claude-sdk-gleam
A Gleam SDK for the Anthropic Messages API with a built-in agentic tool-use loop, leveraging BEAM concurrency for parallel tool execution and fault isolation.
What this is
claude_sdk provides a typed Gleam interface to the Anthropic Claude API. Beyond simple message sending, it includes a full agent loop that automatically handles multi-turn tool-use conversations: send a prompt, let the model call tools, execute those tools concurrently on the BEAM, feed results back, and repeat until the model produces a final response.
Key features:
- Type-safe API for the Anthropic Messages endpoint
- Automatic agent loop with configurable iteration limits
- Concurrent tool execution – each tool call runs in its own BEAM process
- Event streaming via callbacks or OTP actor message passing
- SSE streaming support for the Messages API
- Extended thinking support
- Builder-pattern configuration
Requirements
- Gleam >= 1.0
- Erlang/OTP >= 27 (required by
gleam_jsonv3) - An Anthropic API key (set as
ANTHROPIC_API_KEYor passed directly)
Installation
Add claude to your Gleam project:
gleam add claude
Quick start
import claude
import gleam/io
const weather_schema = "{
\"type\": \"object\",
\"properties\": {
\"location\": {
\"type\": \"string\",
\"description\": \"The city and state, e.g. San Francisco, CA\"
}
},
\"required\": [\"location\"]
}"
pub fn main() {
// Create a client from the ANTHROPIC_API_KEY environment variable
let assert Ok(client) = claude.from_env()
// Define a tool
let weather_tool =
claude.tool("get_weather", "Get the current weather for a location", weather_schema)
// Handle tool calls
let handler = fn(name, _input) {
case name {
"get_weather" ->
Ok("{\"temperature\": 72, \"condition\": \"sunny\"}")
_ -> Error("Unknown tool: " <> name)
}
}
// Run the agent loop
case claude.run(client, "What's the weather in San Francisco?", [weather_tool], handler) {
Ok(result) -> io.println(claude.result_text(result))
Error(_err) -> io.println("Agent error")
}
}
Core concepts
Client configuration
Create a client with an API key directly or from the environment:
// Direct API key
let client = claude.new("sk-ant-...")
// From ANTHROPIC_API_KEY environment variable
let assert Ok(client) = claude.from_env()
The client defaults to:
- Base URL:
https://api.anthropic.com - Model:
claude-sonnet-4-5-20250929 - Max tokens:
4096
Override defaults with the builder functions on client.Config:
import claude/client
let client =
claude.new("sk-ant-...")
|> client.with_model("claude-opus-4-5-20250929")
|> client.with_max_tokens(8192)
|> client.with_base_url("https://my-proxy.example.com")
Defining tools
Tools are defined with a name, description, and a JSON Schema string for the input parameters:
let tool = claude.tool(
"get_weather",
"Get the current weather for a location",
"{\"type\": \"object\", \"properties\": {\"location\": {\"type\": \"string\"}}, \"required\": [\"location\"]}",
)
The input_schema is a raw JSON string that conforms to JSON Schema. The model uses this schema to understand what arguments to provide.
Tool handlers
A tool handler is a function with the signature:
fn(String, String) -> Result(String, String)
The first argument is the tool name, the second is the JSON string of input arguments. Return Ok(json_string) on success or Error(error_message) on failure. The agent loop dispatches all tool calls through a single handler function:
let handler = fn(name, input) {
case name {
"get_weather" -> Ok("{\"temperature\": 72}")
"calculator" -> handle_calculator(input)
_ -> Error("Unknown tool: " <> name)
}
}
Running the agent loop
The primary entry point is claude.run:
pub fn run(
client: client.Config,
prompt: String,
tools: List(tool.Tool),
handler: ToolHandler,
) -> Result(AgentResult, AgentError)
AgentResult contains:
final_message– the model’s lastMessageresponsemessages– the full conversation history asList(MessageParam)iterations– how many API round-trips were madetotal_input_tokens/total_output_tokens– cumulative token usage
AgentError is one of:
ApiCallFailed(ApiError)– an API request failedMaxIterationsReached(messages, iterations)– the iteration limit was hit
What happens under the hood
- The prompt is sent to the Messages API
- The SDK checks the response’s
stop_reason - If
stop_reasonistool_use, it extracts allToolUsecontent blocks - Each tool call is executed concurrently in its own BEAM process
- Tool results are collected (with per-tool timeout) and assembled into a
tool_resultmessage - The results are appended to the conversation history and sent back to the API
- Steps 2–6 repeat until the model stops with
end_turn,max_tokens, or another non-tool-use reason, or untilmax_iterationsis reached
Advanced usage
Custom AgentConfig with builder pattern
For full control, build an AgentConfig and use claude.run_with_config:
import claude
import claude/agent/config
let assert Ok(client) = claude.from_env()
let tools = [claude.tool("get_weather", "Get weather", schema)]
let cfg =
config.new(client: client, tools: tools, tool_handler: handler)
|> config.with_system("You are a helpful weather assistant.")
|> config.with_model("claude-opus-4-5-20250929")
|> config.with_max_tokens(2048)
|> config.with_max_iterations(5)
|> config.with_thinking(10_000)
|> config.with_tool_timeout(60_000)
|> config.with_tool_choice(tool.Auto(disable_parallel: False))
case claude.run_with_config(cfg, "How's the weather in Tokyo?") {
Ok(result) -> io.println(claude.result_text(result))
Error(_) -> io.println("Error")
}
Available config builder functions:
| Function | Default | Description |
|---|---|---|
with_system(String) | None | System prompt |
with_model(String) | Client default | Model ID |
with_max_tokens(Int) | Client default | Max output tokens per API call |
with_max_iterations(Int) | 10 | Maximum agent loop iterations |
with_thinking(Int) | None | Extended thinking token budget |
with_tool_timeout(Int) | 30_000 | Per-tool execution timeout in ms |
with_tool_choice(ToolChoice) | None (API default) | Tool selection strategy |
Continuing conversations with agent.run_with_messages
To continue an existing conversation or provide pre-built message history:
import claude/agent
import claude/agent/config
import claude/types/message
import claude/types/content
let cfg = config.new(client: client, tools: tools, tool_handler: handler)
// Build a message history manually
let messages = [
message.new_user("My name is Alice."),
message.new_assistant_blocks([content.TextParam(text: "Hello Alice!")]),
message.new_user("What's my name?"),
]
case agent.run_with_messages(cfg, messages) {
Ok(result) -> io.println(claude.result_text(result))
Error(_) -> io.println("Error")
}
Concurrent tool execution
When the model returns multiple tool calls in a single response, the SDK executes them concurrently. Each tool call runs in its own BEAM process via tool_runner.execute_concurrent. Results are collected with a per-tool timeout (default 30 seconds). If a tool exceeds the timeout, it returns Error("Tool execution timed out") for that tool while other results are unaffected.
This happens automatically – no additional configuration is needed. The concurrency is a natural fit for the BEAM: each tool runs in a lightweight process with its own heap and fault isolation.
Event streaming with actor.run_with_events and actor.start
The claude/agent/actor module provides two ways to observe agent progress in real time.
Synchronous callback-based events:
import claude/agent/actor
actor.run_with_events(cfg, "What's 25 * 4?", fn(event) {
case event {
actor.Started(id) -> io.println("Started: " <> id)
actor.AssistantResponse(_msg) -> io.println("Got response")
actor.ToolExecuting(name, id) -> io.println("Executing: " <> name)
actor.ToolCompleted(id, result) -> io.println("Tool done: " <> id)
actor.Done(result) -> io.println("Agent finished")
actor.Failed(error) -> io.println("Agent failed")
}
})
Asynchronous OTP actor pattern:
The actor.start function spawns the agent in a separate BEAM process and sends events to a Subject that the caller can receive from:
import gleam/erlang/process
import claude/agent/actor
let events = process.new_subject()
let _pid = actor.start(cfg, "Hello", events)
// Receive events from the agent's mailbox
case process.receive(events, 30_000) {
Ok(actor.Done(result)) -> io.println(claude.result_text(result))
Ok(actor.Failed(err)) -> io.println("Failed")
_ -> io.println("Timeout or other event")
}
The AgentEvent type covers the full lifecycle:
| Event | Description |
|---|---|
Started(session_id) | Agent loop began |
AssistantResponse(message) | Model returned a response |
ToolExecuting(tool_name, tool_id) | A tool call is about to run |
ToolCompleted(tool_id, result) | A tool call finished |
Done(result) | Agent completed successfully |
Failed(error) | Agent encountered an error |
Direct Messages API access
For one-shot messages without the agent loop:
// Simple helper
case claude.message(client, "What is 2 + 2?") {
Ok(msg) -> io.println(claude.text_content(msg))
Error(_) -> io.println("API error")
}
For full control over the Messages API call:
import claude/messages
import claude/types/message
import gleam/option.{None, Some}
case messages.create(
config: client,
model: Some("claude-opus-4-5-20250929"),
max_tokens: Some(1024),
messages: [message.new_user("Explain monads.")],
tools: [],
system: Some("You are a Haskell expert."),
tool_choice: None,
thinking: None,
) {
Ok(msg) -> // msg is a Message
Error(api_error) -> // api_error is an ApiError
}
Streaming SSE events
The SDK supports streaming responses via messages.create_stream. Note that gleam_httpc buffers the full response, so events are parsed from the complete SSE payload after the request completes:
import claude/messages
import claude/streaming.{TextDelta, ContentBlockDelta, MessageStop}
import gleam/option.{None}
case messages.create_stream(
config: client,
model: None,
max_tokens: None,
messages: [message.new_user("Tell me a story.")],
tools: [],
system: None,
tool_choice: None,
thinking: None,
) {
Ok(events) ->
list.each(events, fn(event) {
case event {
ContentBlockDelta(index: _, delta: TextDelta(text)) ->
io.print(text)
MessageStop -> io.println("\n[done]")
_ -> Nil
}
})
Error(_) -> io.println("Stream error")
}
StreamEvent variants: MessageStart, ContentBlockStart, ContentBlockDelta, ContentBlockStop, MessageDelta, MessageStop, Ping, UnknownEvent.
API reference
claude (top-level)
| Function | Description |
|---|---|
new(api_key) -> Config | Create client with API key |
from_env() -> Result(Config, EnvError) | Create client from ANTHROPIC_API_KEY |
tool(name, description, input_schema) -> Tool | Define a tool |
run(client, prompt, tools, handler) -> Result(AgentResult, AgentError) | Run the agent loop |
run_with_config(config, prompt) -> Result(AgentResult, AgentError) | Run with custom config |
message(client, prompt) -> Result(Message, ApiError) | Send a single message |
text_content(message) -> String | Extract text from a Message |
result_text(result) -> String | Extract text from an AgentResult |
version() -> String | SDK version string |
claude/client
| Function | Description |
|---|---|
new(api_key) -> Config | Create client config |
with_model(config, model) -> Config | Set default model |
with_base_url(config, url) -> Config | Set API base URL |
with_max_tokens(config, n) -> Config | Set default max tokens |
claude/agent
| Function | Description |
|---|---|
run(config, prompt) -> Result(AgentResult, AgentError) | Run agent loop |
run_with_messages(config, messages) -> Result(AgentResult, AgentError) | Run with existing history |
extract_tool_calls(content) -> List(ContentBlock) | Filter tool-use blocks |
build_tool_results_message(results) -> MessageParam | Build tool results message |
claude/agent/config
| Function | Description |
|---|---|
new(client, tools, tool_handler) -> AgentConfig | Create agent config |
with_system(config, system) -> AgentConfig | Set system prompt |
with_model(config, model) -> AgentConfig | Set model |
with_max_tokens(config, n) -> AgentConfig | Set max tokens |
with_max_iterations(config, n) -> AgentConfig | Set iteration limit |
with_thinking(config, budget) -> AgentConfig | Enable extended thinking |
with_tool_timeout(config, ms) -> AgentConfig | Set tool timeout |
with_tool_choice(config, choice) -> AgentConfig | Set tool choice strategy |
claude/agent/actor
| Function | Description |
|---|---|
run_with_events(config, prompt, on_event) -> Result(AgentResult, AgentError) | Synchronous with event callbacks |
start(config, prompt, caller) -> Pid | Async agent in a new BEAM process |
claude/agent/tool_runner
| Function | Description |
|---|---|
execute_concurrent(tool_calls, handler, timeout_ms) -> List(ToolResult) | Run tools concurrently |
claude/messages
| Function | Description |
|---|---|
create(...) -> Result(Message, ApiError) | Send a Messages API request |
create_simple(config, message) -> Result(Message, ApiError) | Simple one-shot message |
create_stream(...) -> Result(List(StreamEvent), ApiError) | Streaming Messages API request |
build_request(...) -> Request(String) | Build HTTP request without sending |
build_stream_request(...) -> Request(String) | Build streaming HTTP request |
claude/streaming
| Function | Description |
|---|---|
parse_sse(text) -> List(StreamEvent) | Parse SSE payload into events |
parse_event(event_type, data) -> StreamEvent | Parse a single SSE event |
claude/types/tool
| Type | Description |
|---|---|
Tool(name, description, input_schema) | Tool definition |
ToolChoice | Auto, Any, SpecificTool(name), NoTools – each with disable_parallel flag |
claude/types/message
| Type | Description |
|---|---|
Message | Full API response message with content, usage, stop_reason |
MessageParam | Message sent to the API (role + content) |
StopReason | EndTurn, ToolUseStop, MaxTokens, StopSequence, PauseTurn, Refusal |
Usage | Token counts (input, output, cache creation, cache read) |
claude/types/content
| Type | Description |
|---|---|
ContentBlock | Response content: Text, Thinking, ToolUse, ServerToolUse, WebSearchResult |
ContentBlockParam | Request content: TextParam, ImageParam, DocumentParam, ToolUseParam, ToolResultParam |
claude/types/error
| Type | Description |
|---|---|
ApiError | AuthenticationError, RateLimitError, BadRequestError, NotFoundError, ServerError, ConnectionError, TimeoutError, UnknownError |
Architecture
claude.gleam -- Top-level public API (facade)
claude/
client.gleam -- Client config (API key, model, base URL)
messages.gleam -- HTTP layer: build and send Messages API requests
streaming.gleam -- SSE parser for streaming responses
agent.gleam -- Core agent loop (recursive, synchronous)
agent/
config.gleam -- AgentConfig type and builder functions
tool_runner.gleam -- Concurrent tool execution via BEAM processes
actor.gleam -- Event-emitting agent loop + async OTP actor
types/
content.gleam -- ContentBlock and ContentBlockParam types
message.gleam -- Message, MessageParam, Role, StopReason, Usage
tool.gleam -- Tool and ToolChoice types
error.gleam -- ApiError type and HTTP status mapping
json/
encode.gleam -- JSON encoding for API request bodies
decode.gleam -- JSON decoding for API response bodies
The architecture takes advantage of the BEAM in three ways:
-
Concurrent tool execution – When the model returns N tool calls,
tool_runnerspawns N lightweight BEAM processes that run in parallel. Results are collected with per-tool timeouts usingprocess.receive. This is significantly faster than sequential execution for I/O-bound tools. -
Fault isolation – Each tool runs in its own process with its own heap. If a tool handler crashes or times out, it does not affect the agent loop or other tool executions. The agent simply receives a timeout error for that specific tool.
-
Actor-based event streaming –
actor.startspawns the entire agent loop in a new BEAM process and communicates results viaSubjectmessage passing, fitting naturally into OTP supervision trees and concurrent application architectures.
Development
Build the project:
gleam build
Run tests:
gleam test
Format code:
gleam format
Run an example (add the example file to src/ first):
ANTHROPIC_API_KEY=sk-ant-... gleam run -m weather_agent
License
MIT