Architecture Guide

View Source

System Overview

The Elixir Codex SDK is a layered architecture that wraps the codex-rs CLI executable and provides an idiomatic OTP-based interface. The system is designed around three core principles:

  1. Process Isolation: Each turn execution runs in its own GenServer
  2. Clean Separation: Clear boundaries between client API, process management, and IPC
  3. Robust Error Handling: Failures are isolated and cleanly propagated

Component Architecture

High-Level Component Diagram


                        Client Code                             
  (User application using Codex SDK)                           

                 
                  Public API
                 

                      Codex Module                              
  - start_thread/2                                             
  - resume_thread/3                                            
  (Factory for Thread instances)                               

                 
                  Returns Thread struct
                 

                   Codex.Thread Module                          
  - run/3 (blocking)                                           
  - run_streamed/3 (streaming)                                 
  (Manages turn execution lifecycle)                           

                 
                  Starts GenServer
                 

                   Codex.Exec GenServer                         
  - Spawns codex-rs process                                    
  - Manages Port communication                                 
  - Parses JSONL events                                        
  - Handles process lifecycle                                  

                 
                  Port (stdin/stdout)
                 

                      codex-rs Process                          
  - OpenAI API integration                                     
  - Command execution                                          
  - File operations                                            
  - Event emission                                             

Module Breakdown

1. Codex Module

Purpose: Main entry point and factory for thread instances.

Responsibilities:

  • Validate global options (API key, base URL, codex path)
  • Create new thread instances
  • Resume existing threads from saved sessions

State: Stateless module (pure functions)

Key Functions:

@spec start_thread(Codex.Options.t(), Codex.Thread.Options.t()) ::
  {:ok, Codex.Thread.t()} | {:error, term()}

@spec resume_thread(String.t(), Codex.Options.t(), Codex.Thread.Options.t()) ::
  {:ok, Codex.Thread.t()} | {:error, term()}

Error Handling:

  • Validates codex binary exists and is executable
  • Validates options format
  • Returns descriptive errors for invalid configurations

2. Codex.Thread Module

Purpose: Manages individual conversation threads.

Responsibilities:

  • Execute turns (blocking and streaming modes)
  • Maintain thread ID and options
  • Coordinate with Exec GenServer
  • Handle structured output schemas

State: Encapsulated in %Codex.Thread{} struct

defstruct [
  :thread_id,          # String.t() | nil (populated after first turn)
  :codex_opts,         # %Codex.Options{}
  :thread_opts         # %Codex.Thread.Options{}
]

Key Functions:

@spec run(t(), String.t(), Codex.Turn.Options.t()) ::
  {:ok, Codex.Turn.Result.t()} | {:error, term()}

@spec run_streamed(t(), String.t(), Codex.Turn.Options.t()) ::
  {:ok, Enumerable.t()} | {:error, term()}

Execution Flow (Blocking Mode):

  1. Create output schema file if needed
  2. Start Codex.Exec GenServer with options
  3. Wait for events, accumulating items
  4. Extract final response from last AgentMessage
  5. Return TurnResult when TurnCompleted received
  6. Clean up schema file and Exec process

Execution Flow (Streaming Mode):

  1. Create output schema file if needed
  2. Start Codex.Exec GenServer with options
  3. Return Stream that yields events as they arrive
  4. Clean up when stream completes or is halted

3. Codex.Exec GenServer

Purpose: Manages the lifecycle of a single codex-rs process execution.

Responsibilities:

  • Spawn codex-rs process via Port
  • Send input prompt via stdin
  • Receive and parse JSONL events from stdout
  • Monitor process health and exit status
  • Clean up resources on completion or crash

State:

defstruct [
  :port,               # Port.t()
  :caller,             # pid() of requesting process
  :ref,                # reference() for synchronization
  :buffer,             # String.t() for incomplete lines
  :exit_status,        # integer() | nil
  :stderr_buffer       # String.t() for error messages
]

Lifecycle:

  1. init/1:

    • Build command args from options
    • Set environment variables
    • Spawn Port with codex-rs process
    • Send telemetry event (turn started)
  2. Message Handling:

    • {port, {:data, data}}: Parse JSONL lines, send events to caller
    • {port, {:exit_status, status}}: Handle process exit
    • {:EXIT, port, reason}: Handle unexpected crashes
  3. terminate/2:

    • Close port if still open
    • Send telemetry event (turn completed/failed)
    • Clean up any remaining resources

Error Scenarios:

  • Spawn failure: Return error immediately
  • JSON parse error: Emit error event, continue processing
  • Non-zero exit: Emit TurnFailed with stderr contents
  • Process crash: Emit TurnFailed with crash reason

