System Overview
The Elixir Codex SDK is a layered architecture that wraps the codex-rs CLI executable and provides an idiomatic OTP-based interface. The system is designed around three core principles:
- Process Isolation: Each turn execution runs in its own GenServer
- Clean Separation: Clear boundaries between client API, process management, and IPC
- Robust Error Handling: Failures are isolated and cleanly propagated
Transports
codex_sdk supports two upstream external transports:
- Exec JSONL (default): spawns
codex exec --jsonand parses JSONL events - App-server JSON-RPC (optional): maintains a stateful
codex app-serversubprocess and speaks newline-delimited JSON-RPC over stdio
Transport selection is per-thread via Codex.Thread.Options.transport:
{:ok, conn} = Codex.AppServer.connect(codex_opts)
{:ok, thread_opts} = Codex.Thread.Options.new(%{transport: {:app_server, conn}})Component Architecture
High-Level Component Diagram
┌───────────────────────────────────────────────────────────────┐
│ Client Code │
│ (User application using Codex SDK) │
└────────────────┬──────────────────────────────────────────────┘
│
│ Public API
▼
┌───────────────────────────────────────────────────────────────┐
│ Codex Module │
│ - start_thread/2 │
│ - resume_thread/3 │
│ (Factory for Thread instances) │
└────────────────┬──────────────────────────────────────────────┘
│
│ Returns Thread struct
▼
┌───────────────────────────────────────────────────────────────┐
│ Codex.Thread Module │
│ - run/3 (blocking) │
│ - run_streamed/3 (streaming) │
│ (Manages turn execution lifecycle) │
└────────────────┬──────────────────────────────────────────────┘
│
│ Transport dispatch
▼
┌───────────────────────────────────────────────────────────────┐
│ Codex.Transport (behaviour) │
│ - Exec JSONL: Codex.Exec │
│ - App-server: Codex.AppServer.Connection │
└────────────────┬──────────────────────────────────────────────┘
│
│ Port (stdin/stdout)
▼
┌───────────────────────────────────────────────────────────────┐
│ codex-rs Process │
│ - OpenAI API integration │
│ - Command execution │
│ - File operations │
│ - Event emission │
└───────────────────────────────────────────────────────────────┘Module Breakdown
1. Codex Module
Purpose: Main entry point and factory for thread instances.
Responsibilities:
- Validate global options (API key, base URL, codex path)
- Create new thread instances
- Resume existing threads from saved sessions
State: Stateless module (pure functions)
Key Functions:
@spec start_thread(Codex.Options.t(), Codex.Thread.Options.t()) ::
{:ok, Codex.Thread.t()} | {:error, term()}
@spec resume_thread(String.t(), Codex.Options.t(), Codex.Thread.Options.t()) ::
{:ok, Codex.Thread.t()} | {:error, term()}Error Handling:
- Validates codex binary exists and is executable
- Validates options format
- Returns descriptive errors for invalid configurations
2. Codex.Thread Module
Purpose: Manages individual conversation threads.
Responsibilities:
- Execute turns (blocking and streaming modes)
- Maintain thread ID and options
- Coordinate with Exec GenServer
- Handle structured output schemas and rate limit snapshots
State: Encapsulated in %Codex.Thread{} struct (includes transport metadata)
defstruct [
:thread_id, # String.t() | nil (populated after first turn)
:codex_opts, # %Codex.Options{}
:thread_opts, # %Codex.Thread.Options{}
:rate_limits, # latest rate limit snapshot (if provided)
:transport # :exec | {:app_server, pid()}
]Key Functions:
@spec run(t(), String.t() | [map()], Codex.Turn.Options.t()) ::
{:ok, Codex.Turn.Result.t()} | {:error, term()}
@spec run_streamed(t(), String.t() | [map()], Codex.Turn.Options.t()) ::
{:ok, Enumerable.t()} | {:error, term()}App-server transport accepts UserInput block lists (text/image/localImage); exec JSONL expects text input.
Execution Flow (Blocking Mode):
- Create output schema file if needed
- Start
Codex.ExecGenServer with options - Wait for events, accumulating items
- Extract final response from last
AgentMessage - Return
TurnResultwhenTurnCompletedreceived - Clean up schema file and Exec process
Execution Flow (Streaming Mode):
- Create output schema file if needed
- Start
Codex.ExecGenServer with options - Return Stream that yields events as they arrive
- Clean up when stream completes or is halted
3. Codex.Exec GenServer
Purpose: Manages the lifecycle of a single codex-rs process execution.
Responsibilities:
- Spawn codex-rs process via Port
- Send input prompt via stdin
- Receive and parse JSONL events from stdout
- Monitor process health and exit status
- Clean up resources on completion or crash
State:
defstruct [
:port, # Port.t()
:caller, # pid() of requesting process
:ref, # reference() for synchronization
:buffer, # String.t() for incomplete lines
:exit_status, # integer() | nil
:stderr_buffer # String.t() for error messages
]Lifecycle:
init/1:
- Build command args from options
- Set environment variables
- Spawn Port with codex-rs process
- Send telemetry event (turn started)
Message Handling:
{port, {:data, data}}: Parse JSONL lines, send events to caller{port, {:exit_status, status}}: Handle process exit{:EXIT, port, reason}: Handle unexpected crashes
terminate/2:
- Close port if still open
- Send telemetry event (turn completed/failed)
- Clean up any remaining resources
Error Scenarios:
- Spawn failure: Return error immediately
- JSON parse error: Emit error event, continue processing
- Non-zero exit: Emit
TurnFailedwith stderr contents - Process crash: Emit
TurnFailedwith crash reason
GenServer API:
@spec start_link(keyword()) :: GenServer.on_start()
@spec run_turn(pid(), String.t(), map()) :: {:ok, reference()}4. Type Modules
Codex.Events
Defines all event types emitted during turn execution.
TypedStruct Definitions:
defmodule Codex.Events.ThreadStarted do
use TypedStruct
typedstruct do
field :type, :thread_started, enforce: true
field :thread_id, String.t(), enforce: true
end
end
# Similar for:
# - TurnStarted
# - TurnCompleted (with Usage)
# - TurnFailed (with ThreadError)
# - ItemStarted (with ThreadItem)
# - ItemUpdated (with ThreadItem)
# - ItemCompleted (with ThreadItem)Codex.Items
Defines all item types and their variants.
Item Types:
AgentMessage: Text or JSON responseReasoning: Agent's thinking summaryCommandExecution: Command with output and exit codeFileChange: File modifications with changes arrayMcpToolCall: MCP tool invocationWebSearch: Search queryTodoList: Agent's task listError: Non-fatal error
Example:
defmodule Codex.Items.CommandExecution do
use TypedStruct
typedstruct do
field :id, String.t(), enforce: true
field :type, :command_execution, default: :command_execution
field :command, String.t(), enforce: true
field :aggregated_output, String.t(), default: ""
field :exit_code, integer()
field :status, atom(), enforce: true
end
endCodex.Options
Configuration structs for each level.
defmodule Codex.Options do
use TypedStruct
typedstruct do
field :codex_path_override, String.t()
field :base_url, String.t()
field :api_key, String.t()
end
end
defmodule Codex.Thread.Options do
use TypedStruct
typedstruct do
field :model, String.t()
field :sandbox_mode, atom() # :read_only | :workspace_write | :danger_full_access
field :working_directory, String.t()
field :skip_git_repo_check, boolean(), default: false
end
end
defmodule Codex.Turn.Options do
use TypedStruct
typedstruct do
field :output_schema, map()
end
end5. Utility Modules
Codex.OutputSchemaFile
Manages temporary JSON schema files.
Functions:
@spec create(map() | nil) :: {:ok, {String.t() | nil, function()}} | {:error, term()}Implementation:
- Creates temp directory in system tmp
- Writes schema JSON to file
- Returns path and cleanup function
- Cleanup function removes directory recursively
- Handles nil schema (no file created)
Data Flow Diagrams
Blocking Turn Execution
Client Thread Exec GenServer Port/Process
| | | |
|-- run(input) -------->| | |
| |-- start_link() ------>| |
| | |-- spawn() --------->|
| | | |-- codex-rs starts
| |-- call: run_turn ---->| |
| | |-- write stdin ----->|
| | | |
| |<------- event --------|<-- stdout line -----|
| |<------- event --------|<-- stdout line -----|
| |<------- event --------|<-- stdout line -----|
| | | |
| |<-- TurnCompleted -----|<-- stdout line -----|
| | | |-- codex-rs exits
| | |<-- exit_status -----|
| |-- stop() ------------>| |
| | |-- cleanup --------->|
|<-- {:ok, result} -----| | |Streaming Turn Execution
Client Thread Exec GenServer Port/Process
| | | |
|-- run_streamed() ---->| | |
| |-- start_link() ------>| |
| | |-- spawn() --------->|
|<-- {:ok, stream} -----| | |
| | | |-- codex-rs starts
| |-- call: run_turn ---->| |
| | |-- write stdin ----->|
| | | |
|-- next event -------->|-- fetch event ------->| |
|<-- ItemStarted -------|<----------------------|<-- stdout line -----|
| | | |
|-- next event -------->|-- fetch event ------->| |
|<-- ItemCompleted -----|<----------------------|<-- stdout line -----|
| | | |
|-- next event -------->|-- fetch event ------->| |
|<-- TurnCompleted -----|<----------------------|<-- stdout line -----|
| | | |-- codex-rs exits
|-- stream done ------->|-- stop() ------------>| |
| | |-- cleanup --------->|Process Model
Process Hierarchy
Application Supervisor
│
└─── Client Process (caller)
│
└─── Codex.Exec GenServer (per turn)
│
└─── Port (OS process)
│
└─── codex-rsKey Points:
- Exec GenServer is ephemeral (one per turn)
- No persistent supervision tree needed
- Client monitors Exec GenServer
- Exec GenServer monitors Port
- Clean shutdown cascades down hierarchy
Message Passing
Client → Thread (synchronous):
{:run, input, options}
{:run_streamed, input, options}Thread → Exec (GenServer call):
{:run_turn, input, codex_args}Port → Exec (Port messages):
{port, {:data, binary}}
{port, {:exit_status, integer}}
{:EXIT, port, reason}Exec → Client (via reference):
{:event, ref, event_struct}
{:error, ref, error_term}
{:done, ref}Error Handling Strategy
Error Categories
Configuration Errors (fail fast)
- Invalid options
- Missing codex binary
- Bad API credentials
- Return:
{:error, {:config, reason}}
Process Errors (recoverable)
- Spawn failure
- Port crash
- Return:
{:error, {:process, reason}}
Communication Errors (retryable)
- JSON parse error
- Protocol mismatch
- Return:
{:error, {:communication, reason}}
Turn Errors (expected)
- Agent failure
- API rate limit
- Model error
- Return:
{:error, {:turn_failed, error_struct}}
Error Propagation
codex-rs exit code ≠ 0
↓
Port sends {:exit_status, code}
↓
Exec GenServer receives exit
↓
Exec parses stderr buffer
↓
Exec sends {:error, ref, {:turn_failed, details}}
↓
Thread receives error
↓
Client gets {:error, {:turn_failed, details}}Cleanup Guarantees
All cleanup happens in GenServer terminate/2:
- Close Port
- Kill OS process if still running
- Remove temporary schema file
- Send telemetry event
Cleanup is guaranteed even on:
- Normal completion
- Client crash
- GenServer crash
- VM shutdown
Streaming Implementation
Stream Creation
def run_streamed(thread, input, opts) do
{schema_path, cleanup_fn} = OutputSchemaFile.create(opts.output_schema)
stream = Stream.resource(
# Start function
fn ->
{:ok, pid} = Exec.start_link(...)
ref = Exec.run_turn(pid, input, ...)
{pid, ref, cleanup_fn}
end,
# Next function
fn {pid, ref, cleanup_fn} = acc ->
receive do
{:event, ^ref, event} -> {[event], acc}
{:done, ^ref} -> {:halt, acc}
{:error, ^ref, error} -> raise error
after
30_000 -> raise TimeoutError
end
end,
# After function
fn {pid, _ref, cleanup_fn} ->
GenServer.stop(pid)
cleanup_fn.()
end
)
{:ok, stream}
endKey Properties:
- Lazy evaluation (events fetched on demand)
- Backpressure support (caller controls rate)
- Automatic cleanup (even if stream halted early)
- Timeout protection (30s default)
Event Buffering
In Exec GenServer:
- Small buffer (100 events) to handle bursts
- Blocks Port reading if buffer full (backpressure)
- Flush buffer on process exit
In Thread/Client:
- No buffering (events consumed immediately)
- Client controls pace via Stream consumption
Performance Considerations
Memory
Per Turn Overhead:
- GenServer state: ~1 KB
- Event buffers: ~10 KB
- Port buffers: ~4 KB
- Total: ~15 KB per concurrent turn
Streaming Benefits:
- Constant memory (O(1) per turn)
- Events processed and discarded
- No accumulation of full turn history
Latency
Event Propagation:
- codex-rs → stdout: < 1 ms
- Port → Exec: < 1 ms
- Exec → Client: < 1 ms
- Total: < 5 ms end-to-end
Optimization Opportunities:
- Batch small events
- Binary protocol (vs JSON)
- NIF for JSON parsing
Throughput
Bottlenecks:
- OpenAI API rate limits (primary)
- JSON parsing (secondary)
- Process scheduling (minimal)
Scalability:
- 100s of concurrent turns easily
- 1000s possible with tuning
- Limited by API, not SDK
Telemetry Integration
Events
[:codex, :turn, :start]
Measurements: %{system_time: integer()}
Metadata: %{thread_id: string(), input_length: integer()}
[:codex, :turn, :stop]
Measurements: %{duration: integer()}
Metadata: %{thread_id: string(), usage: Usage.t()}
[:codex, :turn, :exception]
Measurements: %{duration: integer()}
Metadata: %{thread_id: string(), error: term()}
[:codex, :item, :completed]
Measurements: %{system_time: integer()}
Metadata: %{thread_id: string(), item_type: atom(), item_id: string()}Usage
:telemetry.attach_many(
"codex-handler",
[
[:codex, :turn, :start],
[:codex, :turn, :stop],
[:codex, :turn, :exception]
],
&MyApp.TelemetryHandler.handle_event/4,
nil
)Security Considerations
Sandbox Modes
:read_only: Codex can read files but not write:workspace_write: Codex can write within working directory:danger_full_access: Codex has unrestricted access
Recommendations:
- Use
:read_onlyfor analysis tasks - Use
:workspace_writefor development - Avoid
:danger_full_accessunless necessary
Input Validation
- Sanitize file paths
- Validate schema JSON
- Escape shell arguments (handled by codex-rs)
Secrets Management
- Never log API keys
- Use environment variables
- Rotate keys regularly
- Use per-project API keys
Extension Points
Custom Event Handlers
defmodule MyApp.CodexHandler do
def handle_event(%ItemCompleted{item: %CommandExecution{} = cmd}) do
Logger.info("Command: #{cmd.command}, exit: #{cmd.exit_code}")
end
def handle_event(_), do: :ok
end
# Use with streaming
{:ok, stream} = Thread.run_streamed(thread, input)
Enum.each(stream, &MyApp.CodexHandler.handle_event/1)Custom Telemetry
defmodule MyApp.Metrics do
def track_usage(%Usage{} = usage) do
:telemetry.execute(
[:my_app, :codex, :tokens],
%{total: usage.input_tokens + usage.output_tokens},
%{source: :codex}
)
end
endSupervision
defmodule MyApp.CodexSupervisor do
use Supervisor
def start_link(opts) do
Supervisor.start_link(__MODULE__, opts, name: __MODULE__)
end
def init(_opts) do
children = [
{Task.Supervisor, name: MyApp.CodexTaskSupervisor}
]
Supervisor.init(children, strategy: :one_for_one)
end
end
# Use supervised tasks for concurrent turns
Task.Supervisor.async(MyApp.CodexTaskSupervisor, fn ->
Thread.run(thread, input)
end)Shared Runtime Modules
Extracted from duplicated patterns across the codebase, these modules centralize cross-cutting concerns:
Codex.Runtime.Erlexec: Unified erlexec startup configuration shared by Exec, Connection, Sessions, ShellTool, and MCP StdioCodex.Runtime.Env: Subprocess environment construction shared between Exec and AppServer.Connection; setsCODEX_INTERNAL_ORIGINATOR_OVERRIDE=codex_sdk_elixirby defaultCodex.Runtime.KeyringWarning: Deduplicated warn-once logic from Auth and MCP.OAuthCodex.Config.BaseURL:OPENAI_BASE_URLenv fallback with explicit option precedence (option → env → default)Codex.Config.OptionNormalizers: Shared validation for reasoning summary, verbosity, and history persistence across Options and Thread.OptionsCodex.Config.Overrides: Config override serialization, nested map auto-flattening (flatten_config_map/1), TOML value validation, and deduplicatednormalize_config_overrides/1
Realtime and Voice Modules
The SDK includes two subsystems for voice interactions that make direct API calls to OpenAI rather than wrapping the codex CLI.
Realtime API (Codex.Realtime.*)
Full integration with OpenAI's Realtime API for bidirectional voice streaming:
Codex.Realtime.Session: WebSocket-based GenServer using WebSockex; traps linked socket exits and runs tool calls outside the callback path so the session stays responsiveCodex.Realtime.Runner: High-level orchestrator for agent sessions with automatic tool call handling, handoff execution, and guardrail integrationCodex.Realtime.Agent: Agent configuration with instructions, tools, and handoffs- PubSub-based event broadcasting with idempotent subscribe/unsubscribe
- Semantic VAD turn detection with eagerness, silence duration, and prefix padding
Voice Pipeline (Codex.Voice.*)
Non-realtime STT -> Workflow -> TTS processing:
Codex.Voice.Pipeline: Orchestrates speech-to-text, workflow processing, and text-to-speech withasync_nolinkvia ephemeralTaskSupervisorCodex.Voice.Workflow: Behaviour for custom workflow implementations (SimpleWorkflow,AgentWorkflow)Codex.Voice.Model.*: Behaviours and implementations for STT/TTS models (OpenAIgpt-4o-transcribeandgpt-4o-mini-tts)StreamQueue-backed audio queues replacing Agent-backed queues for backpressure and close semantics
Auth precedence for both: CODEX_API_KEY → auth.json OPENAI_API_KEY → OPENAI_API_KEY.
Future Enhancements
Potential Improvements
- Native JSON Parsing: NIF for faster event parsing
- Binary Protocol: Reduce overhead vs JSONL
- WebSocket Streaming: Alternative to Port for long-running sessions
- Event Persistence: Store events for replay/debugging
- Distributed Turns: Run turns on remote nodes
- Rate Limiting: Built-in API rate limiting
- Caching: Cache common responses
- Metrics Dashboard: Real-time monitoring UI
API Stability
Stable (v1.0+):
- Core module interfaces
- Event/item struct shapes
- Option struct fields
Unstable (may change):
- Telemetry event names
- Internal GenServer implementation
- Error tuple formats
Experimental:
- Custom event handlers
- Advanced streaming modes
- Performance optimizations