View Source PoolSup (pool_sup v0.6.2)
PoolSup: Yet another process pool library in Elixir
PoolSup
defines a supervisor specialized to manage pool of worker processes.
features
Features
- Process defined by this module behaves as a
:simple_one_for_one
supervisor. - Worker processes are spawned using a callback module that implements
PoolSup.Worker
behaviour. PoolSup
process manages which worker processes are in use and which are not.PoolSup
automatically restarts crashed workers.- Functions to request pid of an available worker process:
checkout/2
,checkout_nonblocking/2
. - Run-time reconfiguration of pool size:
change_capacity/3
. - Automatic cleanup of workers hanging around too long without checkin, as a safeguard against process leaks.
- Load-balancing using multiple pools:
PoolSup.Multi
.
example
Example
Suppose we have a module that implements both GenServer
and PoolSup.Worker
behaviours
(PoolSup.Worker
behaviour requires only 1 callback to implement, start_link/1
).
defmodule MyWorker do
@behaviour PoolSup.Worker
use GenServer
def start_link(arg) do
GenServer.start_link(__MODULE__, arg)
end
# definitions of gen_server callbacks...
end
When we want to have 3 worker processes that run MyWorker
server:
{:ok, pool_sup_pid} = PoolSup.start_link(MyWorker, {:worker, :arg}, 3, 0, [name: :my_pool])
Each worker process is started using MyWorker.start_link({:worker, :arg})
.
Then we can get a pid of a child currently not in use:
worker_pid = PoolSup.checkout(:my_pool)
do_something(worker_pid)
PoolSup.checkin(:my_pool, worker_pid)
Don't forget to return the worker_pid
when finished; for simple use cases PoolSup.transaction/3
comes in handy.
reserved-and-on-demand-worker-processes
Reserved and on-demand worker processes
PoolSup
defines the following two parameters to control capacity of a pool:
reserved
(3rd argument ofstart_link/5
): Number of workers to keep alive.ondemand
(4th argument ofstart_link/5
): Maximum number of workers that are spawned on-demand.
In short:
{:ok, pool_sup_pid} = PoolSup.start_link(MyWorker, {:worker, :arg}, 2, 1)
w1 = PoolSup.checkout_nonblocking(pool_sup_pid) # Returns a pre-spawned worker pid
w2 = PoolSup.checkout_nonblocking(pool_sup_pid) # Returns the other pre-spawned worker pid
w3 = PoolSup.checkout_nonblocking(pool_sup_pid) # Returns a newly-spawned worker pid
nil = PoolSup.checkout_nonblocking(pool_sup_pid) # Returns `nil`, no available process
PoolSup.checkin(pool_sup_pid, w1) # `w1` is terminated
PoolSup.checkin(pool_sup_pid, w2) # `w2` is kept alive for the subsequent checkout
PoolSup.checkin(pool_sup_pid, w3) # `w3` is kept alive for the subsequent checkout
usage-within-supervision-tree
Usage within supervision tree
The following code snippet spawns a supervisor that has PoolSup
process as one of its children:
chilldren = [
...
Supervisor.child_spec({PoolSup, [MyWorker, {:worker, :arg}, 5, 3]}, []),
...
]
Supervisor.start_link(children, [strategy: :one_for_one])
The PoolSup
process initially has 5 workers and can temporarily have up to 8.
All workers are started by MyWorker.start_link({:worker, :arg})
.
You can of course define a wrapper function of PoolSup.start_link/4
and use it in your supervisor spec.
Link to this section Summary
Functions
Changes capacity (number of worker processes) of a pool.
Changes :checkout_max_duration
option of the pool.
Checks in an in-use worker pid and make it available to others.
Checks out a worker pid that is currently not used.
Checks out a worker pid in a nonblocking manner, i.e. if no available worker found this returns nil
.
Returns a child specification to be used when it's not fully specified by the parent supervisor.
Callback implementation for GenServer.format_status/2
.
Callback implementation for GenServer.init/1
.
Starts a PoolSup
process linked to the calling process.
Query current status of a pool.
Checks out a worker pid, creates a link to the worker, executes the given function using the pid, and finally checks-in and unlink.
Link to this section Types
@type option() :: {:name, GenServer.name()} | {:checkout_max_duration, pos_integer()}
@type pool() :: pid() | GenServer.name()
Link to this section Functions
@spec change_capacity(pool(), nil | non_neg_integer(), nil | non_neg_integer()) :: :ok
Changes capacity (number of worker processes) of a pool.
new_reserved
and/or new_ondemand
parameters can be nil
; in that case the original value is kept unchanged
(i.e. PoolSup.change_capacity(pool, 10, nil)
replaces only reserved
value of pool
).
On receipt of change_capacity
message, the pool adjusts number of children according to the new configuration as follows:
- If current number of workers are less than
reserved
, the pool spawns new workers to ensurereserved
workers are available. Note that, as is the same throughout the OTP framework, spawning processes under a supervisor is synchronous operation. Therefore increasingreserved
too many at once may make the pool unresponsive for a while. - When increasing maximum capacity (
reserved + ondemand
) and if any client process is being checking-out in a blocking manner, then the newly-spawned process is returned to the client. - When decreasing capacity, the pool tries to shutdown extra workers that are not in use. Processes currently in use are never interrupted. If number of in-use workers is more than the desired capacity, terminating further is delayed until any worker process is checked in.
@spec change_checkout_max_duration(pool(), nil | pos_integer()) :: :ok
Changes :checkout_max_duration
option of the pool.
See start_link/5
for detailed explanation of :checkout_max_duration
option.
Checks in an in-use worker pid and make it available to others.
Checks out a worker pid that is currently not used.
If no available worker process exists, the caller is blocked until either
- any process becomes available, or
- timeout is reached.
Note that when a pid is checked-out it must eventually be checked-in or die, in order to correctly keep track of working processes and avoid process leaks. For this purpose it's advisable to either
- link the checked-out process and the process who is going to check-in that process, or
- implement your worker to check-in itself at the end of each job.
Checks out a worker pid in a nonblocking manner, i.e. if no available worker found this returns nil
.
@spec child_spec(list()) :: Supervisor.child_spec()
Returns a child specification to be used when it's not fully specified by the parent supervisor.
Callback implementation for GenServer.format_status/2
.
Callback implementation for GenServer.init/1
.
start_link(worker_module, worker_init_arg, reserved, ondemand, options \\ [])
View Source@spec start_link(module(), term(), non_neg_integer(), non_neg_integer(), [option()]) :: GenServer.on_start()
Starts a PoolSup
process linked to the calling process.
arguments
Arguments
worker_module
is the callback module ofPoolSup.Worker
.worker_init_arg
is the value passed toworker_module.start_link/1
callback function.reserved
is the number of workers thisPoolSup
process holds.ondemand
is the maximum number of workers that are spawned on checkouts when all reserved processes are in use.options
is a keyword list of the following options::name
: used for name registration for the pool process.:checkout_max_duration
: a threshold (in seconds) of worker's checkout duration (see below). Defaults tonil
(workers won't be killed).
terminating-non-returning-workers
Terminating non-returning workers
Sometimes it's difficult to guarantee that a checked-out worker pid will eventually be checked-in.
For example there are cases where a caller process of checkout/2
is killed during waiting for a reply (a worker pid)
from a pool process, resulting in a process leak of the worker pid.
For this purpose PoolSup
provides :checkout_max_duration
option as a safeguard against process leaks.
If a checked-out worker has not been checked-in for longer than :checkout_max_duration
seconds,
the pool regards the worker process as leaked and kill it.
If :checkout_max_duration
is nil
this cleanup functionality is disabled.
You can dynamically change :checkout_max_duration
option of a pool by change_checkout_max_duration/2
.
@spec status(pool()) :: %{ reserved: nni, ondemand: nni, children: nni, available: nni, working: nni, checkout_max_duration: nil | pos_integer() } when nni: non_neg_integer()
Query current status of a pool.
Checks out a worker pid, creates a link to the worker, executes the given function using the pid, and finally checks-in and unlink.
The timeout
parameter is used only in the checkout step; time elapsed during other steps are not counted.