基于 Registry 的会话事件发布/订阅系统,附带可选的 ring buffer 用于断线重连补帧。
基础订阅
CMDC.EventBus.subscribe("session-abc123")
receive do
{:cmdc_event, "session-abc123", event} -> handle_event(event)
end
CMDC.EventBus.broadcast("session-abc123", {:stream_chunk, "session-abc123", "hello"})
CMDC.EventBus.subscribe_all()Ring Buffer (v0.2 RFC C10)
适用场景:WebSocket / Channel 短暂断线重连时不丢事件。
Agent 启动时通过 CMDC.Options.new!(event_buffer_size: 100) 开启该会话的内存
ring buffer。Buffer 满后自动丢弃最早事件(FIFO)。默认 0(关闭,零内存开销)。
重连后用 :since 选项 replay:
{:ok, _pid} = CMDC.EventBus.subscribe("session-abc123", since: last_index)
receive do
{:cmdc_event, _sid, event} -> # ... 包含 replay + 新事件
end典型 buffer size 建议 50 ~ 200,对应 1-3 秒内的 stream_chunk 风暴。
Buffer 由 ETS 实现,session 终止时通过 disable_buffer/1 清理(CMDC.Agent
在 terminate/3 自动调用)。
Summary
Functions
广播事件到指定会话所有订阅者,并写入 ring buffer(若已开启)。
关闭 ring buffer 并清理该会话所有缓存事件(CMDC.Agent.terminate 自动调用)。
开启指定会话的 ring buffer(CMDC.Agent.init 自动调用)。
返回该会话最新事件 index(未开启 buffer 时为 nil)。
订阅当前进程对指定会话 ID 的事件。
订阅当前进程对所有会话的事件(监控/调试用)。
取消当前进程对指定会话 ID 的订阅。
取消当前进程的通配符订阅。
Functions
@spec broadcast(String.t(), term()) :: :ok | {:ok, pos_integer()}
广播事件到指定会话所有订阅者,并写入 ring buffer(若已开启)。
返回事件分配到的单调递增 index(开启 buffer 时)或 :ok(未开启)。
Registry 未启动时(如部分测试环境)静默返回 :ok。
@spec disable_buffer(String.t()) :: :ok
关闭 ring buffer 并清理该会话所有缓存事件(CMDC.Agent.terminate 自动调用)。
@spec enable_buffer(String.t(), non_neg_integer()) :: :ok
开启指定会话的 ring buffer(CMDC.Agent.init 自动调用)。
buffer_size 必须为正整数;<= 0 等同 no-op。
@spec last_index(String.t()) :: non_neg_integer() | nil
返回该会话最新事件 index(未开启 buffer 时为 nil)。
常用于断线重连前记录"上次看到的 index",重连时传 subscribe(sid, since: idx)。
订阅当前进程对指定会话 ID 的事件。
接收 {:cmdc_event, session_id, event} 消息。
Options(v0.2 RFC C10 + v0.3 RFC 11G #C22)
:since :: non_neg_integer()— 从该 index 之后的事件开始 replay。要求该 会话已开启 ring buffer(Options.event_buffer_size > 0),否则该选项被忽略。 如since早于 buffer 起点(已被丢弃),仅 replay 还在 buffer 内的部分。:types :: [atom()]— 只 replay 这些 type 的事件(v0.3 RFC 11G #C22 新增); 适用于 UI 重连补帧只想要:stream_chunk+:agent_end不想要:tool_started等内部事件的场景。事件 type 来自事件第一个元素(如{:agent_end, _, _}的 type 是:agent_end,裸 atom 事件如:agent_starttype 就是它自己)。 此选项仅影响 replay 阶段;实时订阅不过滤,避免漏事件。
返回 {:ok, pid},replay 事件在订阅注册后按时序异步投递到当前进程。
订阅当前进程对所有会话的事件(监控/调试用)。
@spec unsubscribe(String.t()) :: :ok
取消当前进程对指定会话 ID 的订阅。
@spec unsubscribe_all() :: :ok
取消当前进程的通配符订阅。