Examples and Usage Patterns

View Source

This document provides comprehensive examples demonstrating common use cases and patterns for the Elixir Codex SDK.

Table of Contents

  1. Basic Usage
  2. Streaming Responses
  3. Structured Output
  4. Multi-Turn Conversations
  5. File Operations
  6. Command Execution
  7. Error Handling
  8. Configuration
  9. Advanced Patterns
  10. Production Patterns

Basic Usage

Simple Question and Answer

The most basic usage: ask a question and get a response.

defmodule BasicExample do
  def ask_question do
    # Start a new thread
    {:ok, thread} = Codex.start_thread()

    # Run a turn
    {:ok, result} = Codex.Thread.run(thread, "What is a GenServer in Elixir?")

    # Print the response
    IO.puts(result.final_response)

    # Check token usage
    IO.puts("\nTokens used:")
    IO.puts("  Input: #{result.usage.input_tokens}")
    IO.puts("  Output: #{result.usage.output_tokens}")
    IO.puts("  Total: #{result.usage.input_tokens + result.usage.output_tokens}")
  end
end

Examining All Items

Access all items produced during a turn.

defmodule ItemsExample do
  def explore_items do
    {:ok, thread} = Codex.start_thread()

    {:ok, result} = Codex.Thread.run(
      thread,
      "Explain how Elixir processes work, and give me a simple example"
    )

    # Iterate through all items
    Enum.each(result.items, fn item ->
      case item do
        %Codex.Items.AgentMessage{text: text} ->
          IO.puts("\n[Agent Message]")
          IO.puts(text)

        %Codex.Items.Reasoning{text: text} ->
          IO.puts("\n[Reasoning]")
          IO.puts(text)

        %Codex.Items.CommandExecution{command: cmd, exit_code: code} ->
          IO.puts("\n[Command Execution]")
          IO.puts("Command: #{cmd}")
          IO.puts("Exit Code: #{code}")

        _ ->
          IO.puts("\n[Other Item: #{item.type}]")
      end
    end)
  end
end

Streaming Responses

Real-Time Event Processing

Process events as they arrive for responsive UIs.

defmodule StreamingExample do
  def stream_response do
    {:ok, thread} = Codex.start_thread()

    {:ok, stream} = Codex.Thread.run_streamed(
      thread,
      "Analyze the files in this directory and suggest improvements"
    )

    # Process events in real-time
    Enum.each(stream, fn event ->
      case event do
        %Codex.Events.ThreadStarted{thread_id: id} ->
          IO.puts("Started thread: #{id}")

        %Codex.Events.TurnStarted{} ->
          IO.puts("Turn started...")

        %Codex.Events.ItemStarted{item: item} ->
          IO.puts("New item: #{item.type}")

        %Codex.Events.ItemUpdated{item: %{type: :command_execution} = cmd} ->
          if cmd.status == :in_progress do
            IO.write(".")  # Progress indicator
          end

        %Codex.Events.ItemCompleted{item: item} ->
          case item do
            %{type: :agent_message, text: text} ->
              IO.puts("\n\n[Response]")
              IO.puts(text)

            %{type: :command_execution, command: cmd, exit_code: 0} ->
              IO.puts("\n✓ Command succeeded: #{cmd}")

            %{type: :file_change, changes: changes} ->
              IO.puts("\n[Files Changed]")
              Enum.each(changes, fn %{path: path, kind: kind} ->
                IO.puts("  #{kind}: #{path}")
              end)

            _ ->
              :ok
          end

        %Codex.Events.TurnCompleted{usage: usage} ->
          IO.puts("\n\nTurn completed!")
          IO.puts("Tokens: #{usage.input_tokens + usage.output_tokens}")

        _ ->
          :ok
      end
    end)
  end
end

Progressive Display

Stream text responses character by character (simulated).

