CMDC.Event (cmdc v0.5.3)

Copy Markdown View Source

Agent 运行时结构化事件。

CMDC.EventBus 广播的事件格式为 {:cmdc_event, session_id, CMDC.Event.t()}。 外部消费者(TUI / Phoenix / Bot)通过订阅 Registry 接收这些事件。

完整事件类型

会话生命周期

  • :agent_start — Agent 开始处理 prompt
  • {:agent_end, messages, token_usage} — 本轮全部完成,token_usage%CMDC.TokenUsage{} struct
  • {:agent_abort, reason} / :agent_abort — Agent 被中止
  • {:prompt_received, text} — 收到用户 prompt

流式响应

  • :message_start — LLM 开始生成文本(首个 content chunk 前)
  • {:message_delta, %{delta: text}} — LLM 流式 token 片段
  • {:response_complete, %CMDC.Message{}} — 本轮 LLM 响应完整结束(含工具调用解析)
  • :thinking_start — LLM 开始思考链(部分模型)
  • {:thinking_delta, %{delta: text}} — 思考链 token 片段
  • {:status_update, text} — LLM 响应中的 <status> XML 标签内容
  • {:title_generated, title} — LLM 响应中的 <title> XML 标签内容(限 60 字符)

Provider / 请求

  • {:request_start, %{model: model, messages: count}} — LLM 请求即将发起
  • {:stream_error, reason} — 流式响应出错
  • {:stream_stalled, elapsed_s} — 流式响应疑似卡住(超过 stall 阈值未收到 chunk)
  • {:retry, attempt, delay_ms, reason} — 即将重试
  • {:context_overflow, reason} — 上下文长度超限

工具执行

  • {:tool_calls, count} — 本轮 LLM 请求了 count 个工具调用
  • {:tool_execution_start, name, call_id, args} — 工具开始执行
  • {:tool_execution_end, name, call_id, result} — 工具执行完成,result 为 {:ok, text} | {:error, text}

  • {:tool_execution_metrics, name, call_id, meta} — 工具执行耗时指标; 入站时 meta = %{started_at_ms: integer}(紧随 tool_execution_start 后); 结束时 meta = %{started_at_ms: integer, ended_at_ms: integer, duration_ms: non_neg_integer} (紧随 tool_execution_end 后)。订阅方无需自己埋点即可统计 tool 耗时分布
  • {:tool_blocked, name, call_id, reason} — 工具被 Plugin 拦截(block_tool)
  • {:loop_detected, %{type: type, ...}} — 内建循环检测触发,type 为 :repeat_pattern | :file_loop_warn | :file_loop_abort

