ClaudeCodeSDK.Orchestrator (claude_code_sdk v0.2.2)

View Source

Concurrent query orchestration with rate limiting and error recovery.

Enables parallel Claude query execution, sequential pipelines, and automatic retry with exponential backoff.

Features

  • Parallel query execution with concurrency limits
  • Sequential pipeline workflows (output → input)
  • Automatic retry with exponential backoff
  • Rate limiting (configurable queries/minute)
  • Cost tracking and statistics
  • Error aggregation and reporting

Usage

# Parallel queries
{:ok, results} = Orchestrator.query_parallel([
  {"Analyze file1.ex", opts},
  {"Analyze file2.ex", opts},
  {"Analyze file3.ex", opts}
], max_concurrent: 3)

# Pipeline (sequential with context)
{:ok, final_result} = Orchestrator.query_pipeline([
  {"Analyze code", analysis_opts},
  {"Suggest refactorings", refactor_opts},
  {"Generate tests", test_opts}
], use_context: true)

# Retry with backoff
{:ok, result} = Orchestrator.query_with_retry(
  prompt,
  options,
  max_retries: 3,
  backoff_ms: 1000
)

Summary

Functions

Executes multiple queries in parallel.

Executes queries sequentially in a pipeline.

Executes a query with automatic retry and exponential backoff.

Functions

query_parallel(queries, opts \\ [])

@spec query_parallel(
  [{String.t(), ClaudeCodeSDK.Options.t()}],
  keyword()
) :: {:ok, [map()]} | {:error, term()}

Executes multiple queries in parallel.

Parameters

  • queries - List of {prompt, options} tuples
  • opts - Keyword list of options:
    • :max_concurrent - Maximum concurrent queries (default: 5)
    • :timeout - Timeout per query in ms (default: 300_000)

Returns

  • {:ok, results} - List of result maps
  • {:error, reason} - If any query fails critically

Examples

queries = [
  {"What is 2+2?", %Options{}},
  {"What is 3+3?", %Options{}},
  {"What is 4+4?", %Options{}}
]

{:ok, results} = Orchestrator.query_parallel(queries, max_concurrent: 2)

Enum.each(results, fn result ->
  IO.puts("Prompt: #{result.prompt}")
  IO.puts("Success: #{result.success}")
  IO.puts("Cost: $#{result.cost}")
end)

query_pipeline(steps, opts \\ [])

@spec query_pipeline(
  [{String.t(), ClaudeCodeSDK.Options.t()}],
  keyword()
) :: {:ok, [ClaudeCodeSDK.Message.t()]} | {:error, term()}

Executes queries sequentially in a pipeline.

Each step can optionally use the output from the previous step.

Parameters

  • steps - List of {prompt, options} tuples
  • opts - Keyword list of options:
    • :use_context - Include previous output in next prompt (default: false)
    • :stop_on_error - Stop pipeline on first error (default: true)

Returns

  • {:ok, messages} - Messages from final step
  • {:error, {:step_failed, prompt, errors}} - If a step fails

Examples

steps = [
  {"Analyze this code: ...", analysis_opts},
  {"Suggest improvements", refactor_opts},
  {"Generate tests for improved code", test_opts}
]

{:ok, final_result} = Orchestrator.query_pipeline(steps, use_context: true)

query_with_retry(prompt, options, opts \\ [])

@spec query_with_retry(String.t(), ClaudeCodeSDK.Options.t(), keyword()) ::
  {:ok, [ClaudeCodeSDK.Message.t()]} | {:error, term()}

Executes a query with automatic retry and exponential backoff.

Parameters

  • prompt - Query prompt
  • options - ClaudeCodeSDK.Options
  • opts - Keyword list of retry options:
    • :max_retries - Maximum retry attempts (default: 3)
    • :backoff_ms - Initial backoff in ms (default: 1000)
    • :exponential - Use exponential backoff (default: true)

Returns

  • {:ok, messages} - Successful result
  • {:error, {:max_retries_exceeded, errors}} - If all retries fail

Examples

{:ok, result} = Orchestrator.query_with_retry(
  "Analyze this code",
  options,
  max_retries: 5,
  backoff_ms: 2000
)