Snakepit (snakepit v0.4.1)

Snakepit - A generalized high-performance pooler and session manager.

Extracted from DSPex V3 pool implementation, Snakepit provides:

  • Concurrent worker initialization and management
  • Stateless pool system with session affinity
  • Generalized adapter pattern for any external process
  • High-performance OTP-based process management

Basic Usage

# Configure in config/config.exs
config :snakepit,
  pooling_enabled: true,
  adapter_module: YourAdapter

# Execute commands on any available worker
{:ok, result} = Snakepit.execute("ping", %{test: true})

# Session-based execution with worker affinity
{:ok, result} = Snakepit.execute_in_session("my_session", "command", %{})

Domain-Specific Helpers

For ML/DSP workflows with program management, see Snakepit.SessionHelpers:

# ML program creation and execution
{:ok, result} = Snakepit.SessionHelpers.execute_program_command(
  "session_id", "create_program", %{signature: "input -> output"}
)

Summary

Functions

Convenience function to execute commands on the pool.

Executes a command in session context with worker affinity.

Executes a command in a session with a callback function.

Executes a streaming command with a callback function.

Get pool statistics.

List workers from the pool.

Starts the Snakepit application, executes a given function, and ensures graceful shutdown.

Functions

execute(command, args, opts \\ [])

Convenience function to execute commands on the pool.

execute_in_session(session_id, command, args, opts \\ [])

Executes a command in session context with worker affinity.

This function executes commands with session-based worker affinity, ensuring that subsequent calls with the same session_id prefer the same worker when possible for state continuity.

Args are passed through unchanged - no domain-specific enhancement. For ML/DSP program workflows, use Snakepit.SessionHelpers.execute_program_command/4.

execute_in_session_stream(session_id, command, args \\ %{}, callback_fn, opts \\ [])

@spec execute_in_session_stream(String.t(), String.t(), map(), function(), keyword()) ::
  :ok | {:error, term()}

Executes a command in a session with a callback function.

execute_stream(command, args \\ %{}, callback_fn, opts \\ [])

@spec execute_stream(String.t(), map(), function(), keyword()) ::
  :ok | {:error, term()}

Executes a streaming command with a callback function.

Examples

Snakepit.execute_stream("batch_inference", %{items: [...]}, fn chunk ->
  IO.puts("Received: #{inspect(chunk)}")
end)

Options

  • :pool - The pool to use (default: Snakepit.Pool)
  • :timeout - Request timeout in ms (default: 300000)
  • :session_id - Run in a specific session

Returns

Returns :ok on success or {:error, reason} on failure.

Note: Streaming is only supported with gRPC adapters.

get_stats(pool \\ Snakepit.Pool)

Get pool statistics.

list_workers(pool \\ Snakepit.Pool)

List workers from the pool.

run_as_script(fun, opts \\ [])

@spec run_as_script(
  (-> any()),
  keyword()
) :: any() | {:error, term()}

Starts the Snakepit application, executes a given function, and ensures graceful shutdown.

This is the recommended way to use Snakepit for short-lived scripts or Mix tasks to prevent orphaned processes.

It handles the full OTP application lifecycle (start, run, stop) automatically.

Examples

# In a Mix task
Snakepit.run_as_script(fn ->
  {:ok, result} = Snakepit.execute("my_command", %{data: "value"})
  IO.inspect(result)
end)

# For demos or scripts
Snakepit.run_as_script(fn ->
  MyApp.run_load_test()
end)

Options

  • :timeout - Maximum time to wait for pool initialization (default: 15000ms)

Returns

Returns the result of the provided function, or {:error, reason} if the pool fails to initialize.