Snakepit gRPC Streaming Guide

View Source

Overview

This guide covers Snakepit's gRPC streaming implementation - a modern replacement for stdin/stdout communication that enables real-time progress updates, progressive results, and superior performance for ML and data processing workflows.

Table of Contents

Quick Start

1. Install Dependencies

# Install gRPC dependencies
make install-grpc

# Generate protocol buffer code
make proto-python

# Verify installation
python -c "import grpc, snakepit_bridge.grpc.snakepit_pb2; print('✅ gRPC ready')"

2. Basic Configuration

# Configure Snakepit with gRPC adapter
Application.put_env(:snakepit, :adapter_module, Snakepit.Adapters.GRPCPython)
Application.put_env(:snakepit, :grpc_config, %{
  base_port: 50051,
  port_range: 100  # Uses ports 50051-50151
})

{:ok, _} = Application.ensure_all_started(:snakepit)

3. Run Your First Stream

# Traditional (still works)
{:ok, result} = Snakepit.execute("ping", %{})

# NEW: Streaming execution
Snakepit.execute_stream("ping_stream", %{count: 5}, fn chunk ->
  IO.puts("Received: #{chunk["message"]}")
end)

Installation

Prerequisites

  • Elixir: 1.18+ with OTP 27+
  • Python: 3.8+
  • System: protoc compiler (for development)

Elixir Dependencies

# mix.exs
def deps do
  [
    {:grpc, "~> 0.8"},
    {:protobuf, "~> 0.12"},
    # ... existing deps
  ]
end

Python Dependencies

# Option 1: Install with gRPC support
cd priv/python && pip install -e ".[grpc]"

# Option 2: Install manually
pip install grpcio>=1.50.0 protobuf>=4.0.0 grpcio-tools>=1.50.0

# Option 3: All features
pip install -e ".[all]"  # Includes gRPC + MessagePack

Development Setup

# Complete development environment
make dev-setup

# Or step by step:
make install-grpc      # Install Python gRPC deps
make proto-python      # Generate protobuf code
make test             # Verify everything works

Core Concepts

Communication Protocols Comparison

ProtocolStreamingMultiplexingBinary DataSetup Complexity
stdin/stdoutBase64Simple
MessagePack✅ NativeSimple
gRPC✅ Native✅ HTTP/2✅ NativeModerate

Protocol Selection

# Snakepit automatically chooses the best available protocol:

# 1. If gRPC adapter configured and available → gRPC
# 2. If MessagePack adapter configured → MessagePack  
# 3. Fallback → JSON over stdin/stdout

# Force specific protocol:
Application.put_env(:snakepit, :adapter_module, Snakepit.Adapters.GRPCPython)        # gRPC
Application.put_env(:snakepit, :adapter_module, Snakepit.Adapters.GenericPythonMsgpack)  # MessagePack
Application.put_env(:snakepit, :adapter_module, Snakepit.Adapters.GenericPythonV2)     # JSON

Worker Architecture

        
   Elixir App          gRPC Worker         Python Process  
                    (GenServer)     (gRPC Server)  
 execute_stream                                            
        
                                                       
                       HTTP/2 Connection               
                                                       
    Callback Handler      Connection Mgmt          Stream Handlers

Connection lifecycle & channel reuse

  • Workers store the OS-negotiated port after GRPC_READY, so registry lookups always return the live endpoint even when the adapter requested port 0.
  • BridgeServer first asks the worker for its already-open GRPC.Stub and only dials a new channel as a fallback—cutting handshake latency and ensuring sockets are torn down after use.
  • Tests: test/unit/grpc/grpc_worker_ephemeral_port_test.exs (port persistence) and test/snakepit/grpc/bridge_server_test.exs (channel reuse) guard the behaviour.

Session Management

# Sessions work seamlessly with gRPC
session_id = "ml_training_#{user_id}"

# Session-based execution (maintains state)
{:ok, result} = Snakepit.execute_in_session(session_id, "initialize_model", model_config)