defmodule ProgressiveDisplay do
  def display_progressively do
    {:ok, thread} = Codex.start_thread()

    {:ok, stream} = Codex.Thread.run_streamed(
      thread,
      "Write a short story about a robot learning to code"
    )

    # Accumulate text and display progressively
    stream
    |> Stream.filter(fn
      %Codex.Events.ItemCompleted{item: %{type: :agent_message}} -> true
      _ -> false
    end)
    |> Enum.each(fn %{item: %{text: text}} ->
      # Simulate streaming by printing chunks
      text
      |> String.graphemes()
      |> Enum.each(fn char ->
        IO.write(char)
        Process.sleep(10)  # Adjust for desired speed
      end)
    end)

    IO.puts("\n")
  end
end

Structured Output

JSON Schema with Validation

Request structured data conforming to a schema.

defmodule StructuredOutputExample do
  def analyze_code_quality do
    schema = %{
      "type" => "object",
      "properties" => %{
        "overall_score" => %{
          "type" => "integer",
          "minimum" => 0,
          "maximum" => 100
        },
        "issues" => %{
          "type" => "array",
          "items" => %{
            "type" => "object",
            "properties" => %{
              "severity" => %{
                "type" => "string",
                "enum" => ["low", "medium", "high"]
              },
              "description" => %{"type" => "string"},
              "file" => %{"type" => "string"},
              "line" => %{"type" => "integer"}
            },
            "required" => ["severity", "description"]
          }
        },
        "suggestions" => %{
          "type" => "array",
          "items" => %{"type" => "string"}
        }
      },
      "required" => ["overall_score", "issues", "suggestions"]
    }

    {:ok, thread} = Codex.start_thread()

    turn_opts = %Codex.Turn.Options{output_schema: schema}

    {:ok, result} = Codex.Thread.run(
      thread,
      "Analyze the code quality of this Elixir project",
      turn_opts
    )

    # Parse JSON response
    case Jason.decode(result.final_response) do
      {:ok, data} ->
        IO.puts("Overall Score: #{data["overall_score"]}/100")

        IO.puts("\nIssues Found:")
        Enum.each(data["issues"], fn issue ->
          severity = String.upcase(issue["severity"])
          IO.puts("  [#{severity}] #{issue["description"]}")
          if issue["file"] do
            IO.puts("    File: #{issue["file"]}:#{issue["line"]}")
          end
        end)

        IO.puts("\nSuggestions:")
        Enum.each(data["suggestions"], fn suggestion ->
          IO.puts("  - #{suggestion}")
        end)

      {:error, _} ->
        IO.puts("Failed to parse JSON response")
    end
  end
end

Using with TypedStruct

Define Elixir structs that match your schema.

defmodule MyApp.CodeAnalysis do
  use TypedStruct

  typedstruct do
    field :overall_score, integer(), enforce: true
    field :issues, [issue()], default: []
    field :suggestions, [String.t()], default: []
  end

  typedstruct module: Issue do
    field :severity, String.t(), enforce: true
    field :description, String.t(), enforce: true
    field :file, String.t()
    field :line, integer()
  end

  def schema do
    %{
      "type" => "object",
      "properties" => %{
        "overall_score" => %{"type" => "integer", "minimum" => 0, "maximum" => 100},
        "issues" => %{
          "type" => "array",
          "items" => %{
            "type" => "object",
            "properties" => %{
              "severity" => %{"type" => "string", "enum" => ["low", "medium", "high"]},
              "description" => %{"type" => "string"},
              "file" => %{"type" => "string"},
              "line" => %{"type" => "integer"}
            },
            "required" => ["severity", "description"]
          }
        },
        "suggestions" => %{"type" => "array", "items" => %{"type" => "string"}}
      },
      "required" => ["overall_score", "issues", "suggestions"]
    }
  end

  def from_json(json_string) do
    with {:ok, data} <- Jason.decode(json_string),
         issues <- parse_issues(data["issues"]) do
      {:ok, %__MODULE__{
        overall_score: data["overall_score"],
        issues: issues,
        suggestions: data["suggestions"] || []
      }}
    end
  end

  defp parse_issues(issues) when is_list(issues) do
    Enum.map(issues, fn issue ->
      %Issue{
        severity: issue["severity"],
        description: issue["description"],
        file: issue["file"],
        line: issue["line"]
      }
    end)
  end
end

