CMDC.Provider.Registry (cmdc v0.5.3)

Copy Markdown View Source

命名 Provider Profile 注册表 — 多租户场景下的运行时 provider/opts 寻址中心。

让 Agent 创建路径上 model: "registry:tenant-A-anthropic:claude-sonnet-4-5" 字符串一句话替代 200 µs 量级的 per-Agent provider_opts 拼装。

Registry 是 CMDC.Config.providers(启动期静态多路由)的运行时动态补集, 二者并存:model 字符串带 "registry:" prefix 走 Registry,否则走 Config。

设计

  • 每节点本地 ETS:set + read_concurrency: true)—— lookup/1 ≤ 1 µs hot path
  • 写串行化 GenServer call — register/2 / unregister/1 走 GenServer 保证原子
  • 跨节点同步CMDC.Provider.Registry.Broadcaster behaviour 解耦 (默认 Broadcaster.PG best-effort,生产推荐 Phoenix.PubSub)
  • Local-first —— 节点重启 = 集成方自家持久层 init 时 re-register

5 公开 API

:ok = Registry.register("tenant-A-anthropic",
        provider: "anthropic",
        opts: [api_key: System.get_env("TENANT_A_KEY"),
               base_url: "https://litellm.tenant-a.internal"])

{:ok, %{provider: "anthropic", opts: [...]}} = Registry.lookup("tenant-A-anthropic")
{:error, :not_found} = Registry.lookup("not-exist")

:ok = Registry.unregister("tenant-A-anthropic")

["tenant-A-anthropic", "tenant-B-openai"] = Registry.list()

:ok = Registry.subscribe("tenant-A-anthropic")
# 本进程 mailbox 收 {:cmdc_registry, :profile_changed, name, new_opts | nil}

与 Agent / Checkpoint 的关系

  • Agent 启动时若 options.model"registry:profile:model_id"Agent.init/1 一次性 Registry.lookup/1 写入 state.config.provider_opts此后 profile 改动不再影响该 Agent(运行时一致性)
  • 想切到新 profile 走 CMDC.switch_model(sid, "registry:new-profile:model_id") 显式触发,所有切换都有 EventBus :model_switched 事件留痕
  • CMDC.Checkpoint.Snapshot.state.options.model 可能含 "registry:" prefix; resume 时该 profile 必须仍在 Registry,否则 CMDC.resume_session!/2{:error, {:registry_profile_missing, name}}

profile name 约束

  • 必须是非空 binary
  • 不能含 : 字符(否则 "registry:#{name}:#{model_id}" 解析时 name 会被吃掉一段)
  • 违反返 {:error, {:invalid_name, :contains_colon | :empty | :not_binary}}

Studio 现有 "tenant-#{tid}-#{provider}" 风格继续可用;UUID / 自定义 ID 含 : 时调用方需自行替换为 -_

跨节点同步语义

默认 Broadcaster.PG(基于 OTP :pg 模块)是 best-effort send-only

  • 无重试 —— send/2 失败丢消息
  • 无 ack —— 不知道远端是否收到
  • 网络分区会丢更新 —— 集群恢复后不会自动重放

生产多节点推荐实现 Broadcaster behaviour 接 Phoenix.PubSub / Redis Streams, 详见 CMDC.Provider.Registry.Broadcaster moduledoc。

配置

# config/config.exs(可选,默认 Broadcaster.PG)
config :cmdc, CMDC.Provider.Registry,
  broadcaster: MyApp.PhoenixPubSubBroadcaster

Telemetry

事件名metadata
[:cmdc, :provider, :registry, :lookup]%{name, hit?} + measurements %{duration_us}
[:cmdc, :provider, :registry, :register]%{name, broadcaster_called?}

详见 CMDC.Telemetry

Summary

Types

已注册的 Provider Profile 内部表示。

register/2 接受的入参 keyword。

register/2 校验失败原因。

Functions