# Session-based streaming
Snakepit.execute_in_session_stream(session_id, "train_model", training_data, fn chunk ->
  IO.puts("Epoch #{chunk["epoch"]}: loss=#{chunk["loss"]}")
end)

State protection & quotas

  • SessionStore, ToolRegistry, and ProcessRegistry expose their state through :protected ETS tables and keep DETS handles private, so only the owning processes can mutate core metadata.
  • Configurable session/program quotas return tagged errors such as {:error, :session_quota_exceeded} or {:error, {:program_quota_exceeded, session_id}}, making it easy to surface actionable messages to callers.
  • Regression coverage: test/unit/bridge/session_store_test.exs exercises quota paths, while test/unit/pool/process_registry_security_test.exs ensures ETS/DETS access stays locked down.

Logging redaction & diagnostics

  • \Snakepit.Logger.Redaction (internal helper) now collapses sensitive payloads into short summaries before anything hits the log pipeline, preventing credential or large blob leaks during gRPC debugging.
  • Bridge telemetry ties the redaction summaries to execution spans so you still get useful context without sacrificing safety.
  • Guarded by test/unit/logger/redaction_test.exs, which asserts both redaction coverage and fallback behaviour.

API Reference

Basic Execution

# Snakepit.execute/3 - Same as always
{:ok, result} = Snakepit.execute(command, args, timeout \\ 30_000)

# Examples
{:ok, _} = Snakepit.execute("ping", %{})
{:ok, result} = Snakepit.execute("compute", %{operation: "add", a: 5, b: 3})

Streaming Execution

# Snakepit.execute_stream/4 - NEW!
:ok = Snakepit.execute_stream(command, args, callback_fn, timeout \\ 300_000)

# Callback function receives each chunk
callback_fn = fn chunk ->
  if chunk["is_final"] do
    IO.puts("Stream complete!")
  else
    IO.puts("Progress: #{chunk["progress"]}%")
  end
end

Streaming chunk envelope

  • Each callback receives a map built from ToolChunk: decoded JSON payload merged with "is_final" and optional _metadata.
  • Binary responses degrade gracefully to "raw_data_base64" so consumers can still inspect results without guessing encodings.
  • Regression coverage: test/snakepit/streaming_regression_test.exs asserts ordering/finality while Python fixtures cover metadata propagation.

Session-based APIs

# Session execution (existing)
{:ok, result} = Snakepit.execute_in_session(session_id, command, args, timeout \\ 30_000)

# Session streaming (NEW!)
:ok = Snakepit.execute_in_session_stream(session_id, command, args, callback_fn, timeout \\ 300_000)

Error Handling

case Snakepit.execute_stream("long_process", %{data: large_data}, callback) do
  :ok ->
    IO.puts("Stream completed successfully")
    
  {:error, :worker_timeout} ->
    IO.puts("Operation timed out")
    
  {:error, :grpc_unavailable} ->
    IO.puts("gRPC not available, check setup")
    
{:error, reason} ->
  IO.puts("Stream failed: #{inspect(reason)}")
end

Invalid payloads: When a tool parameter is encoded as malformed JSON the bridge returns {:error, {:invalid_parameter, key, message}} before contacting the worker. See test/snakepit/grpc/bridge_server_test.exs for real examples.

Streaming Examples

Quick Demo

Run the end-to-end streaming example to watch five progress updates stream from Python back into Elixir:

MIX_ENV=dev mix run examples/stream_progress_demo.exs

Internally it calls the stream_progress tool with a configurable delay_ms so you can watch each chunk arrive roughly every 750 ms. Chunks look like:

%{
  "is_final" => boolean(),
  "message" => String.t(),
  "progress" => float(),
  "step" => pos_integer(),
  "total" => pos_integer()
}

"is_final" is set on the last chunk so you can detect completion without relying on timeouts.

1. ML Batch Inference

