CMDC.EventBus (cmdc v0.5.0)

Copy Markdown View Source

基于 Registry 的会话事件发布/订阅系统,附带可选的 ring buffer 用于断线重连补帧。

基础订阅

CMDC.EventBus.subscribe("session-abc123")

receive do
  {:cmdc_event, "session-abc123", event} -> handle_event(event)
end

CMDC.EventBus.broadcast("session-abc123", {:stream_chunk, "session-abc123", "hello"})

CMDC.EventBus.subscribe_all()

Ring Buffer

适用场景:WebSocket / Channel 短暂断线重连时不丢事件。

Agent 启动时通过 CMDC.Options.new!(event_buffer_size: 100) 开启该会话的内存 ring buffer。Buffer 满后自动丢弃最早事件(FIFO)。默认 0(关闭,零内存开销)。

重连后用 :since 选项 replay:

{:ok, _pid} = CMDC.EventBus.subscribe("session-abc123", since: last_index)

receive do
  {:cmdc_event, _sid, event} -> # ... 包含 replay + 新事件
end

典型 buffer size 建议 50 ~ 200,对应 1-3 秒内的 stream_chunk 风暴。 Buffer 由 ETS 实现,session 终止时通过 disable_buffer/1 清理(CMDC.Agent 在 terminate/3 自动调用)。

Summary

Functions

广播事件到指定会话所有订阅者,并写入 ring buffer(若已开启)。

关闭 ring buffer 并清理该会话所有缓存事件(CMDC.Agent.terminate 自动调用)。

开启指定会话的 ring buffer(CMDC.Agent.init 自动调用)。

返回该会话最新事件 index(未开启 buffer 时为 nil)。

订阅当前进程对指定会话 ID 的事件。

订阅当前进程对所有会话的事件(监控/调试用)。

取消当前进程对指定会话 ID 的订阅。

取消当前进程的通配符订阅。

Functions

broadcast(session_id, event)

@spec broadcast(String.t(), term()) :: :ok | {:ok, pos_integer()}

广播事件到指定会话所有订阅者,并写入 ring buffer(若已开启)。

返回事件分配到的单调递增 index(开启 buffer 时)或 :ok(未开启)。 Registry 未启动时(如部分测试环境)静默返回 :ok

disable_buffer(session_id)

@spec disable_buffer(String.t()) :: :ok

关闭 ring buffer 并清理该会话所有缓存事件(CMDC.Agent.terminate 自动调用)。

enable_buffer(session_id, size)

@spec enable_buffer(String.t(), non_neg_integer()) :: :ok

开启指定会话的 ring buffer(CMDC.Agent.init 自动调用)。

buffer_size 必须为正整数;<= 0 等同 no-op。

last_index(session_id)

@spec last_index(String.t()) :: non_neg_integer() | nil

返回该会话最新事件 index(未开启 buffer 时为 nil)。

常用于断线重连前记录"上次看到的 index",重连时传 subscribe(sid, since: idx)

subscribe(session_id, opts \\ [])

@spec subscribe(
  String.t(),
  keyword()
) :: {:ok, pid()} | {:error, term()}

订阅当前进程对指定会话 ID 的事件。

接收 {:cmdc_event, session_id, event} 消息。

选项

  • :since :: non_neg_integer() — 从该 index 之后的事件开始 replay。要求该 会话已开启 ring buffer(Options.event_buffer_size > 0),否则该选项被忽略。 如 since 早于 buffer 起点(已被丢弃),仅 replay 还在 buffer 内的部分。
  • :types :: [atom()] — 只 replay 这些 type 的事件; 适用于 UI 重连补帧只想要 :stream_chunk + :agent_end 不想要 :tool_started 等内部事件的场景。事件 type 来自事件第一个元素(如 {:agent_end, _, _} 的 type 是 :agent_end,裸 atom 事件如 :agent_start type 就是它自己)。 此选项仅影响 replay 阶段;实时订阅不过滤,避免漏事件。

返回 {:ok, pid},replay 事件在订阅注册后按时序异步投递到当前进程。

subscribe_all()

@spec subscribe_all() :: {:ok, pid()} | {:error, {:already_registered, pid()}}

订阅当前进程对所有会话的事件(监控/调试用)。

unsubscribe(session_id)

@spec unsubscribe(String.t()) :: :ok

取消当前进程对指定会话 ID 的订阅。

unsubscribe_all()

@spec unsubscribe_all() :: :ok

取消当前进程的通配符订阅。