Streaming Guide
View SourceThis guide walks through enabling streaming output in pipeline_ex pipelines.
When to Use Streaming
- Long running prompts where you want partial progress updates.
- Tooling scenarios that push incremental tokens to a UI.
- Debugging runs that need to expose intermediate state as it is produced.
If your pipeline already emits small responses, synchronous execution is fine. Streaming makes the biggest difference when a step may take tens of seconds or more.
Quick Start
- Add the
streamflag to the step that invokes your LLM provider. - Supply a streaming handler to capture events in real time.
defmodule Pipeline.StreamingExample do
use Pipeline.Step
def run(ctx) do
Pipeline.LLM.invoke!(
ctx,
prompt: ctx.prompt,
stream: true,
on_event: &handle_event/1
)
end
defp handle_event({:delta, chunk}) do
IO.write(chunk.content)
end
endStore handlers under lib/pipeline/streaming/ so other pipelines can reuse them.
YAML Configuration
Streaming can be toggled directly in pipeline YAML workflows.
steps:
- id: research_summary
type: llm
provider: claude
prompt: >
Summarize the latest findings on autonomous research pipelines.
stream: true
stream_handler: Pipeline.Streaming.ConsoleHandlerHandler Requirements
- Accept events shaped as
{:delta, chunk}and{:done, response}. - Avoid blocking calls; use async tasks if you need to buffer or persist output.
- Keep handlers idempotent—retries can trigger duplicate events.
Testing Streaming Pipelines
- Run
mix test test/streamingto execute streaming regression suites. - For ad-hoc verification, execute
mix pipeline.run pipelines/demo.yaml --stream. Capture output with
teeif you need to diff streamed tokens:mix pipeline.run ... | tee log.txt.
Troubleshooting
- Nothing appears on screen: confirm your handler writes to stdout and that the provider supports streaming for the selected model.
- Out-of-order chunks: accumulate within a GenServer before printing to guarantee ordering.
- State bleed between runs: reset any ETS tables or Agent state in your handler's
init/1.
For deeper design notes, continue with docs/ASYNC_STREAMING_MIGRATION_GUIDE.md.