Stream inference results as each item completes:

batch_items = ["image_001.jpg", "image_002.jpg", "image_003.jpg"]

Snakepit.execute_stream("batch_inference", %{
  model_path: "/models/resnet50.pkl",
  batch_items: batch_items
}, fn chunk ->
  if chunk["is_final"] do
    IO.puts("🎉 Batch inference complete!")
  else
    item = chunk["item"]
    prediction = chunk["prediction"] 
    confidence = chunk["confidence"]
    IO.puts("🧠 #{item}: #{prediction} (#{confidence}% confidence)")
  end
end)

# Output:
# 🧠 image_001.jpg: cat (94% confidence)
# 🧠 image_002.jpg: dog (87% confidence)  
# 🧠 image_003.jpg: bird (91% confidence)
# 🎉 Batch inference complete!

2. Large Dataset Processing

Process huge datasets with real-time progress:

Snakepit.execute_stream("process_large_dataset", %{
  file_path: "/data/sales_data_10gb.csv",
  chunk_size: 10_000,
  operations: ["clean", "transform", "aggregate"]
}, fn chunk ->
  if chunk["is_final"] do
    stats = chunk["final_stats"]
    IO.puts("✅ Processing complete!")
    IO.puts("📊 Final stats: #{inspect(stats)}")
  else
    progress = chunk["progress_percent"]
    processed = chunk["processed_rows"]
    total = chunk["total_rows"]
    memory_mb = chunk["memory_usage_mb"]
    
    IO.puts("📊 Progress: #{progress}% (#{processed}/#{total} rows) - Memory: #{memory_mb}MB")
  end
end)

# Output:
# 📊 Progress: 10.0% (100000/1000000 rows) - Memory: 45MB
# 📊 Progress: 20.0% (200000/1000000 rows) - Memory: 47MB
# 📊 Progress: 30.0% (300000/1000000 rows) - Memory: 46MB
# ...
# ✅ Processing complete!
# 📊 Final stats: %{errors: 12, processed: 988000, skipped: 12}

3. Real-time Log Analysis

Analyze logs in real-time as new entries arrive:

Snakepit.execute_stream("tail_and_analyze", %{
  log_path: "/var/log/app.log",
  patterns: ["ERROR", "FATAL", "OutOfMemoryError"],
  context_lines: 3
}, fn chunk ->
  if chunk["is_final"] do
    IO.puts("📊 Log analysis session ended")
  else
    severity = chunk["severity"]
    timestamp = chunk["timestamp"]
    line = chunk["log_line"]
    pattern = chunk["pattern_matched"]
    
    emoji = case severity do
      "FATAL" -> "💀"
      "ERROR" -> "🚨" 
      "WARN" -> "⚠️"
      _ -> "ℹ️"
    end
    
    IO.puts("#{emoji} [#{timestamp}] #{pattern}: #{String.slice(line, 0, 80)}...")
  end
end)

# Output:
# 🚨 [2025-01-20T15:30:45] ERROR: Database connection failed: timeout after 30s...
# ⚠️ [2025-01-20T15:30:50] WARN: High memory usage detected: 85% of heap used...
# 💀 [2025-01-20T15:31:00] FATAL: OutOfMemoryError: Java heap space exceeded...

4. Distributed Training Monitoring

Monitor ML training progress across multiple nodes:

Snakepit.execute_stream("distributed_training", %{
  model_config: %{
    architecture: "transformer",
    layers: 24,
    hidden_size: 1024
  },
  training_config: %{
    epochs: 100,
    batch_size: 32,
    learning_rate: 0.001
  },
  dataset_path: "/data/training_set"
}, fn chunk ->
  if chunk["is_final"] do
    model_path = chunk["final_model_path"] 
    best_acc = chunk["best_val_accuracy"]
    IO.puts("🎯 Training complete! Best accuracy: #{best_acc}")
    IO.puts("💾 Model saved: #{model_path}")
  else
    epoch = chunk["epoch"]
    train_loss = chunk["train_loss"]
    val_loss = chunk["val_loss"] 
    train_acc = chunk["train_acc"]
    val_acc = chunk["val_acc"]
    lr = chunk["learning_rate"]
    
    IO.puts("📈 Epoch #{epoch}/100:")
    IO.puts("   Train: loss=#{train_loss}, acc=#{train_acc}")
    IO.puts("   Val:   loss=#{val_loss}, acc=#{val_acc}")
    IO.puts("   LR: #{lr}")
  end
end)

