Snakepit.Pool (Snakepit v0.9.1)
View SourcePool manager for external workers with concurrent initialization.
Features:
- Concurrent worker startup (all workers start in parallel)
- Simple queue-based request distribution
- Non-blocking async execution
- Automatic request queueing when workers are busy
- Adapter-based support for any external process
Summary
Functions
Waits for the pool to be fully initialized.
Returns a specification to start this module under a supervisor.
Derives the RPC timeout from opts, considering deadline if present.
Computes effective queue timeout considering deadline.
Executes a command on any available worker.
Execute a streaming command with callback.
Returns the default timeout for a given call type.
Gets pool statistics.
Gets statistics for a specific pool name.
Lists all worker IDs in the pool.
Starts the pool manager.
Functions
@spec await_ready(atom() | pid(), timeout() | nil) :: :ok | {:error, Snakepit.Error.t()}
Waits for the pool to be fully initialized.
Returns :ok when all workers are ready, or {:error, %Snakepit.Error{}} if
the pool doesn't initialize within the given timeout.
Returns a specification to start this module under a supervisor.
See Supervisor.
Derives the RPC timeout from opts, considering deadline if present.
When a request has been queued, time has already elapsed. This function calculates the remaining time budget for the actual RPC call.
Examples
# Fresh request with 60s budget
iex> Snakepit.Pool.derive_rpc_timeout_from_opts([], 60_000)
58_800 # 60_000 - 1000 - 200 margins
# Request that waited 500ms in queue
iex> now = System.monotonic_time(:millisecond)
iex> opts = [deadline_ms: now + 59_500]
iex> Snakepit.Pool.derive_rpc_timeout_from_opts(opts, 60_000)
# ~= 58_300 (remaining - margins)
@spec effective_queue_timeout_ms(Keyword.t(), timeout()) :: non_neg_integer()
Computes effective queue timeout considering deadline.
If a deadline is set and less time remains than the configured queue timeout, returns the remaining time instead.
Examples
# No deadline - use configured queue timeout
iex> Snakepit.Pool.effective_queue_timeout_ms([], 10_000)
10_000
# Deadline with 5s remaining - use remaining time
iex> now = System.monotonic_time(:millisecond)
iex> opts = [deadline_ms: now + 5_000]
iex> Snakepit.Pool.effective_queue_timeout_ms(opts, 10_000)
# ~= 5_000 (remaining time)
Executes a command on any available worker.
Execute a streaming command with callback.
Returns the default timeout for a given call type.
Call types
:execute- Regular execute operations:execute_stream- Streaming operations:queue- Queue wait operations
Examples
iex> Snakepit.Pool.get_default_timeout_for_call(:execute, %{}, [])
300_000 # from default_timeout()
iex> Snakepit.Pool.get_default_timeout_for_call(:execute, %{}, [timeout: 45_000])
45_000
Gets pool statistics.
Gets statistics for a specific pool name.
Lists all worker IDs in the pool.
Can be called with pool process or pool name:
list_workers()- all workers from all poolslist_workers(Snakepit.Pool)- all workers from all poolslist_workers(Snakepit.Pool, :pool_name)- workers from specific pool
Starts the pool manager.