Snakepit.GRPCWorker (Snakepit v0.6.10)

View Source

A GenServer that manages gRPC connections to external processes.

This worker can handle both traditional request/response and streaming operations via gRPC instead of stdin/stdout communication.

## Features

  • Automatic gRPC connection management
  • Health check monitoring
  • Streaming support with callback-based API
  • Session affinity for stateful operations
  • Graceful fallback to traditional workers if gRPC unavailable

Usage

# Start a gRPC worker
{:ok, worker} = Snakepit.GRPCWorker.start_link(adapter: Snakepit.Adapters.GRPCPython)

# Simple execution
{:ok, result} = Snakepit.GRPCWorker.execute(worker, "ping", %{})

# Streaming execution
Snakepit.GRPCWorker.execute_stream(worker, "batch_inference", %{
  batch_items: ["img1.jpg", "img2.jpg"]
}, fn chunk ->
  IO.puts("Processed: #{chunk["item"]}")
end)

Summary

Functions

Returns a specification to start this module under a supervisor.

Execute a command and return the result.

Get the gRPC channel for direct client usage.

Get worker health and statistics.

Get worker information and capabilities.

Get the session ID for this worker.

Start a gRPC worker with the given adapter.

Types

worker_state()

@type worker_state() :: %{
  adapter: module(),
  connection: map() | nil,
  port: integer(),
  process_pid: integer() | nil,
  server_port: port() | nil,
  id: String.t(),
  pool_name: atom() | pid(),
  health_check_ref: reference() | nil,
  heartbeat_monitor: pid() | nil,
  heartbeat_config: map(),
  stats: map(),
  session_id: String.t(),
  worker_config: map()
}

Functions

child_spec(init_arg)

Returns a specification to start this module under a supervisor.

See Supervisor.

execute(worker, command, args, timeout \\ 30000)

Execute a command and return the result.

execute_in_session(worker, session_id, command, args, timeout \\ 30000)

Execute a command in a specific session.

execute_stream(worker, command, args, callback_fn, timeout \\ 300_000)

Execute a streaming command with callback.

get_channel(worker)

Get the gRPC channel for direct client usage.

get_health(worker)

Get worker health and statistics.

get_info(worker)

Get worker information and capabilities.

get_session_id(worker)

Get the session ID for this worker.

start_link(opts)

Start a gRPC worker with the given adapter.