Snakepit.GRPCWorker (Snakepit v0.8.7)

View Source

A GenServer that manages gRPC connections to external processes.

This worker can handle both traditional request/response and streaming operations via gRPC instead of stdin/stdout communication.

## Features

  • Automatic gRPC connection management
  • Health check monitoring
  • Streaming support with callback-based API
  • Session affinity for stateful operations
  • Graceful fallback to traditional workers if gRPC unavailable

Usage

# Start a gRPC worker
{:ok, worker} = Snakepit.GRPCWorker.start_link(adapter: Snakepit.Adapters.GRPCPython)

# Simple execution
{:ok, result} = Snakepit.GRPCWorker.execute(worker, "ping", %{})

# Streaming execution
Snakepit.GRPCWorker.execute_stream(worker, "batch_inference", %{
  batch_items: ["img1.jpg", "img2.jpg"]
}, fn chunk ->
  handle_chunk(chunk)
end)

Summary

Functions

Returns a specification to start this module under a supervisor.

Execute a command and return the result.

Get the gRPC channel for direct client usage.

Get worker health and statistics.

Get worker information and capabilities.

Get the session ID for this worker.

Start a gRPC worker with the given adapter.

Returns the recommended supervisor shutdown timeout.

Types

worker_state()

@type worker_state() :: %{
  adapter: module(),
  connection: map() | nil,
  port: integer(),
  process_pid: integer() | nil,
  pgid: integer() | nil,
  process_group?: boolean(),
  server_port: port() | nil,
  id: String.t(),
  pool_name: atom() | pid(),
  health_check_ref: reference() | nil,
  heartbeat_monitor: pid() | nil,
  heartbeat_config: map(),
  ready_file: String.t(),
  stats: map(),
  session_id: String.t(),
  worker_config: map(),
  shutting_down: boolean()
}

Functions

child_spec(init_arg)

Returns a specification to start this module under a supervisor.

See Supervisor.

execute(worker, command, args, timeout \\ 30000)

Execute a command and return the result.

execute_in_session(worker, session_id, command, args, timeout \\ 30000)

Execute a command in a specific session.

execute_stream(worker, command, args, callback_fn, timeout \\ 300_000)

Execute a streaming command with callback.

get_channel(worker)

Get the gRPC channel for direct client usage.

get_health(worker)

Get worker health and statistics.

get_info(worker)

Get worker information and capabilities.

get_session_id(worker)

Get the session ID for this worker.

start_link(opts)

Start a gRPC worker with the given adapter.

supervisor_shutdown_timeout()

Returns the recommended supervisor shutdown timeout.

This is graceful_shutdown_timeout + margin to ensure supervisors give workers enough time to complete their terminate/2 callback (which includes graceful Python process termination).

Use this value for:

  • shutdown: in child_spec
  • shutdown: in Worker.Starter
  • Any other supervisor that manages GRPCWorker processes

Example

children = [
  %{
    id: MyWorker,
    start: {Snakepit.GRPCWorker, :start_link, [opts]},
    shutdown: Snakepit.GRPCWorker.supervisor_shutdown_timeout()
  }
]