View Source wpool (worker_pool v6.4.0)

Worker pool main interface.

Use functions provided by this module to manage your pools of workers.

Starting the application

Worker Pool is an Erlang application that can be started using the functions in the application module. For convenience, wpool:start/0 and wpool:stop/0 are also provided.

Starting a Pool

To start a new worker pool, you can either

Stopping a Pool

To stop a pool, just use wpool:stop_pool/1 or wpool:stop_sup_pool/1 according to how you started the pool.

Using the Workers

Since the workers are gen_servers, messages can be called or casted to them. To do that you can use wpool:call and wpool:cast as you would use the equivalent functions on gen_server.

Choosing a Strategy

Beyond the regular parameters for gen_server, wpool also provides an extra optional parameter Strategy The strategy used to pick up the worker to perform the task. If not provided, the result of wpool:default_strategy/0 is used.

The available strategies are defined in the wpool:strategy/0 type.

Watching a Pool

Wpool provides a way to get live statistics about a pool. To do that, you can use wpool:stats/1.

Summary

Types

Initial list of callback modules implementing wpool_process_callbacks to be called on certain worker events.

A callback that gets the pool name and returns a worker's name.

A boolean value determining if event_manager should be started for callback modules.

The maximum number of overrun warnings emitted before killing the worker with a delayed task.

Name of the pool

Options that can be provided to a new pool.

Options that can be provided to a new pool.

The module and function to call when a task is overrun

The number of milliseconds after which a task is considered overrun i.e., delayed.

The supervision intensity to use over the supervisor that supervises the workers.

The supervision period to use over the supervisor that supervises the workers.

The shutdown option to be used over the supervisor that supervises the workers.

Order in which requests will be stored and handled by workers.

A function to run with a given worker.

Statistics about a given live pool.

Strategy to use when choosing a worker.

Supervision strategy to use over the individual workers.

The gen_server module and the arguments to pass to the init callback.

Server options that will be passed to each gen_server worker.

The shutdown option to be used over the individual workers.

Statistics about a worker in a pool.

The number of workers in the pool.

Functions

Calls all the workers within the given pool async and waits for the responses synchronously.

Casts a message to all the workers within the given pool.

Picks a server and issues the call to it.

Picks a server and issues the cast to it

Builds a child specification to pass to a supervisor.

Default strategy

Retrieves the list of worker registered names.

Picks a server and issues the run to it.

Picks a server and issues the call to it.

Starts the application

Starts (and links) a pool of N wpool_processes. The result pid belongs to a supervisor (in case you want to add it to a supervisor tree)

Starts a pool of N wpool_processes supervised by wpool_sup

Retrieves a snapshot of statistics for all pools.

Retrieves a snapshot of statistics for a a given pool.

Stops the application

Stops a pool that doesn't belong to wpool_sup.

Stops a pool supervised by wpool_sup supervision tree.

Types

-type callbacks() :: [module()].

Initial list of callback modules implementing wpool_process_callbacks to be called on certain worker events.

This options will only work if the enable_callbacks() is set to true. Callbacks can be added and removed later by wpool_pool:add_callback_module/2 and wpool_pool:remove_callback_module/2.

-type custom_strategy() :: fun((atom()) -> Atom :: atom()).

A callback that gets the pool name and returns a worker's name.

-type enable_callbacks() :: boolean().

A boolean value determining if event_manager should be started for callback modules.

Defaults to false.

Link to this type

max_overrun_warnings/0

View Source
-type max_overrun_warnings() :: infinity | pos_integer().

The maximum number of overrun warnings emitted before killing the worker with a delayed task.

If this parameter is set to a value other than infinity the rounds of warnings become equally timed (i.e. with overrun_warning = 1000 and max_overrun_warnings = 5 the task would be killed after 5 seconds of execution).

The default value for this setting is infinity, i.e., delayed tasks are not killed.

NOTE: As the worker is being killed it might cause worker's messages to be missing if you are using a worker stategy other than available_worker (see worker strategy() below).