# Usage
{:ok, thread} = Codex.start_thread()
turn_opts = %Codex.Turn.Options{output_schema: MyApp.CodeAnalysis.schema()}
{:ok, result} = Codex.Thread.run(thread, "Analyze code", turn_opts)

{:ok, analysis} = MyApp.CodeAnalysis.from_json(result.final_response)
IO.puts("Score: #{analysis.overall_score}")

Multi-Turn Conversations

Context Retention

Maintain context across multiple turns.

defmodule ConversationExample do
  def multi_turn_conversation do
    {:ok, thread} = Codex.start_thread()

    # Turn 1: Provide context
    {:ok, result1} = Codex.Thread.run(
      thread,
      "I have a bug in my GenServer. It crashes when I send it a {:stop, reason} message."
    )

    IO.puts("Agent: #{result1.final_response}")

    # Turn 2: Ask follow-up (agent remembers previous context)
    {:ok, result2} = Codex.Thread.run(
      thread,
      "Can you show me an example of how to handle that message correctly?"
    )

    IO.puts("\nAgent: #{result2.final_response}")

    # Turn 3: More specific
    {:ok, result3} = Codex.Thread.run(
      thread,
      "What if I want to perform cleanup before stopping?"
    )

    IO.puts("\nAgent: #{result3.final_response}")

    # Save thread_id for later
    IO.puts("\nThread ID: #{thread.thread_id}")
  end
end

Resuming Sessions

Resume a previous conversation.

defmodule ResumeExample do
  def resume_previous_session(thread_id) do
    # Resume the thread
    {:ok, thread} = Codex.resume_thread(thread_id)

    # Continue the conversation
    {:ok, result} = Codex.Thread.run(
      thread,
      "Can you remind me what we were discussing?"
    )

    IO.puts(result.final_response)
  end

  def save_and_resume do
    # Start thread and have conversation
    {:ok, thread} = Codex.start_thread()

    {:ok, result1} = Codex.Thread.run(thread, "Remember the number 42")
    IO.puts(result1.final_response)

    # Save thread_id (e.g., to database, file, etc.)
    thread_id = thread.thread_id
    File.write!("thread_id.txt", thread_id)

    # Simulate restart: read thread_id and resume
    saved_id = File.read!("thread_id.txt")
    {:ok, resumed_thread} = Codex.resume_thread(saved_id)

    {:ok, result2} = Codex.Thread.run(resumed_thread, "What number should I remember?")
    IO.puts("\nAfter resuming: #{result2.final_response}")
  end
end

File Operations

Tracking File Changes

Monitor file modifications made by the agent.

defmodule FileOperationsExample do
  def track_file_changes do
    {:ok, thread} = Codex.start_thread()

    {:ok, stream} = Codex.Thread.run_streamed(
      thread,
      "Add comprehensive documentation to all modules in lib/"
    )

    # Track changes
    changes = stream
      |> Stream.filter(fn
        %Codex.Events.ItemCompleted{item: %{type: :file_change}} -> true
        _ -> false
      end)
      |> Enum.map(fn %{item: file_change} ->
        Enum.map(file_change.changes, fn change ->
          {change.kind, change.path}
        end)
      end)
      |> List.flatten()

    # Summarize
    IO.puts("File Changes:")
    changes
    |> Enum.group_by(fn {kind, _} -> kind end)
    |> Enum.each(fn {kind, files} ->
      IO.puts("\n#{String.upcase(to_string(kind))}:")
      Enum.each(files, fn {_, path} ->
        IO.puts("  - #{path}")
      end)
    end)
  end

  def review_before_apply do
    {:ok, thread} = Codex.start_thread()

    {:ok, result} = Codex.Thread.run(
      thread,
      "Refactor the authentication module"
    )

    # Extract file changes
    file_changes = result.items
      |> Enum.filter(fn
        %{type: :file_change} -> true
        _ -> false
      end)

    # Review each change
    Enum.each(file_changes, fn change ->
      IO.puts("\nProposed Changes:")
      Enum.each(change.changes, fn %{path: path, kind: kind} ->
        IO.puts("  #{kind}: #{path}")

        # Could display diffs here
        if kind == :update do
          # Read current file and show diff
          # (Would need access to diff from codex-rs)
        end
      end)

      # Prompt user
      IO.puts("\nApply these changes? (y/n)")
      response = IO.gets("")

      if String.trim(response) == "y" do
        IO.puts("Changes applied (by codex-rs)")
      else
        IO.puts("Changes rejected - would need to revert")
      end
    end)
  end
