Plugin 是 CMDC 注入业务切面的标准方式。本章给出最小可运行模板、13 hook × 8 action 矩阵、以及 5 个完整 Plugin 范例。


Plugin behaviour

实现 CMDC.Plugin 三个 callback 即可:

defmodule MyApp.MyPlugin do
  @behaviour CMDC.Plugin

  @impl true
  def init(opts) do
    # opts 来自 create_agent 时传的 {MyPlugin, opts}
    {:ok, %{counter: 0, opts: opts}}
  end

  @impl true
  def priority, do: 100  # 1-1000,小的先执行;同 priority 内顺序未定

  @impl true
  def handle_event({:before_tool, name, args}, state, ctx) do
    # state 是本 plugin 自己的状态;ctx 是 CMDC.Context.t()
    {:continue, %{state | counter: state.counter + 1}}
  end

  def handle_event(_event, state, _ctx), do: {:continue, state}
end

挂载:

{:ok, session} = CMDC.create_agent(
  model: "...",
  plugins: [{MyApp.MyPlugin, [my_opt: :foo]}]
)

13 个 hook 速查

hook触发时机
:session_startAgent 会话刚启动
:session_endAgent 会话正常结束
{:after_turn, payload}每 turn 回 idle 前(finish + abort 双路径)
{:before_prompt, text}用户 prompt 提交前
{:before_request, messages}LLM 请求前(可改 messages)
{:after_response, assistant_msg}LLM 回复后
{:before_tool, name, args}单工具执行前
{:on_tool_error, name, call_id, error, attempt}工具失败、retry 前
{:after_tool, name, call_id, result}单工具执行后
{:after_tool_batch, results}批工具全部完成后
:before_finishAgent 准备返回最终结果前
{:before_compact, messages}上下文压缩前
{:before_steering, text}steer/2 中段软中断入队前

8 种 action

action元组形式含义
continue{:continue, state}继续下一个 plugin
intervene{:intervene, prompt, state}注入提示文本(多个 plugin 同时 intervene 时按 priority 顺序拼接)
abort{:abort, reason, state}短路 + Agent 回 idle + emit :agent_abort
skip{:skip, state}短路 Pipeline,不影响 Agent 主流程
block_tool{:block_tool, reason, state}:before_tool,阻止当前工具,注入 synthetic error result
replace_tool_args{:replace_tool_args, new_args, state}:before_tool,覆盖参数
replace_tool_result{:replace_tool_result, new_result, state}:after_tool,覆盖结果
emit{:emit, {name, payload}, state} 等 4 种形态广播自定义事件(累积)
switch_model{:switch_model, model, state}{:switch_model, model, state, opts}运行期换模型

Hook × Action 矩阵

表示该 hook 接受该 action;- 表示忽略;每个 hook 都隐含支持 :continue:emit

Hook \ Actioncontinueinterveneabortskipblock_toolreplace_argsreplace_resultemitswitch_model
:session_start------
:session_end-------
{:after_turn, p}-------
{:before_prompt, t}----
{:before_request, m}---
{:after_response, m}---
{:before_tool, n, a}---
{:on_tool_error, ...}-----*
{:after_tool, ...}---
{:after_tool_batch, r}----
:before_finish-----
{:before_compact, m}------
{:before_steering, t}-----

\*:on_tool_error 在 Task retry 内部 Pipeline 触发,无完整 Agent state 上下文,:switch_model 被收集但不应用。要在工具失败后切模型请改用 :after_tool 钩子匹配 {:error, _} result。


范例 1 — 危险命令拦截器(block_tool)

defmodule MyApp.DangerousCommandGuard do
  @behaviour CMDC.Plugin

  @denied ~w(rm dd mkfs sudo curl wget)

  @impl true
  def init(_), do: {:ok, %{}}

  @impl true
  def priority, do: 50  # 早于 HumanApproval

  @impl true
  def handle_event({:before_tool, "shell", %{"command" => cmd}}, state, _ctx) do
    bin = cmd |> String.split(" ", parts: 2) |> hd() |> Path.basename()

    if bin in @denied do
      {:block_tool, "Command '#{bin}' is in the deny list.", state}
    else
      {:continue, state}
    end
  end

  def handle_event(_, state, _), do: {:continue, state}
end

范例 2 — Prompt 审计日志(emit + 文件落盘)

defmodule MyApp.PromptAuditLog do
  @behaviour CMDC.Plugin

  @impl true
  def init(opts), do: {:ok, %{file: Keyword.fetch!(opts, :file)}}

  @impl true
  def priority, do: 200

  @impl true
  def handle_event({:before_prompt, text}, state, ctx) do
    line = "#{DateTime.utc_now()} #{ctx.session_id} #{inspect(text)}\n"
    File.write!(state.file, line, [:append])

    {:emit, {:prompt_audited, %{session_id: ctx.session_id, length: byte_size(text)}}, state}
  end

  def handle_event(_, state, _), do: {:continue, state}
end

范例 3 — 成本预算 abort(after_response)

defmodule MyApp.BudgetGuard do
  @behaviour CMDC.Plugin

  @impl true
  def init(opts) do
    {:ok, %{max_usd: Keyword.fetch!(opts, :max_usd)}}
  end

  @impl true
  def priority, do: 300

  @impl true
  def handle_event({:after_response, _msg}, state, ctx) do
    if ctx.cost_usd > state.max_usd do
      {:abort, {:budget_exceeded, ctx.cost_usd, state.max_usd}, state}
    else
      {:continue, state}
    end
  end

  def handle_event(_, state, _), do: {:continue, state}
