Examples and Usage Patterns

Copy Markdown View Source

This document provides comprehensive examples demonstrating common use cases and patterns for the Elixir Codex SDK. Runnable counterparts live under /examples; each script can be executed with mix run examples/<name>.exs ....

Auth defaults: CLI-backed examples prefer CODEX_API_KEY (or auth.json OPENAI_API_KEY) when present, and otherwise fall back to your CLI/ChatGPT auth stored under CODEX_HOME (default ~/.codex). The dedicated OAuth example uses Codex.OAuth directly and defaults to an isolated temporary CODEX_HOME so it does not mutate your real login state unless you opt into that explicitly.

Table of Contents

  1. Basic Usage
  2. Streaming Responses
  3. Structured Output
  4. Multi-Turn Conversations
  5. File Operations
  6. Command Execution
  7. Error Handling
  8. Configuration
  9. Advanced Patterns
  10. Production Patterns
  11. Live Usage & Compaction
  12. Live Exec Controls
  13. Live Telemetry Stream
  14. Additional Live Examples
  15. App-server Transport
  16. Realtime Voice Interactions
  17. Voice Pipeline

Basic Usage

Simple Question and Answer

The most basic usage: ask a question and get a response.

defmodule BasicExample do
  def ask_question do
    # Start a new thread
    {:ok, thread} = Codex.start_thread()

    # Run a turn
    {:ok, result} = Codex.Thread.run(thread, "What is a GenServer in Elixir?")

    # Print the response
    case result.final_response do
      %Codex.Items.AgentMessage{text: text} ->
        IO.puts(text)

      _ ->
        IO.puts("The turn did not produce a final response.")
    end

    # Check token usage
    IO.puts("\nTokens used:")
    IO.puts("  Input: #{result.usage["input_tokens"]}")
    IO.puts("  Output: #{result.usage["output_tokens"]}")
    IO.puts("  Total: #{result.usage["total_tokens"]}")
  end
end

Examining All Items

Access all items produced during a turn.

defmodule ItemsExample do
  def explore_items do
    {:ok, thread} = Codex.start_thread()

    {:ok, result} = Codex.Thread.run(
      thread,
      "Explain how Elixir processes work, and give me a simple example"
    )

    # Iterate through completed items in the event stream
    Enum.each(result.events, fn
      %Codex.Events.ItemCompleted{item: %Codex.Items.AgentMessage{text: text}} ->
          IO.puts("\n[Agent Message]")
          IO.puts(text)

      %Codex.Events.ItemCompleted{item: %Codex.Items.Reasoning{text: text}} ->
          IO.puts("\n[Reasoning]")
          IO.puts(text)

      %Codex.Events.ItemCompleted{
        item: %Codex.Items.CommandExecution{command: cmd, exit_code: code, status: status}
      } ->
          IO.puts("\n[Command Execution]")
          IO.puts("Command: #{cmd}")
          IO.puts("Exit Code: #{inspect(code)} (#{status})")

      %Codex.Events.ItemCompleted{item: %Codex.Items.FileChange{changes: changes}} ->
        IO.puts("\n[File Changes]")
        Enum.each(changes, fn %{path: path, kind: kind} ->
          IO.puts("  #{kind}: #{path}")
        end)

      %Codex.Events.ItemCompleted{item: other} ->
        IO.puts("\n[Other Item]")
        IO.inspect(other, label: "item")

      _ ->
        :ok
    end)
  end
end

Streaming Responses

Real-Time Event Processing

Process events as they arrive for responsive UIs.

defmodule StreamingExample do
  def stream_response do
    {:ok, thread} = Codex.start_thread()

    {:ok, stream} = Codex.Thread.run_streamed(
      thread,
      "Analyze the files in this directory and suggest improvements"
    )

    # Process events in real-time
    Enum.each(stream, fn event ->
      case event do
        %Codex.Events.ThreadStarted{thread_id: id} ->
          IO.puts("Started thread: #{id}")

        %Codex.Events.TurnStarted{} ->
          IO.puts("Turn started...")

        %Codex.Events.ItemStarted{item: item} ->
          IO.puts("New item: #{item.type}")

        %Codex.Events.ItemUpdated{item: %{type: :command_execution} = cmd} ->
          if cmd.status == :in_progress do
            IO.write(".")  # Progress indicator
          end

        %Codex.Events.ItemCompleted{item: item} ->
          case item do
            %{type: :agent_message, text: text} ->
              IO.puts("\n\n[Response]")
              IO.puts(text)

            %{type: :command_execution, command: cmd, exit_code: 0} ->
              IO.puts("\n✓ Command succeeded: #{cmd}")

            %{type: :file_change, changes: changes} ->
              IO.puts("\n[Files Changed]")
              Enum.each(changes, fn %{path: path, kind: kind} ->
                IO.puts("  #{kind}: #{path}")
              end)

            _ ->
              :ok
          end

        %Codex.Events.TurnCompleted{usage: usage} ->
          IO.puts("\n\nTurn completed!")
          IO.puts("Tokens: #{usage.input_tokens + usage.output_tokens}")

        _ ->
          :ok
      end
    end)
  end
end

Progressive Display

Stream text responses character by character (simulated).

defmodule ProgressiveDisplay do
  def display_progressively do
    {:ok, thread} = Codex.start_thread()

    {:ok, stream} = Codex.Thread.run_streamed(
      thread,
      "Write a short story about a robot learning to code"
    )

    # Accumulate text and display progressively
    stream
    |> Stream.filter(fn
      %Codex.Events.ItemCompleted{item: %{type: :agent_message}} -> true
      _ -> false
    end)
    |> Enum.each(fn %{item: %{text: text}} ->
      # Simulate streaming by printing chunks
      text
      |> String.graphemes()
      |> Enum.each(fn char ->
        IO.write(char)
        Process.sleep(10)  # Adjust for desired speed
      end)
    end)

    IO.puts("\n")
  end
end

Live Tooling Stream (Codex CLI)

Run a live streamed turn that shows shell and MCP tool events without requiring an API key (uses your Codex CLI login if present):

