Agent 状态机与事件

Copy Markdown View Source

CMDC Agent 是一个 :gen_statem,4 个状态、7 大类事件。看完本章你能精确预测 任何 API 在任何状态下的行为。


4 个状态

状态含义主要外部 API 行为
:idle空闲,等下一条 promptprompt/2 立即开新 turn;steer/3 退化为 prompt
:running已发出 LLM 请求,等首个 chunkprompt/2 入队;steer/3 入 steering queue
:streaming正在接收 LLM 流式输出同上
:executing_toolsLLM 已停,工具批次执行中同上 + 当前批次结束才合并 steering

状态转换图

                         
                                                      
                                                      
     prompt/2     stream chunk 
      idle    running     
                              
                                            
         agent_end /              stream    streaming 
         abort                    done      
                                                 
                                 has tool_calls
                             finalize +    
                             decide next   
                           
                                  has tool_calls
                                 
                          
         executing_tools  
            batch done     
            +next turn

4 状态接收行为

prompt/2

状态行为
:idle立即开新 turn,转 :running
其他pending_messages 队列;本轮结束后自动消费下一条

steer/3(中段干预)

状态行为
:idle退化为 prompt/2
:running入 steering queue,下一 turn 间隙合并注入
:streaming同上
:executing_tools同上 + 当前批次内 killable 工具立即被 brutal_kill

队列满(默认 3)时返回 {:error, :queue_full},发出 :steering_received 事件 status=:rejected_full。Plugin 可在 {:before_steering, text} hook 返回 :abort 拒绝某条 steering。

abort/2