人机交互(HITL)

  • {:approval_required, approval_map} — 等待人类审批,approval_map 含 id/tool/args/session_id/hint/requested_at
  • {:approval_resolved, approval_map} — 审批已决定,approval_map 含 status: :approved | :rejected | :timeout

  • {:agent_resumed, meta} — Agent 因外部 HITL 信号继续工作; meta.trigger 取值:
    • :tool_approved / :tool_rejected / :tool_approval_timeout — auto-resume 后从 idle 进 running (meta 含 :approval_id
    • :user_respondCMDC.user_respond/3 紧随 :user_responded 后发出(meta 含 :ref
    • :steering:steering_applied 后立即发出(meta 含 :refs:count

Agent 提问

注:AskUser 工具通过 EventBus 广播以下事件。

  • {:ask_user, session_id, question, options, ref} — Agent 主动向用户提问
  • {:user_responded, session_id, ref, response} — 用户已回答

规划

  • {:todo_change, session_id, todos} — Todo 列表变更,todos 为 [map()]

上下文压缩

  • {:compact_start, session_id} — 上下文压缩开始
  • {:compact_end, session_id, removed_count} — 上下文压缩完成

子代理

  • {:subagent_start, session_id, child_session_id, description} — 子代理启动
  • {:subagent_end, session_id, child_session_id, result} — 子代理结束,result 为 {:ok, text} | {:error, reason}

  • {:sub_agent_event, call_id, child_session_id, event} — 子代理内部事件透传

Plugin 自定义事件

  • {:plugin_event, name, payload} — Plugin 通过 emit action 发出的自定义事件; payload 是 map 时自动 merge state.user_data:user_data 字段; Plugin 可在 payload 加 :_no_user_data 键 opt-out(broadcast 前自动 pop,订阅方不可见)

Agent 内部干预

  • {:intervention, prompt} — Agent 检测到循环/异常,注入干预 prompt
  • {:stop_blocked, prompt} — abort 被 Plugin 拦截,注入继续 prompt
  • {:prompt_rejected, reason} — prompt 被 Plugin 拒绝
  • {:prompt_queued, text} — Agent 忙碌,prompt 已入队

中段干预(Steering)

详见 guides/cookbook.md 中段干预配方。

  • {:steering_received, %{ref, text, queued_at, status}}CMDC.steer/2 调用结果广播; status:queued | :rejected_full | :rejected_by_plugin
  • {:steering_applied, %{refs, count}} — Steering queue 已合并注入下一 turn
  • {:tool_skipped_for_steering, %{name, call_id, reason}} — 工具因 steering 被取消; reason:killed_by_steering | :pending_dispatch

Abort

  • {:prompt_dropped, text}CMDC.abort/2 默认清空 pending prompt 队列时,每条被丢弃的 user prompt 文本
  • {:tool_killed, %{name, call_id, reason}}CMDC.abort/2 杀掉的工具任务(mode :killable | :all); reason 取自 abort/2:reason 选项,未提供时为 :aborted

运行期切换模型

  • {:model_switched, %{from: old_model, to: new_model}} — 模型已切换; 由 CMDC.switch_model/2 或 Plugin {:switch_model, model, state} action 触发; same model → no-op 不发;下一次 LLM 请求生效

运行期挂载/卸载工具

  • {:tool_attached, name}CMDC.attach_tool/2 / attach_tools/2 / replace_tools/2 成功挂载新工具
  • {:tool_detached, name}CMDC.detach_tool/2 / detach_tools/2 / replace_tools/2 成功卸载工具
  • {:tools_updated, %{attached: [name, ...], detached: [name, ...]}} — 批量操作汇总事件; attach_tool/2 / detach_tool/2 单次操作不发此事件
  • {:tool_call_unknown, name, call_id} — LLM 调用了 Agent tools 表中找不到的工具; 通常发生在 detach 后的 streaming 已经引用了被卸载工具时; Agent 自动注入 {:error, "tool not found"} 的 synthetic tool_result 让 LLM 纠正

错误

  • {:error, session_id, reason} — 运行时错误

注意

  • 实际广播格式为 {:cmdc_event, session_id, event},消费者需匹配外层元组
  • agent_end / agent_abort 中不含 session_id(session_id 在外层元组)
  • approval_required / approval_resolved 的 payload 是 approval map,不是位置参数
  • tool_execution_start/endcall_id(区别于旧文档中的 tool_start/end

Ring Buffer 与重连补帧

Agent 可通过 Options.event_buffer_size > 0 开启 per-session 事件 ring buffer。 开启后所有事件除了实时广播给订阅者外,还会按 FIFO 缓存到 ETS; 订阅时通过 :since 选项 replay:

# 启动时开启
{:ok, session} = CMDC.create_agent(model: "...", event_buffer_size: 100)

# 重连后补帧
{:ok, _} = CMDC.subscribe(session, since: last_seen_index)

关闭时(默认 0)零内存开销。

Summary

Types

t()

Token 用量类型 — 统一为 CMDC.TokenUsage.t() struct; 保留 map 类型仅用于历史事件兼容。

Functions

返回所有已知事件类型原子标识列表。

从事件中提取 session_id。

判断给定元组或原子是否为合法的 CMDC 事件。

Types

approval_map()

@type approval_map() :: %{
  id: String.t(),
  tool: String.t(),
  args: map(),
  session_id: String.t(),
  hint: String.t(),
  requested_at: integer()
}

ref()

@type ref() :: String.t()

session_id()

@type session_id() :: String.t()

t()

@type t() ::
  :agent_start
  | {:agent_end, messages :: [CMDC.Message.t()], token_usage()}
  | {:agent_abort, reason :: term()}
  | :agent_abort
  | {:prompt_received, text :: String.t()}
  | {:prompt_rejected, reason :: term()}
  | {:prompt_queued, text :: String.t()}
  | :message_start
  | {:message_delta, %{delta: String.t()}}
  | {:response_complete, CMDC.Message.t()}
  | :thinking_start
  | {:thinking_delta, %{delta: String.t()}}
  | {:status_update, text :: String.t()}
  | {:title_generated, title :: String.t()}
  | {:request_start, %{model: term(), messages: non_neg_integer()}}
  | {:stream_error, reason :: term()}
  | {:stream_stalled, elapsed_s :: non_neg_integer()}
  | {:retry, attempt :: pos_integer(), delay_ms :: non_neg_integer(),
     reason :: term()}
  | {:context_overflow, reason :: term()}
  | {:tool_calls, count :: non_neg_integer()}
  | {:tool_execution_start, name :: String.t(), call_id :: String.t(),
     args :: map()}
  | {:tool_execution_end, name :: String.t(), call_id :: String.t(),
     result :: {:ok, String.t()} | {:error, String.t()}}
  | {:tool_execution_metrics, name :: String.t(), call_id :: String.t(),
     meta ::
       %{started_at_ms: integer()}
       | %{
           started_at_ms: integer(),
           ended_at_ms: integer(),
           duration_ms: non_neg_integer()
         }}
  | {:tool_blocked, name :: String.t(), call_id :: String.t(),
     reason :: String.t()}
  | {:loop_detected, %{type: atom()}}
  | {:approval_required, approval_map()}
  | {:approval_resolved, approval_map()}
  | {:agent_resumed, map()}
  | {:ask_user, session_id(), question :: String.t(), options :: [map()], ref()}
  | {:user_responded, session_id(), ref(), response :: term()}
  | {:todo_change, session_id(), todos :: [map()]}
  | {:compact_start, session_id()}
  | {:compact_end, session_id(), removed_count :: non_neg_integer()}
  | {:subagent_start, session_id(), child_session_id :: String.t(),
     description :: String.t()}
  | {:subagent_end, session_id(), child_session_id :: String.t(),
     result :: {:ok, String.t()} | {:error, term()}}
  | {:sub_agent_event, call_id :: String.t(), child_session_id :: String.t(),
     event :: t()}
  | {:plugin_event, name :: atom(), payload :: term()}
  | {:intervention, prompt :: String.t()}
  | {:stop_blocked, prompt :: String.t()}
  | {:steering_received,
     %{
       ref: reference(),
       text: String.t(),
       queued_at: integer(),
       status: :queued | :rejected_full | :rejected_by_plugin
     }}
  | {:steering_applied, %{refs: [reference()], count: pos_integer()}}
  | {:prompt_dropped, text :: String.t()}
  | {:tool_killed, %{name: String.t(), call_id: String.t(), reason: term()}}
  | {:tool_skipped_for_steering,
     %{
       name: String.t(),
       call_id: String.t(),
       reason: :killed_by_steering | :pending_dispatch
     }}
  | {:model_switched, %{from: String.t(), to: String.t()}}
  | {:tool_attached, name :: String.t()}
  | {:tool_detached, name :: String.t()}
  | {:tools_updated, %{attached: [String.t()], detached: [String.t()]}}
  | {:tool_call_unknown, name :: String.t(), call_id :: String.t()}
  | {:error, session_id(), reason :: term()}

token_usage()

@type token_usage() :: CMDC.TokenUsage.t()

Token 用量类型 — 统一为 CMDC.TokenUsage.t() struct; 保留 map 类型仅用于历史事件兼容。

Functions

all_types()

@spec all_types() :: [atom()]

返回所有已知事件类型原子标识列表。

session_id(event)

@spec session_id(t()) :: session_id() | nil

从事件中提取 session_id。

  • 对于含 session_id 字段的事件(如 compact_starttodo_change),返回第二个元素
  • 对于不含 session_id 的事件(如 agent_startmessage_delta),返回 nil

TUI 消费者通常从外层 {:cmdc_event, session_id, event} 元组读取 session_id, 无需调用此函数。

valid?(event)

@spec valid?(term()) :: boolean()

判断给定元组或原子是否为合法的 CMDC 事件。

iex> CMDC.Event.valid?({:message_delta, %{delta: "hello"}})
true
iex> CMDC.Event.valid?(:agent_start)
true
iex> CMDC.Event.valid?({:unknown, "sid"})
false