GenServer API:

@spec start_link(keyword()) :: GenServer.on_start()
@spec run_turn(pid(), String.t(), map()) :: {:ok, reference()}

4. Type Modules

Codex.Events

Defines all event types emitted during turn execution.

TypedStruct Definitions:

defmodule Codex.Events.ThreadStarted do
  use TypedStruct
  typedstruct do
    field :type, :thread_started, enforce: true
    field :thread_id, String.t(), enforce: true
  end
end

# Similar for:
# - TurnStarted
# - TurnCompleted (with Usage)
# - TurnFailed (with ThreadError)
# - ItemStarted (with ThreadItem)
# - ItemUpdated (with ThreadItem)
# - ItemCompleted (with ThreadItem)

Codex.Items

Defines all item types and their variants.

Item Types:

  • AgentMessage: Text or JSON response
  • Reasoning: Agent's thinking summary
  • CommandExecution: Command with output and exit code
  • FileChange: File modifications with changes array
  • McpToolCall: MCP tool invocation
  • WebSearch: Search query
  • TodoList: Agent's task list
  • Error: Non-fatal error

Example:

defmodule Codex.Items.CommandExecution do
  use TypedStruct
  typedstruct do
    field :id, String.t(), enforce: true
    field :type, :command_execution, default: :command_execution
    field :command, String.t(), enforce: true
    field :aggregated_output, String.t(), default: ""
    field :exit_code, integer()
    field :status, atom(), enforce: true
  end
end

Codex.Options

Configuration structs for each level.

defmodule Codex.Options do
  use TypedStruct
  typedstruct do
    field :codex_path_override, String.t()
    field :base_url, String.t()
    field :api_key, String.t()
  end
end

defmodule Codex.Thread.Options do
  use TypedStruct
  typedstruct do
    field :model, String.t()
    field :sandbox_mode, atom()  # :read_only | :workspace_write | :danger_full_access
    field :working_directory, String.t()
    field :skip_git_repo_check, boolean(), default: false
  end
end

defmodule Codex.Turn.Options do
  use TypedStruct
  typedstruct do
    field :output_schema, map()
  end
end

5. Utility Modules

Codex.OutputSchemaFile

Manages temporary JSON schema files.

Functions:

@spec create(map() | nil) :: {:ok, {String.t() | nil, function()}} | {:error, term()}

Implementation:

  • Creates temp directory in system tmp
  • Writes schema JSON to file
  • Returns path and cleanup function
  • Cleanup function removes directory recursively
  • Handles nil schema (no file created)

Data Flow Diagrams

Blocking Turn Execution

Client                  Thread              Exec GenServer         Port/Process
  |                       |                       |                     |
  |-- run(input) -------->|                       |                     |
  |                       |-- start_link() ------>|                     |
  |                       |                       |-- spawn() --------->|
  |                       |                       |                     |-- codex-rs starts
  |                       |-- call: run_turn ---->|                     |
  |                       |                       |-- write stdin ----->|
  |                       |                       |                     |
  |                       |<------- event --------|<-- stdout line -----|
  |                       |<------- event --------|<-- stdout line -----|
  |                       |<------- event --------|<-- stdout line -----|
  |                       |                       |                     |
  |                       |<-- TurnCompleted -----|<-- stdout line -----|
  |                       |                       |                     |-- codex-rs exits
  |                       |                       |<-- exit_status -----|
  |                       |-- stop() ------------>|                     |
  |                       |                       |-- cleanup --------->|
  |<-- {:ok, result} -----|                       |                     |

Streaming Turn Execution

Client                  Thread              Exec GenServer         Port/Process
  |                       |                       |                     |
  |-- run_streamed() ---->|                       |                     |
  |                       |-- start_link() ------>|                     |
  |                       |                       |-- spawn() --------->|
  |<-- {:ok, stream} -----|                       |                     |
  |                       |                       |                     |-- codex-rs starts
  |                       |-- call: run_turn ---->|                     |
  |                       |                       |-- write stdin ----->|
  |                       |                       |                     |
  |-- next event -------->|-- fetch event ------->|                     |
  |<-- ItemStarted -------|<----------------------|<-- stdout line -----|
  |                       |                       |                     |
  |-- next event -------->|-- fetch event ------->|                     |
  |<-- ItemCompleted -----|<----------------------|<-- stdout line -----|
  |                       |                       |                     |
  |-- next event -------->|-- fetch event ------->|                     |
  |<-- TurnCompleted -----|<----------------------|<-- stdout line -----|
  |                       |                       |                     |-- codex-rs exits
  |-- stream done ------->|-- stop() ------------>|                     |
  |                       |                       |-- cleanup --------->|