列出所有已注册的 profile name(无序)。

按 name 精确查找 Profile(hot path,不走 GenServer,直接读 ETS)。

注册或覆盖一个 Provider Profile。同名 register 直接覆盖。

启动 Registry GenServer。CMDC.Application supervisor 树会自动拉起, 不需要业务代码手动调用。

当前进程订阅指定 profile 的变更通知。

注销 Profile。订阅者会收到 {:cmdc_registry, :profile_changed, name, nil}, 跨节点 broadcaster 也会派发 {:unregister, name} 事件。

Types

profile()

@type profile() :: %{provider: String.t(), opts: keyword(), registered_at: integer()}

已注册的 Provider Profile 内部表示。

profile_opts()

@type profile_opts() :: [provider: String.t(), opts: keyword()]

register/2 接受的入参 keyword。

register_error()

@type register_error() ::
  {:invalid_name, :contains_colon | :empty | :not_binary}
  | {:invalid_profile, atom()}

register/2 校验失败原因。

Functions

list()

@spec list() :: [String.t()]

列出所有已注册的 profile name(无序)。

仅用于运维 / 调试场景,不要在 hot path 调用:ets.tab2list 是 O(N) 全表扫描)。

lookup(name)

@spec lookup(String.t()) :: {:ok, profile()} | {:error, :not_found}

按 name 精确查找 Profile(hot path,不走 GenServer,直接读 ETS)。

返回

  • {:ok, profile()} —— %{provider:, opts:, registered_at:}
  • {:error, :not_found} —— profile 未注册或已 unregister

性能

ETS :set read_concurrency: true —— 单次查询 ~500 ns, 10000 次并发不阻塞写。Agent 创建路径仅一次 lookup。

register(name, profile_opts)

@spec register(String.t(), profile_opts()) :: :ok | {:error, register_error()}

注册或覆盖一个 Provider Profile。同名 register 直接覆盖。

参数

  • name — profile 唯一标识(不能含 :,否则字符串协议解析会出错)
  • profile_opts
    • :provider — 必填,req_llm provider 标识(如 "anthropic" / "openai"
    • :opts — 默认 [],透传给 CMDC.Provider.stream/4 的选项 (api_key / base_url / receive_timeout / temperature ...)

返回

  • :ok — 注册成功
  • {:error, {:invalid_name, reason}} — name 不合法
  • {:error, {:invalid_profile, reason}} — profile_opts 不合法

Examples

iex> Registry.register("tenant-A-anthropic",
...>   provider: "anthropic",
...>   opts: [api_key: "sk-tenant-a", base_url: "https://litellm.a.internal"])
:ok

iex> Registry.register("invalid:name", provider: "openai", opts: [])
{:error, {:invalid_name, :contains_colon}}

start_link(opts \\ [])

@spec start_link(keyword()) :: GenServer.on_start()

启动 Registry GenServer。CMDC.Application supervisor 树会自动拉起, 不需要业务代码手动调用。

选项

  • :broadcaster — 覆盖默认 broadcaster module(默认从 Application env 读,再 fallback Broadcaster.PG

subscribe(name)

@spec subscribe(String.t()) :: :ok

当前进程订阅指定 profile 的变更通知。

收到的消息格式:

{:cmdc_registry, :profile_changed, name :: String.t(),
 new_profile_opts :: keyword() | nil}
  • new_profile_opts 是 register 时传入的 [provider:, opts:]
  • nil 表示该 profile 被 unregister

自动清理

当订阅进程退出时(exit / GC),Registry GenServer 通过 monitor 自动清理订阅。 无需手动 unsubscribe

unregister(name)

@spec unregister(String.t()) :: :ok

注销 Profile。订阅者会收到 {:cmdc_registry, :profile_changed, name, nil}, 跨节点 broadcaster 也会派发 {:unregister, name} 事件。

幂等:name 不存在时也返 :ok,不返错误。