pool_sup v0.6.1 PoolSup View Source

PoolSup: Yet another process pool library in Elixir

PoolSup defines a supervisor specialized to manage pool of worker processes.

Hex.pm Build Status Coverage Status

Features

  • Process defined by this module behaves as a :simple_one_for_one supervisor.
  • Worker processes are spawned using a callback module that implements PoolSup.Worker behaviour.
  • PoolSup process manages which worker processes are in use and which are not.
  • PoolSup automatically restarts crashed workers.
  • Functions to request pid of an available worker process: checkout/2, checkout_nonblocking/2.
  • Run-time reconfiguration of pool size: change_capacity/3.
  • Automatic cleanup of workers hanging around too long without checkin, as a safeguard against process leaks.
  • Load-balancing using multiple pools: PoolSup.Multi.

Example

Suppose we have a module that implements both GenServer and PoolSup.Worker behaviours (PoolSup.Worker behaviour requires only 1 callback to implement, start_link/1).

defmodule MyWorker do
  @behaviour PoolSup.Worker
  use GenServer
  def start_link(arg) do
    GenServer.start_link(__MODULE__, arg)
  end
  # definitions of gen_server callbacks...
end

When we want to have 3 worker processes that run MyWorker server:

{:ok, pool_sup_pid} = PoolSup.start_link(MyWorker, {:worker, :arg}, 3, 0, [name: :my_pool])

Each worker process is started using MyWorker.start_link({:worker, :arg}). Then we can get a pid of a child currently not in use:

worker_pid = PoolSup.checkout(:my_pool)
do_something(worker_pid)
PoolSup.checkin(:my_pool, worker_pid)

Don’t forget to return the worker_pid when finished; for simple use cases PoolSup.transaction/3 comes in handy.

Reserved and on-demand worker processes

PoolSup defines the following two parameters to control capacity of a pool:

  • reserved (3rd argument of start_link/5): Number of workers to keep alive.
  • ondemand (4th argument of start_link/5): Maximum number of workers that are spawned on-demand.

In short:

{:ok, pool_sup_pid} = PoolSup.start_link(MyWorker, {:worker, :arg}, 2, 1)
w1  = PoolSup.checkout_nonblocking(pool_sup_pid) # Returns a pre-spawned worker pid
w2  = PoolSup.checkout_nonblocking(pool_sup_pid) # Returns the other pre-spawned worker pid
w3  = PoolSup.checkout_nonblocking(pool_sup_pid) # Returns a newly-spawned worker pid
nil = PoolSup.checkout_nonblocking(pool_sup_pid) # Returns `nil`, no available process
PoolSup.checkin(pool_sup_pid, w1)                # `w1` is terminated
PoolSup.checkin(pool_sup_pid, w2)                # `w2` is kept alive for the subsequent checkout
PoolSup.checkin(pool_sup_pid, w3)                # `w3` is kept alive for the subsequent checkout

Usage within supervision tree

The following code snippet spawns a supervisor that has PoolSup process as one of its children:

chilldren = [
  ...
  Supervisor.Spec.supervisor(PoolSup, [MyWorker, {:worker, :arg}, 5, 3]),
  ...
]
Supervisor.start_link(children, [strategy: :one_for_one])

The PoolSup process initially has 5 workers and can temporarily have up to 8. All workers are started by MyWorker.start_link({:worker, :arg}).

You can of course define a wrapper function of PoolSup.start_link/4 and use it in your supervisor spec.

Link to this section Summary

Functions

Changes capacity (number of worker processes) of a pool

Changes :checkout_max_duration option of the pool

Checks in an in-use worker pid and make it available to others

Checks out a worker pid that is currently not used

Checks out a worker pid in a nonblocking manner, i.e. if no available worker found this returns nil

Returns a child specification to be used when it’s not fully specified by the parent supervisor

Invoked in some cases to retrieve a formatted version of the GenServer status

Invoked when the server is started. start_link/3 or start/3 will block until it returns

Query current status of a pool

Checks out a worker pid, creates a link to the worker, executes the given function using the pid, and finally checks-in and unlink

Link to this section Types

Link to this type option() View Source
option() :: {:name, GenServer.name()} | {:checkout_max_duration, pos_integer()}

Link to this section Functions

Link to this function change_capacity(pool, new_reserved, new_ondemand) View Source
change_capacity(pool(), nil | non_neg_integer(), nil | non_neg_integer()) :: :ok

Changes capacity (number of worker processes) of a pool.

new_reserved and/or new_ondemand parameters can be nil; in that case the original value is kept unchanged (i.e. PoolSup.change_capacity(pool, 10, nil) replaces only reserved value of pool).

On receipt of change_capacity message, the pool adjusts number of children according to the new configuration as follows:

  • If current number of workers are less than reserved, the pool spawns new workers to ensure reserved workers are available. Note that, as is the same throughout the OTP framework, spawning processes under a supervisor is synchronous operation. Therefore increasing reserved too many at once may make the pool unresponsive for a while.
  • When increasing maximum capacity (reserved + ondemand) and if any client process is being checking-out in a blocking manner, then the newly-spawned process is returned to the client.
  • When decreasing capacity, the pool tries to shutdown extra workers that are not in use. Processes currently in use are never interrupted. If number of in-use workers is more than the desired capacity, terminating further is delayed until any worker process is checked in.
