View Source PoolSup (pool_sup v0.6.2)

PoolSup: Yet another process pool library in Elixir

PoolSup defines a supervisor specialized to manage pool of worker processes.

Hex.pm Build Status Coverage Status

features

Features

  • Process defined by this module behaves as a :simple_one_for_one supervisor.
  • Worker processes are spawned using a callback module that implements PoolSup.Worker behaviour.
  • PoolSup process manages which worker processes are in use and which are not.
  • PoolSup automatically restarts crashed workers.
  • Functions to request pid of an available worker process: checkout/2, checkout_nonblocking/2.
  • Run-time reconfiguration of pool size: change_capacity/3.
  • Automatic cleanup of workers hanging around too long without checkin, as a safeguard against process leaks.
  • Load-balancing using multiple pools: PoolSup.Multi.

example

Example

Suppose we have a module that implements both GenServer and PoolSup.Worker behaviours (PoolSup.Worker behaviour requires only 1 callback to implement, start_link/1).

defmodule MyWorker do
  @behaviour PoolSup.Worker
  use GenServer
  def start_link(arg) do
    GenServer.start_link(__MODULE__, arg)
  end
  # definitions of gen_server callbacks...
end

When we want to have 3 worker processes that run MyWorker server:

{:ok, pool_sup_pid} = PoolSup.start_link(MyWorker, {:worker, :arg}, 3, 0, [name: :my_pool])

Each worker process is started using MyWorker.start_link({:worker, :arg}). Then we can get a pid of a child currently not in use:

worker_pid = PoolSup.checkout(:my_pool)
do_something(worker_pid)
PoolSup.checkin(:my_pool, worker_pid)

Don't forget to return the worker_pid when finished; for simple use cases PoolSup.transaction/3 comes in handy.

reserved-and-on-demand-worker-processes

Reserved and on-demand worker processes

PoolSup defines the following two parameters to control capacity of a pool:

  • reserved (3rd argument of start_link/5): Number of workers to keep alive.
  • ondemand (4th argument of start_link/5): Maximum number of workers that are spawned on-demand.

In short:

{:ok, pool_sup_pid} = PoolSup.start_link(MyWorker, {:worker, :arg}, 2, 1)
w1  = PoolSup.checkout_nonblocking(pool_sup_pid) # Returns a pre-spawned worker pid
w2  = PoolSup.checkout_nonblocking(pool_sup_pid) # Returns the other pre-spawned worker pid
w3  = PoolSup.checkout_nonblocking(pool_sup_pid) # Returns a newly-spawned worker pid
nil = PoolSup.checkout_nonblocking(pool_sup_pid) # Returns `nil`, no available process
PoolSup.checkin(pool_sup_pid, w1)                # `w1` is terminated
PoolSup.checkin(pool_sup_pid, w2)                # `w2` is kept alive for the subsequent checkout
PoolSup.checkin(pool_sup_pid, w3)                # `w3` is kept alive for the subsequent checkout

usage-within-supervision-tree

Usage within supervision tree

The following code snippet spawns a supervisor that has PoolSup process as one of its children:

chilldren = [
  ...
  Supervisor.child_spec({PoolSup, [MyWorker, {:worker, :arg}, 5, 3]}, []),
  ...
]
Supervisor.start_link(children, [strategy: :one_for_one])

The PoolSup process initially has 5 workers and can temporarily have up to 8. All workers are started by MyWorker.start_link({:worker, :arg}).

You can of course define a wrapper function of PoolSup.start_link/4 and use it in your supervisor spec.

Link to this section Summary

Functions

Changes capacity (number of worker processes) of a pool.

Changes :checkout_max_duration option of the pool.

Checks in an in-use worker pid and make it available to others.

Checks out a worker pid that is currently not used.

Checks out a worker pid in a nonblocking manner, i.e. if no available worker found this returns nil.

Returns a child specification to be used when it's not fully specified by the parent supervisor.

Callback implementation for GenServer.init/1.

Query current status of a pool.

Checks out a worker pid, creates a link to the worker, executes the given function using the pid, and finally checks-in and unlink.

Link to this section Types

@type option() :: {:name, GenServer.name()} | {:checkout_max_duration, pos_integer()}
@type pool() :: pid() | GenServer.name()

Link to this section Functions

Link to this function

change_capacity(pool, new_reserved, new_ondemand)

View Source
@spec change_capacity(pool(), nil | non_neg_integer(), nil | non_neg_integer()) :: :ok