-type name() :: atom().

Name of the pool

-type option() ::
          {workers, workers()} |
          {worker, worker()} |
          {worker_opt, [worker_opt()]} |
          {strategy, supervisor_strategy()} |
          {worker_shutdown, worker_shutdown()} |
          {overrun_handler, overrun_handler() | [overrun_handler()]} |
          {overrun_warning, overrun_warning()} |
          {max_overrun_warnings, max_overrun_warnings()} |
          {pool_sup_intensity, pool_sup_intensity()} |
          {pool_sup_shutdown, pool_sup_shutdown()} |
          {pool_sup_period, pool_sup_period()} |
          {queue_type, queue_type()} |
          {enable_callbacks, enable_callbacks()} |
          {callbacks, callbacks()}.

Options that can be provided to a new pool.

child_spec/2, start_pool/2, start_sup_pool/2 are the callbacks that take a list of these options as a parameter.

-type options() ::
          #{workers => workers(),
            worker => worker(),
            worker_opt => [worker_opt()],
            strategy => supervisor_strategy(),
            worker_shutdown => worker_shutdown(),
            overrun_handler => overrun_handler() | [overrun_handler()],
            overrun_warning => overrun_warning(),
            max_overrun_warnings => max_overrun_warnings(),
            pool_sup_intensity => pool_sup_intensity(),
            pool_sup_shutdown => pool_sup_shutdown(),
            pool_sup_period => pool_sup_period(),
            queue_type => queue_type(),
            enable_callbacks => enable_callbacks(),
            callbacks => callbacks(),
            _ => _}.

Options that can be provided to a new pool.

child_spec/2, start_pool/2, start_sup_pool/2 are the callbacks that take a list of these options as a parameter.

-type overrun_handler() :: {Module :: module(), Fun :: atom()}.

The module and function to call when a task is overrun

The default value for this setting is {logger, warning}. The function must be of arity 1, and it will be called asModule:Fun(Args) where Args is a proplist with the following reported values:

  • {alert, AlertType}: Where AlertType is overrun on regular warnings, or max_overrun_limit when the worker is about to be killed.
  • {pool, Pool}: The pool name.
  • {worker, Pid}: Pid of the worker.
  • {task, Task}: A description of the task.
  • {runtime, Runtime}: The runtime of the current round.
-type overrun_warning() :: infinity | pos_integer().

The number of milliseconds after which a task is considered overrun i.e., delayed.

A warning is emitted using overrun_handler().

The task is monitored until it is finished, thus more than one warning might be emitted for a single task.

The rounds of warnings are not equally timed, an exponential backoff algorithm is used instead: after each warning the overrun time is doubled (i.e. with overrun_warning = 1000 warnings would be emitted after 1000, 2000, 4000, 8000 ...).

The default value for this setting is infinity, i.e., no warnings are emitted.

-type pool_sup_intensity() :: non_neg_integer().

The supervision intensity to use over the supervisor that supervises the workers.

Defaults to 5. See wpool_pool for more details.

-type pool_sup_period() :: non_neg_integer().

The supervision period to use over the supervisor that supervises the workers.

Defaults to 60. See wpool_pool for more details.

-type pool_sup_shutdown() :: brutal_kill | timeout().

The shutdown option to be used over the supervisor that supervises the workers.

Defaults to brutal_kill. See wpool_process_sup for more details.

-type queue_type() :: fifo | lifo.

Order in which requests will be stored and handled by workers.

This option can take values lifo or fifo. Defaults to fifo.

-type run(Result) :: fun((name() | pid(), timeout()) -> Result).

A function to run with a given worker.

It can be used to enable APIs that hide the gen_server behind a complex logic that might for example curate parameters or run side-effects, for example, supervisor.

