anthropic/streaming/accumulator
Stream accumulator for building complete messages from streaming events
This module provides utilities for accumulating streaming events into a complete message response. It tracks content blocks, handles deltas, and produces the final message with usage statistics.
Types
State for accumulating streaming events into a complete message
pub type AccumulatorState {
AccumulatorState(
id: option.Option(String),
message_type: option.Option(String),
role: option.Option(message.Role),
model: option.Option(String),
content_blocks: dict.Dict(Int, ContentBlockState),
stop_reason: option.Option(request.StopReason),
stop_sequence: option.Option(String),
input_tokens: Int,
output_tokens: Int,
is_complete: Bool,
error: option.Option(streaming.StreamError),
)
}
Constructors
-
AccumulatorState( id: option.Option(String), message_type: option.Option(String), role: option.Option(message.Role), model: option.Option(String), content_blocks: dict.Dict(Int, ContentBlockState), stop_reason: option.Option(request.StopReason), stop_sequence: option.Option(String), input_tokens: Int, output_tokens: Int, is_complete: Bool, error: option.Option(streaming.StreamError), )Arguments
- id
-
Message ID
- message_type
-
Message type
- role
-
Role of the response
- model
-
Model that generated the response
- content_blocks
-
Content blocks being accumulated (index -> accumulated content)
- stop_reason
-
Final stop reason
- stop_sequence
-
Final stop sequence
- input_tokens
-
Input tokens from message_start
- output_tokens
-
Output tokens from message_delta
- is_complete
-
Whether the stream is complete
- error
-
Error if one occurred
State for a content block being accumulated
pub type ContentBlockState {
TextBlockState(text: String)
ToolUseBlockState(id: String, name: String, input: String)
}
Constructors
-
TextBlockState(text: String)Text block being accumulated
-
ToolUseBlockState(id: String, name: String, input: String)Tool use block being accumulated
Values
pub fn accumulate(
events: List(streaming.StreamEvent),
) -> Result(request.CreateMessageResponse, String)
Accumulate events and build final response
pub fn build_response(
state: AccumulatorState,
) -> Result(request.CreateMessageResponse, String)
Build a CreateMessageResponse from the accumulated state
pub fn get_accumulated_text(state: AccumulatorState) -> String
Get the accumulated text so far
pub fn get_accumulated_tool_inputs(
state: AccumulatorState,
) -> List(#(String, String, String))
Get the accumulated tool inputs so far
pub fn has_content(state: AccumulatorState) -> Bool
Check if accumulator has any content
pub fn has_error(state: AccumulatorState) -> Bool
Check if accumulator encountered an error
pub fn process_event(
state: AccumulatorState,
event: streaming.StreamEvent,
) -> AccumulatorState
Process a single event and update accumulator state
pub fn process_events(
events: List(streaming.StreamEvent),
) -> AccumulatorState
Process multiple events and return final state
pub fn process_events_with_callback(
events: List(streaming.StreamEvent),
callback: fn(streaming.StreamEvent, AccumulatorState) -> Nil,
) -> AccumulatorState
Process events with a callback for each event