Changes capacity (number of worker processes) of a pool.

new_reserved and/or new_ondemand parameters can be nil; in that case the original value is kept unchanged (i.e. PoolSup.change_capacity(pool, 10, nil) replaces only reserved value of pool).

On receipt of change_capacity message, the pool adjusts number of children according to the new configuration as follows:

  • If current number of workers are less than reserved, the pool spawns new workers to ensure reserved workers are available. Note that, as is the same throughout the OTP framework, spawning processes under a supervisor is synchronous operation. Therefore increasing reserved too many at once may make the pool unresponsive for a while.
  • When increasing maximum capacity (reserved + ondemand) and if any client process is being checking-out in a blocking manner, then the newly-spawned process is returned to the client.
  • When decreasing capacity, the pool tries to shutdown extra workers that are not in use. Processes currently in use are never interrupted. If number of in-use workers is more than the desired capacity, terminating further is delayed until any worker process is checked in.
Link to this function

change_checkout_max_duration(pool, new_duration)

View Source
@spec change_checkout_max_duration(pool(), nil | pos_integer()) :: :ok

Changes :checkout_max_duration option of the pool.

See start_link/5 for detailed explanation of :checkout_max_duration option.

@spec checkin(pool(), pid()) :: :ok

Checks in an in-use worker pid and make it available to others.

Link to this function

checkout(pool, timeout \\ 5000)

View Source
@spec checkout(pool(), timeout()) :: pid()

Checks out a worker pid that is currently not used.

If no available worker process exists, the caller is blocked until either

  • any process becomes available, or
  • timeout is reached.

Note that when a pid is checked-out it must eventually be checked-in or die, in order to correctly keep track of working processes and avoid process leaks. For this purpose it's advisable to either

  • link the checked-out process and the process who is going to check-in that process, or
  • implement your worker to check-in itself at the end of each job.
Link to this function

checkout_nonblocking(pool, timeout \\ 5000)

View Source
@spec checkout_nonblocking(pool(), timeout()) :: nil | pid()

Checks out a worker pid in a nonblocking manner, i.e. if no available worker found this returns nil.

@spec child_spec(list()) :: Supervisor.child_spec()

Returns a child specification to be used when it's not fully specified by the parent supervisor.

Link to this function

format_status(opt, list)

View Source

Callback implementation for GenServer.format_status/2.

Callback implementation for GenServer.init/1.

Link to this function

start_link(worker_module, worker_init_arg, reserved, ondemand, options \\ [])

View Source
@spec start_link(module(), term(), non_neg_integer(), non_neg_integer(), [option()]) ::
  GenServer.on_start()

Starts a PoolSup process linked to the calling process.

arguments

Arguments

  • worker_module is the callback module of PoolSup.Worker.
  • worker_init_arg is the value passed to worker_module.start_link/1 callback function.
  • reserved is the number of workers this PoolSup process holds.
  • ondemand is the maximum number of workers that are spawned on checkouts when all reserved processes are in use.
  • options is a keyword list of the following options:
    • :name: used for name registration for the pool process.
    • :checkout_max_duration: a threshold (in seconds) of worker's checkout duration (see below). Defaults to nil (workers won't be killed).

terminating-non-returning-workers

Terminating non-returning workers

Sometimes it's difficult to guarantee that a checked-out worker pid will eventually be checked-in. For example there are cases where a caller process of checkout/2 is killed during waiting for a reply (a worker pid) from a pool process, resulting in a process leak of the worker pid.

For this purpose PoolSup provides :checkout_max_duration option as a safeguard against process leaks. If a checked-out worker has not been checked-in for longer than :checkout_max_duration seconds, the pool regards the worker process as leaked and kill it.

If :checkout_max_duration is nil this cleanup functionality is disabled. You can dynamically change :checkout_max_duration option of a pool by change_checkout_max_duration/2.

@spec status(pool()) :: %{
  reserved: nni,
  ondemand: nni,
  children: nni,
  available: nni,
  working: nni,
  checkout_max_duration: nil | pos_integer()
}
when nni: non_neg_integer()

Query current status of a pool.

Link to this function

transaction(pool, f, timeout \\ 5000)

View Source
@spec transaction(pool(), (pid() -> a), timeout()) :: a when a: term()

Checks out a worker pid, creates a link to the worker, executes the given function using the pid, and finally checks-in and unlink.

The timeout parameter is used only in the checkout step; time elapsed during other steps are not counted.