mix run examples/live_tooling_stream.exs "Summarize this repository and run one quick check"

The example prints started/updated/completed tool events (including arguments and streamed results) and falls back to the last agent message if the final response isn’t explicitly included in turn.completed.


Structured Output

JSON Schema with Validation

Request structured data conforming to a schema.

defmodule StructuredOutputExample do
  def analyze_code_quality do
    schema = %{
      "type" => "object",
      "properties" => %{
        "overall_score" => %{
          "type" => "integer",
          "minimum" => 0,
          "maximum" => 100
        },
        "issues" => %{
          "type" => "array",
          "items" => %{
            "type" => "object",
            "properties" => %{
              "severity" => %{
                "type" => "string",
                "enum" => ["low", "medium", "high"]
              },
              "description" => %{"type" => "string"},
              "file" => %{"type" => "string"},
              "line" => %{"type" => "integer"}
            },
            "required" => ["severity", "description"]
          }
        },
        "suggestions" => %{
          "type" => "array",
          "items" => %{"type" => "string"}
        }
      },
      "required" => ["overall_score", "issues", "suggestions"]
    }

    {:ok, thread} = Codex.start_thread()

    turn_opts = %Codex.Turn.Options{output_schema: schema}

    {:ok, result} = Codex.Thread.run(
      thread,
      "Analyze the code quality of this Elixir project",
      turn_opts
    )

    case Codex.Turn.Result.json(result) do
      {:ok, data} ->
        IO.puts("Overall Score: #{data["overall_score"]}/100")

        IO.puts("\nIssues Found:")
        Enum.each(data["issues"], fn issue ->
          severity = String.upcase(issue["severity"])
          IO.puts("  [#{severity}] #{issue["description"]}")
          if issue["file"] do
            IO.puts("    File: #{issue["file"]}:#{issue["line"]}")
          end
        end)

        IO.puts("\nSuggestions:")
        Enum.each(data["suggestions"], fn suggestion ->
          IO.puts("  - #{suggestion}")
        end)

      {:error, _} ->
        IO.puts("Failed to parse structured response")
    end
  end
end

result.final_response.parsed contains the decoded payload whenever the response matches the requested schema. Codex.Turn.Result.json/1 returns the same data alongside helpful error metadata.

Using with TypedStruct

Define Elixir structs that match your schema.

defmodule MyApp.CodeAnalysis do
  use TypedStruct

  typedstruct do
    field :overall_score, integer(), enforce: true
    field :issues, [issue()], default: []
    field :suggestions, [String.t()], default: []
  end

  typedstruct module: Issue do
    field :severity, String.t(), enforce: true
    field :description, String.t(), enforce: true
    field :file, String.t()
    field :line, integer()
  end

  def schema do
    %{
      "type" => "object",
      "properties" => %{
        "overall_score" => %{"type" => "integer", "minimum" => 0, "maximum" => 100},
        "issues" => %{
          "type" => "array",
          "items" => %{
            "type" => "object",
            "properties" => %{
              "severity" => %{"type" => "string", "enum" => ["low", "medium", "high"]},
              "description" => %{"type" => "string"},
              "file" => %{"type" => "string"},
              "line" => %{"type" => "integer"}
            },
            "required" => ["severity", "description"]
          }
        },
        "suggestions" => %{"type" => "array", "items" => %{"type" => "string"}}
      },
      "required" => ["overall_score", "issues", "suggestions"]
    }
  end

  def from_map(%{} = data) do
    issues = parse_issues(data["issues"])

    {:ok, %__MODULE__{
      overall_score: data["overall_score"],
      issues: issues,
      suggestions: data["suggestions"] || []
    }}
  end

  defp parse_issues(issues) when is_list(issues) do
    Enum.map(issues, fn issue ->
      %Issue{
        severity: issue["severity"],
        description: issue["description"],
        file: issue["file"],
        line: issue["line"]
      }
    end)
  end
end

# Usage
{:ok, thread} = Codex.start_thread()
turn_opts = %Codex.Turn.Options{output_schema: MyApp.CodeAnalysis.schema()}
{:ok, result} = Codex.Thread.run(thread, "Analyze code", turn_opts)

case {result.final_response, Codex.Turn.Result.json(result)} do
  {%Codex.Items.AgentMessage{parsed: parsed}, {:ok, parsed}} ->
    {:ok, analysis} = MyApp.CodeAnalysis.from_map(parsed)
    IO.puts("Score: #{analysis.overall_score}")

  {_, {:error, reason}} ->
    IO.inspect(reason, label: "structured output error")
end

Multi-Turn Conversations

Context Retention

Maintain context across multiple turns.

defmodule ConversationExample do
  def multi_turn_conversation do
    {:ok, thread} = Codex.start_thread()

    # Turn 1: Provide context
    {:ok, result1} = Codex.Thread.run(
      thread,
      "I have a bug in my GenServer. It crashes when I send it a {:stop, reason} message."
    )

    IO.puts("Agent: #{render(result1.final_response)}")

    # Turn 2: Ask follow-up (agent remembers previous context)
    {:ok, result2} = Codex.Thread.run(
      thread,
      "Can you show me an example of how to handle that message correctly?"
    )

    IO.puts("\nAgent: #{render(result2.final_response)}")

    # Turn 3: More specific
    {:ok, result3} = Codex.Thread.run(
      thread,
      "What if I want to perform cleanup before stopping?"
    )

    IO.puts("\nAgent: #{render(result3.final_response)}")

    # Save thread_id for later
    IO.puts("\nThread ID: #{thread.thread_id}")
  end

  defp render(%Codex.Items.AgentMessage{text: text}), do: text
  defp render(_), do: "(no response produced)"
end

Resuming Sessions

Resume a previous conversation.

