Streaming

View Source

Snakepit supports streaming execution for real-time progress updates, large data transfers, and continuous data streams.

When to Use Streaming

  • Progress updates: Long-running ML operations reporting incremental progress
  • Large data transfers: Datasets too large for single responses
  • Real-time data: Continuous sensor data, log streams, or live predictions
  • Batch processing: Results delivered as items complete

For simple request-response patterns, use Snakepit.execute/3 instead.

Snakepit.execute_stream/4

@spec execute_stream(command(), args(), callback_fn(), keyword()) ::
        :ok | {:error, Snakepit.Error.t()}
ParameterTypeDescription
commandString.t()The streaming command
argsmap()Command arguments
callback_fn(term() -> any())Called for each chunk
optskeyword()Execution options
OptionDefaultDescription
:poolSnakepit.PoolPool to use
:timeout300000Timeout in ms
:session_idnilSession affinity

Basic Example

:ok = Snakepit.execute_stream("process_items", %{items: list}, fn chunk ->
  IO.puts("Received: #{inspect(chunk)}")
end)

Callback Function

The callback is invoked for each chunk, synchronously and in order.

# Logging
callback = fn chunk -> IO.inspect(chunk) end

# Sending to another process
callback = fn chunk -> send(consumer_pid, {:chunk, chunk}) end

Python Streaming Tools

Enabling Streaming

Use supports_streaming=True in the @tool decorator:

from snakepit_bridge.base_adapter import BaseAdapter, tool

class MyAdapter(BaseAdapter):

    @tool(description="Stream progress", supports_streaming=True)
    def stream_progress(self, steps: int = 10):
        for i in range(steps):
            yield {"step": i + 1, "total": steps, "progress": (i + 1) / steps * 100}

Using Generators

Each yield becomes a chunk sent to Elixir:

@tool(description="Stream numbers", supports_streaming=True)
def stream_numbers(self, count: int = 10):
    for i in range(count):
        yield {"number": i, "is_last": i == count - 1}

StreamChunk Class

For explicit control over the final marker:

from snakepit_bridge.adapters.showcase.tool import StreamChunk

@tool(description="Stream with metadata", supports_streaming=True)
def stream_items(self, count: int = 5):
    for i in range(count):
        yield StreamChunk(
            data={"index": i + 1, "value": f"Item {i + 1}"},
            is_final=(i == count - 1)
        )

Progress Update Patterns

Training Progress

@tool(description="Train with progress", supports_streaming=True)
def train_model(self, epochs: int = 10):
    for epoch in range(epochs):
        loss = 1.0 / (epoch + 1)
        accuracy = min(0.95, 0.5 + epoch * 0.05)
        yield {"type": "progress", "epoch": epoch + 1, "loss": loss, "accuracy": accuracy}

    yield {"type": "complete", "final_loss": loss, "final_accuracy": accuracy}

Elixir consumer:

:ok = Snakepit.execute_stream("train_model", %{epochs: 10}, fn chunk ->
  case chunk["type"] do
    "progress" -> IO.puts("Epoch #{chunk["epoch"]}: loss=#{chunk["loss"]}")
    "complete" -> IO.puts("Training complete!")
  end
end)

Large Data Transfer Patterns

Chunked Dataset

@tool(description="Generate dataset", supports_streaming=True)
def generate_dataset(self, rows: int = 10000, chunk_size: int = 1000):
    import numpy as np
    total_sent = 0
    while total_sent < rows:
        batch = min(chunk_size, rows - total_sent)
        data = np.random.randn(batch, 10)
        total_sent += batch
        yield StreamChunk(
            data={"rows": batch, "total": total_sent, "data": data.tolist()},
            is_final=(total_sent >= rows)
        )

Error Handling in Streams

Python Errors

Exceptions propagate to Elixir:

case Snakepit.execute_stream("may_fail", %{}, callback) do
  :ok -> IO.puts("Success")
  {:error, error} -> IO.puts("Failed: #{error.message}")
end

Stream Cancellation

task = Task.async(fn ->
  Snakepit.execute_stream("infinite_stream", %{}, fn chunk ->
    IO.puts("Received: #{inspect(chunk)}")
  end)
end)

Process.sleep(5000)
Task.shutdown(task, :brutal_kill)  # Cancel stream

Complete Streaming Example

Python Adapter

from snakepit_bridge.base_adapter import BaseAdapter, tool
from snakepit_bridge.adapters.showcase.tool import StreamChunk
import time

class MLStreamingAdapter(BaseAdapter):

    @tool(description="Train with streaming progress", supports_streaming=True)
    def train_with_progress(self, epochs: int = 10):
        loss, accuracy = 1.0, 0.5
        for epoch in range(epochs):
            time.sleep(0.5)
            loss *= 0.8
            accuracy = min(0.99, accuracy + 0.05)
            yield StreamChunk(
                data={"epoch": epoch + 1, "loss": round(loss, 4), "accuracy": round(accuracy, 4)},
                is_final=False
            )
        yield StreamChunk(
            data={"status": "complete", "model_id": f"model_{int(time.time())}"},
            is_final=True
        )

Elixir Consumer

defmodule MyApp.MLClient do
  require Logger

  def train_model(epochs) do
    Logger.info("Starting training")

    result = Snakepit.execute_stream("train_with_progress", %{epochs: epochs}, fn chunk ->
      if chunk["status"] == "complete" do
        Logger.info("Complete! Model: #{chunk["model_id"]}")
      else
        Logger.info("Epoch #{chunk["epoch"]}: loss=#{chunk["loss"]}, acc=#{chunk["accuracy"]}")
      end
    end)

    case result do
      :ok -> {:ok, "Training completed"}
      {:error, e} -> {:error, e}
    end
  end
end

Performance Considerations

  1. Chunk size: Balance responsiveness vs efficiency (1000-10000 items typical)
  2. Callback overhead: Keep callbacks lightweight; offload heavy processing
  3. Timeouts: Default is 5 minutes; adjust with :timeout option
  4. Memory: Streaming reduces peak memory by processing incrementally
  5. gRPC required: Streaming only works with gRPC adapters

Server-Side Streaming Implementation (v0.8.5+)

Starting in v0.8.5, Snakepit's BridgeServer fully implements ExecuteStreamingTool, enabling end-to-end gRPC streaming from external clients through to Python workers.

Requirements for Streaming Tools

  1. Tool must be a remote (Python) tool
  2. Tool must have supports_streaming: true in metadata
  3. Python adapter must implement the streaming tool as a generator

Enabling a Tool for Streaming

In your Python adapter:

@tool(description="Stream results", supports_streaming=True)
def my_streaming_tool(self, param: str):
    for i in range(10):
        yield {"step": i, "result": f"Processing {param}"}

The tool will be registered with streaming support automatically.

Stream Chunk Metadata

Final chunks include automatic metadata decoration:

  • execution_time_ms: Total execution time in milliseconds
  • tool_type: The type of tool (always "remote" for streaming)
  • worker_id: The ID of the worker that executed the tool

If the worker doesn't send a final chunk, Snakepit injects a synthetic final chunk with synthetic_final: "true" in metadata.