For example:

   Opts =
       #{workers => 3,
         worker_shutdown => infinity,
         worker => {supervisor, {Name, ModuleCallback, Args}}},
         %% Note that the supervisor's `init/1' callback takes such 3-tuple.
  {ok, Pid} = wpool:start_sup_pool(pool_of_supervisors, Opts),
 
  ...
 
  Run = fun(Sup, _) -> supervisor:start_child(Sup, Params) end,
  {ok, Pid} = wpool:run(pool_of_supervisors, Run, next_worker),
-type stats() ::
          [{pool, name()} |
           {supervisor, pid()} |
           {options, [option()] | options()} |
           {size, non_neg_integer()} |
           {next_worker, pos_integer()} |
           {total_message_queue_len, non_neg_integer()} |
           {workers, [{pos_integer(), worker_stats()}]}].

Statistics about a given live pool.

-type strategy() ::
          best_worker | random_worker | next_worker | available_worker | next_available_worker |
          {hash_worker, term()} |
          custom_strategy().

Strategy to use when choosing a worker.

best_worker

Picks the worker with the shortest queue of messages. Loosely based on this article: https://lethain.com/load-balancing-across-erlang-process-groups/.

This strategy is usually useful when your workers always perform the same task, or tasks with expectedly similar runtimes.

random_worker

Just picks a random worker. This strategy is the fastest one to select a worker. It's ideal if your workers will perform many short tasks.

next_worker

Picks the next worker in a round-robin fashion. This ensures an evenly distribution of tasks.

available_worker

Instead of just picking one of the workers in the queue and sending the request to it, this strategy queues the request and waits until a worker is available to perform it. That may render the worker selection part of the process much slower (thus generating the need for an additional parameter: Worker_Timeout that controls how many milliseconds the client is willing to spend in that, regardless of the global Timeout for the call).

This strategy ensures that, if a worker crashes, no messages are lost in its message queue. It also ensures that, if a task takes too long, that doesn't block other tasks since, as soon as other worker is free it can pick up the next task in the list.

next_available_worker

In a way, this strategy behaves like available_worker in the sense that it will pick the first worker that it can find which is not running any task at the moment, but the difference is that it will fail if all workers are busy.

{hash_worker, Key}

This strategy takes a Key and selects a worker using erlang:phash2/2. This ensures that tasks classified under the same key will be delivered to the same worker, which is useful to classify events by key and work on them sequentially on the worker, distributing different keys across different workers.

custom_strategy()

A callback that gets the pool name and returns a worker's name.

-type supervisor_strategy() :: supervisor:sup_flags().

Supervision strategy to use over the individual workers.

Defaults to {one_for_one, 5, 60}. See wpool_process_sup for more details.

-type worker() :: {Module :: module(), InitArg :: term()}.

The gen_server module and the arguments to pass to the init callback.

This is the module that each worker will run and the InitArgs to use on the corresponding start_link call used to initiate it.

The default value for this setting is {wpool_worker, undefined}. That means that if you don't provide a worker implementation, the pool will be generated with this default one. See wpool_worker for details.

-type worker_opt() :: gen_server:start_opt().

Server options that will be passed to each gen_server worker.

These are the same as described at the gen_server documentation.

-type worker_shutdown() :: brutal_kill | timeout().

The shutdown option to be used over the individual workers.

Defaults to 5000. See wpool_process_sup for more details.

-type worker_stats() :: [{messsage_queue_len, non_neg_integer()} | {memory, pos_integer()}].

Statistics about a worker in a pool.

-type workers() :: pos_integer().

The number of workers in the pool.

The default value for this setting is 100

Functions

Link to this function

broadcall(Sup, Call, Timeout)

View Source
-spec broadcall(wpool:name(), term(), timeout()) -> {[Replies :: term()], [Errors :: term()]}.

Calls all the workers within the given pool async and waits for the responses synchronously.

If one worker times out, the entire call is considered timed-out.

-spec broadcast(wpool:name(), term()) -> ok.

Casts a message to all the workers within the given pool.

