View Source wpool (worker_pool v6.4.0)
Worker pool main interface.
Use functions provided by this module to manage your pools of workers.
Starting the application
Worker Pool is an Erlang application that can be started using the functions in the application
module. For convenience, wpool:start/0
and wpool:stop/0
are also provided.
Starting a Pool
To start a new worker pool, you can either
- Use
wpool:child_spec/2
if you want to add the pool under a supervision tree initialisation; - Use
wpool:start_pool/1
orwpool:start_pool/2
if you want to supervise it yourself; - Use
wpool:start_sup_pool/1
orwpool:start_sup_pool/2
if you want the pool to live under wpool's supervision tree.
Stopping a Pool
To stop a pool, just use wpool:stop_pool/1
or wpool:stop_sup_pool/1
according to how you started the pool.
Using the Workers
Since the workers are gen_server
s, messages can be call
ed or cast
ed to them. To do that you can use wpool:call
and wpool:cast
as you would use the equivalent functions on gen_server
.
Choosing a Strategy
Beyond the regular parameters for gen_server
, wpool also provides an extra optional parameter Strategy The strategy used to pick up the worker to perform the task. If not provided, the result of wpool:default_strategy/0
is used.
The available strategies are defined in the wpool:strategy/0
type.
Watching a Pool
Wpool provides a way to get live statistics about a pool. To do that, you can use wpool:stats/1
.
Summary
Types
Initial list of callback modules implementing wpool_process_callbacks
to be called on certain worker events.
A callback that gets the pool name and returns a worker's name.
A boolean value determining if event_manager
should be started for callback modules.
The maximum number of overrun warnings emitted before killing the worker with a delayed task.
Name of the pool
Options that can be provided to a new pool.
Options that can be provided to a new pool.
The module and function to call when a task is overrun
The number of milliseconds after which a task is considered overrun i.e., delayed.
The supervision intensity to use over the supervisor that supervises the workers.
The supervision period to use over the supervisor that supervises the workers.
The shutdown
option to be used over the supervisor that supervises the workers.
Order in which requests will be stored and handled by workers.
A function to run with a given worker.
Statistics about a given live pool.
Strategy to use when choosing a worker.
Supervision strategy to use over the individual workers.
The gen_server
module and the arguments to pass to the init
callback.
Server options that will be passed to each gen_server
worker.
The shutdown
option to be used over the individual workers.
Statistics about a worker in a pool.
The number of workers in the pool.
Functions
Calls all the workers within the given pool async and waits for the responses synchronously.
Casts a message to all the workers within the given pool.
Equivalent to call(Sup, Call, default_strategy()).
Equivalent to call(Sup, Call, Strategy, 5000).
Picks a server and issues the call to it.
Equivalent to cast(Sup, Cast, default_strategy()).
Picks a server and issues the cast to it
Builds a child specification to pass to a supervisor.
Default strategy
Retrieves the list of worker registered names.
Equivalent to run(Sup, Run, default_strategy()).
Equivalent to run(Sup, Run, Strategy, 5000).
Picks a server and issues the run to it.
Picks a server and issues the call to it.
Starts the application
Equivalent to start_pool(Name, []).
Starts (and links) a pool of N wpool_processes. The result pid belongs to a supervisor (in case you want to add it to a supervisor tree)
Equivalent to start_sup_pool(Name, []).
Starts a pool of N wpool_processes supervised by wpool_sup
Retrieves a snapshot of statistics for all pools.
Retrieves a snapshot of statistics for a a given pool.
Stops the application
Stops a pool that doesn't belong to wpool_sup
.
Stops a pool supervised by wpool_sup
supervision tree.
Types
-type callbacks() :: [module()].
Initial list of callback modules implementing wpool_process_callbacks
to be called on certain worker events.
This options will only work if the enable_callbacks()
is set to true. Callbacks can be added and removed later by wpool_pool:add_callback_module/2
and wpool_pool:remove_callback_module/2
.
A callback that gets the pool name and returns a worker's name.
-type enable_callbacks() :: boolean().
A boolean value determining if event_manager
should be started for callback modules.
Defaults to false
.
-type max_overrun_warnings() :: infinity | pos_integer().
The maximum number of overrun warnings emitted before killing the worker with a delayed task.
If this parameter is set to a value other than infinity
the rounds of warnings become equally timed (i.e. with overrun_warning = 1000
and max_overrun_warnings = 5
the task would be killed after 5 seconds of execution).
The default value for this setting is infinity
, i.e., delayed tasks are not killed.
NOTE: As the worker is being killed it might cause worker's messages to be missing if you are using a worker stategy other than available_worker
(see worker strategy()
below).
-type name() :: atom().
Name of the pool
-type option() :: {workers, workers()} | {worker, worker()} | {worker_opt, [worker_opt()]} | {strategy, supervisor_strategy()} | {worker_shutdown, worker_shutdown()} | {overrun_handler, overrun_handler() | [overrun_handler()]} | {overrun_warning, overrun_warning()} | {max_overrun_warnings, max_overrun_warnings()} | {pool_sup_intensity, pool_sup_intensity()} | {pool_sup_shutdown, pool_sup_shutdown()} | {pool_sup_period, pool_sup_period()} | {queue_type, queue_type()} | {enable_callbacks, enable_callbacks()} | {callbacks, callbacks()}.
Options that can be provided to a new pool.
child_spec/2
, start_pool/2
, start_sup_pool/2
are the callbacks that take a list of these options as a parameter.
-type options() :: #{workers => workers(), worker => worker(), worker_opt => [worker_opt()], strategy => supervisor_strategy(), worker_shutdown => worker_shutdown(), overrun_handler => overrun_handler() | [overrun_handler()], overrun_warning => overrun_warning(), max_overrun_warnings => max_overrun_warnings(), pool_sup_intensity => pool_sup_intensity(), pool_sup_shutdown => pool_sup_shutdown(), pool_sup_period => pool_sup_period(), queue_type => queue_type(), enable_callbacks => enable_callbacks(), callbacks => callbacks(), _ => _}.
Options that can be provided to a new pool.
child_spec/2
, start_pool/2
, start_sup_pool/2
are the callbacks that take a list of these options as a parameter.
The module and function to call when a task is overrun
The default value for this setting is {logger, warning}
. The function must be of arity 1, and it will be called asModule:Fun(Args)
where Args
is a proplist with the following reported values:
{alert, AlertType}
: WhereAlertType
isoverrun
on regular warnings, ormax_overrun_limit
when the worker is about to be killed.{pool, Pool}
: The pool name.{worker, Pid}
: Pid of the worker.{task, Task}
: A description of the task.{runtime, Runtime}
: The runtime of the current round.
-type overrun_warning() :: infinity | pos_integer().
The number of milliseconds after which a task is considered overrun i.e., delayed.
A warning is emitted using overrun_handler()
.
The task is monitored until it is finished, thus more than one warning might be emitted for a single task.
The rounds of warnings are not equally timed, an exponential backoff algorithm is used instead: after each warning the overrun time is doubled (i.e. with overrun_warning = 1000
warnings would be emitted after 1000, 2000, 4000, 8000 ...).
The default value for this setting is infinity
, i.e., no warnings are emitted.
-type pool_sup_intensity() :: non_neg_integer().
The supervision intensity to use over the supervisor that supervises the workers.
Defaults to 5
. See wpool_pool
for more details.
-type pool_sup_period() :: non_neg_integer().
The supervision period to use over the supervisor that supervises the workers.
Defaults to 60
. See wpool_pool
for more details.
-type pool_sup_shutdown() :: brutal_kill | timeout().
The shutdown
option to be used over the supervisor that supervises the workers.
Defaults to brutal_kill
. See wpool_process_sup
for more details.
-type queue_type() :: fifo | lifo.
Order in which requests will be stored and handled by workers.
This option can take values lifo
or fifo
. Defaults to fifo
.
A function to run with a given worker.
It can be used to enable APIs that hide the gen_server behind a complex logic that might for example curate parameters or run side-effects, for example, supervisor
.
For example:
Opts =
#{workers => 3,
worker_shutdown => infinity,
worker => {supervisor, {Name, ModuleCallback, Args}}},
%% Note that the supervisor's `init/1' callback takes such 3-tuple.
{ok, Pid} = wpool:start_sup_pool(pool_of_supervisors, Opts),
...
Run = fun(Sup, _) -> supervisor:start_child(Sup, Params) end,
{ok, Pid} = wpool:run(pool_of_supervisors, Run, next_worker),
-type stats() :: [{pool, name()} | {supervisor, pid()} | {options, [option()] | options()} | {size, non_neg_integer()} | {next_worker, pos_integer()} | {total_message_queue_len, non_neg_integer()} | {workers, [{pos_integer(), worker_stats()}]}].
Statistics about a given live pool.
-type strategy() :: best_worker | random_worker | next_worker | available_worker | next_available_worker | {hash_worker, term()} | custom_strategy().
Strategy to use when choosing a worker.
best_worker
Picks the worker with the shortest queue of messages. Loosely based on this article: https://lethain.com/load-balancing-across-erlang-process-groups/
.
This strategy is usually useful when your workers always perform the same task, or tasks with expectedly similar runtimes.
random_worker
Just picks a random worker. This strategy is the fastest one to select a worker. It's ideal if your workers will perform many short tasks.
next_worker
Picks the next worker in a round-robin fashion. This ensures an evenly distribution of tasks.
available_worker
Instead of just picking one of the workers in the queue and sending the request to it, this strategy queues the request and waits until a worker is available to perform it. That may render the worker selection part of the process much slower (thus generating the need for an additional parameter: Worker_Timeout
that controls how many milliseconds the client is willing to spend in that, regardless of the global Timeout
for the call).
This strategy ensures that, if a worker crashes, no messages are lost in its message queue. It also ensures that, if a task takes too long, that doesn't block other tasks since, as soon as other worker is free it can pick up the next task in the list.
next_available_worker
In a way, this strategy behaves like available_worker
in the sense that it will pick the first worker that it can find which is not running any task at the moment, but the difference is that it will fail if all workers are busy.
{hash_worker, Key}
This strategy takes a Key
and selects a worker using erlang:phash2/2
. This ensures that tasks classified under the same key will be delivered to the same worker, which is useful to classify events by key and work on them sequentially on the worker, distributing different keys across different workers.
custom_strategy()
A callback that gets the pool name and returns a worker's name.
-type supervisor_strategy() :: supervisor:sup_flags().
Supervision strategy to use over the individual workers.
Defaults to {one_for_one, 5, 60}
. See wpool_process_sup
for more details.
The gen_server
module and the arguments to pass to the init
callback.
This is the module that each worker will run and the InitArgs
to use on the corresponding start_link
call used to initiate it.
The default value for this setting is {wpool_worker, undefined}
. That means that if you don't provide a worker implementation, the pool will be generated with this default one. See wpool_worker
for details.
-type worker_opt() :: gen_server:start_opt().
Server options that will be passed to each gen_server
worker.
These are the same as described at the gen_server
documentation.
-type worker_shutdown() :: brutal_kill | timeout().
The shutdown
option to be used over the individual workers.
Defaults to 5000
. See wpool_process_sup
for more details.
-type worker_stats() :: [{messsage_queue_len, non_neg_integer()} | {memory, pos_integer()}].
Statistics about a worker in a pool.
-type workers() :: pos_integer().
The number of workers in the pool.
The default value for this setting is 100
Functions
-spec broadcall(wpool:name(), term(), timeout()) -> {[Replies :: term()], [Errors :: term()]}.
Calls all the workers within the given pool async and waits for the responses synchronously.
If one worker times out, the entire call is considered timed-out.
-spec broadcast(wpool:name(), term()) -> ok.
Casts a message to all the workers within the given pool.
NOTE: These messages don't get queued, they go straight to the worker's message queues, so if you're using available_worker strategy to balance the charge and you have some tasks queued up waiting for the next available worker, the broadcast will reach all the workers before the queued up tasks.
Equivalent to call(Sup, Call, default_strategy()).
Equivalent to call(Sup, Call, Strategy, 5000).
Picks a server and issues the call to it.
For all strategies except available_worker, Timeout applies only to the time spent on the actual call to the worker, because time spent finding the worker in other strategies is negligible. For available_worker the time used choosing a worker is also considered
Equivalent to cast(Sup, Cast, default_strategy()).
Picks a server and issues the cast to it
-spec child_spec(name(), [option()] | options()) -> supervisor:child_spec().
Builds a child specification to pass to a supervisor.
-spec default_strategy() -> strategy().
Default strategy
Retrieves the list of worker registered names.
This can be useful to manually inspect the workers or do custom work on them.
Equivalent to run(Sup, Run, default_strategy()).
Equivalent to run(Sup, Run, Strategy, 5000).
Picks a server and issues the run to it.
For all strategies except available_worker, Timeout applies only to the time spent on the actual run to the worker, because time spent finding the worker in other strategies is negligible. For available_worker the time used choosing a worker is also considered
-spec send_request(name(), term()) -> noproc | timeout | gen_server:request_id().
Equivalent to send_request(Sup, Call, default_strategy(), 5000).
-spec send_request(name(), term(), strategy()) -> noproc | timeout | gen_server:request_id().
Equivalent to send_request(Sup, Call, Strategy, 5000).
-spec send_request(name(), term(), strategy(), timeout()) -> noproc | timeout | gen_server:request_id().
Picks a server and issues the call to it.
Timeout applies only for the time used choosing a worker in the available_worker strategy
-spec start() -> ok | {error, {already_started, wpool}}.
Starts the application
-spec start_pool(name()) -> supervisor:startlink_ret().
Equivalent to start_pool(Name, []).
-spec start_pool(name(), [option()] | options()) -> supervisor:startlink_ret().
Starts (and links) a pool of N wpool_processes. The result pid belongs to a supervisor (in case you want to add it to a supervisor tree)
-spec start_sup_pool(name()) -> supervisor:startchild_ret().
Equivalent to start_sup_pool(Name, []).
-spec start_sup_pool(name(), [option()] | options()) -> supervisor:startchild_ret().
Starts a pool of N wpool_processes supervised by wpool_sup
-spec stats() -> [stats()].
Retrieves a snapshot of statistics for all pools.
See stats/0
for details on the return type.
Retrieves a snapshot of statistics for a a given pool.
See stats/0
for details on the return type.
-spec stop() -> ok.
Stops the application
-spec stop_pool(name()) -> true.
Stops a pool that doesn't belong to wpool_sup
.
-spec stop_sup_pool(name()) -> ok.
Stops a pool supervised by wpool_sup
supervision tree.