defmodule ResumeExample do
  def resume_previous_session(thread_id) do
    # Resume the thread
    {:ok, thread} = Codex.resume_thread(thread_id)

    # Continue the conversation
    {:ok, result} = Codex.Thread.run(
      thread,
      "Can you remind me what we were discussing?"
    )

    IO.puts(render(result.final_response))
  end

  def save_and_resume do
    # Start thread and have conversation
    {:ok, thread} = Codex.start_thread()

    {:ok, result1} = Codex.Thread.run(thread, "Remember the number 42")
    IO.puts(render(result1.final_response))

    # Save thread_id (e.g., to database, file, etc.)
    thread_id = thread.thread_id
    File.write!("thread_id.txt", thread_id)

    # Simulate restart: read thread_id and resume
    saved_id = File.read!("thread_id.txt")
    {:ok, resumed_thread} = Codex.resume_thread(saved_id)

    {:ok, result2} = Codex.Thread.run(resumed_thread, "What number should I remember?")
    IO.puts("\nAfter resuming: #{render(result2.final_response)}")
  end

  defp render(%Codex.Items.AgentMessage{text: text}), do: text
  defp render(_), do: "(no response produced)"
end

Agent Runner Loop

Automatically follow continuation tokens with the multi-turn runner.

defmodule AgentRunnerExample do
  def run do
    {:ok, thread} = Codex.start_thread()

    Codex.AgentRunner.run(thread, "Plan a checklist",
      agent: %{instructions: "Keep responses short"},
      run_config: %{max_turns: 3}
    )
  end
end

Run mix run examples/agent_runner_multi_turn.exs for a runnable script.


File Operations

Tracking File Changes

Monitor file modifications made by the agent.

defmodule FileOperationsExample do
  def track_file_changes do
    {:ok, thread} = Codex.start_thread()

    {:ok, stream} = Codex.Thread.run_streamed(
      thread,
      "Add comprehensive documentation to all modules in lib/"
    )

    # Track changes
    changes = stream
      |> Stream.filter(fn
        %Codex.Events.ItemCompleted{item: %{type: :file_change}} -> true
        _ -> false
      end)
      |> Enum.map(fn %{item: file_change} ->
        Enum.map(file_change.changes, fn change ->
          {change.kind, change.path}
        end)
      end)
      |> List.flatten()

    # Summarize
    IO.puts("File Changes:")
    changes
    |> Enum.group_by(fn {kind, _} -> kind end)
    |> Enum.each(fn {kind, files} ->
      IO.puts("\n#{String.upcase(to_string(kind))}:")
      Enum.each(files, fn {_, path} ->
        IO.puts("  - #{path}")
      end)
    end)
  end

  def review_before_apply do
    {:ok, thread} = Codex.start_thread()

    {:ok, result} = Codex.Thread.run(
      thread,
      "Refactor the authentication module"
    )

    # Extract file changes
    file_changes = result.items
      |> Enum.filter(fn
        %{type: :file_change} -> true
        _ -> false
      end)

    # Review each change
    Enum.each(file_changes, fn change ->
      IO.puts("\nProposed Changes:")
      Enum.each(change.changes, fn %{path: path, kind: kind} ->
        IO.puts("  #{kind}: #{path}")

        # Could display diffs here
        if kind == :update do
          # Read current file and show diff
          # (Would need access to diff from codex-rs)
        end
      end)

      # Prompt user
      IO.puts("\nApply these changes? (y/n)")
      response = IO.gets("")

      if String.trim(response) == "y" do
        IO.puts("Changes applied (by codex-rs)")
      else
        IO.puts("Changes rejected - would need to revert")
      end
    end)
  end
end

Command Execution

Monitoring Commands

Track commands executed by the agent.

defmodule CommandExample do
  def monitor_commands do
    {:ok, thread} = Codex.start_thread()

    {:ok, stream} = Codex.Thread.run_streamed(
      thread,
      "List the top-level files and print the working directory"
    )

    # Collect all commands
    commands = stream
      |> Stream.filter(fn
        %Codex.Events.ItemCompleted{item: %{type: :command_execution}} -> true
        _ -> false
      end)
      |> Enum.map(fn %{item: cmd} -> cmd end)

    # Display results
    IO.puts("Commands Executed:")
    Enum.each(commands, fn cmd ->
      status_icon = case cmd.status do
        :completed when cmd.exit_code == 0 -> "✓"
        :completed -> "✗"
        :failed -> "✗"
        _ -> "?"
      end

      IO.puts("\n#{status_icon} #{cmd.command}")
      IO.puts("  Exit Code: #{cmd.exit_code || "N/A"}")

      if cmd.aggregated_output != "" do
        IO.puts("  Output:")
        cmd.aggregated_output
        |> String.split("\n")
        |> Enum.take(5)  # First 5 lines
        |> Enum.each(fn line ->
          IO.puts("    #{line}")
        end)
      end
    end)
  end

end

Raw CLI Passthrough and PTY Sessions

When you need a Codex CLI command that is outside the structured thread API, use Codex.CLI.

defmodule CLIExample do
  def completion do
    {:ok, codex_opts} = Codex.Options.new(%{})
    {:ok, result} = Codex.CLI.completion("zsh", codex_opts: codex_opts)
    IO.puts(result.stdout)
  end

  def prompt_mode do
    {:ok, codex_opts} = Codex.Options.new(%{})

    {:ok, session} =
      Codex.CLI.interactive(
        "Summarize this repository in three bullets.",
        codex_opts: codex_opts
      )

    :ok = Codex.CLI.Session.close_input(session)
    {:ok, result} = Codex.CLI.Session.collect(session)
    IO.puts(result.stdout)
  end
end

For literal future-proof access, Codex.CLI.run/2 and Codex.CLI.start/2 accept arbitrary argv:

{:ok, result} =
  Codex.CLI.run(
    ["cloud", "list", "--json"],
    codex_opts: codex_opts
  )

IO.puts(result.stdout)

Error Handling

Graceful Error Recovery

Handle errors gracefully in production.

