Snakepit.Pool (Snakepit v0.9.1)

View Source

Pool manager for external workers with concurrent initialization.

Features:

  • Concurrent worker startup (all workers start in parallel)
  • Simple queue-based request distribution
  • Non-blocking async execution
  • Automatic request queueing when workers are busy
  • Adapter-based support for any external process

Summary

Functions

Waits for the pool to be fully initialized.

Returns a specification to start this module under a supervisor.

Derives the RPC timeout from opts, considering deadline if present.

Computes effective queue timeout considering deadline.

Executes a command on any available worker.

Execute a streaming command with callback.

Returns the default timeout for a given call type.

Gets pool statistics.

Gets statistics for a specific pool name.

Lists all worker IDs in the pool.

Starts the pool manager.

Functions

await_ready(pool \\ __MODULE__, timeout \\ nil)

@spec await_ready(atom() | pid(), timeout() | nil) ::
  :ok | {:error, Snakepit.Error.t()}

Waits for the pool to be fully initialized.

Returns :ok when all workers are ready, or {:error, %Snakepit.Error{}} if the pool doesn't initialize within the given timeout.

child_spec(init_arg)

Returns a specification to start this module under a supervisor.

See Supervisor.

derive_rpc_timeout_from_opts(opts, default_timeout)

@spec derive_rpc_timeout_from_opts(Keyword.t(), timeout()) :: timeout()

Derives the RPC timeout from opts, considering deadline if present.

When a request has been queued, time has already elapsed. This function calculates the remaining time budget for the actual RPC call.

Examples

# Fresh request with 60s budget
iex> Snakepit.Pool.derive_rpc_timeout_from_opts([], 60_000)
58_800  # 60_000 - 1000 - 200 margins

# Request that waited 500ms in queue
iex> now = System.monotonic_time(:millisecond)
iex> opts = [deadline_ms: now + 59_500]
iex> Snakepit.Pool.derive_rpc_timeout_from_opts(opts, 60_000)
# ~= 58_300 (remaining - margins)

effective_queue_timeout_ms(opts, configured_queue_timeout)

@spec effective_queue_timeout_ms(Keyword.t(), timeout()) :: non_neg_integer()

Computes effective queue timeout considering deadline.

If a deadline is set and less time remains than the configured queue timeout, returns the remaining time instead.

Examples

# No deadline - use configured queue timeout
iex> Snakepit.Pool.effective_queue_timeout_ms([], 10_000)
10_000

# Deadline with 5s remaining - use remaining time
iex> now = System.monotonic_time(:millisecond)
iex> opts = [deadline_ms: now + 5_000]
iex> Snakepit.Pool.effective_queue_timeout_ms(opts, 10_000)
# ~= 5_000 (remaining time)

execute(command, args, opts \\ [])

Executes a command on any available worker.

execute_stream(command, args, callback_fn, opts \\ [])

Execute a streaming command with callback.

get_default_timeout_for_call(call_type, args, opts)

@spec get_default_timeout_for_call(atom(), map(), Keyword.t()) :: timeout()

Returns the default timeout for a given call type.

Call types

  • :execute - Regular execute operations
  • :execute_stream - Streaming operations
  • :queue - Queue wait operations

Examples

iex> Snakepit.Pool.get_default_timeout_for_call(:execute, %{}, [])
300_000  # from default_timeout()

iex> Snakepit.Pool.get_default_timeout_for_call(:execute, %{}, [timeout: 45_000])
45_000

get_stats(pool \\ __MODULE__)

Gets pool statistics.

get_stats(pool, pool_name)

Gets statistics for a specific pool name.

list_workers(pool \\ __MODULE__)

Lists all worker IDs in the pool.

Can be called with pool process or pool name:

  • list_workers() - all workers from all pools
  • list_workers(Snakepit.Pool) - all workers from all pools
  • list_workers(Snakepit.Pool, :pool_name) - workers from specific pool

list_workers(pool, pool_name)

start_link(opts \\ [])

Starts the pool manager.