View Source EventStreamex.Operators.Queue.QueueStorageAdapter behaviour (EventStreamex v1.3.1)
Behaviour for the queue storage.
Every event received is dispatched to operators (EventStreamex.Operators.Operator
and executed sequentially to keep ordenancing.
Every time an event is received, a task is added to the queue of events with each operator listening for this event. And everytime a an operator task is finished, its completion status is updated in the task. When all operators have finished their task for this event, it is removed from the queue.
The adapters keeps a copy of this queue, mostly in case of crash (Except for EventStreamex.Operators.Queue.NoAdapter
which does not save the queue and should not be used in production)
There are currently 2 queue adapters:
: A queue that uses the database tu store its items (This is the default queue adapter)EventStreamex.Operators.Queue.NoAdapter
: A queue that does not store its items
You can create your own adapter and set the config to use it.
Here is the full code of a possible memory adapter:
defmodule EventStreamex.Operators.Queue.MemAdapter do
use GenServer
@behaviour EventStreamex.Operators.Queue.QueueStorageAdapter
@impl true
def start_link(opts) do
GenServer.start_link(__MODULE__, opts, name: __MODULE__)
@impl EventStreamex.Operators.Queue.QueueStorageAdapter
def add_item(item) do, {:save, item})
@impl EventStreamex.Operators.Queue.QueueStorageAdapter
def delete_item(item) do, {:delete, item})
@impl EventStreamex.Operators.Queue.QueueStorageAdapter
def update_processors_status(item) do, {:update, item})
@impl EventStreamex.Operators.Queue.QueueStorageAdapter
def load_queue() do, :load)
@impl EventStreamex.Operators.Queue.QueueStorageAdapter
def reset_queue() do, :reset)
# Callbacks
@impl true
def init(_opts) do
{:ok, []}
@impl true
def handle_call({:save, item}, _from, queue) do
new_queue = queue ++ [item]
{:reply, {:ok, new_queue}, new_queue}
@impl true
def handle_call({:delete, {id, _item}}, _from, queue) do
new_queue = queue |> Enum.reject(fn {i, _t} -> i == id end)
{:reply, {:ok, new_queue}, new_queue}
@impl true
def handle_call({:update, {id, item}}, _from, queue) do
new_queue = queue |> {i, t} ->
if i == id do
{:reply, {:ok, new_queue}, new_queue}
@impl true
def handle_call({_action, nil}, _from, queue) do
{:reply, {:ok, queue}, queue}
@impl true
def handle_call(:load, _from, queue) do
{:reply, {:ok, queue}, queue}
@impl true
def handle_call(:reset, _from, _queue) do
{:reply, {:ok, []}, []}
The adapter must be a GenServer
and declare the function start_link/1
Then, it must provide these functions:
: Add a new item to the queuedelete_item/1
: Delete an item from the queueupdate_processors_status/1
: Updates the specified itemload_queue/0
: Return the entire queuereset_queue/0
: Remove all items from the queue
An item is a tuple containing:
- The ID of the queue item
- The operator modules that process the event. It's a map with the module of the operator as key and a boolean as value. The boolean indicates the completion status for this operator
- The WalEx event (more on this structure in
Callback called when a new item must be stored in the queue.
Callback called when an item has been processed and must be deleted from the queue.
Called at startup or in case of processor crash to return the full list of items.
Called for tests to reset the queue
Called when starting the queue adapter.
Updates the first item in the queue.
Returns a specification to start this module under a supervisor.
@type queue() :: [queue_item()]
A list of queue_item/0
@type queue_item() :: {binary(), {map(), WalEx.Event.t()}} | nil
A queue item to store.
An item is a tuple containing:
- The ID of the queue item
- The operator modules that process the event. It's a map with the module of the operator as key and a boolean as value. The boolean indicates the completion status for this operator
- The WalEx event (more on this structure in
@callback add_item(queue_item()) :: {:ok, term()} | {:error, term()}
Callback called when a new item must be stored in the queue.
The item is a queue_item/0
The return value is a result tuple. The value in case of success is not used for the moment.
@callback delete_item(queue_item()) :: {:ok, term()} | {:error, term()}
Callback called when an item has been processed and must be deleted from the queue.
The item is a queue_item/0
The return value is a result tuple. The value in case of success is not used for the moment.
Called at startup or in case of processor crash to return the full list of items.
The return value is a result tuple with the list of items queue/0
Called for tests to reset the queue
The return value is a result tuple with an empty list.
Called when starting the queue adapter.
The parameter is the one given in the config file when you set the adapter.
The return value is a result with the pid
of the adapter processor.
@callback update_processors_status(queue_item()) :: {:ok, term()} | {:error, term()}
Updates the first item in the queue.
This is used to update the processors status from the current task
Returns a specification to start this module under a supervisor.
See Supervisor