pool_sup v0.6.0 PoolSup View Source
PoolSup: Yet another process pool library in Elixir
PoolSup defines a supervisor specialized to manage pool of worker processes.
Features
- Process defined by this module behaves as a
:simple_one_for_onesupervisor. - Worker processes are spawned using a callback module that implements
PoolSup.Workerbehaviour. PoolSupprocess manages which worker processes are in use and which are not.PoolSupautomatically restarts crashed workers.- Functions to request pid of an available worker process:
checkout/2,checkout_nonblocking/2. - Run-time reconfiguration of pool size:
change_capacity/3. - Automatic cleanup of workers hanging around too long without checkin, as a safeguard against process leaks.
- Load-balancing using multiple pools:
PoolSup.Multi.
Example
Suppose we have a module that implements both GenServer and PoolSup.Worker behaviours
(PoolSup.Worker behaviour requires only 1 callback to implement, start_link/1).
defmodule MyWorker do
@behaviour PoolSup.Worker
use GenServer
def start_link(arg) do
GenServer.start_link(__MODULE__, arg)
end
# definitions of gen_server callbacks...
end
When we want to have 3 worker processes that run MyWorker server:
{:ok, pool_sup_pid} = PoolSup.start_link(MyWorker, {:worker, :arg}, 3, 0, [name: :my_pool])
Each worker process is started using MyWorker.start_link({:worker, :arg}).
Then we can get a pid of a child currently not in use:
worker_pid = PoolSup.checkout(:my_pool)
do_something(worker_pid)
PoolSup.checkin(:my_pool, worker_pid)
Don’t forget to return the worker_pid when finished; for simple use cases PoolSup.transaction/3 comes in handy.
Reserved and on-demand worker processes
PoolSup defines the following two parameters to control capacity of a pool:
reserved(3rd argument ofstart_link/5): Number of workers to keep alive.ondemand(4th argument ofstart_link/5): Maximum number of workers that are spawned on-demand.
In short:
{:ok, pool_sup_pid} = PoolSup.start_link(MyWorker, {:worker, :arg}, 2, 1)
w1 = PoolSup.checkout_nonblocking(pool_sup_pid) # => pre-spawned worker pid
w2 = PoolSup.checkout_nonblocking(pool_sup_pid) # => pre-spawned worker pid
w3 = PoolSup.checkout_nonblocking(pool_sup_pid) # => newly-spawned worker pid
nil = PoolSup.checkout_nonblocking(pool_sup_pid)
PoolSup.checkin(pool_sup_pid, w1) # `w1` is terminated
PoolSup.checkin(pool_sup_pid, w2) # `w2` is kept alive for the subsequent checkout
PoolSup.checkin(pool_sup_pid, w3) # `w3` is kept alive for the subsequent checkout
Usage within supervision tree
The following code snippet spawns a supervisor that has PoolSup process as one of its children:
chilldren = [
...
Supervisor.Spec.supervisor(PoolSup, [MyWorker, {:worker, :arg}, 5, 3]),
...
]
Supervisor.start_link(children, [strategy: :one_for_one])
The PoolSup process initially has 5 workers and can temporarily have up to 8.
All workers are started by MyWorker.start_link({:worker, :arg}).
You can of course define a wrapper function of PoolSup.start_link/4 and use it in your supervisor spec.
Link to this section Summary
Functions
Changes capacity (number of worker processes) of a pool
Changes :checkout_max_duration option of the pool
Checks in an in-use worker pid and make it available to others
Checks out a worker pid that is currently not used
Checks out a worker pid in a nonblocking manner, i.e. if no available worker found this returns nil
Returns a child specification to be used when it’s not fully specified by the parent supervisor
Invoked in some cases to retrieve a formatted version of the GenServer status
Invoked when the server is started. start_link/3 or start/3 will
block until it returns
Starts a PoolSup process linked to the calling process
Query current status of a pool
Checks out a worker pid, creates a link to the worker, executes the given function using the pid, and finally checks-in and unlink
Link to this section Types
option() :: {:name, GenServer.name()} | {:checkout_max_duration, pos_integer()}
Link to this section Functions
change_capacity(pool(), nil | non_neg_integer(), nil | non_neg_integer()) :: :ok
Changes capacity (number of worker processes) of a pool.
new_reserved and/or new_ondemand parameters can be nil; in that case the original value is kept unchanged
(i.e. PoolSup.change_capacity(pool, 10, nil) replaces only reserved value of pool).
On receipt of change_capacity message, the pool adjusts number of children according to the new configuration as follows:
- If current number of workers are less than
reserved, the pool spawns new workers to ensurereservedworkers are available. Note that, as is the same throughout the OTP framework, spawning processes under a supervisor is synchronous operation. Therefore increasingreservedtoo many at once may make the pool unresponsive for a while. - When increasing maximum capacity (
reserved + ondemand) and if any client process is being checking-out in a blocking manner, then the newly-spawned process is returned to the client. - When decreasing capacity, the pool tries to shutdown extra workers that are not in use. Processes currently in use are never interrupted. If number of in-use workers is more than the desired capacity, terminating further is delayed until any worker process is checked in.
change_checkout_max_duration(pool(), nil | pos_integer()) :: :ok
Changes :checkout_max_duration option of the pool.
See start_link/5 for detailed explanation of :checkout_max_duration option.
Checks in an in-use worker pid and make it available to others.
Checks out a worker pid that is currently not used.
If no available worker process exists, the caller is blocked until either
- any process becomes available, or
- timeout is reached.
Note that when a pid is checked-out it must eventually be checked-in or die, in order to correctly keep track of working processes and avoid process leaks. For this purpose it’s advisable to either
- link the checked-out process and the process who is going to check-in that process, or
- implement your worker to check-in itself at the end of each job.
Checks out a worker pid in a nonblocking manner, i.e. if no available worker found this returns nil.
Returns a child specification to be used when it’s not fully specified by the parent supervisor.
Invoked in some cases to retrieve a formatted version of the GenServer status.
This callback can be useful to control the appearance of the status of the
GenServer. For example, it can be used to return a compact representation of
the GenServer’s state to avoid having large state terms printed.
one of
:sys.get_status/1or:sys.get_status/2is invoked to get the status of theGenServer; in such cases,reasonis:normalthe
GenServerterminates abnormally and logs an error; in such cases,reasonis:terminate
pdict_and_state is a two-elements list [pdict, state] where pdict is a
list of {key, value} tuples representing the current process dictionary of
the GenServer and state is the current state of the GenServer.
Callback implementation for GenServer.format_status/2.
Invoked when the server is started. start_link/3 or start/3 will
block until it returns.
args is the argument term (second argument) passed to start_link/3.
Returning {:ok, state} will cause start_link/3 to return
{:ok, pid} and the process to enter its loop.
Returning {:ok, state, timeout} is similar to {:ok, state}
except handle_info(:timeout, state) will be called after timeout
milliseconds if no messages are received within the timeout.
Returning {:ok, state, :hibernate} is similar to
{:ok, state} except the process is hibernated before entering the loop. See
c:handle_call/3 for more information on hibernation.
Returning :ignore will cause start_link/3 to return :ignore and the
process will exit normally without entering the loop or calling c:terminate/2.
If used when part of a supervision tree the parent supervisor will not fail
to start nor immediately try to restart the GenServer. The remainder of the
supervision tree will be (re)started and so the GenServer should not be
required by other processes. It can be started later with
Supervisor.restart_child/2 as the child specification is saved in the parent
supervisor. The main use cases for this are:
- The
GenServeris disabled by configuration but might be enabled later. - An error occurred and it will be handled by a different mechanism than the
Supervisor. Likely this approach involves callingSupervisor.restart_child/2after a delay to attempt a restart.
Returning {:stop, reason} will cause start_link/3 to return
{:error, reason} and the process to exit with reason reason without
entering the loop or calling c:terminate/2.
Callback implementation for GenServer.init/1.
start_link(module(), term(), non_neg_integer(), non_neg_integer(), [option()]) :: GenServer.on_start()
Starts a PoolSup process linked to the calling process.
Arguments
worker_moduleis the callback module ofPoolSup.Worker.worker_init_argis the value passed toworker_module.start_link/1callback function.reservedis the number of workers thisPoolSupprocess holds.ondemandis the maximum number of workers that are spawned on checkouts when all reserved processes are in use.optionsis a keyword list of the following options::name: used for name registration for the pool process.:checkout_max_duration: a threshold (in seconds) of worker’s checkout duration (see below). Defaults tonil(workers won’t be killed).
Terminating non-returning workers
Sometimes it’s difficult to guarantee that a checked-out worker pid will eventually be checked-in.
For example there are cases where a caller process of checkout/2 is killed during waiting for a reply (a worker pid)
from a pool process, resulting in a process leak of the worker pid.
For this purpose PoolSup provides :checkout_max_duration option as a safeguard against process leaks.
If a checked-out worker has not been checked-in for longer than :checkout_max_duration seconds,
the pool regards the worker process as leaked and kill it.
If :checkout_max_duration is nil this cleanup functionality is disabled.
You can dynamically change :checkout_max_duration option of a pool by change_checkout_max_duration/2.
status(pool()) :: %{ reserved: nni, ondemand: nni, children: nni, available: nni, working: nni, checkout_max_duration: nil | pos_integer() } when nni: non_neg_integer()
Query current status of a pool.
Checks out a worker pid, creates a link to the worker, executes the given function using the pid, and finally checks-in and unlink.
The timeout parameter is used only in the checkout step; time elapsed during other steps are not counted.