Workex
A behaviour which separates message receiving and aggregating from message processing.
Example:
defmodule Consumer do
use Workex
# Interface functions are invoked inside client processes
def start_link do
Workex.start_link(__MODULE__, nil)
end
def push(pid, item) do
Workex.push(pid, item)
end
# Callback functions run in the worker process
def init(_), do: {:ok, nil}
def handle(data, state) do
Processor.long_op(data)
{:ok, state}
end
end
The callback
module must export following functions:
- init/1 - receives
arg
and should return{:ok, initial_state}
or{:stop, reason}
. - handle/2 - receives aggregated messages and the state, and should return
{:ok, new_state}
or{:stop, reason}
.
The Workex
starts two processes. The one returned by Workex.start_link/4
is the "facade" process which can be used as the target for messages. This is also the process which aggregates messages.
Callback functions will run in the worker process, which is started by the "main" process. Thus, consuming is done concurrently to message aggregation.
Both processes are linked, and the main process traps exits. Termination of the worker process will cause the main process to terminate with the same exit reason.
Summary
push(server, message) | Pushes a new message, returns immediately |
push_ack(server, message, timeout \\ 5000) | Pushes a new message and returns as soon as the message is queued (or rejected) |
push_block(server, message, timeout \\ 5000) | Pushes a new message and returns after the message is processed (or rejected) |
start(callback, arg, opts \\ [], gen_server_opts \\ []) | Starts aggregator and worker processes |
start_link(callback, arg, opts \\ [], gen_server_opts \\ []) | Starts aggregator and worker processes |
Types ↑
workex_options :: [aggregate: Workex.Aggregate.t, max_size: pos_integer, replace_oldest: boolean]
Functions
Specs:
- push_ack(GenServer.server, any, non_neg_integer | :infinity) :: :ok | {:error, reason :: any}
Pushes a new message and returns as soon as the message is queued (or rejected).
Specs:
- push_block(GenServer.server, any, non_neg_integer | :infinity) :: :ok | {:error, reason :: any}
Pushes a new message and returns after the message is processed (or rejected).
Specs:
- start(module, any, workex_options, GenServer.options) :: GenServer.on_start
Starts aggregator and worker processes.
See start_link/4
for detailed description.
Specs:
- start_link(module, any, workex_options, GenServer.options) :: GenServer.on_start
Starts aggregator and worker processes.
Possible options are:
aggregate
- Aggregation instance. Defaults to%Workex.Queue{}
. Must implementWorkex.Aggregated
.max_size
- Maximum number of messages in the buffer after which new messages are discarded.replace_oldest
- Alters behavior ofmax_size
. When the buffer is full, new message replaces the oldest one.