defmodule ErrorHandlingExample do
  require Logger

  def robust_turn(thread, input, opts \\ %Codex.Turn.Options{}) do
    case Codex.Thread.run(thread, input, opts) do
      {:ok, result} ->
        {:ok, result}

      {:error, {:turn_failed, error}} ->
        Logger.error("Turn failed: #{error.message}")
        handle_turn_failure(error)

      {:error, {:process, reason}} ->
        Logger.error("Process error: #{inspect(reason)}")
        {:error, :process_error}

      {:error, {:config, reason}} ->
        Logger.error("Configuration error: #{inspect(reason)}")
        {:error, :config_error}

      {:error, reason} ->
        Logger.error("Unknown error: #{inspect(reason)}")
        {:error, :unknown}
    end
  end

  defp handle_turn_failure(error) do
    # Could implement retry logic, fallback, etc.
    if retryable?(error) do
      Logger.info("Error is retryable, consider retry logic")
    end

    {:error, :turn_failed}
  end

  defp retryable?(error) do
    # Check if error message indicates transient issue
    error.message =~ ~r/(rate limit|timeout|temporarily unavailable)/i
  end

  def with_retry(thread, input, max_attempts \\ 3) do
    do_with_retry(thread, input, max_attempts, 1)
  end

  defp do_with_retry(thread, input, max_attempts, attempt) do
    case Codex.Thread.run(thread, input) do
      {:ok, result} ->
        {:ok, result}

      {:error, {:turn_failed, error}} when attempt < max_attempts ->
        if retryable?(error) do
          Logger.info("Retry attempt #{attempt + 1}/#{max_attempts}")
          # Exponential backoff
          Process.sleep(1000 * :math.pow(2, attempt - 1) |> round())
          do_with_retry(thread, input, max_attempts, attempt + 1)
        else
          {:error, {:turn_failed, error}}
        end

      error ->
        error
    end
  end
end

Validation and Sanitization

Validate inputs and outputs.

defmodule ValidationExample do
  def safe_run(thread, input) do
    with :ok <- validate_input(input),
         {:ok, result} <- Codex.Thread.run(thread, input),
         :ok <- validate_result(result) do
      {:ok, result}
    end
  end

  defp validate_input(input) do
    cond do
      !is_binary(input) ->
        {:error, :input_must_be_string}

      String.length(input) == 0 ->
        {:error, :input_cannot_be_empty}

      String.length(input) > 100_000 ->
        {:error, :input_too_long}

      true ->
        :ok
    end
  end

  defp validate_result(result) do
    cond do
      !result.usage ->
        {:error, :missing_usage}

      result.usage.input_tokens + result.usage.output_tokens == 0 ->
        {:error, :no_tokens_used}

      true ->
        :ok
    end
  end
end

Configuration

Environment-Based Configuration

Configure based on environment.

The SDK default model is derived from the active catalog after applying auth-mode filtering and environment overrides (CODEX_MODEL, OPENAI_DEFAULT_MODEL, then CODEX_MODEL_DEFAULT). With the bundled catalog shipped in this repo, that currently resolves to gpt-5.4. The SDK always loads bundled upstream model metadata from priv/models.json, which is synced from codex/codex-rs/core/models.json, and ChatGPT-auth flows can still refresh /models and cache the result when available. Config layering still applies across system /etc/codex/config.toml, user $CODEX_HOME/config.toml, repo .codex/config.toml, and cwd config.toml project layers, with trust-aware enablement and configurable project-root markers.

For provider parity, layered config also honors openai_base_url and user-defined [model_providers.<id>] entries. See examples/live_config_overrides.exs and examples/live_options_config_overrides.exs for runnable demonstrations, including the reserved built-in provider restriction (openai, ollama, lmstudio cannot be redefined).

defmodule MyApp.Codex do
  def start_thread do
    codex_opts = %Codex.Options{
      api_key: api_key(),
      base_url: base_url(),
      codex_path_override: codex_path()
    }

    thread_opts = %Codex.Thread.Options{
      model: model(),
      sandbox_mode: sandbox_mode(),
      working_directory: working_directory()
    }

    Codex.start_thread(codex_opts, thread_opts)
  end

  defp api_key do
    System.get_env("CODEX_API_KEY") ||
      Application.get_env(:my_app, :codex_api_key)
  end

  defp base_url do
    Application.get_env(:my_app, :codex_base_url)
  end

  defp codex_path do
    Application.get_env(:my_app, :codex_path)
  end

  defp model do
    Application.get_env(:my_app, :codex_model, "o1")
  end

  defp sandbox_mode do
    case Mix.env() do
      :prod -> :read_only
      _ -> :workspace_write
    end
  end

  defp working_directory do
    File.cwd!()
  end
end

Per-Request Configuration

Override configuration per request.

defmodule ConfigExample do
  def analyze_with_different_models(input) do
    models = ["gpt-4", "o1", "gpt-4-turbo"]

    results = Enum.map(models, fn model ->
      thread_opts = %Codex.Thread.Options{model: model}
      {:ok, thread} = Codex.start_thread(%Codex.Options{}, thread_opts)

      {:ok, result} = Codex.Thread.run(thread, input)

      {model, result}
    end)

    # Compare results
    Enum.each(results, fn {model, result} ->
      IO.puts("\n#{model}:")
      IO.puts(String.slice(result.final_response, 0..200))
      IO.puts("Tokens: #{result.usage.input_tokens + result.usage.output_tokens}")
    end)
  end
end

You can also pass CLI config overrides and shell environment policy entries via thread options:

{:ok, thread_opts} =
  Codex.Thread.Options.new(
    model_provider: "mistral",
    base_instructions: "Keep answers brief.",
    shell_environment_policy: %{
      inherit: "core",
      exclude: ["AWS_*"]
    },
    config_overrides: %{
      "model_reasoning_summary" => "concise"
    }
  )

{:ok, thread} = Codex.start_thread(%Codex.Options{}, thread_opts)

Options-level global overrides are also supported and are emitted before derived/thread/turn overrides (later layers win):

{:ok, codex_opts} =
  Codex.Options.new(
    config: %{"model_reasoning_summary" => "concise"}
  )

{:ok, thread_opts} =
  Codex.Thread.Options.new(
    config_overrides: %{"model_reasoning_summary" => "detailed"}
  )

{:ok, thread} = Codex.start_thread(codex_opts, thread_opts)