end

Command Execution

Monitoring Commands

Track commands executed by the agent.

defmodule CommandExample do
  def monitor_commands do
    {:ok, thread} = Codex.start_thread()

    {:ok, stream} = Codex.Thread.run_streamed(
      thread,
      "Run the test suite and fix any failures"
    )

    # Collect all commands
    commands = stream
      |> Stream.filter(fn
        %Codex.Events.ItemCompleted{item: %{type: :command_execution}} -> true
        _ -> false
      end)
      |> Enum.map(fn %{item: cmd} -> cmd end)

    # Display results
    IO.puts("Commands Executed:")
    Enum.each(commands, fn cmd ->
      status_icon = case cmd.status do
        :completed when cmd.exit_code == 0 -> "✓"
        :completed -> "✗"
        :failed -> "✗"
        _ -> "?"
      end

      IO.puts("\n#{status_icon} #{cmd.command}")
      IO.puts("  Exit Code: #{cmd.exit_code || "N/A"}")

      if cmd.aggregated_output != "" do
        IO.puts("  Output:")
        cmd.aggregated_output
        |> String.split("\n")
        |> Enum.take(5)  # First 5 lines
        |> Enum.each(fn line ->
          IO.puts("    #{line}")
        end)
      end
    end)
  end

  def verify_tests_pass do
    {:ok, thread} = Codex.start_thread()

    {:ok, result} = Codex.Thread.run(
      thread,
      "Run mix test and report if all tests pass"
    )

    # Find the test command
    test_command = result.items
      |> Enum.find(fn
        %{type: :command_execution, command: cmd} ->
          String.contains?(cmd, "mix test")
        _ ->
          false
      end)

    case test_command do
      %{exit_code: 0} ->
        IO.puts("✓ All tests passed!")
        {:ok, :all_passed}

      %{exit_code: code} ->
        IO.puts("✗ Tests failed with exit code #{code}")
        {:error, :tests_failed}

      nil ->
        IO.puts("No test command found")
        {:error, :no_tests_run}
    end
  end
end

Error Handling

Graceful Error Recovery

Handle errors gracefully in production.

defmodule ErrorHandlingExample do
  require Logger

  def robust_turn(thread, input, opts \\ %Codex.Turn.Options{}) do
    case Codex.Thread.run(thread, input, opts) do
      {:ok, result} ->
        {:ok, result}

      {:error, {:turn_failed, error}} ->
        Logger.error("Turn failed: #{error.message}")
        handle_turn_failure(error)

      {:error, {:process, reason}} ->
        Logger.error("Process error: #{inspect(reason)}")
        {:error, :process_error}

      {:error, {:config, reason}} ->
        Logger.error("Configuration error: #{inspect(reason)}")
        {:error, :config_error}

      {:error, reason} ->
        Logger.error("Unknown error: #{inspect(reason)}")
        {:error, :unknown}
    end
  end

  defp handle_turn_failure(error) do
    # Could implement retry logic, fallback, etc.
    if retryable?(error) do
      Logger.info("Error is retryable, consider retry logic")
    end

    {:error, :turn_failed}
  end

  defp retryable?(error) do
    # Check if error message indicates transient issue
    error.message =~ ~r/(rate limit|timeout|temporarily unavailable)/i
  end

  def with_retry(thread, input, max_attempts \\ 3) do
    do_with_retry(thread, input, max_attempts, 1)
  end

  defp do_with_retry(thread, input, max_attempts, attempt) do
    case Codex.Thread.run(thread, input) do
      {:ok, result} ->
        {:ok, result}

      {:error, {:turn_failed, error}} when attempt < max_attempts ->
        if retryable?(error) do
          Logger.info("Retry attempt #{attempt + 1}/#{max_attempts}")
          # Exponential backoff
          Process.sleep(1000 * :math.pow(2, attempt - 1) |> round())
          do_with_retry(thread, input, max_attempts, attempt + 1)
        else
          {:error, {:turn_failed, error}}
        end

      error ->
        error
    end
  end
