Streaming
View SourceSnakepit supports streaming execution for real-time progress updates, large data transfers, and continuous data streams.
When to Use Streaming
- Progress updates: Long-running ML operations reporting incremental progress
- Large data transfers: Datasets too large for single responses
- Real-time data: Continuous sensor data, log streams, or live predictions
- Batch processing: Results delivered as items complete
For simple request-response patterns, use Snakepit.execute/3 instead.
Snakepit.execute_stream/4
@spec execute_stream(command(), args(), callback_fn(), keyword()) ::
:ok | {:error, Snakepit.Error.t()}| Parameter | Type | Description |
|---|---|---|
command | String.t() | The streaming command |
args | map() | Command arguments |
callback_fn | (term() -> any()) | Called for each chunk |
opts | keyword() | Execution options |
| Option | Default | Description |
|---|---|---|
:pool | Snakepit.Pool | Pool to use |
:timeout | 300000 | Timeout in ms |
:session_id | nil | Session affinity |
Basic Example
:ok = Snakepit.execute_stream("process_items", %{items: list}, fn chunk ->
IO.puts("Received: #{inspect(chunk)}")
end)Callback Function
The callback is invoked for each chunk, synchronously and in order.
# Logging
callback = fn chunk -> IO.inspect(chunk) end
# Sending to another process
callback = fn chunk -> send(consumer_pid, {:chunk, chunk}) endPython Streaming Tools
Enabling Streaming
Use supports_streaming=True in the @tool decorator:
from snakepit_bridge.base_adapter import BaseAdapter, tool
class MyAdapter(BaseAdapter):
@tool(description="Stream progress", supports_streaming=True)
def stream_progress(self, steps: int = 10):
for i in range(steps):
yield {"step": i + 1, "total": steps, "progress": (i + 1) / steps * 100}Using Generators
Each yield becomes a chunk sent to Elixir:
@tool(description="Stream numbers", supports_streaming=True)
def stream_numbers(self, count: int = 10):
for i in range(count):
yield {"number": i, "is_last": i == count - 1}StreamChunk Class
For explicit control over the final marker:
from snakepit_bridge.adapters.showcase.tool import StreamChunk
@tool(description="Stream with metadata", supports_streaming=True)
def stream_items(self, count: int = 5):
for i in range(count):
yield StreamChunk(
data={"index": i + 1, "value": f"Item {i + 1}"},
is_final=(i == count - 1)
)Progress Update Patterns
Training Progress
@tool(description="Train with progress", supports_streaming=True)
def train_model(self, epochs: int = 10):
for epoch in range(epochs):
loss = 1.0 / (epoch + 1)
accuracy = min(0.95, 0.5 + epoch * 0.05)
yield {"type": "progress", "epoch": epoch + 1, "loss": loss, "accuracy": accuracy}
yield {"type": "complete", "final_loss": loss, "final_accuracy": accuracy}Elixir consumer:
:ok = Snakepit.execute_stream("train_model", %{epochs: 10}, fn chunk ->
case chunk["type"] do
"progress" -> IO.puts("Epoch #{chunk["epoch"]}: loss=#{chunk["loss"]}")
"complete" -> IO.puts("Training complete!")
end
end)Large Data Transfer Patterns
Chunked Dataset
@tool(description="Generate dataset", supports_streaming=True)
def generate_dataset(self, rows: int = 10000, chunk_size: int = 1000):
import numpy as np
total_sent = 0
while total_sent < rows:
batch = min(chunk_size, rows - total_sent)
data = np.random.randn(batch, 10)
total_sent += batch
yield StreamChunk(
data={"rows": batch, "total": total_sent, "data": data.tolist()},
is_final=(total_sent >= rows)
)Error Handling in Streams
Python Errors
Exceptions propagate to Elixir:
case Snakepit.execute_stream("may_fail", %{}, callback) do
:ok -> IO.puts("Success")
{:error, error} -> IO.puts("Failed: #{error.message}")
endStream Cancellation
task = Task.async(fn ->
Snakepit.execute_stream("infinite_stream", %{}, fn chunk ->
IO.puts("Received: #{inspect(chunk)}")
end)
end)
Process.sleep(5000)
Task.shutdown(task, :brutal_kill) # Cancel streamComplete Streaming Example
Python Adapter
from snakepit_bridge.base_adapter import BaseAdapter, tool
from snakepit_bridge.adapters.showcase.tool import StreamChunk
import time
class MLStreamingAdapter(BaseAdapter):
@tool(description="Train with streaming progress", supports_streaming=True)
def train_with_progress(self, epochs: int = 10):
loss, accuracy = 1.0, 0.5
for epoch in range(epochs):
time.sleep(0.5)
loss *= 0.8
accuracy = min(0.99, accuracy + 0.05)
yield StreamChunk(
data={"epoch": epoch + 1, "loss": round(loss, 4), "accuracy": round(accuracy, 4)},
is_final=False
)
yield StreamChunk(
data={"status": "complete", "model_id": f"model_{int(time.time())}"},
is_final=True
)Elixir Consumer
defmodule MyApp.MLClient do
require Logger
def train_model(epochs) do
Logger.info("Starting training")
result = Snakepit.execute_stream("train_with_progress", %{epochs: epochs}, fn chunk ->
if chunk["status"] == "complete" do
Logger.info("Complete! Model: #{chunk["model_id"]}")
else
Logger.info("Epoch #{chunk["epoch"]}: loss=#{chunk["loss"]}, acc=#{chunk["accuracy"]}")
end
end)
case result do
:ok -> {:ok, "Training completed"}
{:error, e} -> {:error, e}
end
end
endPerformance Considerations
- Chunk size: Balance responsiveness vs efficiency (1000-10000 items typical)
- Callback overhead: Keep callbacks lightweight; offload heavy processing
- Timeouts: Default is 5 minutes; adjust with
:timeoutoption - Memory: Streaming reduces peak memory by processing incrementally
- gRPC required: Streaming only works with gRPC adapters
Server-Side Streaming Implementation (v0.8.5+)
Starting in v0.8.5, Snakepit's BridgeServer fully implements ExecuteStreamingTool,
enabling end-to-end gRPC streaming from external clients through to Python workers.
Requirements for Streaming Tools
- Tool must be a remote (Python) tool
- Tool must have
supports_streaming: truein metadata - Python adapter must implement the streaming tool as a generator
Enabling a Tool for Streaming
In your Python adapter:
@tool(description="Stream results", supports_streaming=True)
def my_streaming_tool(self, param: str):
for i in range(10):
yield {"step": i, "result": f"Processing {param}"}The tool will be registered with streaming support automatically.
Stream Chunk Metadata
Final chunks include automatic metadata decoration:
execution_time_ms: Total execution time in millisecondstool_type: The type of tool (always "remote" for streaming)worker_id: The ID of the worker that executed the tool
If the worker doesn't send a final chunk, Snakepit injects a synthetic final chunk
with synthetic_final: "true" in metadata.