gRPC Integration Quick Reference
View SourceKeep this page handy when working on Snakepit's gRPC bridge. It summarizes the current implementation, the critical entry points, and the fastest ways to test streaming behaviour end-to-end.
Current State
- ✅
Snakepit.execute_stream/4andexecute_in_session_stream/5stream results through the gRPC adapter with full session awareness. - ✅
Snakepit.Adapters.GRPCPythonhandles request/response and streaming calls viaSnakepit.GRPC.Clientwith per-worker session IDs. - ✅ Workers persist the OS-assigned port and expose their live
GRPC.Stubso BridgeServer can reuse an existing channel (test/unit/grpc/grpc_worker_ephemeral_port_test.exs,test/snakepit/grpc/bridge_server_test.exs). - ✅ BridgeServer validates JSON payloads up front and emits descriptive
{:invalid_parameter, key, reason}tuples on malformed input (test/snakepit/grpc/bridge_server_test.exs). - ✅ SessionStore and bridge registries keep state in
:protectedETS tables with private DETS handles while enforcing configurable quotas that fail fast with tagged errors (test/unit/bridge/session_store_test.exs,test/unit/pool/process_registry_security_test.exs). - ✅ The logger redaction helper summarises sensitive payloads so gRPC logs never leak credentials or large blobs (
test/unit/logger/redaction_test.exs). - ✅ Python bridge servers (
priv/python/grpc_server.pyandpriv/python/grpc_server_threaded.py) bridge async generators and synchronous iterators into gRPC streams using anasyncio.Queue. - ✅ Regression coverage lives in
test/snakepit/streaming_regression_test.exswith supplemental unit tests for quotas, channel reuse, and logging redaction. - ✅ A runnable showcase lives in
examples/stream_progress_demo.exs.
Adapter Entry Points
def grpc_execute(connection, session_id, command, args, timeout \ 30_000) do
Snakepit.GRPC.Client.execute_tool(
connection.channel,
session_id,
command,
args,
timeout: timeout
)
end
def grpc_execute_stream(connection, session_id, command, args, callback, timeout \ 300_000)
when is_function(callback, 1) do
connection.channel
|> Snakepit.GRPC.Client.execute_streaming_tool(session_id, command, args, timeout: timeout)
|> consume_stream(callback)
endconsume_stream/2 decodes each ToolChunk into a plain map and
invokes the callback. Payloads always include an "is_final" flag and preserve
metadata under _metadata when present.
Python Stream Bridge (Process Mode)
queue: asyncio.Queue = asyncio.Queue()
sentinel = object()
async def produce_chunks():
... # Drain async or sync iterators
await queue.put(pb2.ToolChunk(...))
await queue.put(sentinel)
asyncio.create_task(produce_chunks())
while True:
item = await queue.get()
if item is sentinel:
break
if isinstance(item, Exception):
await context.abort(grpc.StatusCode.INTERNAL, str(item))
return
yield itemThe threaded server mirrors this logic but supports adapters that can safely run
inside a ThreadPoolExecutor.
Pool/Worker Flow
Snakepit.execute_stream/4validates the adapter supports gRPC and delegates toSnakepit.Pool.execute_stream/4.- The pool checks out a worker (respecting session affinity) and forwards the
request to
Snakepit.GRPCWorker.execute_stream/5. - The worker ensures a session is initialised, calls the adapter's
grpc_execute_stream/6, and tracks success/error telemetry.
Testing & Diagnostics
Run the regression suite:
mix test test/snakepit/streaming_regression_test.exsVerify port/channel reuse:
mix test test/unit/grpc/grpc_worker_ephemeral_port_test.exs test/snakepit/grpc/bridge_server_test.exsExercise quota and state protections:
mix test test/unit/bridge/session_store_test.exs test/unit/pool/process_registry_security_test.exsConfirm logging redaction summaries:
mix test test/unit/logger/redaction_test.exsRun Python bridge tests (handles venv, PYTHONPATH, and proto generation):
./test_python.shExercise everything interactively:
MIX_ENV=dev mix run examples/stream_progress_demo.exsInspect decoded payloads quickly:
{:ok, agent} = Agent.start(fn -> [] end) Snakepit.execute_stream("stream_progress", %{"steps" => 3}, fn chunk -> Agent.update(agent, &[chunk | &1]) end) Agent.get(agent, &Enum.reverse/1)
Common Payload Shape
%{
"step" => 1,
"total" => 5,
"message" => "Processing step 1/5",
"progress" => 20.0,
"is_final" => false,
"_metadata" => %{}
}Use the "is_final" flag to trigger completion handlers without relying on the
stream returning an empty chunk. When the bridge cannot decode ToolChunk.data
as JSON it exposes a "raw_data_base64" field instead, so you can still inspect
the payload safely.