anthropic/streaming/accumulator

Stream accumulator for building complete messages from streaming events

This module provides utilities for accumulating streaming events into a complete message response. It tracks content blocks, handles deltas, and produces the final message with usage statistics.

Types

State for accumulating streaming events into a complete message

pub type AccumulatorState {
  AccumulatorState(
    id: option.Option(String),
    message_type: option.Option(String),
    role: option.Option(message.Role),
    model: option.Option(String),
    content_blocks: dict.Dict(Int, ContentBlockState),
    stop_reason: option.Option(request.StopReason),
    stop_sequence: option.Option(String),
    input_tokens: Int,
    output_tokens: Int,
    is_complete: Bool,
    error: option.Option(streaming.StreamError),
  )
}

Constructors

State for a content block being accumulated

pub type ContentBlockState {
  TextBlockState(text: String)
  ToolUseBlockState(id: String, name: String, input: String)
}

Constructors

  • TextBlockState(text: String)

    Text block being accumulated

  • ToolUseBlockState(id: String, name: String, input: String)

    Tool use block being accumulated

Values

pub fn accumulate(
  events: List(streaming.StreamEvent),
) -> Result(request.CreateMessageResponse, String)

Accumulate events and build final response

pub fn build_response(
  state: AccumulatorState,
) -> Result(request.CreateMessageResponse, String)

Build a CreateMessageResponse from the accumulated state

pub fn get_accumulated_text(state: AccumulatorState) -> String

Get the accumulated text so far

pub fn get_accumulated_tool_inputs(
  state: AccumulatorState,
) -> List(#(String, String, String))

Get the accumulated tool inputs so far

pub fn has_content(state: AccumulatorState) -> Bool

Check if accumulator has any content

pub fn has_error(state: AccumulatorState) -> Bool

Check if accumulator encountered an error

pub fn new() -> AccumulatorState

Create a new accumulator state

pub fn process_event(
  state: AccumulatorState,
  event: streaming.StreamEvent,
) -> AccumulatorState

Process a single event and update accumulator state

pub fn process_events(
  events: List(streaming.StreamEvent),
) -> AccumulatorState

Process multiple events and return final state

pub fn process_events_with_callback(
  events: List(streaming.StreamEvent),
  callback: fn(streaming.StreamEvent, AccumulatorState) -> Nil,
) -> AccumulatorState

Process events with a callback for each event

pub fn total_tokens(state: AccumulatorState) -> Int

Get total token count

Search Document