# Turn-level override wins last:
{:ok, _result} =
  Codex.Thread.run(thread, "Reply with exactly ok", %{
    config_overrides: %{"model_reasoning_summary" => "none"}
  })

Advanced Patterns

Concurrent Turns

Execute multiple turns concurrently.

defmodule ConcurrentExample do
  def parallel_analysis(files) do
    tasks = Enum.map(files, fn file ->
      Task.async(fn ->
        {:ok, thread} = Codex.start_thread()
        {:ok, result} = Codex.Thread.run(thread, "Analyze #{file}")
        {file, result}
      end)
    end)

    results = Task.await_many(tasks, 60_000)

    Enum.each(results, fn {file, result} ->
      preview =
        case result.final_response do
          %Codex.Items.AgentMessage{text: text} -> String.slice(text, 0..200)
          _ -> "(no response produced)"
        end

      IO.puts("\n#{file}:")
      IO.puts(preview)
    end)
  end

  def map_reduce_pattern(items) do
    # Map: Process each item concurrently
    tasks = Enum.map(items, fn item ->
      Task.async(fn ->
        {:ok, thread} = Codex.start_thread()
        {:ok, result} = Codex.Thread.run(thread, "Process #{item}")

        case result.final_response do
          %Codex.Items.AgentMessage{text: text} -> text
          _ -> ""
        end
      end)
    end)

    responses = Task.await_many(tasks, 60_000)

    # Reduce: Combine results
    {:ok, thread} = Codex.start_thread()

    summary_prompt = """
    Summarize these analyses:

    #{Enum.join(responses, "\n\n---\n\n")}
    """

    {:ok, result} = Codex.Thread.run(thread, summary_prompt)

    case result.final_response do
      %Codex.Items.AgentMessage{text: text} ->
        IO.puts("Summary:")
        IO.puts(text)

      _ ->
        IO.puts("Summary not available")
    end
  end
end

Agent Collaboration

Multiple agents working together.

defmodule CollaborationExample do
  def collaborative_code_review(file) do
    # Analyzer agent
    {:ok, analyzer} = Codex.start_thread()
    {:ok, analysis} = Codex.Thread.run(
      analyzer,
      "Analyze #{file} for potential issues"
    )

    # Security expert agent
    {:ok, security} = Codex.start_thread()
    {:ok, security_review} = Codex.Thread.run(
      security,
      """
      Review this code for security issues:

      Analysis: #{analysis.final_response.text}
      """
    )

    # Performance expert agent
    {:ok, performance} = Codex.start_thread()
    {:ok, perf_review} = Codex.Thread.run(
      performance,
      """
      Review this code for performance issues:

      Analysis: #{analysis.final_response.text}
      """
    )

    # Synthesizer agent
    {:ok, synthesizer} = Codex.start_thread()

    synthesis_prompt = """
    Synthesize these reviews into actionable recommendations:

    Security Review:
    #{security_review.final_response.text}

    Performance Review:
    #{perf_review.final_response.text}
    """

    {:ok, final} = Codex.Thread.run(synthesizer, synthesis_prompt)

    case final.final_response do
      %Codex.Items.AgentMessage{text: text} ->
        IO.puts("Final Recommendations:")
        IO.puts(text)

      _ ->
        IO.puts("No recommendations produced.")
    end
  end
end

Tool Output Bridging and Auto-Run (Transport Notes)

The SDK includes a tool registry and can collect structured tool outputs during auto-run cycles. However, the default transport (codex exec --json) does not currently expose a supported mechanism to inject tool outputs back into the CLI. Treat the example below as a conceptual demo until a tool-capable transport (e.g. MCP-hosted tools or protocol/app-server) is adopted.

defmodule ToolBridgingExample do
  def run_with_registered_tool do
    defmodule MathTool do
      use Codex.Tool, name: "math_tool", description: "adds two numbers"

      @impl true
      def invoke(%{"x" => x, "y" => y}, _context), do: {:ok, %{"sum" => x + y}}
    end

    {:ok, _handle} = Codex.Tools.register(MathTool)

    {:ok, thread} =
      Codex.start_thread(approval_policy: Codex.Approvals.StaticPolicy.allow())

    {:ok, result} =
      Codex.Thread.run_auto(thread, "Ask the math tool to add 4 and 5", max_attempts: 2)

    IO.inspect(result.raw[:tool_outputs], label: "tool outputs captured by SDK")
    IO.inspect(result.thread.pending_tool_outputs, label: "pending outputs after turn")
  end
end

Streaming with State Accumulation

Build up state while streaming.

defmodule StatefulStreamingExample do
  def accumulate_state do
    {:ok, thread} = Codex.start_thread()
    {:ok, stream} = Codex.Thread.run_streamed(thread, "Implement a new feature")

    final_state = Enum.reduce(stream, initial_state(), fn event, state ->
      case event do
        %Codex.Events.ThreadStarted{thread_id: id} ->
          %{state | thread_id: id}

        %Codex.Events.ItemCompleted{item: %Codex.Items.CommandExecution{} = cmd} ->
          %{state | commands: [cmd | state.commands]}

        %Codex.Events.ItemCompleted{item: %Codex.Items.FileChange{} = file} ->
          %{state | files: [file | state.files]}

        %Codex.Events.ItemCompleted{item: %Codex.Items.AgentMessage{text: text}} ->
          %{state | messages: [text | state.messages]}

        %Codex.Events.TurnCompleted{usage: usage} when is_map(usage) ->
          %{state | usage: usage}

        _ ->
          state
      end
    end)

    display_summary(final_state)
  end

  defp initial_state do
    %{
      thread_id: nil,
      commands: [],
      files: [],
      messages: [],
      usage: nil
    }
  end

  defp display_summary(state) do
    IO.puts("Turn Summary:")
    IO.puts("  Thread ID: #{state.thread_id}")
    IO.puts("  Commands: #{length(state.commands)}")
    IO.puts("  Files Changed: #{count_file_changes(state.files)}")
    IO.puts("  Messages: #{length(state.messages)}")

    if state.usage do
      IO.puts("  Tokens: #{state.usage.input_tokens + state.usage.output_tokens}")
    end
  end

  defp count_file_changes(files) do
    files
    |> Enum.flat_map(fn file -> file.changes end)
    |> length()
  end