状态默认行为(kill_tools: :killable
:idleno-op,但仍 emit :agent_abort 便于订阅方对账
:running / :streamingcancel stream task
:executing_tools杀非 immune 工具,immune 工具继续;同时 cancel stream

无论何种状态都保证 :agent_abort 事件 100ms 内到达订阅方。

approve/3 / reject/3

HumanApproval Plugin 在 :before_tool 拦下危险工具,把 Agent 暂时退到 :idle 等审批。approve/reject 在 idle 状态下:

  • :auto_resume 默认 true(approve)/ false(reject)
  • auto_resume 后 Agent emit {:agent_resumed, %{trigger: ...}},重新进 :running 让 LLM 重试

switch_model/2 / 3

状态行为
:idle立即切;下一次 prompt/2 用新模型;emit :model_switched
:running / :streaming / :executing_toolsstate.model 立即更新 + emit;本轮跑完,下一轮才用新模型

切到同一模型 = no-op,不发事件。messages / tools / plugin_states 全保留。


完整事件清单

事件外层格式:{:cmdc_event, session_id, event}。下面只列 event 部分。

会话生命周期

  • :agent_start — Agent 开始处理 prompt
  • {:agent_end, messages, %CMDC.TokenUsage{}} — 本轮完成
  • {:agent_abort, reason} / :agent_abort — 中止(reason 为 nil 时发裸 atom)
  • {:agent_resumed, %{trigger}} — 因外部信号续命;trigger:tool_approved | :tool_rejected | :tool_approval_timeout | :user_respond | :steering
  • {:prompt_received, text} — 收到用户 prompt
  • {:prompt_queued, text} — 忙碌中入队
  • {:prompt_dropped, text}abort(:clear_queue) 丢弃的 prompt
  • {:prompt_rejected, reason} — Plugin 拒绝

流式响应

  • :message_start
  • {:message_delta, %{delta: text}}
  • {:response_complete, %CMDC.Message{}}
  • :thinking_start
  • {:thinking_delta, %{delta}}
  • {:status_update, text} / {:title_generated, title}<status>...</status> / <title>...</title> 内联 XML 标签提取

Provider / 请求

  • {:request_start, %{model, messages}}
  • {:stream_error, reason} / {:stream_stalled, elapsed_s}
  • {:retry, attempt, delay_ms, reason}
  • {:context_overflow, reason}

工具执行

  • {:tool_calls, count} — LLM 请求了 N 个工具
  • {:tool_execution_start, name, call_id, args}
  • {:tool_execution_end, name, call_id, result} — result 为 {:ok, _} | {:error, _}

  • {:tool_execution_metrics, name, call_id, %{started_at_ms, ended_at_ms, duration_ms}} — 自动埋点
  • {:tool_blocked, name, call_id, reason} — Plugin block_tool
  • {:tool_killed, %{name, call_id, reason}} — abort 杀掉的
  • {:tool_skipped_for_steering, %{name, call_id, reason}} — Steering 跳过
  • {:tool_attached, name} / {:tool_detached, name}attach_tool/2 / detach_tool/2 调用结果
  • {:tools_updated, %{attached, detached}} — 批量原子操作汇总
  • {:tool_call_unknown, name, call_id} — LLM 引用了已 detach 的工具
  • {:loop_detected, %{type, ...}} — 内建循环检测;type 为 :repeat_pattern | :file_loop_warn | :file_loop_abort

人机交互

  • {:approval_required, approval_map} — 等待审批
  • {:approval_resolved, approval_map} — 已决定(含 :status:approved | :rejected | :timeout
  • {:tool_approved_always, %{tool, command_family}}approve_always 白名单写入
  • {:ask_user, sid, question, options, ref} — Agent 主动提问
  • {:user_responded, sid, ref, response} — 用户已答

中段干预

  • {:steering_received, %{ref, text, queued_at, status}}steer/2 调用结果
  • {:steering_applied, %{refs, count}} — queue 已合并到下一 turn
  • {:tool_skipped_for_steering, ...} — 工具因 steering 取消

上下文压缩

  • {:compact_start, sid} / {:compact_end, sid, removed_count}
  • {:before_compact, messages} — Plugin hook 同名

子代理

  • {:subagent_start, sid, child_sid, description}
  • {:subagent_end, sid, child_sid, result}
  • {:sub_agent_event, call_id, child_sid, event} — 子代理内部事件透传

模型 / 计划 / 通知

  • {:model_switched, %{from, to, provider_opts_changed?}}
  • {:todo_change, sid, todos}WriteTodos 工具更新
  • {:plan_generated, %CMDC.Plan{}}Planning Plugin 解析
  • {:memory_flushed, %{facts, count, sid}}MemoryFlush 写入
  • {:large_result_offloaded, %{tool, call_id, path, bytes}}
  • {:content_policy_violated, %{summary, triggered_policies}}
  • {:plugin_event, name, payload} — 自定义事件(payload map 自动注入 user_data)
  • {:intervention, prompt} — Agent 注入干预 prompt

订阅模板

defmodule MyApp.AgentObserver do
  use GenServer

  def start_link(session) do
    GenServer.start_link(__MODULE__, session)
  end

  def init(session) do
    CMDC.subscribe(session)
    CMDC.monitor(session)
    {:ok, %{session: session, deltas: []}}
  end

  def handle_info({:cmdc_event, _sid, {:message_delta, %{delta: t}}}, st) do
    IO.write(t)
    {:noreply, %{st | deltas: [t | st.deltas]}}
  end

  def handle_info({:cmdc_event, _sid, {:tool_execution_start, name, _, _}}, st) do
    IO.puts("\n  [tool] #{name} ...")
    {:noreply, st}
  end

  def handle_info({:cmdc_event, _sid, {:agent_end, _msgs, usage}}, st) do
    IO.puts("\n[done] tokens=#{usage.total_tokens} cost=#{usage.cost_usd}")
    {:noreply, st}
  end

  def handle_info({:cmdc_down, _ref, _sid, reason}, st) do
    IO.puts("[crashed] #{inspect(reason)}")
    {:stop, :normal, st}
  end

  def handle_info(_, st), do: {:noreply, st}
end

重连补帧(断网恢复)

启用 ring buffer:

{:ok, session} = CMDC.create_agent(
  model: "...",
  event_buffer_size: 200  # 默认 0 = 关闭
)

订阅时给 :since

last_index = MyApp.LastSeen.read(session_id)
{:ok, _} = CMDC.subscribe(session, since: last_index, types: [:message_delta, :agent_end])

可选 :types 白名单只 replay 关心的事件,省网络流量。


下一步