Workex

A behaviour which separates message receiving and aggregating from message processing.

Example:

defmodule Consumer do
  use Workex

  # Interface functions are invoked inside client processes

  def start_link do
    Workex.start_link(__MODULE__, nil)
  end

  def push(pid, item) do
    Workex.push(pid, item)
  end


  # Callback functions run in the worker process

  def init(_), do: {:ok, nil}

  def handle(data, state) do
    Processor.long_op(data)
    {:ok, state}
  end
end

The callback module must export following functions:

The Workex starts two processes. The one returned by Workex.start_link/4 is the "facade" process which can be used as the target for messages. This is also the process which aggregates messages.

Callback functions will run in the worker process, which is started by the "main" process. Thus, consuming is done concurrently to message aggregation.

Both processes are linked, and the main process traps exits. Termination of the worker process will cause the main process to terminate with the same exit reason.

Source

Summary

push(server, message)

Pushes a new message, returns immediately

push_ack(server, message, timeout \\ 5000)

Pushes a new message and returns as soon as the message is queued (or rejected)

push_block(server, message, timeout \\ 5000)

Pushes a new message and returns after the message is processed (or rejected)

start(callback, arg, opts \\ [], gen_server_opts \\ [])

Starts aggregator and worker processes

start_link(callback, arg, opts \\ [], gen_server_opts \\ [])

Starts aggregator and worker processes

Types

workex_options :: [aggregate: Workex.Aggregate.t, max_size: pos_integer, replace_oldest: boolean]

Functions

push(server, message)

Specs:

Pushes a new message, returns immediately.

Source
push_ack(server, message, timeout \\ 5000)

Specs:

  • push_ack(GenServer.server, any, non_neg_integer | :infinity) :: :ok | {:error, reason :: any}

Pushes a new message and returns as soon as the message is queued (or rejected).

Source
push_block(server, message, timeout \\ 5000)

Specs:

  • push_block(GenServer.server, any, non_neg_integer | :infinity) :: :ok | {:error, reason :: any}

Pushes a new message and returns after the message is processed (or rejected).

Source
start(callback, arg, opts \\ [], gen_server_opts \\ [])

Specs:

Starts aggregator and worker processes.

See start_link/4 for detailed description.

Source
start_link(callback, arg, opts \\ [], gen_server_opts \\ [])

Specs:

Starts aggregator and worker processes.

Possible options are:

  • aggregate - Aggregation instance. Defaults to %Workex.Queue{}. Must implement Workex.Aggregated.
  • max_size - Maximum number of messages in the buffer after which new messages are discarded.
  • replace_oldest - Alters behavior of max_size. When the buffer is full, new message replaces the oldest one.
Source