Snakepit (Snakepit v0.6.10)

View Source

Snakepit - A generalized high-performance pooler and session manager.

Extracted from DSPex V3 pool implementation, Snakepit provides:

  • Concurrent worker initialization and management
  • Stateless pool system with session affinity
  • Generalized adapter pattern for any external process
  • High-performance OTP-based process management

Basic Usage

# Configure in config/config.exs
config :snakepit,
  pooling_enabled: true,
  adapter_module: YourAdapter

# Execute commands on any available worker
{:ok, result} = Snakepit.execute("ping", %{test: true})

# Session-based execution with worker affinity
{:ok, result} = Snakepit.execute_in_session("my_session", "command", %{})

Domain-Specific Helpers

For ML/DSP workflows with program management, see Snakepit.SessionHelpers:

# ML program creation and execution
{:ok, result} = Snakepit.SessionHelpers.execute_program_command(
  "session_id", "create_program", %{signature: "input -> output"}
)

Summary

Functions

Convenience function to execute commands on the pool.

Executes a command in session context with worker affinity.

Executes a command in a session with a callback function.

Executes a streaming command with a callback function.

Get pool statistics.

List workers from the pool.

Starts the Snakepit application, executes a given function, and ensures graceful shutdown.

Types

args()

@type args() :: map()

callback_fn()

@type callback_fn() :: (term() -> any())

command()

@type command() :: String.t()

pool_name()

@type pool_name() :: atom() | pid()

result()

@type result() :: term()

session_id()

@type session_id() :: String.t()

Functions

execute(command, args, opts \\ [])

@spec execute(command(), args(), keyword()) ::
  {:ok, result()} | {:error, Snakepit.Error.t()}

Convenience function to execute commands on the pool.

Examples

{:ok, result} = Snakepit.execute("ping", %{test: true})

Options

  • :pool - The pool to use (default: Snakepit.Pool)
  • :timeout - Request timeout in ms (default: 60000)
  • :session_id - Execute with session affinity

execute_in_session(session_id, command, args, opts \\ [])

@spec execute_in_session(session_id(), command(), args(), keyword()) ::
  {:ok, result()} | {:error, Snakepit.Error.t()}

Executes a command in session context with worker affinity.

This function executes commands with session-based worker affinity, ensuring that subsequent calls with the same session_id prefer the same worker when possible for state continuity.

Args are passed through unchanged - no domain-specific enhancement. For ML/DSP program workflows, use Snakepit.SessionHelpers.execute_program_command/4.

execute_in_session_stream(session_id, command, args \\ %{}, callback_fn, opts \\ [])

@spec execute_in_session_stream(
  session_id(),
  command(),
  args(),
  callback_fn(),
  keyword()
) ::
  :ok | {:error, Snakepit.Error.t()}

Executes a command in a session with a callback function.

execute_stream(command, args \\ %{}, callback_fn, opts \\ [])

@spec execute_stream(command(), args(), callback_fn(), keyword()) ::
  :ok | {:error, Snakepit.Error.t()}

Executes a streaming command with a callback function.

Examples

Snakepit.execute_stream("batch_inference", %{items: [...]}, fn chunk ->
  IO.puts("Received: #{inspect(chunk)}")
end)

Options

  • :pool - The pool to use (default: Snakepit.Pool)
  • :timeout - Request timeout in ms (default: 300000)
  • :session_id - Run in a specific session

Returns

Returns :ok on success or {:error, %Snakepit.Error{}} on failure.

Note: Streaming is only supported with gRPC adapters.

get_stats(pool \\ Snakepit.Pool)

@spec get_stats(pool_name()) :: map()

Get pool statistics.

Returns aggregate stats across all pools or stats for a specific pool.

list_workers(pool \\ Snakepit.Pool)

@spec list_workers(pool_name()) :: [String.t()]

List workers from the pool.

Returns a list of worker IDs.

run_as_script(fun, opts \\ [])

@spec run_as_script(
  (-> any()),
  keyword()
) :: any() | {:error, term()}

Starts the Snakepit application, executes a given function, and ensures graceful shutdown.

This is the recommended way to use Snakepit for short-lived scripts or Mix tasks to prevent orphaned processes.

It handles the full OTP application lifecycle (start, run, stop) automatically.

Examples

# In a Mix task
Snakepit.run_as_script(fn ->
  {:ok, result} = Snakepit.execute("my_command", %{data: "value"})
  IO.inspect(result)
end)

# For demos or scripts
Snakepit.run_as_script(fn ->
  MyApp.run_load_test()
end)

Options

  • :timeout - Maximum time to wait for pool initialization (default: 15000ms)

Returns

Returns the result of the provided function, or {:error, reason} if the pool fails to initialize.