Link to this function change_checkout_max_duration(pool, new_duration) View Source
change_checkout_max_duration(pool(), nil | pos_integer()) :: :ok

Changes :checkout_max_duration option of the pool.

See start_link/5 for detailed explanation of :checkout_max_duration option.

Link to this function checkin(pool, pid) View Source
checkin(pool(), pid()) :: :ok

Checks in an in-use worker pid and make it available to others.

Link to this function checkout(pool, timeout \\ 5000) View Source
checkout(pool(), timeout()) :: pid()

Checks out a worker pid that is currently not used.

If no available worker process exists, the caller is blocked until either

  • any process becomes available, or
  • timeout is reached.

Note that when a pid is checked-out it must eventually be checked-in or die, in order to correctly keep track of working processes and avoid process leaks. For this purpose it’s advisable to either

  • link the checked-out process and the process who is going to check-in that process, or
  • implement your worker to check-in itself at the end of each job.
Link to this function checkout_nonblocking(pool, timeout \\ 5000) View Source
checkout_nonblocking(pool(), timeout()) :: nil | pid()

Checks out a worker pid in a nonblocking manner, i.e. if no available worker found this returns nil.

Returns a child specification to be used when it’s not fully specified by the parent supervisor.

Link to this function format_status(opt, list) View Source

Invoked in some cases to retrieve a formatted version of the GenServer status.

This callback can be useful to control the appearance of the status of the GenServer. For example, it can be used to return a compact representation of the GenServer’s state to avoid having large state terms printed.

pdict_and_state is a two-elements list [pdict, state] where pdict is a list of {key, value} tuples representing the current process dictionary of the GenServer and state is the current state of the GenServer.

Callback implementation for GenServer.format_status/2.

Invoked when the server is started. start_link/3 or start/3 will block until it returns.

args is the argument term (second argument) passed to start_link/3.

Returning {:ok, state} will cause start_link/3 to return {:ok, pid} and the process to enter its loop.

Returning {:ok, state, timeout} is similar to {:ok, state} except handle_info(:timeout, state) will be called after timeout milliseconds if no messages are received within the timeout.

Returning {:ok, state, :hibernate} is similar to {:ok, state} except the process is hibernated before entering the loop. See c:handle_call/3 for more information on hibernation.

Returning :ignore will cause start_link/3 to return :ignore and the process will exit normally without entering the loop or calling c:terminate/2. If used when part of a supervision tree the parent supervisor will not fail to start nor immediately try to restart the GenServer. The remainder of the supervision tree will be (re)started and so the GenServer should not be required by other processes. It can be started later with Supervisor.restart_child/2 as the child specification is saved in the parent supervisor. The main use cases for this are:

  • The GenServer is disabled by configuration but might be enabled later.
  • An error occurred and it will be handled by a different mechanism than the Supervisor. Likely this approach involves calling Supervisor.restart_child/2 after a delay to attempt a restart.

Returning {:stop, reason} will cause start_link/3 to return {:error, reason} and the process to exit with reason reason without entering the loop or calling c:terminate/2.

Callback implementation for GenServer.init/1.

Link to this function start_link(worker_module, worker_init_arg, reserved, ondemand, options \\ []) View Source

Starts a PoolSup process linked to the calling process.

Arguments

  • worker_module is the callback module of PoolSup.Worker.
  • worker_init_arg is the value passed to worker_module.start_link/1 callback function.
  • reserved is the number of workers this PoolSup process holds.
  • ondemand is the maximum number of workers that are spawned on checkouts when all reserved processes are in use.
  • options is a keyword list of the following options:

    • :name: used for name registration for the pool process.
    • :checkout_max_duration: a threshold (in seconds) of worker’s checkout duration (see below). Defaults to nil (workers won’t be killed).

Terminating non-returning workers

Sometimes it’s difficult to guarantee that a checked-out worker pid will eventually be checked-in. For example there are cases where a caller process of checkout/2 is killed during waiting for a reply (a worker pid) from a pool process, resulting in a process leak of the worker pid.

For this purpose PoolSup provides :checkout_max_duration option as a safeguard against process leaks. If a checked-out worker has not been checked-in for longer than :checkout_max_duration seconds, the pool regards the worker process as leaked and kill it.

If :checkout_max_duration is nil this cleanup functionality is disabled. You can dynamically change :checkout_max_duration option of a pool by change_checkout_max_duration/2.

Link to this function status(pool) View Source
status(pool()) :: %{
  reserved: nni,
  ondemand: nni,
  children: nni,
  available: nni,
  working: nni,
  checkout_max_duration: nil | pos_integer()
}
when nni: non_neg_integer()

Query current status of a pool.

Link to this function transaction(pool, f, timeout \\ 5000) View Source
transaction(pool(), (pid() -> a), timeout()) :: a when a: term()

Checks out a worker pid, creates a link to the worker, executes the given function using the pid, and finally checks-in and unlink.

The timeout parameter is used only in the checkout step; time elapsed during other steps are not counted.