# Output:
# 📈 Epoch 1/100:
#    Train: loss=2.45, acc=0.12
#    Val:   loss=2.38, acc=0.15
#    LR: 0.001
# 📈 Epoch 2/100:
#    Train: loss=2.12, acc=0.23
#    Val:   loss=2.05, acc=0.28
#    LR: 0.001
# ...
# 🎯 Training complete! Best accuracy: 0.94
# 💾 Model saved: /models/transformer_best_20250120.pkl

5. Financial Data Pipeline

Process real-time financial data with technical analysis:

Snakepit.execute_stream("realtime_stock_analysis", %{
  symbols: ["AAPL", "GOOGL", "MSFT", "TSLA"],
  indicators: ["RSI", "MACD", "SMA_20", "SMA_50"],
  alert_thresholds: %{
    rsi_oversold: 30,
    rsi_overbought: 70,
    volume_spike: 2.0
  }
}, fn chunk ->
  if chunk["is_final"] do
    IO.puts("📊 Market session ended")
  else
    symbol = chunk["symbol"]
    price = chunk["price"]
    volume = chunk["volume"] 
    rsi = chunk["rsi"]
    signal = chunk["trading_signal"]
    
    case signal do
      "BUY" -> IO.puts("🟢 #{symbol}: $#{price} - BUY signal (RSI: #{rsi})")
      "SELL" -> IO.puts("🔴 #{symbol}: $#{price} - SELL signal (RSI: #{rsi})")
      _ -> IO.puts("⚪ #{symbol}: $#{price} - HOLD (RSI: #{rsi})")
    end
  end
end)

# Output:
# ⚪ AAPL: $175.50 - HOLD (RSI: 45.2)
# 🟢 GOOGL: $142.80 - BUY signal (RSI: 28.5)
# ⚪ MSFT: $380.20 - HOLD (RSI: 52.1) 
# 🔴 TSLA: $210.15 - SELL signal (RSI: 73.8)

Performance

Streaming vs Traditional Comparison

Metricstdin/stdoutMessagePackgRPC Streaming
Latency (first result)5-10s3-7s0.1-0.5s
Memory usageGrows with resultGrows with resultConstant
Progress visibilityNoneNoneReal-time
CancellationKill processKill processGraceful
Error granularityEnd onlyEnd onlyPer chunk
User experience"Is it working?""Is it working?"Live updates

Binary Serialization Performance

gRPC mode includes automatic binary serialization for large data:

Data SizeJSON EncodingBinary EncodingSpeedup
1KB0.1ms0.1ms1x (uses JSON)
10KB2ms2ms1x (threshold)
100KB25ms3ms8x faster
1MB300ms30ms10x faster
10MB3500ms250ms14x faster

The 10KB threshold ensures optimal performance:

  • Small data remains human-readable (JSON)
  • Large data gets maximum performance (binary)

Binary Tool Parameters

ExecuteToolRequest also exposes a binary_parameters map for payloads that should never be JSON encoded (for example, pickled tensors or zipped archives). When a client sends both maps:

  • The Elixir bridge decodes the regular parameters map as before.
  • Binary entries show up in local Elixir tools as {:binary, payload} so handlers can pattern-match without guessing.
  • Remote executions forward the original binary map to Python workers via gRPC. The helper Snakepit.GRPC.Client.execute_tool/5 accepts binary_parameters: %{ "blob" => <<...>> } in the options list to make this explicit.
  • Supplying a non-binary value (such as an integer or map) will raise an invalid_binary_parameter error before the request reaches any tool, preventing silent truncation.

