View Source wpool (worker_pool v6.2.0)
Worker pool main interface.
Use functions provided by this module to manage your pools of workers.
starting-the-application
Starting the application
Worker Pool is an Erlang application that can be started using the functions in the application
module. For convenience, wpool:start/0
and wpool:stop/0
are also provided.
starting-a-pool
Starting a Pool
To start a new worker pool, you can either- Use
wpool:child_spec/2
if you want to add the pool under a supervision tree initialisation; - Use
wpool:start_pool/1
orwpool:start_pool/2
if you want to supervise it yourself; - Use
wpool:start_sup_pool/1
orwpool:start_sup_pool/2
if you want the pool to live under wpool's supervision tree.
stopping-a-pool
Stopping a Pool
To stop a pool, just use wpool:stop_pool/1
or wpool:stop_sup_pool/1
according to how you started the pool.
using-the-workers
Using the Workers
Since the workers are gen_server
s, messages can be call
ed or cast
ed to them. To do that you can use wpool:call
and wpool:cast
as you would use the equivalent functions on gen_server
.
choosing-a-strategy
Choosing a Strategy
Beyond the regular parameters for gen_server
, wpool also provides an extra optional parameter Strategy The strategy used to pick up the worker to perform the task. If not provided, the result of wpool:default_strategy/0
is used.
The available strategies are defined in the wpool:strategy()
type.
watching-a-pool
Watching a Pool
Wpool provides a way to get live statistics about a pool. To do that, you can use wpool:stats/1
.
Link to this section Summary
Types
Initial list of callback modules implementing wpool_process_callbacks
to be called on certain worker events.
A boolean value determining if event_manager
should be started for callback modules.
The maximum number of overrun warnings emitted before killing the worker with a delayed task.
Options that can be provided to a new pool.
The module and function to call when a task is overrun
The number of milliseconds after which a task is considered overrun i.e., delayed.
The supervision intensity to use over the supervisor that supervises the workers.
The supervision period to use over the supervisor that supervises the workers.
The shutdown
option to be used over the supervisor that supervises the workers.
Order in which requests will be stored and handled by workers.
Strategy to use when choosing a worker.
Supervision strategy to use over the individual workers.
The gen_server
module and the arguments to pass to the init
callback.
Server options that will be passed to each gen_server
worker.
The shutdown
option to be used over the individual workers.
The number of workers in the pool.
Functions
Calls all the workers within the given pool async and waits for the responses synchronously.
Casts a message to all the workers within the given pool.
Equivalent to call(Sup, Call, default_strategy()).
Equivalent to call(Sup, Call, Strategy, 5000).
Equivalent to cast(Sup, Cast, default_strategy()).
Equivalent to start_pool(Name, []).
Equivalent to start_sup_pool(Name, []).
wpool_sup
Retrieves a snapshot of statistics for all pools.
Retrieves a snapshot of statistics for a a given pool.
wpool_sup
.wpool_sup
supervision tree.Link to this section Types
-type callbacks() :: [module()].
Initial list of callback modules implementing wpool_process_callbacks
to be called on certain worker events.
enable_callbacks()
is set to true. Callbacks can be added and removed later by wpool_pool:add_callback_module/2
and wpool_pool:remove_callback_module/2
.
-type custom_strategy() :: fun(([atom()]) -> Atom :: atom()).
-type enable_callbacks() :: boolean().
A boolean value determining if event_manager
should be started for callback modules.
false
.
-type max_overrun_warnings() :: infinity | pos_integer().
The maximum number of overrun warnings emitted before killing the worker with a delayed task.
If this parameter is set to a value other than infinity
the rounds of warnings become equally timed (i.e. with overrun_warning = 1000
and max_overrun_warnings = 5
the task would be killed after 5 seconds of execution).
The default value for this setting is infinity
, i.e., delayed tasks are not killed.
available_worker
(see worker strategy()
below).
-type name() :: atom().
-type option() :: {workers, workers()} | {worker, worker()} | {worker_opt, [worker_opt()]} | {strategy, supervisor_strategy()} | {worker_shutdown, worker_shutdown()} | {overrun_handler, overrun_handler()} | {overrun_warning, overrun_warning()} | {max_overrun_warnings, max_overrun_warnings()} | {pool_sup_intensity, pool_sup_intensity()} | {pool_sup_shutdown, pool_sup_shutdown()} | {pool_sup_period, pool_sup_period()} | {queue_type, queue_type()} | {enable_callbacks, enable_callbacks()} | {callbacks, callbacks()}.
Options that can be provided to a new pool.
child_spec/2
, start_pool/2
, start_sup_pool/2
are the callbacks that take a list of these options as a parameter.
-type overrun_handler() :: {Module :: module(), Fun :: atom()}.
The module and function to call when a task is overrun
The default value for this setting is{error_logger, warning_report}
. The function must be of arity 1, and it will be called asModule:Fun(Args)
where Args
is a proplist with the following reported values:{alert, AlertType}
: WhereAlertType
isoverrun
on regular warnings, ormax_overrun_limit
when the worker is about to be killed.{pool, Pool}
: The pool name.{worker, Pid}
: Pid of the worker.{task, Task}
: A description of the task.{runtime, Runtime}
: The runtime of the current round.
-type overrun_warning() :: infinity | pos_integer().
The number of milliseconds after which a task is considered overrun i.e., delayed.
A warning is emitted using overrun_handler()
.
The task is monitored until it is finished, thus more than one warning might be emitted for a single task.
The rounds of warnings are not equally timed, an exponential backoff algorithm is used instead: after each warning the overrun time is doubled (i.e. with overrun_warning = 1000
warnings would be emitted after 1000, 2000, 4000, 8000 ...).
infinity
, i.e., no warnings are emitted.
-type pool_sup_intensity() :: non_neg_integer().
The supervision intensity to use over the supervisor that supervises the workers.
Defaults to5
. See wpool_pool
for more details.
-type pool_sup_period() :: non_neg_integer().
The supervision period to use over the supervisor that supervises the workers.
Defaults to60
. See wpool_pool
for more details.
-type pool_sup_shutdown() :: brutal_kill | timeout().
The shutdown
option to be used over the supervisor that supervises the workers.
brutal_kill
. See wpool_process_sup
for more details.
-type queue_type() :: wpool_queue_manager:queue_type().
Order in which requests will be stored and handled by workers.
This option can take valueslifo
or fifo
. Defaults to fifo
.
-type stats() :: [{pool, name()} | {supervisor, pid()} | {options, [option()]} | {size, non_neg_integer()} | {next_worker, pos_integer()} | {total_message_queue_len, non_neg_integer()} | {workers, [{pos_integer(), worker_stats()}]}].
-type strategy() :: best_worker | random_worker | next_worker | available_worker | next_available_worker | {hash_worker, term()} | custom_strategy().
Strategy to use when choosing a worker.
best_worker
best_worker
Picks the worker with the shortest queue of messages. Loosely based on this article: https://lethain.com/load-balancing-across-erlang-process-groups/.
This strategy is usually useful when your workers always perform the same task, or tasks with expectedly similar runtimes.
random_worker
random_worker
Just picks a random worker. This strategy is the fastest one to select a worker. It's ideal if your workers will perform many short tasks.
next_worker
next_worker
Picks the next worker in a round-robin fashion. This ensures an evenly distribution of tasks.
available_worker
available_worker
Instead of just picking one of the workers in the queue and sending the request to it, this strategy queues the request and waits until a worker is available to perform it. That may render the worker selection part of the process much slower (thus generating the need for an additional parameter: Worker_Timeout
that controls how many milliseconds the client is willing to spend in that, regardless of the global Timeout
for the call).
This strategy ensures that, if a worker crashes, no messages are lost in its message queue. It also ensures that, if a task takes too long, that doesn't block other tasks since, as soon as other worker is free it can pick up the next task in the list.
next_available_worker
next_available_worker
In a way, this strategy behaves like available_worker
in the sense that it will pick the first worker that it can find which is not running any task at the moment, but the difference is that it will fail if all workers are busy.
hash_worker-key
{hash_worker, Key}
This strategy takes a Key
and selects a worker using erlang:phash2/2
. This ensures that tasks classified under the same key will be delivered to the same worker, which is useful to classify events by key and work on them sequentially on the worker, distributing different keys across different workers.
custom_strategy
custom_strategy()
A callback that gets the pool name and returns a worker's name.
-type supervisor_strategy() :: supervisor:sup_flags().
Supervision strategy to use over the individual workers.
Defaults to{one_for_one, 5, 60}
. See wpool_process_sup
for more details.
-type worker() :: {Module :: module(), InitArg :: term()}.
The gen_server
module and the arguments to pass to the init
callback.
This is the module that each worker will run and the InitArgs
to use on the corresponding start_link
call used to initiate it.
{wpool_worker, undefined}
. That means that if you don't provide a worker implementation, the pool will be generated with this default one. See wpool_worker
for details.
-type worker_opt() :: gen_server:start_opt().
Server options that will be passed to each gen_server
worker.
gen_server
documentation.
-type worker_shutdown() :: worker_shutdown().
The shutdown
option to be used over the individual workers.
5000
. See wpool_process_sup
for more details.
-type worker_stats() :: [{messsage_queue_len, non_neg_integer()} | {memory, pos_integer()}].
-type workers() :: pos_integer().
The number of workers in the pool.
The default value for this setting is100
Link to this section Functions
-spec broadcall(wpool:name(), term(), timeout()) -> {[Replies :: term()], [Errors :: term()]}.
Calls all the workers within the given pool async and waits for the responses synchronously.
If one worker times out, the entire call is considered timed-out.-spec broadcast(wpool:name(), term()) -> ok.
Casts a message to all the workers within the given pool.
NOTE: These messages don't get queued, they go straight to the worker's message queues, so if you're using available_worker strategy to balance the charge and you have some tasks queued up waiting for the next available worker, the broadcast will reach all the workers before the queued up tasks.-spec call(name(), term()) -> term().
Equivalent to call(Sup, Call, default_strategy()).
Equivalent to call(Sup, Call, Strategy, 5000).
-spec cast(name(), term()) -> ok.
Equivalent to cast(Sup, Cast, default_strategy()).
-spec child_spec(name(), [option()]) -> supervisor:child_spec().
-spec default_strategy() -> strategy().
-spec get_workers(name()) -> [atom()].
-spec send_request(name(), term()) -> noproc | timeout | gen_server:request_id().
Equivalent to send_request(Sup, Call, default_strategy(), 5000).
-spec send_request(name(), term(), strategy()) -> noproc | timeout | gen_server:request_id().
Equivalent to send_request(Sup, Call, Strategy, 5000).
-spec send_request(name(), term(), strategy(), timeout()) -> noproc | timeout | gen_server:request_id().
-spec start() -> ok | {error, {already_started, wpool}}.
-spec start_pool(name()) -> {ok, pid()}.
Equivalent to start_pool(Name, []).
-spec start_sup_pool(name()) -> {ok, pid()} | {error, {already_started, pid()} | term()}.
Equivalent to start_sup_pool(Name, []).
-spec start_sup_pool(name(), [option()]) -> {ok, pid()} | {error, {already_started, pid()} | term()}.
wpool_sup
-spec stats() -> [stats()].
Retrieves a snapshot of statistics for all pools.
Seestats/0
for details on the return type.
Retrieves a snapshot of statistics for a a given pool.
Seestats/0
for details on the return type.
-spec stop() -> ok.
-spec stop_pool(name()) -> true.
wpool_sup
.
-spec stop_sup_pool(name()) -> ok.
wpool_sup
supervision tree.