end

Validation and Sanitization

Validate inputs and outputs.

defmodule ValidationExample do
  def safe_run(thread, input) do
    with :ok <- validate_input(input),
         {:ok, result} <- Codex.Thread.run(thread, input),
         :ok <- validate_result(result) do
      {:ok, result}
    end
  end

  defp validate_input(input) do
    cond do
      !is_binary(input) ->
        {:error, :input_must_be_string}

      String.length(input) == 0 ->
        {:error, :input_cannot_be_empty}

      String.length(input) > 100_000 ->
        {:error, :input_too_long}

      true ->
        :ok
    end
  end

  defp validate_result(result) do
    cond do
      !result.usage ->
        {:error, :missing_usage}

      result.usage.input_tokens + result.usage.output_tokens == 0 ->
        {:error, :no_tokens_used}

      true ->
        :ok
    end
  end
end

Configuration

Environment-Based Configuration

Configure based on environment.

defmodule MyApp.Codex do
  def start_thread do
    codex_opts = %Codex.Options{
      api_key: api_key(),
      base_url: base_url(),
      codex_path_override: codex_path()
    }

    thread_opts = %Codex.Thread.Options{
      model: model(),
      sandbox_mode: sandbox_mode(),
      working_directory: working_directory()
    }

    Codex.start_thread(codex_opts, thread_opts)
  end

  defp api_key do
    System.get_env("CODEX_API_KEY") ||
      Application.get_env(:my_app, :codex_api_key)
  end

  defp base_url do
    Application.get_env(:my_app, :codex_base_url)
  end

  defp codex_path do
    Application.get_env(:my_app, :codex_path)
  end

  defp model do
    Application.get_env(:my_app, :codex_model, "o1")
  end

  defp sandbox_mode do
    case Mix.env() do
      :prod -> :read_only
      :test -> :read_only
      _ -> :workspace_write
    end
  end

  defp working_directory do
    File.cwd!()
  end
end

Per-Request Configuration

Override configuration per request.

defmodule ConfigExample do
  def analyze_with_different_models(input) do
    models = ["gpt-4", "o1", "gpt-4-turbo"]

    results = Enum.map(models, fn model ->
      thread_opts = %Codex.Thread.Options{model: model}
      {:ok, thread} = Codex.start_thread(%Codex.Options{}, thread_opts)

      {:ok, result} = Codex.Thread.run(thread, input)

      {model, result}
    end)

    # Compare results
    Enum.each(results, fn {model, result} ->
      IO.puts("\n#{model}:")
      IO.puts(String.slice(result.final_response, 0..200))
      IO.puts("Tokens: #{result.usage.input_tokens + result.usage.output_tokens}")
    end)
  end
end

Advanced Patterns

Concurrent Turns

Execute multiple turns concurrently.

defmodule ConcurrentExample do
  def parallel_analysis(files) do
    tasks = Enum.map(files, fn file ->
      Task.async(fn ->
        {:ok, thread} = Codex.start_thread()
        {:ok, result} = Codex.Thread.run(thread, "Analyze #{file}")
        {file, result}
      end)
    end)

    results = Task.await_many(tasks, 60_000)

    Enum.each(results, fn {file, result} ->
      IO.puts("\n#{file}:")
      IO.puts(String.slice(result.final_response, 0..200))
    end)
  end

  def map_reduce_pattern(items) do
    # Map: Process each item concurrently
    tasks = Enum.map(items, fn item ->
      Task.async(fn ->
        {:ok, thread} = Codex.start_thread()
        {:ok, result} = Codex.Thread.run(thread, "Process #{item}")
        result.final_response
      end)
    end)

    responses = Task.await_many(tasks, 60_000)

    # Reduce: Combine results
    {:ok, thread} = Codex.start_thread()

    summary_prompt = """
    Summarize these analyses:

    #{Enum.join(responses, "\n\n---\n\n")}
    """

    {:ok, result} = Codex.Thread.run(thread, summary_prompt)

    IO.puts("Summary:")
    IO.puts(result.final_response)
  end
