BB.MCP.EventBuffer (bb_mcp v0.1.0)

Copy Markdown View Source

Per-session ring buffer for BB pubsub events.

Each MCP session subscribes to all configured robots' pubsub trees and records arriving messages in a bounded buffer that the agent can query via the query_events tool.

The buffer is stored directly in Anubis.Server.Frame.assigns[:event_buffer] and updated on every handle_info({:bb, path, %BB.Message{}}, frame) call — no extra processes per session.

Summary

Functions

Capacity sourced from :bb_mcp app config, falling back to the default.

Build an empty buffer with the given capacity.

Append a pubsub event to the buffer, dropping the oldest entry if at capacity.

Return matching events, newest first, serialised to JSON-safe maps.

Record a subscription pair so the session can unsubscribe later.

Types

entry()

@type entry() :: %{
  monotonic_ns: integer(),
  received_ns: integer(),
  robot: String.t(),
  path: [atom()],
  payload_module: module(),
  message: BB.Message.t()
}

filters()

@type filters() :: %{
  optional(:robot) => String.t(),
  optional(:message_type) => String.t(),
  optional(:path_prefix) => String.t(),
  optional(:since_ms) => non_neg_integer(),
  optional(:limit) => pos_integer()
}

t()

@type t() :: %{
  events: [entry()],
  capacity: pos_integer(),
  subscriptions: [{String.t(), module()}]
}

Functions

configured_capacity()

@spec configured_capacity() :: pos_integer()

Capacity sourced from :bb_mcp app config, falling back to the default.

new(capacity \\ 1000)

@spec new(pos_integer()) :: t()

Build an empty buffer with the given capacity.

push(buffer, robot_name, path, message)

@spec push(t(), String.t(), [atom()], BB.Message.t()) :: t()

Append a pubsub event to the buffer, dropping the oldest entry if at capacity.

query(buffer, filters \\ %{})

@spec query(t(), filters()) :: [map()]

Return matching events, newest first, serialised to JSON-safe maps.

record_subscription(buffer, robot_name, robot_module)

@spec record_subscription(t(), String.t(), module()) :: t()

Record a subscription pair so the session can unsubscribe later.