end

挂载后超预算自动 abort,订阅方收到 {:agent_abort, {:budget_exceeded, ...}}


范例 4 — 工具失败降级换模型

:on_tool_error 不能切 model,改用 :after_tool 匹配 {:error, _}

defmodule MyApp.ToolFallbackGuard do
  @behaviour CMDC.Plugin

  @impl true
  def init(opts) do
    {:ok, %{
      fallback: Keyword.fetch!(opts, :fallback),
      threshold: Keyword.get(opts, :consecutive_failures, 3),
      counter: 0
    }}
  end

  @impl true
  def priority, do: 400

  @impl true
  def handle_event({:after_tool, _name, _id, {:error, _}}, %{counter: n} = state, _ctx) do
    new_state = %{state | counter: n + 1}

    if new_state.counter >= state.threshold do
      {:switch_model, state.fallback, %{new_state | counter: 0}}
    else
      {:continue, new_state}
    end
  end

  def handle_event({:after_tool, _, _, {:ok, _}}, state, _ctx) do
    {:continue, %{state | counter: 0}}
  end

  def handle_event(_, state, _), do: {:continue, state}
end

范例 5 — 敏感词拦截 :before_request

defmodule MyApp.SensitiveContentGuard do
  @behaviour CMDC.Plugin

  @impl true
  def init(opts) do
    {:ok, %{words: MapSet.new(Keyword.fetch!(opts, :words))}}
  end

  @impl true
  def priority, do: 100

  @impl true
  def handle_event({:before_request, messages}, state, _ctx) do
    text = messages |> Enum.map(& &1.content) |> Enum.join(" ") |> String.downcase()

    hit = Enum.find(state.words, fn w -> String.contains?(text, w) end)

    if hit do
      {:abort, {:sensitive_word_detected, hit}, state}
    else
      {:continue, state}
    end
  end

  def handle_event(_, state, _), do: {:continue, state}
end

范例 6 — v0.5 新增 :after_turn hook 写长期记忆

v0.3 起新增 :after_turn hook,每个 turn 回 idle 前触发(finish + abort 双路径), payload 含完整结构化信息(messages_diff / token_usage_diff / duration_ms 等)。 比 :session_end 触发更频繁,新 plugin 推荐用它写审计 / 长期记忆 / 计费等:

defmodule MyApp.TurnAuditor do
  @behaviour CMDC.Plugin

  @impl true
  def init(opts), do: {:ok, %{db_pool: Keyword.fetch!(opts, :db_pool)}}

  @impl true
  def priority, do: 950   # 低优先级,业务 plugin 之后跑

  @impl true
  def handle_event({:after_turn, payload}, state, ctx) do
    # payload 字段:
    #   :outcome - :finished | :aborted
    #   :abort_reason - term | nil
    #   :messages_diff - 本 turn 新增的 messages(按时间顺序)
    #   :token_usage_diff - %CMDC.TokenUsage{} 本 turn 增量
    #   :started_at_ms / :ended_at_ms / :duration_ms

    Task.Supervisor.start_child(MyApp.AsyncTaskSupervisor, fn ->
      MyApp.Audit.record_turn(state.db_pool, %{
        session_id: ctx.session_id,
        user_id: ctx.user_data[:user_id],
        outcome: payload.outcome,
        prompt_tokens: payload.token_usage_diff.prompt_tokens,
        completion_tokens: payload.token_usage_diff.completion_tokens,
        duration_ms: payload.duration_ms,
        message_count: length(payload.messages_diff)
      })
    end)

    {:continue, state}
  end

  def handle_event(_, state, _), do: {:continue, state}
end

异步走 Task.Supervisor 避免阻塞 Agent gen_statem callback。 内置 AutoCheckpoint Plugin 是这个模式的典型实现(按 turn 触发 + 异步 save + GC)。


Action 失败/嵌套语义

  • :abort 短路:Pipeline 立即停止,后续 plugin 不再执行;Agent 直接回 idle + 广播 {:agent_abort, reason} + 触发 :after_turn(outcome=:aborted)。
  • :block_tool 短路(仅 :before_tool):当前工具不执行,注入 synthetic error tool_result;同批其他工具继续按调度。
  • :skip 短路:跳过所有后续 plugin,但不影响 Agent 主流程(按"无 action" 继续推进)。
  • :intervene 累积式:多个 plugin 同时 intervene 时按 priority 顺序 拼接(\n\n 分隔),最终一次性注入;不短路。
  • :replace_tool_args / :replace_tool_result / :switch_model 覆盖式: 多个 plugin 同时返回时取最后执行(priority 最大)的值;不短路。
  • :emit 累积式:所有 plugin 的 emit 事件按时序追加,Pipeline 结束后 Agent 统一 broadcast;不短路。

emit 自动注入 user_data

emit 出来的 {:plugin_event, name, payload} 事件,当 payload 是 map 时 Pipeline 会自动 merge state.user_data:user_data 字段(除非 payload 含 :_no_user_data)。这让 plugin 不需要每次手动传 tenant_id / user_id。


17 个内置 Plugin(v0.5 新增 1 个)

CMDC 提供 16 个开箱即用的 Plugin(按职能分两组):

安全与控制

优化与记忆


下一步