Streaming Responses in Ragex
View SourceThis document explains the streaming response functionality.
Overview
Ragex now supports streaming responses from all four AI providers:
- OpenAI: GPT-4, GPT-4-turbo, GPT-3.5-turbo (SSE format)
- Anthropic: Claude 3 Opus/Sonnet/Haiku (SSE format with event types)
- DeepSeek: deepseek-chat, deepseek-reasoner (SSE format, OpenAI-compatible)
- Ollama: Local LLMs (NDJSON format)
Streaming provides real-time response generation, allowing for:
- Progressive UI updates as content arrives
- Lower perceived latency for long responses
- Better user experience for interactive applications
- Token usage tracking in real-time
Architecture
Provider Level (lib/ragex/ai/provider/)
Each provider implements the stream_generate/3 callback defined in Ragex.AI.Behaviour:
@callback stream_generate(prompt :: String.t(), context :: map() | nil, opts) ::
{:ok, Enumerable.t(chunk())} | {:error, term()}Chunk format:
%{
content: String.t(), # Incremental content
done: boolean(), # true for final chunk
metadata: map() # Provider info, usage stats (on final chunk)
}Implementation pattern:
- Initiate HTTP streaming request with
Req.post(..., into: fn {:data, data}, {req, resp} -> ...) - Use
Task.asyncto handle streaming in separate process - Use
Stream.resourceto create Elixir stream from HTTP chunks - Parse SSE/NDJSON events and extract content deltas
- Track token usage and include in final chunk metadata
Error handling:
- HTTP errors:
{:error, {:api_error, status, body}} - Network errors:
{:error, {:http_error, reason}} - Timeouts: 30-second receive timeout per provider
Pipeline Level (lib/ragex/rag/pipeline.ex)
Three new streaming functions:
# Query with streaming
Pipeline.stream_query(user_query, opts)
# Explain with streaming
Pipeline.stream_explain(target, aspect, opts)
# Suggest with streaming
Pipeline.stream_suggest(target, focus, opts)Features:
- Automatic usage tracking (records tokens on final chunk)
- Source attribution (added to final chunk metadata)
- Rate limiting (checked before starting stream)
- Retrieval context injection (same as non-streaming)
Options:
:stream_metadata- Include sources in every chunk (default: false)- All standard RAG options (
:limit,:threshold,:provider, etc.)
MCP Tools Level (lib/ragex/mcp/handlers/tools.ex)
Three new MCP tools:
rag_query_stream - Streaming version of rag_query
rag_explain_stream - Streaming version of rag_explain
rag_suggest_stream - Streaming version of rag_suggestUsage Examples
Direct Provider Usage
alias Ragex.AI.Provider.OpenAI
# Start streaming
{:ok, stream} = OpenAI.stream_generate(
"Explain this code",
%{context: "def foo, do: :bar"},
temperature: 0.7
)
# Consume stream
Enum.each(stream, fn
%{done: false, content: chunk} ->
IO.write(chunk) # Print as it arrives
%{done: true, metadata: meta} ->
IO.puts("\n\nUsage: #{inspect(meta.usage)}")
{:error, reason} ->
IO.puts("Error: #{inspect(reason)}")
end)RAG Pipeline Usage
alias Ragex.RAG.Pipeline
# Stream a query
{:ok, stream} = Pipeline.stream_query("How does auth work?", limit: 5)
# Accumulate content
content =
stream
|> Stream.filter(fn %{done: done} -> not done end)
|> Stream.map(fn %{content: c} -> c end)
|> Enum.join()
# Get final metadata
final_chunk =
stream
|> Enum.find(fn %{done: done} -> done end)
IO.puts("Response: #{content}")
IO.puts("Sources: #{length(final_chunk.metadata.sources)}")MCP Tool Usage
Via MCP client:
{
"jsonrpc": "2.0",
"method": "tools/call",
"params": {
"name": "rag_query_stream",
"arguments": {
"query": "Explain the authentication flow",
"limit": 5,
"provider": "openai",
"show_chunks": true
}
}
}Response:
{
"status": "success",
"query": "Explain the authentication flow",
"response": "The authentication flow consists of...",
"sources_count": 3,
"model_used": "gpt-4-turbo",
"streaming": true,
"chunks_count": 12,
"chunks": [...] // Only if show_chunks: true
}Protocol Details
OpenAI SSE Format
data: {"choices":[{"delta":{"content":"Hello"},"finish_reason":null}]}
data: {"choices":[{"delta":{"content":" world"},"finish_reason":null}]}
data: {"choices":[{"delta":{},"finish_reason":"stop"}],"usage":{"prompt_tokens":10,"completion_tokens":5}}
data: [DONE]Anthropic SSE Format
event: message_start
data: {"type":"message_start","message":{"usage":{"input_tokens":10}}}
event: content_block_delta
data: {"type":"content_block_delta","delta":{"type":"text_delta","text":"Hello"}}
event: content_block_delta
data: {"type":"content_block_delta","delta":{"type":"text_delta","text":" world"}}
event: message_delta
data: {"type":"message_delta","usage":{"output_tokens":5}}
event: message_stop
data: {"type":"message_stop"}Ollama NDJSON Format
{"model":"codellama","response":"Hello","done":false}
{"model":"codellama","response":" world","done":false}
{"model":"codellama","response":"","done":true}Performance Characteristics
Latency:
- First chunk: ~200-500ms (same as non-streaming)
- Subsequent chunks: ~50-100ms intervals
- Total time: Same as non-streaming (no overhead)
Token Usage:
- Tracked identically to non-streaming
- Reported in final chunk metadata
- Recorded via Usage module for cost tracking
Memory:
- Constant memory per stream (buffering only incomplete events)
- No accumulation until explicitly collected
Cancellation:
- Streams can be stopped early by halting enumeration
- Task cleanup via Stream.resource cleanup function
- 30-second receive timeout prevents hanging
Error Scenarios
| Error | When | Handling |
|---|---|---|
| API Error (4xx/5xx) | HTTP status != 200 | {:error, {:api_error, status, body}} |
| Network Error | Connection lost | {:error, {:http_error, reason}} |
| Timeout | No data for 30s | {:error, :timeout} in stream |
| Rate Limit | Before request | {:error, {:rate_limited, reason}} |
| Invalid JSON | SSE/NDJSON parse | Skip chunk, continue stream |
Configuration
No additional configuration required. Streaming uses the same provider settings as non-streaming:
config :ragex, :ai_providers,
openai: [
endpoint: "https://api.openai.com/v1",
model: "gpt-4-turbo",
options: [
temperature: 0.7,
max_tokens: 2048
]
]What’s there
Full MCP Streaming Protocol
- Emit JSON-RPC notifications for each chunk
- Cancellation support via MCP protocol
- Progress indicators
Advanced Features
- Stream caching (cache reconstructed from chunks)
- Concurrent multi-provider streaming (race/merge strategies)
- Stream transformations (filtering, augmentation)
Performance Optimizations
- Adaptive buffering based on chunk size
- Connection pooling for multiple streams
- Predictive prefetching
Limitations
Protocol:
- OpenAI: Requires
stream_options: %{include_usage: true}for token counts - Anthropic: Usage split across message_start and message_delta events
- Ollama: Token counts are estimated (not provided by API)
Troubleshooting
Stream hangs or times out:
- Check network connectivity
- Verify API key is valid
- Increase timeout if needed (modify receive after clause)
Chunks arrive slowly:
- Normal behavior (depends on model response time)
- Larger prompts take longer to process
- Use faster models (gpt-3.5-turbo vs gpt-4)
Missing final chunk:
- Check for errors in stream
- Ensure stream is fully consumed
- Look for
:stream_doneor:stream_errormessages
Token counts are zero:
- OpenAI: Ensure API supports stream_options
- Anthropic: Check for message_delta event
- Ollama: Counts are estimated, may be rough
See Also
lib/ragex/ai/behaviour.ex- Streaming callback definitionlib/ragex/ai/provider/*- Provider implementationslib/ragex/rag/pipeline.ex- Pipeline streaming functionslib/ragex/mcp/handlers/tools.ex- MCP streaming tools