This document provides comprehensive examples demonstrating common use cases and patterns for the Elixir Codex SDK. Runnable counterparts live under /examples; each script can be executed with mix run examples/<name>.exs ....
Auth defaults: all examples prefer CODEX_API_KEY (or auth.json OPENAI_API_KEY) when present; otherwise they fall back to your Codex CLI login stored under CODEX_HOME (default ~/.codex).
Table of Contents
- Basic Usage
- Streaming Responses
- Structured Output
- Multi-Turn Conversations
- File Operations
- Command Execution
- Error Handling
- Configuration
- Advanced Patterns
- Production Patterns
- Live Usage & Compaction
- Live Exec Controls
- Live Telemetry Stream
- Additional Live Examples
- App-server Transport
- Realtime Voice Interactions
- Voice Pipeline
Basic Usage
Simple Question and Answer
The most basic usage: ask a question and get a response.
defmodule BasicExample do
def ask_question do
# Start a new thread
{:ok, thread} = Codex.start_thread()
# Run a turn
{:ok, result} = Codex.Thread.run(thread, "What is a GenServer in Elixir?")
# Print the response
case result.final_response do
%Codex.Items.AgentMessage{text: text} ->
IO.puts(text)
_ ->
IO.puts("The turn did not produce a final response.")
end
# Check token usage
IO.puts("\nTokens used:")
IO.puts(" Input: #{result.usage["input_tokens"]}")
IO.puts(" Output: #{result.usage["output_tokens"]}")
IO.puts(" Total: #{result.usage["total_tokens"]}")
end
endExamining All Items
Access all items produced during a turn.
defmodule ItemsExample do
def explore_items do
{:ok, thread} = Codex.start_thread()
{:ok, result} = Codex.Thread.run(
thread,
"Explain how Elixir processes work, and give me a simple example"
)
# Iterate through completed items in the event stream
Enum.each(result.events, fn
%Codex.Events.ItemCompleted{item: %Codex.Items.AgentMessage{text: text}} ->
IO.puts("\n[Agent Message]")
IO.puts(text)
%Codex.Events.ItemCompleted{item: %Codex.Items.Reasoning{text: text}} ->
IO.puts("\n[Reasoning]")
IO.puts(text)
%Codex.Events.ItemCompleted{
item: %Codex.Items.CommandExecution{command: cmd, exit_code: code, status: status}
} ->
IO.puts("\n[Command Execution]")
IO.puts("Command: #{cmd}")
IO.puts("Exit Code: #{inspect(code)} (#{status})")
%Codex.Events.ItemCompleted{item: %Codex.Items.FileChange{changes: changes}} ->
IO.puts("\n[File Changes]")
Enum.each(changes, fn %{path: path, kind: kind} ->
IO.puts(" #{kind}: #{path}")
end)
%Codex.Events.ItemCompleted{item: other} ->
IO.puts("\n[Other Item]")
IO.inspect(other, label: "item")
_ ->
:ok
end)
end
endStreaming Responses
Real-Time Event Processing
Process events as they arrive for responsive UIs.
defmodule StreamingExample do
def stream_response do
{:ok, thread} = Codex.start_thread()
{:ok, stream} = Codex.Thread.run_streamed(
thread,
"Analyze the files in this directory and suggest improvements"
)
# Process events in real-time
Enum.each(stream, fn event ->
case event do
%Codex.Events.ThreadStarted{thread_id: id} ->
IO.puts("Started thread: #{id}")
%Codex.Events.TurnStarted{} ->
IO.puts("Turn started...")
%Codex.Events.ItemStarted{item: item} ->
IO.puts("New item: #{item.type}")
%Codex.Events.ItemUpdated{item: %{type: :command_execution} = cmd} ->
if cmd.status == :in_progress do
IO.write(".") # Progress indicator
end
%Codex.Events.ItemCompleted{item: item} ->
case item do
%{type: :agent_message, text: text} ->
IO.puts("\n\n[Response]")
IO.puts(text)
%{type: :command_execution, command: cmd, exit_code: 0} ->
IO.puts("\n✓ Command succeeded: #{cmd}")
%{type: :file_change, changes: changes} ->
IO.puts("\n[Files Changed]")
Enum.each(changes, fn %{path: path, kind: kind} ->
IO.puts(" #{kind}: #{path}")
end)
_ ->
:ok
end
%Codex.Events.TurnCompleted{usage: usage} ->
IO.puts("\n\nTurn completed!")
IO.puts("Tokens: #{usage.input_tokens + usage.output_tokens}")
_ ->
:ok
end
end)
end
endProgressive Display
Stream text responses character by character (simulated).
defmodule ProgressiveDisplay do
def display_progressively do
{:ok, thread} = Codex.start_thread()
{:ok, stream} = Codex.Thread.run_streamed(
thread,
"Write a short story about a robot learning to code"
)
# Accumulate text and display progressively
stream
|> Stream.filter(fn
%Codex.Events.ItemCompleted{item: %{type: :agent_message}} -> true
_ -> false
end)
|> Enum.each(fn %{item: %{text: text}} ->
# Simulate streaming by printing chunks
text
|> String.graphemes()
|> Enum.each(fn char ->
IO.write(char)
Process.sleep(10) # Adjust for desired speed
end)
end)
IO.puts("\n")
end
endLive Tooling Stream (Codex CLI)
Run a live streamed turn that shows shell and MCP tool events without requiring an API key (uses your Codex CLI login if present):
mix run examples/live_tooling_stream.exs "Summarize this repository and run one quick check"
The example prints started/updated/completed tool events (including arguments and streamed results) and falls back to the last agent message if the final response isn’t explicitly included in turn.completed.
Structured Output
JSON Schema with Validation
Request structured data conforming to a schema.
defmodule StructuredOutputExample do
def analyze_code_quality do
schema = %{
"type" => "object",
"properties" => %{
"overall_score" => %{
"type" => "integer",
"minimum" => 0,
"maximum" => 100
},
"issues" => %{
"type" => "array",
"items" => %{
"type" => "object",
"properties" => %{
"severity" => %{
"type" => "string",
"enum" => ["low", "medium", "high"]
},
"description" => %{"type" => "string"},
"file" => %{"type" => "string"},
"line" => %{"type" => "integer"}
},
"required" => ["severity", "description"]
}
},
"suggestions" => %{
"type" => "array",
"items" => %{"type" => "string"}
}
},
"required" => ["overall_score", "issues", "suggestions"]
}
{:ok, thread} = Codex.start_thread()
turn_opts = %Codex.Turn.Options{output_schema: schema}
{:ok, result} = Codex.Thread.run(
thread,
"Analyze the code quality of this Elixir project",
turn_opts
)
case Codex.Turn.Result.json(result) do
{:ok, data} ->
IO.puts("Overall Score: #{data["overall_score"]}/100")
IO.puts("\nIssues Found:")
Enum.each(data["issues"], fn issue ->
severity = String.upcase(issue["severity"])
IO.puts(" [#{severity}] #{issue["description"]}")
if issue["file"] do
IO.puts(" File: #{issue["file"]}:#{issue["line"]}")
end
end)
IO.puts("\nSuggestions:")
Enum.each(data["suggestions"], fn suggestion ->
IO.puts(" - #{suggestion}")
end)
{:error, _} ->
IO.puts("Failed to parse structured response")
end
end
endresult.final_response.parsed contains the decoded payload whenever the response matches
the requested schema. Codex.Turn.Result.json/1 returns the same data alongside helpful
error metadata.
Using with TypedStruct
Define Elixir structs that match your schema.
defmodule MyApp.CodeAnalysis do
use TypedStruct
typedstruct do
field :overall_score, integer(), enforce: true
field :issues, [issue()], default: []
field :suggestions, [String.t()], default: []
end
typedstruct module: Issue do
field :severity, String.t(), enforce: true
field :description, String.t(), enforce: true
field :file, String.t()
field :line, integer()
end
def schema do
%{
"type" => "object",
"properties" => %{
"overall_score" => %{"type" => "integer", "minimum" => 0, "maximum" => 100},
"issues" => %{
"type" => "array",
"items" => %{
"type" => "object",
"properties" => %{
"severity" => %{"type" => "string", "enum" => ["low", "medium", "high"]},
"description" => %{"type" => "string"},
"file" => %{"type" => "string"},
"line" => %{"type" => "integer"}
},
"required" => ["severity", "description"]
}
},
"suggestions" => %{"type" => "array", "items" => %{"type" => "string"}}
},
"required" => ["overall_score", "issues", "suggestions"]
}
end
def from_map(%{} = data) do
issues = parse_issues(data["issues"])
{:ok, %__MODULE__{
overall_score: data["overall_score"],
issues: issues,
suggestions: data["suggestions"] || []
}}
end
defp parse_issues(issues) when is_list(issues) do
Enum.map(issues, fn issue ->
%Issue{
severity: issue["severity"],
description: issue["description"],
file: issue["file"],
line: issue["line"]
}
end)
end
end
# Usage
{:ok, thread} = Codex.start_thread()
turn_opts = %Codex.Turn.Options{output_schema: MyApp.CodeAnalysis.schema()}
{:ok, result} = Codex.Thread.run(thread, "Analyze code", turn_opts)
case {result.final_response, Codex.Turn.Result.json(result)} do
{%Codex.Items.AgentMessage{parsed: parsed}, {:ok, parsed}} ->
{:ok, analysis} = MyApp.CodeAnalysis.from_map(parsed)
IO.puts("Score: #{analysis.overall_score}")
{_, {:error, reason}} ->
IO.inspect(reason, label: "structured output error")
endMulti-Turn Conversations
Context Retention
Maintain context across multiple turns.
defmodule ConversationExample do
def multi_turn_conversation do
{:ok, thread} = Codex.start_thread()
# Turn 1: Provide context
{:ok, result1} = Codex.Thread.run(
thread,
"I have a bug in my GenServer. It crashes when I send it a {:stop, reason} message."
)
IO.puts("Agent: #{render(result1.final_response)}")
# Turn 2: Ask follow-up (agent remembers previous context)
{:ok, result2} = Codex.Thread.run(
thread,
"Can you show me an example of how to handle that message correctly?"
)
IO.puts("\nAgent: #{render(result2.final_response)}")
# Turn 3: More specific
{:ok, result3} = Codex.Thread.run(
thread,
"What if I want to perform cleanup before stopping?"
)
IO.puts("\nAgent: #{render(result3.final_response)}")
# Save thread_id for later
IO.puts("\nThread ID: #{thread.thread_id}")
end
defp render(%Codex.Items.AgentMessage{text: text}), do: text
defp render(_), do: "(no response produced)"
endResuming Sessions
Resume a previous conversation.
defmodule ResumeExample do
def resume_previous_session(thread_id) do
# Resume the thread
{:ok, thread} = Codex.resume_thread(thread_id)
# Continue the conversation
{:ok, result} = Codex.Thread.run(
thread,
"Can you remind me what we were discussing?"
)
IO.puts(render(result.final_response))
end
def save_and_resume do
# Start thread and have conversation
{:ok, thread} = Codex.start_thread()
{:ok, result1} = Codex.Thread.run(thread, "Remember the number 42")
IO.puts(render(result1.final_response))
# Save thread_id (e.g., to database, file, etc.)
thread_id = thread.thread_id
File.write!("thread_id.txt", thread_id)
# Simulate restart: read thread_id and resume
saved_id = File.read!("thread_id.txt")
{:ok, resumed_thread} = Codex.resume_thread(saved_id)
{:ok, result2} = Codex.Thread.run(resumed_thread, "What number should I remember?")
IO.puts("\nAfter resuming: #{render(result2.final_response)}")
end
defp render(%Codex.Items.AgentMessage{text: text}), do: text
defp render(_), do: "(no response produced)"
endAgent Runner Loop
Automatically follow continuation tokens with the multi-turn runner.
defmodule AgentRunnerExample do
def run do
{:ok, thread} = Codex.start_thread()
Codex.AgentRunner.run(thread, "Plan a checklist",
agent: %{instructions: "Keep responses short"},
run_config: %{max_turns: 3}
)
end
endRun mix run examples/agent_runner_multi_turn.exs for a runnable script.
File Operations
Tracking File Changes
Monitor file modifications made by the agent.
defmodule FileOperationsExample do
def track_file_changes do
{:ok, thread} = Codex.start_thread()
{:ok, stream} = Codex.Thread.run_streamed(
thread,
"Add comprehensive documentation to all modules in lib/"
)
# Track changes
changes = stream
|> Stream.filter(fn
%Codex.Events.ItemCompleted{item: %{type: :file_change}} -> true
_ -> false
end)
|> Enum.map(fn %{item: file_change} ->
Enum.map(file_change.changes, fn change ->
{change.kind, change.path}
end)
end)
|> List.flatten()
# Summarize
IO.puts("File Changes:")
changes
|> Enum.group_by(fn {kind, _} -> kind end)
|> Enum.each(fn {kind, files} ->
IO.puts("\n#{String.upcase(to_string(kind))}:")
Enum.each(files, fn {_, path} ->
IO.puts(" - #{path}")
end)
end)
end
def review_before_apply do
{:ok, thread} = Codex.start_thread()
{:ok, result} = Codex.Thread.run(
thread,
"Refactor the authentication module"
)
# Extract file changes
file_changes = result.items
|> Enum.filter(fn
%{type: :file_change} -> true
_ -> false
end)
# Review each change
Enum.each(file_changes, fn change ->
IO.puts("\nProposed Changes:")
Enum.each(change.changes, fn %{path: path, kind: kind} ->
IO.puts(" #{kind}: #{path}")
# Could display diffs here
if kind == :update do
# Read current file and show diff
# (Would need access to diff from codex-rs)
end
end)
# Prompt user
IO.puts("\nApply these changes? (y/n)")
response = IO.gets("")
if String.trim(response) == "y" do
IO.puts("Changes applied (by codex-rs)")
else
IO.puts("Changes rejected - would need to revert")
end
end)
end
endCommand Execution
Monitoring Commands
Track commands executed by the agent.
defmodule CommandExample do
def monitor_commands do
{:ok, thread} = Codex.start_thread()
{:ok, stream} = Codex.Thread.run_streamed(
thread,
"List the top-level files and print the working directory"
)
# Collect all commands
commands = stream
|> Stream.filter(fn
%Codex.Events.ItemCompleted{item: %{type: :command_execution}} -> true
_ -> false
end)
|> Enum.map(fn %{item: cmd} -> cmd end)
# Display results
IO.puts("Commands Executed:")
Enum.each(commands, fn cmd ->
status_icon = case cmd.status do
:completed when cmd.exit_code == 0 -> "✓"
:completed -> "✗"
:failed -> "✗"
_ -> "?"
end
IO.puts("\n#{status_icon} #{cmd.command}")
IO.puts(" Exit Code: #{cmd.exit_code || "N/A"}")
if cmd.aggregated_output != "" do
IO.puts(" Output:")
cmd.aggregated_output
|> String.split("\n")
|> Enum.take(5) # First 5 lines
|> Enum.each(fn line ->
IO.puts(" #{line}")
end)
end
end)
end
endError Handling
Graceful Error Recovery
Handle errors gracefully in production.
defmodule ErrorHandlingExample do
require Logger
def robust_turn(thread, input, opts \\ %Codex.Turn.Options{}) do
case Codex.Thread.run(thread, input, opts) do
{:ok, result} ->
{:ok, result}
{:error, {:turn_failed, error}} ->
Logger.error("Turn failed: #{error.message}")
handle_turn_failure(error)
{:error, {:process, reason}} ->
Logger.error("Process error: #{inspect(reason)}")
{:error, :process_error}
{:error, {:config, reason}} ->
Logger.error("Configuration error: #{inspect(reason)}")
{:error, :config_error}
{:error, reason} ->
Logger.error("Unknown error: #{inspect(reason)}")
{:error, :unknown}
end
end
defp handle_turn_failure(error) do
# Could implement retry logic, fallback, etc.
if retryable?(error) do
Logger.info("Error is retryable, consider retry logic")
end
{:error, :turn_failed}
end
defp retryable?(error) do
# Check if error message indicates transient issue
error.message =~ ~r/(rate limit|timeout|temporarily unavailable)/i
end
def with_retry(thread, input, max_attempts \\ 3) do
do_with_retry(thread, input, max_attempts, 1)
end
defp do_with_retry(thread, input, max_attempts, attempt) do
case Codex.Thread.run(thread, input) do
{:ok, result} ->
{:ok, result}
{:error, {:turn_failed, error}} when attempt < max_attempts ->
if retryable?(error) do
Logger.info("Retry attempt #{attempt + 1}/#{max_attempts}")
# Exponential backoff
Process.sleep(1000 * :math.pow(2, attempt - 1) |> round())
do_with_retry(thread, input, max_attempts, attempt + 1)
else
{:error, {:turn_failed, error}}
end
error ->
error
end
end
endValidation and Sanitization
Validate inputs and outputs.
defmodule ValidationExample do
def safe_run(thread, input) do
with :ok <- validate_input(input),
{:ok, result} <- Codex.Thread.run(thread, input),
:ok <- validate_result(result) do
{:ok, result}
end
end
defp validate_input(input) do
cond do
!is_binary(input) ->
{:error, :input_must_be_string}
String.length(input) == 0 ->
{:error, :input_cannot_be_empty}
String.length(input) > 100_000 ->
{:error, :input_too_long}
true ->
:ok
end
end
defp validate_result(result) do
cond do
!result.usage ->
{:error, :missing_usage}
result.usage.input_tokens + result.usage.output_tokens == 0 ->
{:error, :no_tokens_used}
true ->
:ok
end
end
endConfiguration
Environment-Based Configuration
Configure based on environment.
The default model is gpt-5.3-codex (unless overridden by CODEX_MODEL, OPENAI_DEFAULT_MODEL, or CODEX_MODEL_DEFAULT) and remote model metadata is gated behind features.remote_models = true in the effective Codex config (system /etc/codex/config.toml, user $CODEX_HOME/config.toml, and .codex/config.toml layers between cwd and the project root; root markers default to .git and are configurable via project_root_markers). When enabled, the SDK merges the remote /models list (or bundled models.json) with local presets.
defmodule MyApp.Codex do
def start_thread do
codex_opts = %Codex.Options{
api_key: api_key(),
base_url: base_url(),
codex_path_override: codex_path()
}
thread_opts = %Codex.Thread.Options{
model: model(),
sandbox_mode: sandbox_mode(),
working_directory: working_directory()
}
Codex.start_thread(codex_opts, thread_opts)
end
defp api_key do
System.get_env("CODEX_API_KEY") ||
Application.get_env(:my_app, :codex_api_key)
end
defp base_url do
Application.get_env(:my_app, :codex_base_url)
end
defp codex_path do
Application.get_env(:my_app, :codex_path)
end
defp model do
Application.get_env(:my_app, :codex_model, "o1")
end
defp sandbox_mode do
case Mix.env() do
:prod -> :read_only
_ -> :workspace_write
end
end
defp working_directory do
File.cwd!()
end
endPer-Request Configuration
Override configuration per request.
defmodule ConfigExample do
def analyze_with_different_models(input) do
models = ["gpt-4", "o1", "gpt-4-turbo"]
results = Enum.map(models, fn model ->
thread_opts = %Codex.Thread.Options{model: model}
{:ok, thread} = Codex.start_thread(%Codex.Options{}, thread_opts)
{:ok, result} = Codex.Thread.run(thread, input)
{model, result}
end)
# Compare results
Enum.each(results, fn {model, result} ->
IO.puts("\n#{model}:")
IO.puts(String.slice(result.final_response, 0..200))
IO.puts("Tokens: #{result.usage.input_tokens + result.usage.output_tokens}")
end)
end
endYou can also pass CLI config overrides and shell environment policy entries via thread options:
{:ok, thread_opts} =
Codex.Thread.Options.new(
model_provider: "mistral",
base_instructions: "Keep answers brief.",
shell_environment_policy: %{
inherit: "core",
exclude: ["AWS_*"]
},
config_overrides: %{
"model_reasoning_summary" => "concise"
}
)
{:ok, thread} = Codex.start_thread(%Codex.Options{}, thread_opts)Options-level global overrides are also supported and are emitted before derived/thread/turn overrides (later layers win):
{:ok, codex_opts} =
Codex.Options.new(
config: %{"model_reasoning_summary" => "concise"}
)
{:ok, thread_opts} =
Codex.Thread.Options.new(
config_overrides: %{"model_reasoning_summary" => "detailed"}
)
{:ok, thread} = Codex.start_thread(codex_opts, thread_opts)
# Turn-level override wins last:
{:ok, _result} =
Codex.Thread.run(thread, "Reply with exactly ok", %{
config_overrides: %{"model_reasoning_summary" => "none"}
})Advanced Patterns
Concurrent Turns
Execute multiple turns concurrently.
defmodule ConcurrentExample do
def parallel_analysis(files) do
tasks = Enum.map(files, fn file ->
Task.async(fn ->
{:ok, thread} = Codex.start_thread()
{:ok, result} = Codex.Thread.run(thread, "Analyze #{file}")
{file, result}
end)
end)
results = Task.await_many(tasks, 60_000)
Enum.each(results, fn {file, result} ->
preview =
case result.final_response do
%Codex.Items.AgentMessage{text: text} -> String.slice(text, 0..200)
_ -> "(no response produced)"
end
IO.puts("\n#{file}:")
IO.puts(preview)
end)
end
def map_reduce_pattern(items) do
# Map: Process each item concurrently
tasks = Enum.map(items, fn item ->
Task.async(fn ->
{:ok, thread} = Codex.start_thread()
{:ok, result} = Codex.Thread.run(thread, "Process #{item}")
case result.final_response do
%Codex.Items.AgentMessage{text: text} -> text
_ -> ""
end
end)
end)
responses = Task.await_many(tasks, 60_000)
# Reduce: Combine results
{:ok, thread} = Codex.start_thread()
summary_prompt = """
Summarize these analyses:
#{Enum.join(responses, "\n\n---\n\n")}
"""
{:ok, result} = Codex.Thread.run(thread, summary_prompt)
case result.final_response do
%Codex.Items.AgentMessage{text: text} ->
IO.puts("Summary:")
IO.puts(text)
_ ->
IO.puts("Summary not available")
end
end
endAgent Collaboration
Multiple agents working together.
defmodule CollaborationExample do
def collaborative_code_review(file) do
# Analyzer agent
{:ok, analyzer} = Codex.start_thread()
{:ok, analysis} = Codex.Thread.run(
analyzer,
"Analyze #{file} for potential issues"
)
# Security expert agent
{:ok, security} = Codex.start_thread()
{:ok, security_review} = Codex.Thread.run(
security,
"""
Review this code for security issues:
Analysis: #{analysis.final_response.text}
"""
)
# Performance expert agent
{:ok, performance} = Codex.start_thread()
{:ok, perf_review} = Codex.Thread.run(
performance,
"""
Review this code for performance issues:
Analysis: #{analysis.final_response.text}
"""
)
# Synthesizer agent
{:ok, synthesizer} = Codex.start_thread()
synthesis_prompt = """
Synthesize these reviews into actionable recommendations:
Security Review:
#{security_review.final_response.text}
Performance Review:
#{perf_review.final_response.text}
"""
{:ok, final} = Codex.Thread.run(synthesizer, synthesis_prompt)
case final.final_response do
%Codex.Items.AgentMessage{text: text} ->
IO.puts("Final Recommendations:")
IO.puts(text)
_ ->
IO.puts("No recommendations produced.")
end
end
endTool Output Bridging and Auto-Run (Transport Notes)
The SDK includes a tool registry and can collect structured tool outputs during auto-run cycles.
However, the default transport (codex exec --json) does not currently expose a
supported mechanism to inject tool outputs back into the CLI. Treat the example below as a
conceptual demo until a tool-capable transport (e.g. MCP-hosted tools or protocol/app-server) is
adopted.
defmodule ToolBridgingExample do
def run_with_registered_tool do
defmodule MathTool do
use Codex.Tool, name: "math_tool", description: "adds two numbers"
@impl true
def invoke(%{"x" => x, "y" => y}, _context), do: {:ok, %{"sum" => x + y}}
end
{:ok, _handle} = Codex.Tools.register(MathTool)
{:ok, thread} =
Codex.start_thread(approval_policy: Codex.Approvals.StaticPolicy.allow())
{:ok, result} =
Codex.Thread.run_auto(thread, "Ask the math tool to add 4 and 5", max_attempts: 2)
IO.inspect(result.raw[:tool_outputs], label: "tool outputs captured by SDK")
IO.inspect(result.thread.pending_tool_outputs, label: "pending outputs after turn")
end
endStreaming with State Accumulation
Build up state while streaming.
defmodule StatefulStreamingExample do
def accumulate_state do
{:ok, thread} = Codex.start_thread()
{:ok, stream} = Codex.Thread.run_streamed(thread, "Implement a new feature")
final_state = Enum.reduce(stream, initial_state(), fn event, state ->
case event do
%Codex.Events.ThreadStarted{thread_id: id} ->
%{state | thread_id: id}
%Codex.Events.ItemCompleted{item: %Codex.Items.CommandExecution{} = cmd} ->
%{state | commands: [cmd | state.commands]}
%Codex.Events.ItemCompleted{item: %Codex.Items.FileChange{} = file} ->
%{state | files: [file | state.files]}
%Codex.Events.ItemCompleted{item: %Codex.Items.AgentMessage{text: text}} ->
%{state | messages: [text | state.messages]}
%Codex.Events.TurnCompleted{usage: usage} when is_map(usage) ->
%{state | usage: usage}
_ ->
state
end
end)
display_summary(final_state)
end
defp initial_state do
%{
thread_id: nil,
commands: [],
files: [],
messages: [],
usage: nil
}
end
defp display_summary(state) do
IO.puts("Turn Summary:")
IO.puts(" Thread ID: #{state.thread_id}")
IO.puts(" Commands: #{length(state.commands)}")
IO.puts(" Files Changed: #{count_file_changes(state.files)}")
IO.puts(" Messages: #{length(state.messages)}")
if state.usage do
IO.puts(" Tokens: #{state.usage.input_tokens + state.usage.output_tokens}")
end
end
defp count_file_changes(files) do
files
|> Enum.flat_map(fn file -> file.changes end)
|> length()
end
endProduction Patterns
Telemetry Integration
Monitor SDK usage in production.
defmodule MyApp.TelemetryHandler do
require Logger
def setup do
:telemetry.attach_many(
"my-app-codex-handler",
[
[:codex, :turn, :start],
[:codex, :turn, :stop],
[:codex, :turn, :exception]
],
&handle_event/4,
nil
)
end
def handle_event([:codex, :turn, :start], measurements, metadata, _config) do
Logger.info("Codex turn started", thread_id: metadata.thread_id)
# Could send to metrics system
:telemetry.execute(
[:my_app, :codex, :turn, :count],
%{count: 1},
metadata
)
end
def handle_event([:codex, :turn, :stop], measurements, metadata, _config) do
Logger.info("Codex turn completed",
thread_id: metadata.thread_id,
duration_ms: measurements.duration / 1_000_000,
tokens: metadata.usage.input_tokens + metadata.usage.output_tokens
)
# Send metrics
:telemetry.execute(
[:my_app, :codex, :turn, :duration],
%{duration: measurements.duration},
metadata
)
:telemetry.execute(
[:my_app, :codex, :tokens],
%{
input: metadata.usage.input_tokens,
output: metadata.usage.output_tokens,
total: metadata.usage.input_tokens + metadata.usage.output_tokens
},
metadata
)
end
def handle_event([:codex, :turn, :exception], measurements, metadata, _config) do
Logger.error("Codex turn failed",
thread_id: metadata.thread_id,
error: metadata.error,
duration_ms: measurements.duration / 1_000_000
)
# Alert on failures
:telemetry.execute(
[:my_app, :codex, :turn, :error],
%{count: 1},
metadata
)
end
end
# In application.ex
def start(_type, _args) do
MyApp.TelemetryHandler.setup()
# ... rest of supervision tree
endRate Limiting
Implement rate limiting for API calls.
defmodule MyApp.RateLimiter do
use GenServer
def start_link(_) do
GenServer.start_link(__MODULE__, %{}, name: __MODULE__)
end
def acquire do
GenServer.call(__MODULE__, :acquire)
end
@impl true
def init(_) do
state = %{
tokens: 10,
last_refill: System.monotonic_time(:second)
}
schedule_refill()
{:ok, state}
end
@impl true
def handle_call(:acquire, _from, state) do
state = refill_tokens(state)
if state.tokens > 0 do
{:reply, :ok, %{state | tokens: state.tokens - 1}}
else
{:reply, {:error, :rate_limited}, state}
end
end
@impl true
def handle_info(:refill, state) do
schedule_refill()
{:noreply, %{state | tokens: 10}}
end
defp refill_tokens(state) do
now = System.monotonic_time(:second)
elapsed = now - state.last_refill
if elapsed >= 60 do
%{state | tokens: 10, last_refill: now}
else
state
end
end
defp schedule_refill do
Process.send_after(self(), :refill, 60_000)
end
end
# Usage
defmodule MyApp.Codex do
def run(thread, input) do
case MyApp.RateLimiter.acquire() do
:ok ->
Codex.Thread.run(thread, input)
{:error, :rate_limited} ->
{:error, :rate_limited}
end
end
endSupervised Turn Execution
Run turns under supervision.
defmodule MyApp.TurnSupervisor do
use Supervisor
def start_link(init_arg) do
Supervisor.start_link(__MODULE__, init_arg, name: __MODULE__)
end
@impl true
def init(_init_arg) do
children = [
{Task.Supervisor, name: MyApp.TurnTaskSupervisor}
]
Supervisor.init(children, strategy: :one_for_one)
end
def run_supervised(thread, input, opts \\ %{}) do
Task.Supervisor.async(MyApp.TurnTaskSupervisor, fn ->
Codex.Thread.run(thread, input, opts)
end)
|> Task.await()
end
endLive Usage & Compaction
examples/live_usage_and_compaction.exs runs against the live Codex backend (requires CODEX_API_KEY or a CLI login) and mirrors the latest defaults:
- Uses the SDK default model (
gpt-5.3-codex) and reasoning effort. - Streams events, printing token-usage deltas and turn diffs as they arrive.
- Captures explicit compaction notifications (including usage deltas) and merges them into the displayed token totals.
- Prints the final agent response alongside merged usage.
Run it with:
mix run examples/live_usage_and_compaction.exs "summarize recent changes"
Note: examples/run_all.sh exports CODEX_MODEL=gpt-5.3-codex by default, and examples/conversation_and_resume.exs, examples/live_session_walkthrough.exs, and examples/live_mcp_and_sessions.exs also explicitly set model: "gpt-5.3-codex". Update those scripts if you want a different model.
Live Exec Controls
examples/live_exec_controls.exs streams against the live Codex CLI while forwarding per-turn env,
cancellation tokens, and custom timeouts to codex exec. Use it to validate env injection for
tooling or to wire cancellation tokens from upstream request lifecycles.
mix run examples/live_exec_controls.exs \
"List three repo files and echo \$CODEX_DEMO_ENV"
Events with requires_approval: false bypass approval hooks automatically; only flagged operations
invoke your configured approval policy or hook.
Overrides remain available:
# Custom env, timeout, and cancellation token
mix run examples/live_exec_controls.exs \
"List three repo files and echo \$CODEX_DEMO_ENV" \
--env CODEX_DEMO_ENV=from-docs \
--timeout-ms 45000 \
--cancel demo-token-123
If your Codex CLI is older and does not yet support --cancellation-token, rerun without
--cancel or upgrade via npm install -g @openai/codex.
Live Telemetry Stream
examples/live_telemetry_stream.exs attaches telemetry handlers to the live Codex CLI so you can observe thread/turn identifiers, source metadata, token-usage deltas, diff updates, and compaction savings as they stream in. It uses a low reasoning effort and a short default prompt to return quickly.
mix run examples/live_telemetry_stream.exs
Auth falls back to your Codex CLI login when CODEX_API_KEY is not set.
Additional Live Examples
examples/live_collaboration_modes.exs— lists collaboration presets and runs a turnexamples/live_personality.exs— compares friendly, pragmatic, and none personality overrides (including app-server:none)examples/live_config_overrides.exs— nested config override auto-flattening (thread and turn level)examples/live_options_config_overrides.exs— options-level global config overrides, precedence, and validationexamples/live_thread_management.exs— demonstrates thread read/fork/rollback/loaded listexamples/live_web_search_modes.exs— toggles web search modes and reports web search items (explicit disable emitsweb_search="disabled")examples/live_rate_limits.exs— prints rate limit snapshots from token usage events
App-server Transport
App-server (codex app-server) is a stateful, bidirectional transport that unlocks upstream v2 APIs (threads list/archive, skills/models/config, server-driven approvals, etc.).
See guides/05-app-server-transport.md for the complete guide, and run the live scripts:
mix run examples/live_app_server_basic.exs
mix run examples/live_app_server_streaming.exs "Reply with exactly ok and nothing else."
mix run examples/live_app_server_approvals.exs
mix run examples/live_collaboration_modes.exs
mix run examples/live_personality.exs
mix run examples/live_thread_management.exs
Minimal usage with existing thread APIs:
{:ok, codex_opts} = Codex.Options.new(%{})
{:ok, conn} = Codex.AppServer.connect(codex_opts)
{:ok, thread} =
Codex.start_thread(codex_opts, %{
transport: {:app_server, conn},
working_directory: File.cwd!()
})
{:ok, result} = Codex.Thread.run(thread, "List the available skills for this repo")
IO.inspect(result.final_response, label: "final_response")Custom prompts and skills helpers
{:ok, prompts} = Codex.Prompts.list()
{:ok, expanded} = Codex.Prompts.expand(Enum.at(prompts, 0), "FILE=lib/app.ex")
{:ok, %{"data" => skills}} = Codex.Skills.list(conn, skills_enabled: true)
first_skill = skills |> List.first() |> Map.get("skills") |> List.first()
{:ok, skill_body} = Codex.Skills.load(first_skill, skills_enabled: true)Realtime Voice Interactions
Basic Realtime Session
Set up a bidirectional voice session using the OpenAI Realtime API:
defmodule RealtimeExample do
alias Codex.Realtime
alias Codex.Realtime.Config.{RunConfig, SessionModelSettings, TurnDetectionConfig}
def start_session do
# Create a realtime agent
agent = Realtime.agent(
name: "VoiceAssistant",
instructions: "You are a helpful voice assistant."
)
# Configure session with voice and turn detection
config = %RunConfig{
model_settings: %SessionModelSettings{
voice: "alloy",
turn_detection: %TurnDetectionConfig{
type: :semantic_vad,
eagerness: :medium
}
}
}
# Start the session
{:ok, session} = Realtime.start_session(agent, config)
# Subscribe to events
Realtime.subscribe(session, self())
# Session is now ready for audio
session
end
def handle_events do
receive do
{:realtime_event, %Codex.Realtime.Events.RealtimeAudioEvent{} = event} ->
# Handle audio from the agent
play_audio(event.audio)
handle_events()
{:realtime_event, %Codex.Realtime.Events.RealtimeAgentStateEvent{state: state}} ->
IO.puts("Agent state: #{state}")
handle_events()
{:realtime_event, event} ->
IO.inspect(event, label: "Event")
handle_events()
after
30_000 -> :timeout
end
end
endRealtime with Function Tools
Add function calling capabilities to realtime agents:
defmodule RealtimeToolsExample do
alias Codex.Realtime
def create_agent_with_tools do
Realtime.agent(
name: "WeatherAssistant",
instructions: "Help users check the weather.",
tools: [
%{
name: "get_weather",
description: "Get weather for a location",
parameters: %{
type: "object",
properties: %{
location: %{type: "string", description: "City name"}
},
required: ["location"]
}
}
]
)
end
endMulti-Agent Handoffs
Hand off conversations between specialized agents:
defmodule RealtimeHandoffsExample do
alias Codex.Realtime
def create_agents do
greeter = Realtime.agent(
name: "Greeter",
instructions: "Greet users and hand off to specialists."
)
specialist = Realtime.agent(
name: "TechSupport",
instructions: "Provide technical assistance."
)
# Configure greeter to hand off to specialist
greeter_with_handoff = Realtime.add_handoff(greeter, specialist,
condition: "When user needs technical help"
)
{greeter_with_handoff, specialist}
end
endRun realtime examples:
export CODEX_API_KEY=your-key
# or export OPENAI_API_KEY=your-key
mix run examples/realtime_basic.exs
mix run examples/realtime_tools.exs
mix run examples/realtime_handoffs.exs
mix run examples/live_realtime_voice.exs
Voice Pipeline
Basic STT -> Workflow -> TTS Pipeline
Process audio through speech-to-text, custom workflow, and text-to-speech:
defmodule VoicePipelineExample do
alias Codex.Voice.{Pipeline, SimpleWorkflow, Config}
alias Codex.Voice.Config.TTSSettings
alias Codex.Voice.Input.AudioInput
def run_pipeline(audio_data) do
# Create a simple workflow
workflow = SimpleWorkflow.new(
fn transcribed_text ->
# Process the transcribed text
response = "I heard you say: #{transcribed_text}"
[response]
end,
greeting: "Hello! How can I help you?"
)
# Configure the pipeline
config = %Config{
workflow_name: "SimpleVoice",
tts_settings: %TTSSettings{voice: :nova}
}
# Create and start the pipeline
{:ok, pipeline} = Pipeline.start_link(workflow: workflow, config: config)
# Create audio input
input = AudioInput.new(audio_data, format: :wav)
# Run the pipeline
{:ok, result} = Pipeline.run(pipeline, input)
# Collect audio output
result
|> Enum.filter(fn
%Codex.Voice.Events.VoiceStreamEventAudio{} -> true
_ -> false
end)
|> Enum.map(& &1.data)
|> IO.iodata_to_binary()
end
endMulti-Turn Voice Conversations
Maintain conversation context across multiple turns:
defmodule VoiceMultiTurnExample do
alias Codex.Voice.{Pipeline, AgentWorkflow, Config}
alias Codex.Voice.Input.StreamedAudioInput
def create_conversational_pipeline do
# Use AgentWorkflow for multi-turn conversations
workflow = AgentWorkflow.new(
agent: %{
instructions: "You are a helpful assistant. Remember context from earlier in the conversation."
}
)
config = %Config{
workflow_name: "ConversationalAssistant",
tts_settings: %Config.TTSSettings{voice: :alloy}
}
{:ok, pipeline} = Pipeline.start_link(
workflow: workflow,
config: config
)
pipeline
end
def stream_conversation(pipeline, audio_stream) do
# Create streaming input
input = StreamedAudioInput.new()
# Start processing
{:ok, result_stream} = Pipeline.run_streamed(pipeline, input)
# Feed audio chunks
Task.start(fn ->
for chunk <- audio_stream do
StreamedAudioInput.push(input, chunk)
end
StreamedAudioInput.close(input)
end)
# Process results
result_stream
end
endVoice with Codex Agent
Integrate the voice pipeline with a full Codex agent:
defmodule VoiceWithAgentExample do
alias Codex.Voice.{Pipeline, AgentWorkflow, Config}
def create_agent_voice_pipeline do
# Create workflow backed by Codex.Agent
workflow = AgentWorkflow.new(
agent: %{
instructions: """
You are a coding assistant accessible via voice.
When asked about code, provide clear explanations.
""",
tools: [Codex.Tools.FileSearchTool]
}
)
config = %Config{
workflow_name: "CodingAssistant",
stt_settings: %Config.STTSettings{
model: "gpt-4o-transcribe"
},
tts_settings: %Config.TTSSettings{
voice: :echo,
model: "gpt-4o-mini-tts"
}
}
Pipeline.start_link(workflow: workflow, config: config)
end
endRun voice pipeline examples:
export CODEX_API_KEY=your-key
# or export OPENAI_API_KEY=your-key
mix run examples/voice_pipeline.exs
mix run examples/voice_multi_turn.exs
mix run examples/voice_with_agent.exs
Conclusion
These examples demonstrate the flexibility and power of the Elixir Codex SDK. Key patterns include:
- Streaming for responsive UIs
- Structured output for data extraction
- Multi-turn for complex conversations
- Error handling for robustness
- Concurrent execution for performance
- Production patterns for observability and reliability
- Realtime voice for bidirectional audio interactions
- Voice pipeline for STT -> Workflow -> TTS processing
Refer to the API Guide for complete documentation of all functions and types.