end

Production Patterns

Telemetry Integration

Monitor SDK usage in production.

defmodule MyApp.TelemetryHandler do
  require Logger

  def setup do
    :telemetry.attach_many(
      "my-app-codex-handler",
      [
        [:codex, :turn, :start],
        [:codex, :turn, :stop],
        [:codex, :turn, :exception]
      ],
      &handle_event/4,
      nil
    )
  end

  def handle_event([:codex, :turn, :start], measurements, metadata, _config) do
    Logger.info("Codex turn started", thread_id: metadata.thread_id)

    # Could send to metrics system
    :telemetry.execute(
      [:my_app, :codex, :turn, :count],
      %{count: 1},
      metadata
    )
  end

  def handle_event([:codex, :turn, :stop], measurements, metadata, _config) do
    Logger.info("Codex turn completed",
      thread_id: metadata.thread_id,
      duration_ms: measurements.duration / 1_000_000,
      tokens: metadata.usage.input_tokens + metadata.usage.output_tokens
    )

    # Send metrics
    :telemetry.execute(
      [:my_app, :codex, :turn, :duration],
      %{duration: measurements.duration},
      metadata
    )

    :telemetry.execute(
      [:my_app, :codex, :tokens],
      %{
        input: metadata.usage.input_tokens,
        output: metadata.usage.output_tokens,
        total: metadata.usage.input_tokens + metadata.usage.output_tokens
      },
      metadata
    )
  end

  def handle_event([:codex, :turn, :exception], measurements, metadata, _config) do
    Logger.error("Codex turn failed",
      thread_id: metadata.thread_id,
      error: metadata.error,
      duration_ms: measurements.duration / 1_000_000
    )

    # Alert on failures
    :telemetry.execute(
      [:my_app, :codex, :turn, :error],
      %{count: 1},
      metadata
    )
  end
end

# In application.ex
def start(_type, _args) do
  MyApp.TelemetryHandler.setup()
  # ... rest of supervision tree
end

Rate Limiting

Implement rate limiting for API calls.

defmodule MyApp.RateLimiter do
  use GenServer

  def start_link(_) do
    GenServer.start_link(__MODULE__, %{}, name: __MODULE__)
  end

  def acquire do
    GenServer.call(__MODULE__, :acquire)
  end

  @impl true
  def init(_) do
    state = %{
      tokens: 10,
      last_refill: System.monotonic_time(:second)
    }

    schedule_refill()
    {:ok, state}
  end

  @impl true
  def handle_call(:acquire, _from, state) do
    state = refill_tokens(state)

    if state.tokens > 0 do
      {:reply, :ok, %{state | tokens: state.tokens - 1}}
    else
      {:reply, {:error, :rate_limited}, state}
    end
  end

  @impl true
  def handle_info(:refill, state) do
    schedule_refill()
    {:noreply, %{state | tokens: 10}}
  end

  defp refill_tokens(state) do
    now = System.monotonic_time(:second)
    elapsed = now - state.last_refill

    if elapsed >= 60 do
      %{state | tokens: 10, last_refill: now}
    else
      state
    end
  end

  defp schedule_refill do
    Process.send_after(self(), :refill, 60_000)
  end
end

# Usage
defmodule MyApp.Codex do
  def run(thread, input) do
    case MyApp.RateLimiter.acquire() do
      :ok ->
        Codex.Thread.run(thread, input)

      {:error, :rate_limited} ->
        {:error, :rate_limited}
    end
  end
end

Supervised Turn Execution

Run turns under supervision.

defmodule MyApp.TurnSupervisor do
  use Supervisor

  def start_link(init_arg) do
    Supervisor.start_link(__MODULE__, init_arg, name: __MODULE__)
  end

  @impl true
  def init(_init_arg) do
    children = [
      {Task.Supervisor, name: MyApp.TurnTaskSupervisor}
    ]

    Supervisor.init(children, strategy: :one_for_one)
  end

  def run_supervised(thread, input, opts \\ %{}) do
    Task.Supervisor.async(MyApp.TurnTaskSupervisor, fn ->
      Codex.Thread.run(thread, input, opts)
    end)
    |> Task.await()
  end
end

Live Usage & Compaction

examples/live_usage_and_compaction.exs runs against the live Codex backend (requires CODEX_API_KEY or a CLI login) and mirrors the latest defaults:

  • Uses the SDK default model (Codex.Models.default_model/0, currently gpt-5.4 with the bundled catalog) and :low reasoning effort, coercing upward only when the selected model requires it.
  • Streams events, printing token-usage deltas and turn diffs as they arrive.
  • Captures explicit compaction notifications (including usage deltas) and merges them into the displayed token totals.
  • Prints the final agent response alongside merged usage.

Run it with:

mix run examples/live_usage_and_compaction.exs "summarize recent changes"

Note: examples/run_all.sh exports CODEX_MODEL=gpt-5.4-mini by default for faster reproducible live runs, while the bundled SDK default remains gpt-5.4. Most live examples inherit Codex.Models.default_model/0 instead of pinning a model string and explicitly prefer reasoning_effort: :low, so overriding CODEX_MODEL is usually enough if you want a different model.

Live Exec Controls

examples/live_exec_controls.exs streams against the live Codex CLI while forwarding per-turn env, cancellation tokens, and custom timeouts to codex exec. Use it to validate env injection for tooling or to wire cancellation tokens from upstream request lifecycles.

mix run examples/live_exec_controls.exs \
  "List three repo files and echo \$CODEX_DEMO_ENV"

Events with requires_approval: false bypass approval hooks automatically; only flagged operations invoke your configured approval policy or hook.

Overrides remain available:

# Custom env, timeout, and cancellation token
mix run examples/live_exec_controls.exs \
  "List three repo files and echo \$CODEX_DEMO_ENV" \
  --env CODEX_DEMO_ENV=from-docs \
  --timeout-ms 45000 \
  --cancel demo-token-123

