Architecture Guide
View SourceSystem Overview
The Elixir Codex SDK is a layered architecture that wraps the codex-rs
CLI executable and provides an idiomatic OTP-based interface. The system is designed around three core principles:
- Process Isolation: Each turn execution runs in its own GenServer
- Clean Separation: Clear boundaries between client API, process management, and IPC
- Robust Error Handling: Failures are isolated and cleanly propagated
Component Architecture
High-Level Component Diagram
┌───────────────────────────────────────────────────────────────┐
│ Client Code │
│ (User application using Codex SDK) │
└────────────────┬──────────────────────────────────────────────┘
│
│ Public API
▼
┌───────────────────────────────────────────────────────────────┐
│ Codex Module │
│ - start_thread/2 │
│ - resume_thread/3 │
│ (Factory for Thread instances) │
└────────────────┬──────────────────────────────────────────────┘
│
│ Returns Thread struct
▼
┌───────────────────────────────────────────────────────────────┐
│ Codex.Thread Module │
│ - run/3 (blocking) │
│ - run_streamed/3 (streaming) │
│ (Manages turn execution lifecycle) │
└────────────────┬──────────────────────────────────────────────┘
│
│ Starts GenServer
▼
┌───────────────────────────────────────────────────────────────┐
│ Codex.Exec GenServer │
│ - Spawns codex-rs process │
│ - Manages Port communication │
│ - Parses JSONL events │
│ - Handles process lifecycle │
└────────────────┬──────────────────────────────────────────────┘
│
│ Port (stdin/stdout)
▼
┌───────────────────────────────────────────────────────────────┐
│ codex-rs Process │
│ - OpenAI API integration │
│ - Command execution │
│ - File operations │
│ - Event emission │
└───────────────────────────────────────────────────────────────┘
Module Breakdown
1. Codex Module
Purpose: Main entry point and factory for thread instances.
Responsibilities:
- Validate global options (API key, base URL, codex path)
- Create new thread instances
- Resume existing threads from saved sessions
State: Stateless module (pure functions)
Key Functions:
@spec start_thread(Codex.Options.t(), Codex.Thread.Options.t()) ::
{:ok, Codex.Thread.t()} | {:error, term()}
@spec resume_thread(String.t(), Codex.Options.t(), Codex.Thread.Options.t()) ::
{:ok, Codex.Thread.t()} | {:error, term()}
Error Handling:
- Validates codex binary exists and is executable
- Validates options format
- Returns descriptive errors for invalid configurations
2. Codex.Thread Module
Purpose: Manages individual conversation threads.
Responsibilities:
- Execute turns (blocking and streaming modes)
- Maintain thread ID and options
- Coordinate with Exec GenServer
- Handle structured output schemas
State: Encapsulated in %Codex.Thread{}
struct
defstruct [
:thread_id, # String.t() | nil (populated after first turn)
:codex_opts, # %Codex.Options{}
:thread_opts # %Codex.Thread.Options{}
]
Key Functions:
@spec run(t(), String.t(), Codex.Turn.Options.t()) ::
{:ok, Codex.Turn.Result.t()} | {:error, term()}
@spec run_streamed(t(), String.t(), Codex.Turn.Options.t()) ::
{:ok, Enumerable.t()} | {:error, term()}
Execution Flow (Blocking Mode):
- Create output schema file if needed
- Start
Codex.Exec
GenServer with options - Wait for events, accumulating items
- Extract final response from last
AgentMessage
- Return
TurnResult
whenTurnCompleted
received - Clean up schema file and Exec process
Execution Flow (Streaming Mode):
- Create output schema file if needed
- Start
Codex.Exec
GenServer with options - Return Stream that yields events as they arrive
- Clean up when stream completes or is halted
3. Codex.Exec GenServer
Purpose: Manages the lifecycle of a single codex-rs
process execution.
Responsibilities:
- Spawn codex-rs process via Port
- Send input prompt via stdin
- Receive and parse JSONL events from stdout
- Monitor process health and exit status
- Clean up resources on completion or crash
State:
defstruct [
:port, # Port.t()
:caller, # pid() of requesting process
:ref, # reference() for synchronization
:buffer, # String.t() for incomplete lines
:exit_status, # integer() | nil
:stderr_buffer # String.t() for error messages
]
Lifecycle:
init/1:
- Build command args from options
- Set environment variables
- Spawn Port with codex-rs process
- Send telemetry event (turn started)
Message Handling:
{port, {:data, data}}
: Parse JSONL lines, send events to caller{port, {:exit_status, status}}
: Handle process exit{:EXIT, port, reason}
: Handle unexpected crashes
terminate/2:
- Close port if still open
- Send telemetry event (turn completed/failed)
- Clean up any remaining resources
Error Scenarios:
- Spawn failure: Return error immediately
- JSON parse error: Emit error event, continue processing
- Non-zero exit: Emit
TurnFailed
with stderr contents - Process crash: Emit
TurnFailed
with crash reason
GenServer API:
@spec start_link(keyword()) :: GenServer.on_start()
@spec run_turn(pid(), String.t(), map()) :: {:ok, reference()}
4. Type Modules
Codex.Events
Defines all event types emitted during turn execution.
TypedStruct Definitions:
defmodule Codex.Events.ThreadStarted do
use TypedStruct
typedstruct do
field :type, :thread_started, enforce: true
field :thread_id, String.t(), enforce: true
end
end
# Similar for:
# - TurnStarted
# - TurnCompleted (with Usage)
# - TurnFailed (with ThreadError)
# - ItemStarted (with ThreadItem)
# - ItemUpdated (with ThreadItem)
# - ItemCompleted (with ThreadItem)
Codex.Items
Defines all item types and their variants.
Item Types:
AgentMessage
: Text or JSON responseReasoning
: Agent's thinking summaryCommandExecution
: Command with output and exit codeFileChange
: File modifications with changes arrayMcpToolCall
: MCP tool invocationWebSearch
: Search queryTodoList
: Agent's task listError
: Non-fatal error
Example:
defmodule Codex.Items.CommandExecution do
use TypedStruct
typedstruct do
field :id, String.t(), enforce: true
field :type, :command_execution, default: :command_execution
field :command, String.t(), enforce: true
field :aggregated_output, String.t(), default: ""
field :exit_code, integer()
field :status, atom(), enforce: true
end
end
Codex.Options
Configuration structs for each level.
defmodule Codex.Options do
use TypedStruct
typedstruct do
field :codex_path_override, String.t()
field :base_url, String.t()
field :api_key, String.t()
end
end
defmodule Codex.Thread.Options do
use TypedStruct
typedstruct do
field :model, String.t()
field :sandbox_mode, atom() # :read_only | :workspace_write | :danger_full_access
field :working_directory, String.t()
field :skip_git_repo_check, boolean(), default: false
end
end
defmodule Codex.Turn.Options do
use TypedStruct
typedstruct do
field :output_schema, map()
end
end
5. Utility Modules
Codex.OutputSchemaFile
Manages temporary JSON schema files.
Functions:
@spec create(map() | nil) :: {:ok, {String.t() | nil, function()}} | {:error, term()}
Implementation:
- Creates temp directory in system tmp
- Writes schema JSON to file
- Returns path and cleanup function
- Cleanup function removes directory recursively
- Handles nil schema (no file created)
Data Flow Diagrams
Blocking Turn Execution
Client Thread Exec GenServer Port/Process
| | | |
|-- run(input) -------->| | |
| |-- start_link() ------>| |
| | |-- spawn() --------->|
| | | |-- codex-rs starts
| |-- call: run_turn ---->| |
| | |-- write stdin ----->|
| | | |
| |<------- event --------|<-- stdout line -----|
| |<------- event --------|<-- stdout line -----|
| |<------- event --------|<-- stdout line -----|
| | | |
| |<-- TurnCompleted -----|<-- stdout line -----|
| | | |-- codex-rs exits
| | |<-- exit_status -----|
| |-- stop() ------------>| |
| | |-- cleanup --------->|
|<-- {:ok, result} -----| | |
Streaming Turn Execution
Client Thread Exec GenServer Port/Process
| | | |
|-- run_streamed() ---->| | |
| |-- start_link() ------>| |
| | |-- spawn() --------->|
|<-- {:ok, stream} -----| | |
| | | |-- codex-rs starts
| |-- call: run_turn ---->| |
| | |-- write stdin ----->|
| | | |
|-- next event -------->|-- fetch event ------->| |
|<-- ItemStarted -------|<----------------------|<-- stdout line -----|
| | | |
|-- next event -------->|-- fetch event ------->| |
|<-- ItemCompleted -----|<----------------------|<-- stdout line -----|
| | | |
|-- next event -------->|-- fetch event ------->| |
|<-- TurnCompleted -----|<----------------------|<-- stdout line -----|
| | | |-- codex-rs exits
|-- stream done ------->|-- stop() ------------>| |
| | |-- cleanup --------->|
Process Model
Process Hierarchy
Application Supervisor
│
└─── Client Process (caller)
│
└─── Codex.Exec GenServer (per turn)
│
└─── Port (OS process)
│
└─── codex-rs
Key Points:
- Exec GenServer is ephemeral (one per turn)
- No persistent supervision tree needed
- Client monitors Exec GenServer
- Exec GenServer monitors Port
- Clean shutdown cascades down hierarchy
Message Passing
Client → Thread (synchronous):
{:run, input, options}
{:run_streamed, input, options}
Thread → Exec (GenServer call):
{:run_turn, input, codex_args}
Port → Exec (Port messages):
{port, {:data, binary}}
{port, {:exit_status, integer}}
{:EXIT, port, reason}
Exec → Client (via reference):
{:event, ref, event_struct}
{:error, ref, error_term}
{:done, ref}
Error Handling Strategy
Error Categories
Configuration Errors (fail fast)
- Invalid options
- Missing codex binary
- Bad API credentials
- Return:
{:error, {:config, reason}}
Process Errors (recoverable)
- Spawn failure
- Port crash
- Return:
{:error, {:process, reason}}
Communication Errors (retryable)
- JSON parse error
- Protocol mismatch
- Return:
{:error, {:communication, reason}}
Turn Errors (expected)
- Agent failure
- API rate limit
- Model error
- Return:
{:error, {:turn_failed, error_struct}}
Error Propagation
codex-rs exit code ≠ 0
↓
Port sends {:exit_status, code}
↓
Exec GenServer receives exit
↓
Exec parses stderr buffer
↓
Exec sends {:error, ref, {:turn_failed, details}}
↓
Thread receives error
↓
Client gets {:error, {:turn_failed, details}}
Cleanup Guarantees
All cleanup happens in GenServer terminate/2
:
- Close Port
- Kill OS process if still running
- Remove temporary schema file
- Send telemetry event
Cleanup is guaranteed even on:
- Normal completion
- Client crash
- GenServer crash
- VM shutdown
Streaming Implementation
Stream Creation
def run_streamed(thread, input, opts) do
{schema_path, cleanup_fn} = OutputSchemaFile.create(opts.output_schema)
stream = Stream.resource(
# Start function
fn ->
{:ok, pid} = Exec.start_link(...)
ref = Exec.run_turn(pid, input, ...)
{pid, ref, cleanup_fn}
end,
# Next function
fn {pid, ref, cleanup_fn} = acc ->
receive do
{:event, ^ref, event} -> {[event], acc}
{:done, ^ref} -> {:halt, acc}
{:error, ^ref, error} -> raise error
after
30_000 -> raise TimeoutError
end
end,
# After function
fn {pid, _ref, cleanup_fn} ->
GenServer.stop(pid)
cleanup_fn.()
end
)
{:ok, stream}
end
Key Properties:
- Lazy evaluation (events fetched on demand)
- Backpressure support (caller controls rate)
- Automatic cleanup (even if stream halted early)
- Timeout protection (30s default)
Event Buffering
In Exec GenServer:
- Small buffer (100 events) to handle bursts
- Blocks Port reading if buffer full (backpressure)
- Flush buffer on process exit
In Thread/Client:
- No buffering (events consumed immediately)
- Client controls pace via Stream consumption
Performance Considerations
Memory
Per Turn Overhead:
- GenServer state: ~1 KB
- Event buffers: ~10 KB
- Port buffers: ~4 KB
- Total: ~15 KB per concurrent turn
Streaming Benefits:
- Constant memory (O(1) per turn)
- Events processed and discarded
- No accumulation of full turn history
Latency
Event Propagation:
- codex-rs → stdout: < 1 ms
- Port → Exec: < 1 ms
- Exec → Client: < 1 ms
- Total: < 5 ms end-to-end
Optimization Opportunities:
- Batch small events
- Binary protocol (vs JSON)
- NIF for JSON parsing
Throughput
Bottlenecks:
- OpenAI API rate limits (primary)
- JSON parsing (secondary)
- Process scheduling (minimal)
Scalability:
- 100s of concurrent turns easily
- 1000s possible with tuning
- Limited by API, not SDK
Testing Strategy
Unit Tests
Codex Module:
- Option validation
- Thread creation
- Error cases
Thread Module:
- Turn execution (mocked Exec)
- Option passing
- Schema handling
Exec GenServer:
- Process spawning
- Event parsing
- Error handling
- Cleanup
Integration Tests
With Mock codex-rs:
- Script that emits test events
- No real API calls
- Fast and deterministic
With Real codex-rs:
- Tagged
:integration
- Requires API key
- Slow but comprehensive
Property Tests
Event Parsing:
- Generate random valid events
- Verify round-trip JSON encoding
- Ensure no crashes
Stream Properties:
- Events in order
- No duplicates
- Complete consumption
Chaos Tests
Process Crashes:
- Kill Exec during turn
- Kill Port during turn
- Verify cleanup happens
Resource Exhaustion:
- Many concurrent turns
- Large event payloads
- Verify no leaks
Telemetry Integration
Events
[:codex, :turn, :start]
Measurements: %{system_time: integer()}
Metadata: %{thread_id: string(), input_length: integer()}
[:codex, :turn, :stop]
Measurements: %{duration: integer()}
Metadata: %{thread_id: string(), usage: Usage.t()}
[:codex, :turn, :exception]
Measurements: %{duration: integer()}
Metadata: %{thread_id: string(), error: term()}
[:codex, :item, :completed]
Measurements: %{system_time: integer()}
Metadata: %{thread_id: string(), item_type: atom(), item_id: string()}
Usage
:telemetry.attach_many(
"codex-handler",
[
[:codex, :turn, :start],
[:codex, :turn, :stop],
[:codex, :turn, :exception]
],
&MyApp.TelemetryHandler.handle_event/4,
nil
)
Security Considerations
Sandbox Modes
:read_only
: Codex can read files but not write:workspace_write
: Codex can write within working directory:danger_full_access
: Codex has unrestricted access
Recommendations:
- Use
:read_only
for analysis tasks - Use
:workspace_write
for development - Avoid
:danger_full_access
unless necessary
Input Validation
- Sanitize file paths
- Validate schema JSON
- Escape shell arguments (handled by codex-rs)
Secrets Management
- Never log API keys
- Use environment variables
- Rotate keys regularly
- Use per-project API keys
Extension Points
Custom Event Handlers
defmodule MyApp.CodexHandler do
def handle_event(%ItemCompleted{item: %CommandExecution{} = cmd}) do
Logger.info("Command: #{cmd.command}, exit: #{cmd.exit_code}")
end
def handle_event(_), do: :ok
end
# Use with streaming
{:ok, stream} = Thread.run_streamed(thread, input)
Enum.each(stream, &MyApp.CodexHandler.handle_event/1)
Custom Telemetry
defmodule MyApp.Metrics do
def track_usage(%Usage{} = usage) do
:telemetry.execute(
[:my_app, :codex, :tokens],
%{total: usage.input_tokens + usage.output_tokens},
%{source: :codex}
)
end
end
Supervision
defmodule MyApp.CodexSupervisor do
use Supervisor
def start_link(opts) do
Supervisor.start_link(__MODULE__, opts, name: __MODULE__)
end
def init(_opts) do
children = [
{Task.Supervisor, name: MyApp.CodexTaskSupervisor}
]
Supervisor.init(children, strategy: :one_for_one)
end
end
# Use supervised tasks for concurrent turns
Task.Supervisor.async(MyApp.CodexTaskSupervisor, fn ->
Thread.run(thread, input)
end)
Future Enhancements
Potential Improvements
- Native JSON Parsing: NIF for faster event parsing
- Binary Protocol: Reduce overhead vs JSONL
- WebSocket Streaming: Alternative to Port for long-running sessions
- Event Persistence: Store events for replay/debugging
- Distributed Turns: Run turns on remote nodes
- Rate Limiting: Built-in API rate limiting
- Caching: Cache common responses
- Metrics Dashboard: Real-time monitoring UI
API Stability
Stable (v1.0+):
- Core module interfaces
- Event/item struct shapes
- Option struct fields
Unstable (may change):
- Telemetry event names
- Internal GenServer implementation
- Error tuple formats
Experimental:
- Custom event handlers
- Advanced streaming modes
- Performance optimizations