ClaudeCodeSDK.Orchestrator (claude_code_sdk v0.2.2)
View SourceConcurrent query orchestration with rate limiting and error recovery.
Enables parallel Claude query execution, sequential pipelines, and automatic retry with exponential backoff.
Features
- Parallel query execution with concurrency limits
- Sequential pipeline workflows (output → input)
- Automatic retry with exponential backoff
- Rate limiting (configurable queries/minute)
- Cost tracking and statistics
- Error aggregation and reporting
Usage
# Parallel queries
{:ok, results} = Orchestrator.query_parallel([
{"Analyze file1.ex", opts},
{"Analyze file2.ex", opts},
{"Analyze file3.ex", opts}
], max_concurrent: 3)
# Pipeline (sequential with context)
{:ok, final_result} = Orchestrator.query_pipeline([
{"Analyze code", analysis_opts},
{"Suggest refactorings", refactor_opts},
{"Generate tests", test_opts}
], use_context: true)
# Retry with backoff
{:ok, result} = Orchestrator.query_with_retry(
prompt,
options,
max_retries: 3,
backoff_ms: 1000
)
Summary
Functions
Executes multiple queries in parallel.
Executes queries sequentially in a pipeline.
Executes a query with automatic retry and exponential backoff.
Functions
@spec query_parallel( [{String.t(), ClaudeCodeSDK.Options.t()}], keyword() ) :: {:ok, [map()]} | {:error, term()}
Executes multiple queries in parallel.
Parameters
queries- List of {prompt, options} tuplesopts- Keyword list of options::max_concurrent- Maximum concurrent queries (default: 5):timeout- Timeout per query in ms (default: 300_000)
Returns
{:ok, results}- List of result maps{:error, reason}- If any query fails critically
Examples
queries = [
{"What is 2+2?", %Options{}},
{"What is 3+3?", %Options{}},
{"What is 4+4?", %Options{}}
]
{:ok, results} = Orchestrator.query_parallel(queries, max_concurrent: 2)
Enum.each(results, fn result ->
IO.puts("Prompt: #{result.prompt}")
IO.puts("Success: #{result.success}")
IO.puts("Cost: $#{result.cost}")
end)
@spec query_pipeline( [{String.t(), ClaudeCodeSDK.Options.t()}], keyword() ) :: {:ok, [ClaudeCodeSDK.Message.t()]} | {:error, term()}
Executes queries sequentially in a pipeline.
Each step can optionally use the output from the previous step.
Parameters
steps- List of {prompt, options} tuplesopts- Keyword list of options::use_context- Include previous output in next prompt (default: false):stop_on_error- Stop pipeline on first error (default: true)
Returns
{:ok, messages}- Messages from final step{:error, {:step_failed, prompt, errors}}- If a step fails
Examples
steps = [
{"Analyze this code: ...", analysis_opts},
{"Suggest improvements", refactor_opts},
{"Generate tests for improved code", test_opts}
]
{:ok, final_result} = Orchestrator.query_pipeline(steps, use_context: true)
@spec query_with_retry(String.t(), ClaudeCodeSDK.Options.t(), keyword()) :: {:ok, [ClaudeCodeSDK.Message.t()]} | {:error, term()}
Executes a query with automatic retry and exponential backoff.
Parameters
prompt- Query promptoptions- ClaudeCodeSDK.Optionsopts- Keyword list of retry options::max_retries- Maximum retry attempts (default: 3):backoff_ms- Initial backoff in ms (default: 1000):exponential- Use exponential backoff (default: true)
Returns
{:ok, messages}- Successful result{:error, {:max_retries_exceeded, errors}}- If all retries fail
Examples
{:ok, result} = Orchestrator.query_with_retry(
"Analyze this code",
options,
max_retries: 5,
backoff_ms: 2000
)