Keep binary keys distinct from JSON keys to avoid confusion, and continue to prefer JSON for human-readable inputs.

Heartbeat Modes

Every Snakepit.GRPCWorker starts a Snakepit.HeartbeatMonitor unless you disable it in worker_config. Heartbeat settings flow to Python via the SNAKEPIT_HEARTBEAT_CONFIG environment variable so both sides agree on intervals, timeout, and whether the worker should die on failures.

  • Authority: The Elixir monitor is always the source of truth for recycling. Python's optional heartbeat client treats the Elixir process as a liveness hint and will only stop the Python runtime when it can no longer reach BEAM for an extended period. Never rely on Python heartbeats to keep an unhealthy worker alive—set dependent: false on the Elixir side instead.
  • dependent: true (default) &rarr; missed heartbeats or ping failures terminate the worker and the supervisor restarts a fresh capsule. Use this for production pools.
  • dependent: false &rarr; the monitor logs failures and keeps retrying without killing the worker. This is useful when debugging flaky networks or when you want Python to stay alive even if the BEAM cannot reach it temporarily.

You can also override the heartbeat ping_fun in tests or custom pools; otherwise the monitor falls back to Snakepit.GRPC.Client.heartbeat/3 for push-style pings.

Benchmarks

# Large dataset processing (1GB CSV file)
Traditional: Submit → Wait 10 minutes → Get result or timeout
gRPC Stream: Submit → Progress every 30s → Complete with stats

# ML batch inference (1000 images)  
Traditional: Submit → Wait 5 minutes → Get all predictions
gRPC Stream: Submit → Get prediction 1 → Get prediction 2 → ...

# Real-time monitoring
Traditional: Not possible
gRPC Stream: Live tail of logs/metrics with analysis

Memory Usage Patterns

# Traditional: Memory grows with result size
results = []  # Starts empty
# ... processing 1GB of data ...
results = [huge_dataset_results]  # Now using 1GB+ memory

# gRPC Streaming: Constant memory usage
Snakepit.execute_stream("process_1gb_dataset", %{}, fn chunk ->
  process_chunk(chunk)  # Handle immediately
  # chunk gets garbage collected
end)
# Memory usage stays constant regardless of total data size

Migration Guide

From stdin/stdout to gRPC

Step 1: Install gRPC

make install-grpc
make proto-python

Step 2: Update Configuration

# Before
Application.put_env(:snakepit, :adapter_module, Snakepit.Adapters.GenericPythonV2)

# After  
Application.put_env(:snakepit, :adapter_module, Snakepit.Adapters.GRPCPython)
Application.put_env(:snakepit, :grpc_config, %{base_port: 50051, port_range: 100})

Step 3: Keep Existing Code

# This still works exactly the same
{:ok, result} = Snakepit.execute("compute", %{operation: "add", a: 5, b: 3})

Step 4: Add Streaming Where Beneficial

# Replace long-running operations with streaming versions
# Before: 
{:ok, results} = Snakepit.execute("batch_process", large_dataset)  # Blocks for minutes

# After:
Snakepit.execute_stream("batch_process", large_dataset, fn chunk ->
  update_progress_bar(chunk["progress"])
  save_partial_result(chunk["data"])
end)

From MessagePack to gRPC

# MessagePack config (keep as fallback)
Application.put_env(:snakepit, :adapters, [
  Snakepit.Adapters.GRPCPython,        # Try gRPC first
  Snakepit.Adapters.GenericPythonMsgpack  # Fallback to MessagePack
])

Gradual Migration Strategy

  1. Phase 1: Install gRPC alongside existing adapter
  2. Phase 2: Test gRPC with non-critical workloads
  3. Phase 3: Move long-running operations to streaming
  4. Phase 4: Default to gRPC, keep MessagePack as fallback
  5. Phase 5: Full gRPC adoption