If your Codex CLI is older and does not yet support --cancellation-token, rerun without --cancel or upgrade via npm install -g @openai/codex.

Live Telemetry Stream

examples/live_telemetry_stream.exs attaches telemetry handlers to the live Codex CLI so you can observe thread/turn identifiers, source metadata, token-usage deltas, diff updates, and compaction savings as they stream in. It uses a low reasoning effort and a short default prompt to return quickly.

mix run examples/live_telemetry_stream.exs

Auth falls back to your Codex CLI login when CODEX_API_KEY is not set.

Additional Live Examples

  • examples/live_cli_passthrough.exs — direct wrappers for completion, features, login status, and arbitrary raw codex argv
  • examples/live_cli_session.exs — PTY-backed root codex prompt mode via Codex.CLI.interactive/2
  • examples/live_oauth_login.exs — native OAuth status/login/refresh demo with an isolated temporary CODEX_HOME; it prints the browser authorization URL, supports --browser, --device, and --no-browser, and can optionally show memory-mode app-server auth
  • examples/live_collaboration_modes.exs — opts into app-server experimentalApi, lists collaboration presets, and runs a turn with the server-advertised preset settings and built-in preset instructions when the connected build advertises collaborationMode/list
  • examples/live_subagent_host_controls.exs — enables features.multi_agent, runs a one-parent -> one-child workflow, exercises the full Codex.Subagents helper surface, and then drives spawn_agent, send_input, resume_agent, wait, and close_agent across live parent turns
  • examples/live_personality.exs — compares friendly, pragmatic, and none personality overrides (including app-server :none)
  • examples/live_config_overrides.exs — nested config override auto-flattening (thread and turn level)
  • examples/live_options_config_overrides.exs — options-level global config overrides, precedence, and validation
  • examples/live_thread_management.exs — demonstrates thread read/fork/rollback/loaded list
  • examples/live_web_search_modes.exs — toggles web search modes and reports web search items (explicit disable emits web_search="disabled")
  • examples/live_rate_limits.exs — prints rate limit snapshots from token usage events

App-server Transport

App-server (codex app-server) is a stateful, bidirectional transport that unlocks upstream v2 APIs (threads list/archive, skills/models/config, server-driven approvals, etc.).

See guides/05-app-server-transport.md for the complete guide, and run the live scripts:

mix run examples/live_app_server_basic.exs
mix run examples/live_app_server_filesystem.exs
mix run examples/live_app_server_plugins.exs
mix run examples/live_app_server_streaming.exs "Reply with exactly ok and nothing else."
mix run examples/live_app_server_approvals.exs
mix run examples/live_cli_passthrough.exs completion zsh
mix run examples/live_cli_session.exs "Summarize this repository in three bullets."
mix run examples/live_collaboration_modes.exs
mix run examples/live_personality.exs
mix run examples/live_thread_management.exs

