Snakepit.GRPCWorker (Snakepit v0.8.7)
View SourceA GenServer that manages gRPC connections to external processes.
This worker can handle both traditional request/response and streaming operations via gRPC instead of stdin/stdout communication.
## Features
- Automatic gRPC connection management
- Health check monitoring
- Streaming support with callback-based API
- Session affinity for stateful operations
- Graceful fallback to traditional workers if gRPC unavailable
Usage
# Start a gRPC worker
{:ok, worker} = Snakepit.GRPCWorker.start_link(adapter: Snakepit.Adapters.GRPCPython)
# Simple execution
{:ok, result} = Snakepit.GRPCWorker.execute(worker, "ping", %{})
# Streaming execution
Snakepit.GRPCWorker.execute_stream(worker, "batch_inference", %{
batch_items: ["img1.jpg", "img2.jpg"]
}, fn chunk ->
handle_chunk(chunk)
end)
Summary
Functions
Returns a specification to start this module under a supervisor.
Execute a command and return the result.
Execute a command in a specific session.
Execute a streaming command with callback.
Get the gRPC channel for direct client usage.
Get worker health and statistics.
Get worker information and capabilities.
Get the session ID for this worker.
Start a gRPC worker with the given adapter.
Returns the recommended supervisor shutdown timeout.
Types
@type worker_state() :: %{ adapter: module(), connection: map() | nil, port: integer(), process_pid: integer() | nil, pgid: integer() | nil, process_group?: boolean(), server_port: port() | nil, id: String.t(), pool_name: atom() | pid(), health_check_ref: reference() | nil, heartbeat_monitor: pid() | nil, heartbeat_config: map(), ready_file: String.t(), stats: map(), session_id: String.t(), worker_config: map(), shutting_down: boolean() }
Functions
Returns a specification to start this module under a supervisor.
See Supervisor.
Execute a command and return the result.
Execute a command in a specific session.
Execute a streaming command with callback.
Get the gRPC channel for direct client usage.
Get worker health and statistics.
Get worker information and capabilities.
Get the session ID for this worker.
Start a gRPC worker with the given adapter.
Returns the recommended supervisor shutdown timeout.
This is graceful_shutdown_timeout + margin to ensure supervisors give workers
enough time to complete their terminate/2 callback (which includes graceful
Python process termination).
Use this value for:
shutdown:in child_specshutdown:in Worker.Starter- Any other supervisor that manages GRPCWorker processes
Example
children = [
%{
id: MyWorker,
start: {Snakepit.GRPCWorker, :start_link, [opts]},
shutdown: Snakepit.GRPCWorker.supervisor_shutdown_timeout()
}
]