Creating Streaming Commands in Python

This section shows how to implement custom streaming commands in your Python adapters.

Basic Streaming Command

from snakepit_bridge.base_adapter import BaseAdapter, tool

class MyAdapter(BaseAdapter):
    @tool(description="Stream progress updates", supports_streaming=True)
    def progress_stream(self, count: int = 10, delay: float = 0.5):
        """
        Stream progress updates to the client.

        For streaming tools, return a generator that yields dictionaries.
        Each yielded dict becomes a chunk sent to the Elixir callback.
        """
        import time

        for i in range(count):
            # Yield progress chunk
            yield {
                "progress": i + 1,
                "total": count,
                "percent": ((i + 1) / count) * 100,
                "message": f"Processing item {i + 1} of {count}",
                "is_final": False
            }
            time.sleep(delay)

        # Final chunk
        yield {
            "is_final": True,
            "total_processed": count,
            "message": "Stream complete"
        }

ML Batch Inference Streaming

from snakepit_bridge.base_adapter import BaseAdapter, tool
import time

class MLAdapter(BaseAdapter):
    def __init__(self):
        super().__init__()
        # Load your model here
        self.model = self._load_model()

    @tool(description="Batch inference with streaming results", supports_streaming=True)
    def batch_inference(self, items: list, model_path: str = None):
        """
        Process items in a batch, streaming each result as it completes.
        """
        total = len(items)

        for idx, item in enumerate(items):
            # Perform inference
            prediction, confidence = self._predict(item)

            # Yield result for this item
            yield {
                "item": item,
                "index": idx,
                "prediction": prediction,
                "confidence": confidence,
                "progress": idx + 1,
                "total": total,
                "is_final": False
            }

        # Final summary
        yield {
            "is_final": True,
            "total_processed": total,
            "message": f"Processed {total} items successfully"
        }

    def _predict(self, item):
        # Your ML inference logic here
        # This is a placeholder
        time.sleep(0.1)
        return "predicted_class", 0.95

    def _load_model(self):
        # Load your model
        return None

Large Dataset Processing

from snakepit_bridge.base_adapter import BaseAdapter, tool
import pandas as pd

class DataAdapter(BaseAdapter):
    @tool(description="Process large datasets with progress", supports_streaming=True)
    def process_large_dataset(self, file_path: str, chunk_size: int = 10000):
        """
        Process a large CSV file in chunks, streaming progress.
        """
        # Get total rows (for progress calculation)
        total_rows = sum(1 for _ in open(file_path)) - 1  # -1 for header

        processed = 0
        errors = 0

        # Process in chunks
        for chunk in pd.read_csv(file_path, chunksize=chunk_size):
            # Process this chunk
            try:
                results = self._process_chunk(chunk)
                processed += len(chunk)

                # Yield progress
                yield {
                    "processed_rows": processed,
                    "total_rows": total_rows,
                    "progress_percent": (processed / total_rows) * 100,
                    "memory_usage_mb": chunk.memory_usage(deep=True).sum() / 1024 / 1024,
                    "is_final": False
                }
            except Exception as e:
                errors += 1
                yield {
                    "error": str(e),
                    "chunk_start": processed,
                    "is_final": False
                }

        # Final statistics
        yield {
            "is_final": True,
            "final_stats": {
                "total_rows": total_rows,
                "processed": processed,
                "errors": errors,
                "skipped": total_rows - processed
            }
        }

    def _process_chunk(self, chunk):
        # Your processing logic
        return chunk.apply(lambda row: row, axis=1)

Real-time Log Analysis

from snakepit_bridge.base_adapter import BaseAdapter, tool
import re
import time