NOTE: These messages don't get queued, they go straight to the worker's message queues, so if you're using available_worker strategy to balance the charge and you have some tasks queued up waiting for the next available worker, the broadcast will reach all the workers before the queued up tasks.

-spec call(name(), term()) -> term().

Equivalent to call(Sup, Call, default_strategy()).

Link to this function

call(Sup, Call, Strategy)

View Source
-spec call(name(), term(), strategy()) -> term().

Equivalent to call(Sup, Call, Strategy, 5000).

Link to this function

call(Sup, Call, Fun, Timeout)

View Source
-spec call(name(), term(), strategy(), timeout()) -> term().

Picks a server and issues the call to it.

For all strategies except available_worker, Timeout applies only to the time spent on the actual call to the worker, because time spent finding the worker in other strategies is negligible. For available_worker the time used choosing a worker is also considered

-spec cast(name(), term()) -> ok.

Equivalent to cast(Sup, Cast, default_strategy()).

-spec cast(name(), term(), strategy()) -> ok.

Picks a server and issues the cast to it

Link to this function

child_spec(Name, Options)

View Source
-spec child_spec(name(), [option()] | options()) -> supervisor:child_spec().

Builds a child specification to pass to a supervisor.

-spec default_strategy() -> strategy().

Default strategy

-spec get_workers(name()) -> [atom()].

Retrieves the list of worker registered names.

This can be useful to manually inspect the workers or do custom work on them.

-spec run(name(), run(Result)) -> Result.

Equivalent to run(Sup, Run, default_strategy()).

-spec run(name(), run(Result), strategy()) -> Result.

Equivalent to run(Sup, Run, Strategy, 5000).

Link to this function

run(Sup, Run, Fun, Timeout)

View Source
-spec run(name(), run(Result), strategy(), timeout()) -> Result.

Picks a server and issues the run to it.

For all strategies except available_worker, Timeout applies only to the time spent on the actual run to the worker, because time spent finding the worker in other strategies is negligible. For available_worker the time used choosing a worker is also considered

-spec send_request(name(), term()) -> noproc | timeout | gen_server:request_id().

Equivalent to send_request(Sup, Call, default_strategy(), 5000).

Link to this function

send_request(Sup, Call, Strategy)

View Source
-spec send_request(name(), term(), strategy()) -> noproc | timeout | gen_server:request_id().

Equivalent to send_request(Sup, Call, Strategy, 5000).

Link to this function

send_request(Sup, Call, Fun, Timeout)

View Source
-spec send_request(name(), term(), strategy(), timeout()) -> noproc | timeout | gen_server:request_id().

Picks a server and issues the call to it.

Timeout applies only for the time used choosing a worker in the available_worker strategy

-spec start() -> ok | {error, {already_started, wpool}}.

Starts the application

-spec start_pool(name()) -> supervisor:startlink_ret().

Equivalent to start_pool(Name, []).

Link to this function

start_pool(Name, Options)

View Source
-spec start_pool(name(), [option()] | options()) -> supervisor:startlink_ret().

Starts (and links) a pool of N wpool_processes. The result pid belongs to a supervisor (in case you want to add it to a supervisor tree)

-spec start_sup_pool(name()) -> supervisor:startchild_ret().

Equivalent to start_sup_pool(Name, []).

Link to this function

start_sup_pool(Name, Options)

View Source
-spec start_sup_pool(name(), [option()] | options()) -> supervisor:startchild_ret().

Starts a pool of N wpool_processes supervised by wpool_sup

-spec stats() -> [stats()].

Retrieves a snapshot of statistics for all pools.

See stats/0 for details on the return type.

-spec stats(name()) -> stats().

Retrieves a snapshot of statistics for a a given pool.

See stats/0 for details on the return type.

-spec stop() -> ok.

Stops the application

-spec stop_pool(name()) -> true.

Stops a pool that doesn't belong to wpool_sup.

-spec stop_sup_pool(name()) -> ok.

Stops a pool supervised by wpool_sup supervision tree.