gRPC Integration Quick Reference

View Source

Keep this page handy when working on Snakepit's gRPC bridge. It summarizes the current implementation, the critical entry points, and the fastest ways to test streaming behaviour end-to-end.

Current State

  • Snakepit.execute_stream/4 and execute_in_session_stream/5 stream results through the gRPC adapter with full session awareness.
  • Snakepit.Adapters.GRPCPython handles request/response and streaming calls via Snakepit.GRPC.Client with per-worker session IDs.
  • ✅ Workers persist the OS-assigned port and expose their live GRPC.Stub so BridgeServer can reuse an existing channel (test/unit/grpc/grpc_worker_ephemeral_port_test.exs, test/snakepit/grpc/bridge_server_test.exs).
  • ✅ BridgeServer validates JSON payloads up front and emits descriptive {:invalid_parameter, key, reason} tuples on malformed input (test/snakepit/grpc/bridge_server_test.exs).
  • ✅ SessionStore and bridge registries keep state in :protected ETS tables with private DETS handles while enforcing configurable quotas that fail fast with tagged errors (test/unit/bridge/session_store_test.exs, test/unit/pool/process_registry_security_test.exs).
  • ✅ The logger redaction helper summarises sensitive payloads so gRPC logs never leak credentials or large blobs (test/unit/logger/redaction_test.exs).
  • ✅ Python bridge servers (priv/python/grpc_server.py and priv/python/grpc_server_threaded.py) bridge async generators and synchronous iterators into gRPC streams using an asyncio.Queue.
  • ✅ Regression coverage lives in test/snakepit/streaming_regression_test.exs with supplemental unit tests for quotas, channel reuse, and logging redaction.
  • ✅ A runnable showcase lives in examples/stream_progress_demo.exs.

Adapter Entry Points

def grpc_execute(connection, session_id, command, args, timeout \ 30_000) do
  Snakepit.GRPC.Client.execute_tool(
    connection.channel,
    session_id,
    command,
    args,
    timeout: timeout
  )
end

def grpc_execute_stream(connection, session_id, command, args, callback, timeout \ 300_000)
      when is_function(callback, 1) do
  connection.channel
  |> Snakepit.GRPC.Client.execute_streaming_tool(session_id, command, args, timeout: timeout)
  |> consume_stream(callback)
end

consume_stream/2 decodes each ToolChunk into a plain map and invokes the callback. Payloads always include an "is_final" flag and preserve metadata under _metadata when present.

Python Stream Bridge (Process Mode)

queue: asyncio.Queue = asyncio.Queue()
sentinel = object()

async def produce_chunks():
    ...  # Drain async or sync iterators
    await queue.put(pb2.ToolChunk(...))
    await queue.put(sentinel)

asyncio.create_task(produce_chunks())

while True:
    item = await queue.get()
    if item is sentinel:
        break
    if isinstance(item, Exception):
        await context.abort(grpc.StatusCode.INTERNAL, str(item))
        return
    yield item

The threaded server mirrors this logic but supports adapters that can safely run inside a ThreadPoolExecutor.

Pool/Worker Flow

  1. Snakepit.execute_stream/4 validates the adapter supports gRPC and delegates to Snakepit.Pool.execute_stream/4.
  2. The pool checks out a worker (respecting session affinity) and forwards the request to Snakepit.GRPCWorker.execute_stream/5.
  3. The worker ensures a session is initialised, calls the adapter's grpc_execute_stream/6, and tracks success/error telemetry.

Testing & Diagnostics

  • Run the regression suite: mix test test/snakepit/streaming_regression_test.exs

  • Verify port/channel reuse: mix test test/unit/grpc/grpc_worker_ephemeral_port_test.exs test/snakepit/grpc/bridge_server_test.exs

  • Exercise quota and state protections: mix test test/unit/bridge/session_store_test.exs test/unit/pool/process_registry_security_test.exs

  • Confirm logging redaction summaries: mix test test/unit/logger/redaction_test.exs

  • Run Python bridge tests (handles venv, PYTHONPATH, and proto generation): ./test_python.sh

  • Exercise everything interactively:

    MIX_ENV=dev mix run examples/stream_progress_demo.exs
    
  • Inspect decoded payloads quickly:

    {:ok, agent} = Agent.start(fn -> [] end)
    
    Snakepit.execute_stream("stream_progress", %{"steps" => 3}, fn chunk ->
      Agent.update(agent, &[chunk | &1])
    end)
    
    Agent.get(agent, &Enum.reverse/1)

Common Payload Shape

%{
  "step" => 1,
  "total" => 5,
  "message" => "Processing step 1/5",
  "progress" => 20.0,
  "is_final" => false,
  "_metadata" => %{}
}

Use the "is_final" flag to trigger completion handlers without relying on the stream returning an empty chunk. When the bridge cannot decode ToolChunk.data as JSON it exposes a "raw_data_base64" field instead, so you can still inspect the payload safely.