class LogAdapter(BaseAdapter):
    @tool(description="Tail and analyze logs in real-time", supports_streaming=True)
    def tail_and_analyze(self, log_path: str, patterns: list, context_lines: int = 3):
        """
        Stream log analysis results in real-time.
        """
        pattern_regexes = [re.compile(p) for p in patterns]

        # Simulate tailing (in production, use actual file watching)
        with open(log_path, 'r') as f:
            for line_num, line in enumerate(f):
                for pattern_idx, regex in enumerate(pattern_regexes):
                    if regex.search(line):
                        # Found a match
                        severity = self._determine_severity(line)

                        yield {
                            "line_number": line_num,
                            "log_line": line.strip(),
                            "pattern_matched": patterns[pattern_idx],
                            "severity": severity,
                            "timestamp": self._extract_timestamp(line),
                            "is_final": False
                        }

                # Simulate real-time (remove for actual file watching)
                time.sleep(0.01)

        yield {
            "is_final": True,
            "message": "Log analysis complete"
        }

    def _determine_severity(self, line):
        if "FATAL" in line or "CRITICAL" in line:
            return "FATAL"
        elif "ERROR" in line:
            return "ERROR"
        elif "WARN" in line:
            return "WARN"
        return "INFO"

    def _extract_timestamp(self, line):
        # Extract timestamp from log line
        # This is a placeholder
        return "2025-01-20T15:30:45"

ML Training with Epoch Updates

from snakepit_bridge.base_adapter import BaseAdapter, tool
import time

class TrainingAdapter(BaseAdapter):
    @tool(description="Train model with streaming epoch updates", supports_streaming=True)
    def distributed_training(self, model_config: dict, training_config: dict, dataset_path: str):
        """
        Train a model and stream updates for each epoch.
        """
        epochs = training_config.get("epochs", 100)
        best_val_acc = 0.0

        for epoch in range(1, epochs + 1):
            # Simulate training epoch
            train_loss, train_acc = self._train_epoch(epoch)
            val_loss, val_acc = self._validate_epoch(epoch)

            # Track best accuracy
            if val_acc > best_val_acc:
                best_val_acc = val_acc
                model_path = f"/models/model_epoch_{epoch}.pkl"
                self._save_checkpoint(model_path)

            # Yield epoch results
            yield {
                "epoch": epoch,
                "total_epochs": epochs,
                "train_loss": train_loss,
                "train_acc": train_acc,
                "val_loss": val_loss,
                "val_acc": val_acc,
                "learning_rate": self._get_current_lr(epoch),
                "is_final": False
            }

        # Final result
        yield {
            "is_final": True,
            "final_model_path": model_path,
            "best_val_accuracy": best_val_acc,
            "total_epochs": epochs,
            "message": "Training complete"
        }

    def _train_epoch(self, epoch):
        # Training logic (placeholder)
        time.sleep(0.1)
        return 2.5 - (epoch * 0.02), 0.1 + (epoch * 0.008)

    def _validate_epoch(self, epoch):
        # Validation logic (placeholder)
        time.sleep(0.05)
        return 2.3 - (epoch * 0.018), 0.15 + (epoch * 0.007)

    def _get_current_lr(self, epoch):
        # Learning rate schedule
        return 0.001 * (0.95 ** epoch)

    def _save_checkpoint(self, path):
        # Save model checkpoint
        pass

Key Patterns for Streaming Commands

  1. Use Generator Functions: Return a generator (using yield) instead of returning a value directly
  2. Mark with supports_streaming: Add supports_streaming=True to the @tool decorator
  3. Include is_final: Always include "is_final": False in intermediate chunks and "is_final": True in the final chunk
  4. Yield Dictionaries: Each yielded value should be a dictionary that will be sent to the Elixir callback
  5. Progress Information: Include progress indicators (progress, total, percent, etc.) to help users track completion
  6. Error Handling: Yield error information as chunks rather than raising exceptions

Calling Streaming Commands from Elixir