live_app_server_filesystem.exs demonstrates the thin fs/* wrappers end to end, including base64 round-tripping, and self-skips when the connected build no longer advertises those legacy parity methods. live_app_server_plugins.exs creates a disposable repo-local marketplace and plugin bundle under the system temp directory, launches codex app-server with an isolated child cwd plus a temporary CODEX_HOME, then demonstrates plugin/list discovery followed by plugin/read detail loading without mutating your real Codex config. Because the example never installs or enables that temporary plugin in the isolated home, installed and enabled are expected to remain false, and no prior Codex login or plugin install is required. live_app_server_approvals.exs demonstrates command/file approvals, uses granular request_permissions: true for live permissions requests on an experimental_api: true connection, provisions a disposable temp workspace plus temporary CODEX_HOME, enables the under-development approval feature flags only inside that isolated home, retries without experimental fields when the connected build rejects them, and falls back to printing the exact structured grant payload when live permissions events are unavailable. It also surfaces typed command/file approval request events, guardian review, and serverRequest/resolved events without writing inside this repository. live_app_server_mcp.exs and live_mcp_and_sessions.exs print original MCP tool names alongside sanitized qualified names.

Minimal usage with existing thread APIs:

{:ok, codex_opts} = Codex.Options.new(%{})
{:ok, conn} = Codex.AppServer.connect(codex_opts)

{:ok, thread} =
  Codex.start_thread(codex_opts, %{
    transport: {:app_server, conn},
    working_directory: File.cwd!()
  })

{:ok, result} = Codex.Thread.run(thread, "List the available skills for this repo")
IO.inspect(result.final_response, label: "final_response")

Custom prompts and skills helpers

{:ok, prompts} = Codex.Prompts.list()
{:ok, expanded} = Codex.Prompts.expand(Enum.at(prompts, 0), "FILE=lib/app.ex")

{:ok, %{"data" => skills}} = Codex.Skills.list(conn, skills_enabled: true)
first_skill = skills |> List.first() |> Map.get("skills") |> List.first()
{:ok, skill_body} = Codex.Skills.load(first_skill, skills_enabled: true)

Realtime Voice Interactions

Direct API examples support custom trust roots via CODEX_CA_CERTIFICATE first and SSL_CERT_FILE second. Blank values are ignored. The live realtime/voice example files mention this in their headers so users can configure HTTPS/WSS trust before running them.

Basic Realtime Session

Set up a bidirectional voice session using the OpenAI Realtime API:

defmodule RealtimeExample do
  alias Codex.Realtime
  alias Codex.Realtime.Config.{RunConfig, SessionModelSettings, TurnDetectionConfig}

  def start_session do
    agent = Realtime.agent(
      name: "VoiceAssistant",
      instructions: "You are a helpful voice assistant."
    )

    config = %RunConfig{
      model_settings: %SessionModelSettings{
        voice: "alloy",
        turn_detection: %TurnDetectionConfig{
          type: :semantic_vad,
          eagerness: :medium
        }
      }
    }

    {:ok, session} = Realtime.run(agent, config: config)
    Realtime.subscribe(session, self())
    session
  end

  def handle_events do
    receive do
      {:session_event, %Codex.Realtime.Events.AudioEvent{audio: audio}} ->
        play_audio(audio.data)
        handle_events()

      {:session_event, %Codex.Realtime.Events.AgentStartEvent{agent: agent}} ->
        IO.puts("Agent started: #{agent.name}")
        handle_events()

      {:session_event, event} ->
        IO.inspect(event, label: "Event")
        handle_events()
    after
      30_000 -> :timeout
    end
  end
end

Realtime with Function Tools

Add function calling capabilities to realtime agents:

defmodule RealtimeToolsExample do
  alias Codex.Realtime

  def create_agent_with_tools do
    Realtime.agent(
      name: "WeatherAssistant",
      instructions: "Help users check the weather.",
      tools: [
        %{
          name: "get_weather",
          description: "Get weather for a location",
          parameters: %{
            type: "object",
            properties: %{
              location: %{type: "string", description: "City name"}
            },
            required: ["location"]
          }
        }
      ]
    )
  end
end

Multi-Agent Handoffs

Hand off conversations between specialized agents:

defmodule RealtimeHandoffsExample do
  alias Codex.Realtime

  def create_agents do
    greeter = Realtime.agent(
      name: "Greeter",
      instructions: "Greet users and hand off to specialists."
    )

    specialist =
      Realtime.agent(
        name: "TechSupport",
        instructions: "Provide technical assistance."
      )

    greeter_with_handoff =
      Realtime.agent(
        name: greeter.name,
        instructions: greeter.instructions,
        handoffs: [specialist]
      )

    {greeter_with_handoff, specialist}
  end
end

Run realtime examples:

export CODEX_API_KEY=your-key
# or export OPENAI_API_KEY=your-key
mix run examples/realtime_basic.exs
mix run examples/realtime_tools.exs
mix run examples/realtime_handoffs.exs
mix run examples/live_realtime_voice.exs

Voice Pipeline

Basic STT -> Workflow -> TTS Pipeline

Process audio through speech-to-text, custom workflow, and text-to-speech:

defmodule VoicePipelineExample do
  alias Codex.Voice.{Pipeline, SimpleWorkflow, Config}
  alias Codex.Voice.Config.TTSSettings
  alias Codex.Voice.Input.AudioInput

  def run_pipeline(audio_data) do
    # Create a simple workflow
    workflow = SimpleWorkflow.new(
      fn transcribed_text ->
        # Process the transcribed text
        response = "I heard you say: #{transcribed_text}"
        [response]
      end,
      greeting: "Hello! How can I help you?"
    )

    # Configure the pipeline
    config = %Config{
      workflow_name: "SimpleVoice",
      tts_settings: %TTSSettings{voice: :nova}
    }

    # Create and start the pipeline
    {:ok, pipeline} = Pipeline.start_link(workflow: workflow, config: config)

    # Create audio input
    input = AudioInput.new(audio_data, format: :wav)

    # Run the pipeline
    {:ok, result} = Pipeline.run(pipeline, input)

    # Collect audio output
    result
    |> Enum.filter(fn
      %Codex.Voice.Events.VoiceStreamEventAudio{} -> true
      _ -> false
    end)
    |> Enum.map(& &1.data)
    |> IO.iodata_to_binary()
  end
end

Multi-Turn Voice Conversations

Maintain conversation context across multiple turns:

defmodule VoiceMultiTurnExample do
  alias Codex.Voice.{Pipeline, AgentWorkflow, Config}
  alias Codex.Voice.Input.StreamedAudioInput

  def create_conversational_pipeline do
    # Use AgentWorkflow for multi-turn conversations
    workflow = AgentWorkflow.new(
      agent: %{
        instructions: "You are a helpful assistant. Remember context from earlier in the conversation."
      }
    )

    config = %Config{
      workflow_name: "ConversationalAssistant",
      tts_settings: %Config.TTSSettings{voice: :alloy}
    }

    {:ok, pipeline} = Pipeline.start_link(
      workflow: workflow,
      config: config
    )

    pipeline
  end

  def stream_conversation(pipeline, audio_stream) do
    # Create streaming input
    input = StreamedAudioInput.new()

    # Start processing
    {:ok, result_stream} = Pipeline.run_streamed(pipeline, input)

    # Feed audio chunks
    Task.start(fn ->
      for chunk <- audio_stream do
        StreamedAudioInput.push(input, chunk)
      end
      StreamedAudioInput.close(input)
    end)

    # Process results
    result_stream
  end
end

Voice with Codex Agent

Integrate the voice pipeline with a full Codex agent:

defmodule VoiceWithAgentExample do
  alias Codex.Voice.{Pipeline, AgentWorkflow, Config}

  def create_agent_voice_pipeline do
    # Create workflow backed by Codex.Agent
    workflow = AgentWorkflow.new(
      agent: %{
        instructions: """
        You are a coding assistant accessible via voice.
        When asked about code, provide clear explanations.
        """,
        tools: [Codex.Tools.FileSearchTool]
      }
    )

    config = %Config{
      workflow_name: "CodingAssistant",
      stt_settings: %Config.STTSettings{
        model: "gpt-4o-transcribe"
      },
      tts_settings: %Config.TTSSettings{
        voice: :echo,
        model: "gpt-4o-mini-tts"
      }
    }

    Pipeline.start_link(workflow: workflow, config: config)
  end
end

Run voice pipeline examples:

export CODEX_API_KEY=your-key
# or export OPENAI_API_KEY=your-key
mix run examples/voice_pipeline.exs
mix run examples/voice_multi_turn.exs
mix run examples/voice_with_agent.exs

Conclusion

These examples demonstrate the flexibility and power of the Elixir Codex SDK. Key patterns include:

  • Streaming for responsive UIs
  • Structured output for data extraction
  • Multi-turn for complex conversations
  • Error handling for robustness
  • Concurrent execution for performance
  • Production patterns for observability and reliability
  • Realtime voice for bidirectional audio interactions
  • Voice pipeline for STT -> Workflow -> TTS processing

Refer to the API Guide for complete documentation of all functions and types.