anthropic/streaming/handler

Streaming handler for Anthropic API

This module provides sans-io functions for parsing Server-Sent Events (SSE) from the Anthropic streaming API. The design follows the sans-io pattern, allowing you to use any HTTP client that supports streaming.

Recommended: Use api.chat_stream

For most use cases, use api.chat_stream from the main API module:

import anthropic/api
import anthropic/client
import anthropic/types/request
import anthropic/types/message.{user_message}

let assert Ok(client) = client.init()
let req = request.new("claude-sonnet-4-20250514", [user_message("Hello!")], 1024)

case api.chat_stream(client, req) {
  Ok(result) -> io.println(api.stream_text(result))
  Error(err) -> handle_error(err)
}

True Real-Time Streaming (Sans-IO)

For true real-time streaming where you process events as they arrive, use the incremental parsing functions with your own streaming HTTP client:

import anthropic/http
import anthropic/streaming/handler.{
  new_streaming_state, process_chunk, finalize_stream
}

// 1. Build the streaming request
let http_request = http.build_streaming_request(api_key, base_url, request)

// 2. Start streaming with your HTTP client that supports chunked responses
// (e.g., using Erlang's httpc with stream option, or any other client)

// 3. Initialize streaming state
let state = new_streaming_state()

// 4. As each chunk arrives from your HTTP client, process it:
let #(events, new_state) = process_chunk(state, chunk)

// 5. Handle events in real-time as they arrive
list.each(events, fn(event) {
  case handler.get_event_text(event) {
    Ok(text) -> io.print(text)  // Print immediately!
    Error(_) -> Nil
  }
})

// 6. Continue with new_state for next chunk...

// 7. When stream ends, finalize to get any remaining events
let final_events = finalize_stream(state)

Batch Mode (Deprecated)

The batch functions stream_message and stream_message_with_callback in this module are deprecated. Use api.chat_stream and api.chat_stream_with_callback instead for a unified API.

Note: Batch mode waits for the complete response before returning. Use sans-io incremental parsing for true real-time streaming.

Types

Callback function type for processing events as they are parsed

pub type EventCallback =
  fn(streaming.StreamEvent) -> Nil

Error during streaming

pub type StreamError {
  HttpError(error: error.AnthropicError)
  SseParseError(message: String)
  EventDecodeError(message: String)
  ApiError(status: Int, body: String)
}

Constructors

  • HttpError(error: error.AnthropicError)

    HTTP error during request

  • SseParseError(message: String)

    Error parsing SSE data

  • EventDecodeError(message: String)

    Error decoding event JSON

  • ApiError(status: Int, body: String)

    API returned an error response

Result of streaming a message (batch mode)

pub type StreamResult {
  StreamResult(events: List(streaming.StreamEvent))
}

Constructors

State for incremental streaming (sans-io)

Use this to track parsing state across multiple chunks when implementing real-time streaming with your own HTTP client.

pub type StreamingState {
  StreamingState(
    sse_state: sse.SseParserState,
    events: List(streaming.StreamEvent),
    completed: Bool,
    error: option.Option(StreamError),
  )
}

Constructors

Values

pub fn build_stream_result(state: StreamingState) -> StreamResult

Build a StreamResult from the final streaming state

pub fn finalize_stream(
  state: StreamingState,
) -> List(streaming.StreamEvent)

Finalize the stream and get any remaining events

Call this when your HTTP client signals the stream has ended. Returns any events that were buffered but not yet emitted.

pub fn get_accumulated_events(
  state: StreamingState,
) -> List(streaming.StreamEvent)

Get all accumulated events from the streaming state

pub fn get_error(
  events: List(streaming.StreamEvent),
) -> Result(streaming.StreamError, Nil)

Get error from events if present

pub fn get_event_text(
  event: streaming.StreamEvent,
) -> Result(String, Nil)

Extract text from a single event if it contains text content

Useful for real-time streaming to print text as it arrives.

Example

list.each(events, fn(event) {
  case get_event_text(event) {
    Ok(text) -> io.print(text)
    Error(_) -> Nil
  }
})
pub fn get_full_text(
  events: List(streaming.StreamEvent),
) -> String

Get the full text from a stream of events

pub fn get_message_id(
  events: List(streaming.StreamEvent),
) -> Result(String, Nil)

Get the message ID from events

pub fn get_model(
  events: List(streaming.StreamEvent),
) -> Result(String, Nil)