# Call your custom streaming command
Snakepit.execute_stream("progress_stream", %{count: 10, delay: 0.5}, fn chunk ->
  if chunk["is_final"] do
    IO.puts("✅ #{chunk["message"]}")
  else
    IO.puts("📊 Progress: #{chunk["percent"]}% - #{chunk["message"]}")
  end
end)

# With session affinity
Snakepit.execute_in_session_stream("ml_session", "distributed_training", %{
  model_config: %{layers: 12},
  training_config: %{epochs: 100},
  dataset_path: "/data/train.csv"
}, fn chunk ->
  if chunk["is_final"] do
    IO.puts("🎯 Training complete! Model: #{chunk["final_model_path"]}")
  else
    IO.puts("📈 Epoch #{chunk["epoch"]}: loss=#{chunk["train_loss"]}, acc=#{chunk["train_acc"]}")
  end
end)

Troubleshooting

Common Issues

1. "Generated gRPC code not found"

# Solution: Generate protobuf code
make proto-python

# Verify:
ls priv/python/snakepit_bridge/grpc/
# Should show: snakepit_pb2.py, snakepit_pb2_grpc.py

2. "gRPC dependencies not available"

# Check Python dependencies
python -c "import grpc; print('gRPC:', grpc.__version__)"
python -c "import google.protobuf; print('Protobuf OK')"

# Install if missing
pip install 'snakepit-bridge[grpc]'

3. "Port already in use"

# Solution: Configure different port range
Application.put_env(:snakepit, :grpc_config, %{
  base_port: 50100,  # Try different base port
  port_range: 50     # Smaller range
})

4. Worker initialization timeout

# Check Python process
ps aux | grep grpc_bridge

# Check Python errors
python priv/python/grpc_bridge.py --help

# Increase timeout
Application.put_env(:snakepit, :worker_init_timeout, 30_000)

5. Streaming callback errors

# Bad: Callback crashes on unexpected data
callback = fn chunk ->
  result = chunk["result"]  # May be nil
  process_result(result)    # Crashes if nil
end

# Good: Defensive callback
callback = fn chunk ->
  case chunk do
    %{"is_final" => true} ->
      IO.puts("Stream complete")
    %{"result" => result} when not is_nil(result) ->
      process_result(result)
    %{"error" => error} ->
      IO.puts("Stream error: #{error}")
    other ->
      IO.puts("Unexpected chunk: #{inspect(other)}")
  end
end

Debug Mode

# Enable debug logging
Logger.configure(level: :debug)

# Trace gRPC worker
:sys.trace(worker_pid, true)

# Check gRPC connection health
Snakepit.GRPCWorker.get_health(worker)
Snakepit.GRPCWorker.get_info(worker)

Performance Tuning

# Adjust gRPC settings
Application.put_env(:snakepit, :grpc_config, %{
  base_port: 50051,
  port_range: 100,
  # Increase concurrent streams
  max_concurrent_streams: 10,
  # Tune timeouts
  connection_timeout: 10_000,
  stream_timeout: 300_000
})

# Pool optimization
Application.put_env(:snakepit, :pool_config, %{
  pool_size: 16,  # More workers for concurrent streams
  worker_init_timeout: 30_000
})

Fallback Strategy

# Graceful degradation if gRPC unavailable
defmodule MyApp.SnakepitClient do
  def execute_with_fallback(command, args, opts \\ []) do
    case Snakepit.execute(command, args, opts) do
      {:ok, result} -> 
        {:ok, result}
      {:error, :grpc_unavailable} ->
        # Fallback to MessagePack
        with_adapter(Snakepit.Adapters.GenericPythonMsgpack, fn ->
          Snakepit.execute(command, args, opts)
        end)
      {:error, reason} ->
        {:error, reason}
    end
  end
  
  defp with_adapter(adapter, fun) do
    old_adapter = Application.get_env(:snakepit, :adapter_module)
    Application.put_env(:snakepit, :adapter_module, adapter)
    
    try do
      fun.()
    after
      Application.put_env(:snakepit, :adapter_module, old_adapter)
    end
  end
end