Snakepit (snakepit v0.4.1)
Snakepit - A generalized high-performance pooler and session manager.
Extracted from DSPex V3 pool implementation, Snakepit provides:
- Concurrent worker initialization and management
- Stateless pool system with session affinity
- Generalized adapter pattern for any external process
- High-performance OTP-based process management
Basic Usage
# Configure in config/config.exs
config :snakepit,
pooling_enabled: true,
adapter_module: YourAdapter
# Execute commands on any available worker
{:ok, result} = Snakepit.execute("ping", %{test: true})
# Session-based execution with worker affinity
{:ok, result} = Snakepit.execute_in_session("my_session", "command", %{})Domain-Specific Helpers
For ML/DSP workflows with program management, see Snakepit.SessionHelpers:
# ML program creation and execution
{:ok, result} = Snakepit.SessionHelpers.execute_program_command(
"session_id", "create_program", %{signature: "input -> output"}
)
Summary
Functions
Convenience function to execute commands on the pool.
Executes a command in session context with worker affinity.
Executes a command in a session with a callback function.
Executes a streaming command with a callback function.
Get pool statistics.
List workers from the pool.
Starts the Snakepit application, executes a given function, and ensures graceful shutdown.
Functions
Convenience function to execute commands on the pool.
Executes a command in session context with worker affinity.
This function executes commands with session-based worker affinity, ensuring that subsequent calls with the same session_id prefer the same worker when possible for state continuity.
Args are passed through unchanged - no domain-specific enhancement.
For ML/DSP program workflows, use Snakepit.SessionHelpers.execute_program_command/4.
@spec execute_in_session_stream(String.t(), String.t(), map(), function(), keyword()) :: :ok | {:error, term()}
Executes a command in a session with a callback function.
Executes a streaming command with a callback function.
Examples
Snakepit.execute_stream("batch_inference", %{items: [...]}, fn chunk ->
IO.puts("Received: #{inspect(chunk)}")
end)Options
:pool- The pool to use (default:Snakepit.Pool):timeout- Request timeout in ms (default: 300000):session_id- Run in a specific session
Returns
Returns :ok on success or {:error, reason} on failure.
Note: Streaming is only supported with gRPC adapters.
Get pool statistics.
List workers from the pool.
Starts the Snakepit application, executes a given function, and ensures graceful shutdown.
This is the recommended way to use Snakepit for short-lived scripts or Mix tasks to prevent orphaned processes.
It handles the full OTP application lifecycle (start, run, stop) automatically.
Examples
# In a Mix task
Snakepit.run_as_script(fn ->
{:ok, result} = Snakepit.execute("my_command", %{data: "value"})
IO.inspect(result)
end)
# For demos or scripts
Snakepit.run_as_script(fn ->
MyApp.run_load_test()
end)Options
:timeout- Maximum time to wait for pool initialization (default: 15000ms)
Returns
Returns the result of the provided function, or {:error, reason} if
the pool fails to initialize.