Process Model

Process Hierarchy

Application Supervisor
    
     Client Process (caller)
            
             Codex.Exec GenServer (per turn)
                    
                     Port (OS process)
                            
                             codex-rs

Key Points:

  • Exec GenServer is ephemeral (one per turn)
  • No persistent supervision tree needed
  • Client monitors Exec GenServer
  • Exec GenServer monitors Port
  • Clean shutdown cascades down hierarchy

Message Passing

Client → Thread (synchronous):

{:run, input, options}
{:run_streamed, input, options}

Thread → Exec (GenServer call):

{:run_turn, input, codex_args}

Port → Exec (Port messages):

{port, {:data, binary}}
{port, {:exit_status, integer}}
{:EXIT, port, reason}

Exec → Client (via reference):

{:event, ref, event_struct}
{:error, ref, error_term}
{:done, ref}

Error Handling Strategy

Error Categories

  1. Configuration Errors (fail fast)

    • Invalid options
    • Missing codex binary
    • Bad API credentials
    • Return: {:error, {:config, reason}}
  2. Process Errors (recoverable)

    • Spawn failure
    • Port crash
    • Return: {:error, {:process, reason}}
  3. Communication Errors (retryable)

    • JSON parse error
    • Protocol mismatch
    • Return: {:error, {:communication, reason}}
  4. Turn Errors (expected)

    • Agent failure
    • API rate limit
    • Model error
    • Return: {:error, {:turn_failed, error_struct}}

Error Propagation

codex-rs exit code  0
    
Port sends {:exit_status, code}
    
Exec GenServer receives exit
    
Exec parses stderr buffer
    
Exec sends {:error, ref, {:turn_failed, details}}
    
Thread receives error
    
Client gets {:error, {:turn_failed, details}}

Cleanup Guarantees

All cleanup happens in GenServer terminate/2:

  • Close Port
  • Kill OS process if still running
  • Remove temporary schema file
  • Send telemetry event

Cleanup is guaranteed even on:

  • Normal completion
  • Client crash
  • GenServer crash
  • VM shutdown

Streaming Implementation

Stream Creation

def run_streamed(thread, input, opts) do
  {schema_path, cleanup_fn} = OutputSchemaFile.create(opts.output_schema)

  stream = Stream.resource(
    # Start function
    fn ->
      {:ok, pid} = Exec.start_link(...)
      ref = Exec.run_turn(pid, input, ...)
      {pid, ref, cleanup_fn}
    end,

    # Next function
    fn {pid, ref, cleanup_fn} = acc ->
      receive do
        {:event, ^ref, event} -> {[event], acc}
        {:done, ^ref} -> {:halt, acc}
        {:error, ^ref, error} -> raise error
      after
        30_000 -> raise TimeoutError
      end
    end,

    # After function
    fn {pid, _ref, cleanup_fn} ->
      GenServer.stop(pid)
      cleanup_fn.()
    end
  )

  {:ok, stream}
end

Key Properties:

  • Lazy evaluation (events fetched on demand)
  • Backpressure support (caller controls rate)
  • Automatic cleanup (even if stream halted early)
  • Timeout protection (30s default)

Event Buffering

In Exec GenServer:

  • Small buffer (100 events) to handle bursts
  • Blocks Port reading if buffer full (backpressure)
  • Flush buffer on process exit

In Thread/Client:

  • No buffering (events consumed immediately)
  • Client controls pace via Stream consumption

Performance Considerations

Memory

Per Turn Overhead:

  • GenServer state: ~1 KB
  • Event buffers: ~10 KB
  • Port buffers: ~4 KB
  • Total: ~15 KB per concurrent turn

Streaming Benefits:

  • Constant memory (O(1) per turn)
  • Events processed and discarded
  • No accumulation of full turn history

Latency

Event Propagation:

  • codex-rs → stdout: < 1 ms
  • Port → Exec: < 1 ms
  • Exec → Client: < 1 ms
  • Total: < 5 ms end-to-end

Optimization Opportunities:

  • Batch small events
  • Binary protocol (vs JSON)
  • NIF for JSON parsing

Throughput

Bottlenecks:

  1. OpenAI API rate limits (primary)
  2. JSON parsing (secondary)
  3. Process scheduling (minimal)

Scalability:

  • 100s of concurrent turns easily
  • 1000s possible with tuning
  • Limited by API, not SDK