end

Agent Collaboration

Multiple agents working together.

defmodule CollaborationExample do
  def collaborative_code_review(file) do
    # Analyzer agent
    {:ok, analyzer} = Codex.start_thread()
    {:ok, analysis} = Codex.Thread.run(
      analyzer,
      "Analyze #{file} for potential issues"
    )

    # Security expert agent
    {:ok, security} = Codex.start_thread()
    {:ok, security_review} = Codex.Thread.run(
      security,
      """
      Review this code for security issues:

      Analysis: #{analysis.final_response}
      """
    )

    # Performance expert agent
    {:ok, performance} = Codex.start_thread()
    {:ok, perf_review} = Codex.Thread.run(
      performance,
      """
      Review this code for performance issues:

      Analysis: #{analysis.final_response}
      """
    )

    # Synthesizer agent
    {:ok, synthesizer} = Codex.start_thread()

    synthesis_prompt = """
    Synthesize these reviews into actionable recommendations:

    Security Review:
    #{security_review.final_response}

    Performance Review:
    #{perf_review.final_response}
    """

    {:ok, final} = Codex.Thread.run(synthesizer, synthesis_prompt)

    IO.puts("Final Recommendations:")
    IO.puts(final.final_response)
  end
end

Streaming with State Accumulation

Build up state while streaming.

defmodule StatefulStreamingExample do
  def accumulate_state do
    {:ok, thread} = Codex.start_thread()
    {:ok, stream} = Codex.Thread.run_streamed(thread, "Implement a new feature")

    final_state = Enum.reduce(stream, initial_state(), fn event, state ->
      case event do
        %Codex.Events.ThreadStarted{thread_id: id} ->
          %{state | thread_id: id}

        %Codex.Events.ItemCompleted{item: %{type: :command_execution} = cmd} ->
          %{state | commands: [cmd | state.commands]}

        %Codex.Events.ItemCompleted{item: %{type: :file_change} = file} ->
          %{state | files: [file | state.files]}

        %Codex.Events.ItemCompleted{item: %{type: :agent_message, text: text}} ->
          %{state | messages: [text | state.messages]}

        %Codex.Events.TurnCompleted{usage: usage} ->
          %{state | usage: usage}

        _ ->
          state
      end
    end)

    display_summary(final_state)
  end

  defp initial_state do
    %{
      thread_id: nil,
      commands: [],
      files: [],
      messages: [],
      usage: nil
    }
  end

  defp display_summary(state) do
    IO.puts("Turn Summary:")
    IO.puts("  Thread ID: #{state.thread_id}")
    IO.puts("  Commands: #{length(state.commands)}")
    IO.puts("  Files Changed: #{count_file_changes(state.files)}")
    IO.puts("  Messages: #{length(state.messages)}")

    if state.usage do
      IO.puts("  Tokens: #{state.usage.input_tokens + state.usage.output_tokens}")
    end
  end

  defp count_file_changes(files) do
    files
    |> Enum.flat_map(fn file -> file.changes end)
    |> length()
  end
end

Production Patterns

Telemetry Integration

Monitor SDK usage in production.

defmodule MyApp.TelemetryHandler do
  require Logger

  def setup do
    :telemetry.attach_many(
      "my-app-codex-handler",
      [
        [:codex, :turn, :start],
        [:codex, :turn, :stop],
        [:codex, :turn, :exception]
      ],
      &handle_event/4,
      nil
    )
  end

  def handle_event([:codex, :turn, :start], measurements, metadata, _config) do
    Logger.info("Codex turn started", thread_id: metadata.thread_id)

    # Could send to metrics system
    :telemetry.execute(
      [:my_app, :codex, :turn, :count],
      %{count: 1},
      metadata
    )
  end

  def handle_event([:codex, :turn, :stop], measurements, metadata, _config) do
    Logger.info("Codex turn completed",
      thread_id: metadata.thread_id,
      duration_ms: measurements.duration / 1_000_000,
      tokens: metadata.usage.input_tokens + metadata.usage.output_tokens
    )

    # Send metrics
    :telemetry.execute(
      [:my_app, :codex, :turn, :duration],
      %{duration: measurements.duration},
      metadata
    )

    :telemetry.execute(
      [:my_app, :codex, :tokens],
      %{
        input: metadata.usage.input_tokens,
        output: metadata.usage.output_tokens,
        total: metadata.usage.input_tokens + metadata.usage.output_tokens
      },
      metadata
    )
  end

  def handle_event([:codex, :turn, :exception], measurements, metadata, _config) do
    Logger.error("Codex turn failed",
      thread_id: metadata.thread_id,
      error: metadata.error,
      duration_ms: measurements.duration / 1_000_000
    )

    # Alert on failures
    :telemetry.execute(
      [:my_app, :codex, :turn, :error],
      %{count: 1},
      metadata
    )
  end