Get the model from events

pub fn get_stream_error(
  state: StreamingState,
) -> option.Option(StreamError)

Get the error from the stream if one occurred

pub fn get_text_deltas(
  events: List(streaming.StreamEvent),
) -> List(String)

Filter events to only text deltas

pub fn has_error(events: List(streaming.StreamEvent)) -> Bool

Check if stream ended with an error (from event list)

pub fn has_stream_error(state: StreamingState) -> Bool

Check if the stream encountered an error

pub fn is_complete(events: List(streaming.StreamEvent)) -> Bool

Check if stream completed successfully (from event list)

pub fn is_stream_complete(state: StreamingState) -> Bool

Check if the stream has completed successfully

pub fn new_streaming_state() -> StreamingState

Create a new streaming state for incremental parsing

Use this when implementing true real-time streaming with your own HTTP client.

Example

let state = new_streaming_state()

// As chunks arrive from your streaming HTTP client:
let #(events, new_state) = process_chunk(state, chunk)
list.each(events, handle_event_immediately)
pub fn parse_sse_body(
  body: String,
) -> Result(StreamResult, StreamError)

Parse SSE body text into streaming events (batch mode)

Use this for batch parsing when you have the complete SSE text. For real-time streaming, use process_chunk instead.

pub fn parse_sse_chunk(
  state: sse.SseParserState,
  chunk: String,
) -> #(List(streaming.StreamEvent), sse.SseParserState)

Parse a single SSE chunk and return events plus updated SSE state

Lower-level function that works directly with SSE parser state. Consider using process_chunk with StreamingState for a higher-level API.

pub fn parse_streaming_response(
  response: http.HttpResponse,
) -> Result(StreamResult, StreamError)

Parse an HTTP response body containing SSE events into StreamResult

This is for batch parsing of a complete response. For real-time streaming, use new_streaming_state + process_chunk instead.

Example

let http_response = HttpResponse(status: 200, headers: [], body: sse_body)
case parse_streaming_response(http_response) {
  Ok(result) -> get_full_text(result.events)
  Error(err) -> handle_error(err)
}
pub fn process_chunk(
  state: StreamingState,
  chunk: String,
) -> #(List(streaming.StreamEvent), StreamingState)

Process a chunk of SSE data and return parsed events

This is the core function for real-time streaming. Call it each time you receive a chunk from your streaming HTTP client.

Returns a tuple of:

  • List of events parsed from this chunk (process these immediately!)
  • Updated state to use for the next chunk

Example

// In your streaming HTTP callback:
fn on_chunk(state: StreamingState, chunk: String) -> StreamingState {
  let #(events, new_state) = process_chunk(state, chunk)

  // Process events immediately as they arrive
  list.each(events, fn(event) {
    case get_event_text(event) {
      Ok(text) -> io.print(text)  // Real-time output!
      Error(_) -> Nil
    }
  })

  new_state
}
pub fn stream_message(
  api_client: client.Client,
  message_request: request.CreateMessageRequest,
) -> Result(StreamResult, StreamError)

Deprecated: Use api.chat_stream instead

Stream a message request and return all events (batch mode)

@deprecated Use api.chat_stream instead for a unified streaming API

Note: This function collects ALL events before returning. It does NOT provide true real-time streaming. For real-time streaming, use the sans-io functions (new_streaming_state, process_chunk) with a streaming HTTP client.

Example

// Prefer using api.chat_stream instead:
case api.chat_stream(client, request) {
  Ok(result) -> api.stream_text(result)
  Error(err) -> handle_error(err)
}
pub fn stream_message_with_callback(
  api_client: client.Client,
  message_request: request.CreateMessageRequest,
  callback: fn(streaming.StreamEvent) -> Nil,
) -> Result(StreamResult, StreamError)

Deprecated: Use api.chat_stream_with_callback instead

Stream a message request with a callback for each event (batch mode)

@deprecated Use api.chat_stream_with_callback instead for a unified streaming API

Note: Despite the callback, this function collects ALL events before calling the callbacks. It does NOT provide true real-time streaming. For real-time streaming, use the sans-io functions with a streaming HTTP client.

Example

// Prefer using api.chat_stream_with_callback instead:
api.chat_stream_with_callback(client, request, fn(event) {
  case api.event_text(event) {
    Ok(text) -> io.print(text)
    Error(_) -> Nil
  }
})
Search Document