Testing Strategy

Unit Tests

Codex Module:

  • Option validation
  • Thread creation
  • Error cases

Thread Module:

  • Turn execution (mocked Exec)
  • Option passing
  • Schema handling

Exec GenServer:

  • Process spawning
  • Event parsing
  • Error handling
  • Cleanup

Integration Tests

With Mock codex-rs:

  • Script that emits test events
  • No real API calls
  • Fast and deterministic

With Real codex-rs:

  • Tagged :integration
  • Requires API key
  • Slow but comprehensive

Property Tests

Event Parsing:

  • Generate random valid events
  • Verify round-trip JSON encoding
  • Ensure no crashes

Stream Properties:

  • Events in order
  • No duplicates
  • Complete consumption

Chaos Tests

Process Crashes:

  • Kill Exec during turn
  • Kill Port during turn
  • Verify cleanup happens

Resource Exhaustion:

  • Many concurrent turns
  • Large event payloads
  • Verify no leaks

Telemetry Integration

Events

[:codex, :turn, :start]
  Measurements: %{system_time: integer()}
  Metadata: %{thread_id: string(), input_length: integer()}

[:codex, :turn, :stop]
  Measurements: %{duration: integer()}
  Metadata: %{thread_id: string(), usage: Usage.t()}

[:codex, :turn, :exception]
  Measurements: %{duration: integer()}
  Metadata: %{thread_id: string(), error: term()}

[:codex, :item, :completed]
  Measurements: %{system_time: integer()}
  Metadata: %{thread_id: string(), item_type: atom(), item_id: string()}

Usage

:telemetry.attach_many(
  "codex-handler",
  [
    [:codex, :turn, :start],
    [:codex, :turn, :stop],
    [:codex, :turn, :exception]
  ],
  &MyApp.TelemetryHandler.handle_event/4,
  nil
)

Security Considerations

Sandbox Modes

  • :read_only: Codex can read files but not write
  • :workspace_write: Codex can write within working directory
  • :danger_full_access: Codex has unrestricted access

Recommendations:

  • Use :read_only for analysis tasks
  • Use :workspace_write for development
  • Avoid :danger_full_access unless necessary

Input Validation

  • Sanitize file paths
  • Validate schema JSON
  • Escape shell arguments (handled by codex-rs)

Secrets Management

  • Never log API keys
  • Use environment variables
  • Rotate keys regularly
  • Use per-project API keys

Extension Points

Custom Event Handlers

defmodule MyApp.CodexHandler do
  def handle_event(%ItemCompleted{item: %CommandExecution{} = cmd}) do
    Logger.info("Command: #{cmd.command}, exit: #{cmd.exit_code}")
  end

  def handle_event(_), do: :ok
end

# Use with streaming
{:ok, stream} = Thread.run_streamed(thread, input)
Enum.each(stream, &MyApp.CodexHandler.handle_event/1)

Custom Telemetry

defmodule MyApp.Metrics do
  def track_usage(%Usage{} = usage) do
    :telemetry.execute(
      [:my_app, :codex, :tokens],
      %{total: usage.input_tokens + usage.output_tokens},
      %{source: :codex}
    )
  end
end

Supervision

defmodule MyApp.CodexSupervisor do
  use Supervisor

  def start_link(opts) do
    Supervisor.start_link(__MODULE__, opts, name: __MODULE__)
  end

  def init(_opts) do
    children = [
      {Task.Supervisor, name: MyApp.CodexTaskSupervisor}
    ]

    Supervisor.init(children, strategy: :one_for_one)
  end
end

# Use supervised tasks for concurrent turns
Task.Supervisor.async(MyApp.CodexTaskSupervisor, fn ->
  Thread.run(thread, input)
end)

Future Enhancements

Potential Improvements

  1. Native JSON Parsing: NIF for faster event parsing
  2. Binary Protocol: Reduce overhead vs JSONL
  3. WebSocket Streaming: Alternative to Port for long-running sessions
  4. Event Persistence: Store events for replay/debugging
  5. Distributed Turns: Run turns on remote nodes
  6. Rate Limiting: Built-in API rate limiting
  7. Caching: Cache common responses
  8. Metrics Dashboard: Real-time monitoring UI

API Stability

Stable (v1.0+):

  • Core module interfaces
  • Event/item struct shapes
  • Option struct fields

Unstable (may change):

  • Telemetry event names
  • Internal GenServer implementation
  • Error tuple formats

Experimental:

  • Custom event handlers
  • Advanced streaming modes
  • Performance optimizations