# `CMDC.EventBus`
[🔗](https://github.com/tuplehq/cmdc/blob/v0.4.0/lib/cmdc/event_bus.ex#L1)

基于 Registry 的会话事件发布/订阅系统，附带可选的 ring buffer 用于断线重连补帧。

## 基础订阅

    CMDC.EventBus.subscribe("session-abc123")

    receive do
      {:cmdc_event, "session-abc123", event} -> handle_event(event)
    end

    CMDC.EventBus.broadcast("session-abc123", {:stream_chunk, "session-abc123", "hello"})

    CMDC.EventBus.subscribe_all()

## Ring Buffer (v0.2 RFC C10)

适用场景：WebSocket / Channel 短暂断线重连时不丢事件。

Agent 启动时通过 `CMDC.Options.new!(event_buffer_size: 100)` 开启该会话的内存
ring buffer。Buffer 满后自动丢弃最早事件（FIFO）。默认 `0`（关闭，零内存开销）。

重连后用 `:since` 选项 replay：

    {:ok, _pid} = CMDC.EventBus.subscribe("session-abc123", since: last_index)

    receive do
      {:cmdc_event, _sid, event} -> # ... 包含 replay + 新事件
    end

典型 buffer size 建议 `50 ~ 200`，对应 1-3 秒内的 stream_chunk 风暴。
Buffer 由 ETS 实现，session 终止时通过 `disable_buffer/1` 清理（CMDC.Agent
在 `terminate/3` 自动调用）。

# `broadcast`

```elixir
@spec broadcast(String.t(), term()) :: :ok | {:ok, pos_integer()}
```

广播事件到指定会话所有订阅者，并写入 ring buffer（若已开启）。

返回事件分配到的单调递增 `index`（开启 buffer 时）或 `:ok`（未开启）。
Registry 未启动时（如部分测试环境）静默返回 `:ok`。

# `disable_buffer`

```elixir
@spec disable_buffer(String.t()) :: :ok
```

关闭 ring buffer 并清理该会话所有缓存事件（CMDC.Agent.terminate 自动调用）。

# `enable_buffer`

```elixir
@spec enable_buffer(String.t(), non_neg_integer()) :: :ok
```

开启指定会话的 ring buffer（CMDC.Agent.init 自动调用）。

`buffer_size` 必须为正整数；`<= 0` 等同 no-op。

# `last_index`

```elixir
@spec last_index(String.t()) :: non_neg_integer() | nil
```

返回该会话最新事件 index（未开启 buffer 时为 `nil`）。

常用于断线重连前记录"上次看到的 index"，重连时传 `subscribe(sid, since: idx)`。

# `subscribe`

```elixir
@spec subscribe(
  String.t(),
  keyword()
) :: {:ok, pid()} | {:error, term()}
```

订阅当前进程对指定会话 ID 的事件。

接收 `{:cmdc_event, session_id, event}` 消息。

## Options（v0.2 RFC C10 + v0.3 RFC 11G #C22）

- `:since :: non_neg_integer()` — 从该 index 之后的事件开始 replay。要求该
  会话已开启 ring buffer（`Options.event_buffer_size > 0`），否则该选项被忽略。
  如 `since` 早于 buffer 起点（已被丢弃），仅 replay 还在 buffer 内的部分。
- `:types :: [atom()]` — 只 replay 这些 type 的事件（**v0.3 RFC 11G #C22 新增**）;
  适用于 UI 重连补帧只想要 `:stream_chunk` + `:agent_end` 不想要 `:tool_started`
  等内部事件的场景。事件 type 来自事件第一个元素（如 `{:agent_end, _, _}` 的 type
  是 `:agent_end`，裸 atom 事件如 `:agent_start` type 就是它自己）。
  **此选项仅影响 replay 阶段**；实时订阅不过滤，避免漏事件。

返回 `{:ok, pid}`，replay 事件在订阅注册后按时序异步投递到当前进程。

# `subscribe_all`

```elixir
@spec subscribe_all() :: {:ok, pid()} | {:error, {:already_registered, pid()}}
```

订阅当前进程对**所有**会话的事件（监控/调试用）。

# `unsubscribe`

```elixir
@spec unsubscribe(String.t()) :: :ok
```

取消当前进程对指定会话 ID 的订阅。

# `unsubscribe_all`

```elixir
@spec unsubscribe_all() :: :ok
```

取消当前进程的通配符订阅。

---

*Consult [api-reference.md](api-reference.md) for complete listing*