end

# In application.ex
def start(_type, _args) do
  MyApp.TelemetryHandler.setup()
  # ... rest of supervision tree
end

Rate Limiting

Implement rate limiting for API calls.

defmodule MyApp.RateLimiter do
  use GenServer

  def start_link(_) do
    GenServer.start_link(__MODULE__, %{}, name: __MODULE__)
  end

  def acquire do
    GenServer.call(__MODULE__, :acquire)
  end

  @impl true
  def init(_) do
    state = %{
      tokens: 10,
      last_refill: System.monotonic_time(:second)
    }

    schedule_refill()
    {:ok, state}
  end

  @impl true
  def handle_call(:acquire, _from, state) do
    state = refill_tokens(state)

    if state.tokens > 0 do
      {:reply, :ok, %{state | tokens: state.tokens - 1}}
    else
      {:reply, {:error, :rate_limited}, state}
    end
  end

  @impl true
  def handle_info(:refill, state) do
    schedule_refill()
    {:noreply, %{state | tokens: 10}}
  end

  defp refill_tokens(state) do
    now = System.monotonic_time(:second)
    elapsed = now - state.last_refill

    if elapsed >= 60 do
      %{state | tokens: 10, last_refill: now}
    else
      state
    end
  end

  defp schedule_refill do
    Process.send_after(self(), :refill, 60_000)
  end
end

# Usage
defmodule MyApp.Codex do
  def run(thread, input) do
    case MyApp.RateLimiter.acquire() do
      :ok ->
        Codex.Thread.run(thread, input)

      {:error, :rate_limited} ->
        {:error, :rate_limited}
    end
  end
end

Supervised Turn Execution

Run turns under supervision.

defmodule MyApp.TurnSupervisor do
  use Supervisor

  def start_link(init_arg) do
    Supervisor.start_link(__MODULE__, init_arg, name: __MODULE__)
  end

  @impl true
  def init(_init_arg) do
    children = [
      {Task.Supervisor, name: MyApp.TurnTaskSupervisor}
    ]

    Supervisor.init(children, strategy: :one_for_one)
  end

  def run_supervised(thread, input, opts \\ %{}) do
    Task.Supervisor.async(MyApp.TurnTaskSupervisor, fn ->
      Codex.Thread.run(thread, input, opts)
    end)
    |> Task.await()
  end
end

Testing Patterns

Testing with Mocks

Test code that uses Codex without calling the API.

defmodule MyApp.CodeReviewer do
  def review_code(file_path) do
    {:ok, thread} = Codex.start_thread()

    {:ok, result} = Codex.Thread.run(
      thread,
      "Review #{file_path} for issues"
    )

    parse_review(result.final_response)
  end

  defp parse_review(text) do
    # Parse review from text
    %{issues: [], suggestions: []}
  end
end

# Test
defmodule MyApp.CodeReviewerTest do
  use ExUnit.Case, async: true

  test "parses review correctly" do
    # Would use Mox to mock Codex.Thread.run
    # For now, test the parse_review function directly
  end
end

Conclusion

These examples demonstrate the flexibility and power of the Elixir Codex SDK. Key patterns include:

  • Streaming for responsive UIs
  • Structured output for data extraction
  • Multi-turn for complex conversations
  • Error handling for robustness
  • Concurrent execution for performance
  • Production patterns for observability and reliability

Refer to the API Reference for complete documentation of all functions and types.