View Source EventStreamex.Operators.ProcessStatus.ProcessStatusStorageAdapter behaviour (EventStreamex v1.3.1)
Behaviour for the process status storage.
Everytime an event is received we will save its name and timestamp to keep a process status up to date.
The process status allows us to know for each entity the last time it has been processed. Here, "processed" means all of these steps:
- Event received from WAL
- Event emitted in pubsub
- Event processed by all operators listening to it
The adapters keeps a copy of this process status, mostly in case of crash (Except for EventStreamex.Operators.ProcessStatus.NoAdapter
which does not saves the process status and should not be used in production)
There are currently 2 process status adapters:
EventStreamex.Operators.ProcessStatus.DbAdapter
: A process status that uses the database tu store its items (This is the default process status adapter)EventStreamex.Operators.ProcessStatus.NoAdapter
: A process status that do not store its items
You can create your own adapter and set the config to use it.
Here is the full code of a possible memory adapter:
defmodule EventStreamex.Operators.ProcessStatus.MemAdapter do
use GenServer
@behaviour EventStreamex.Operators.ProcessStatus.ProcessStatusStorageAdapter
@impl true
def start_link(opts) do
GenServer.start_link(__MODULE__, opts, name: __MODULE__)
end
@impl EventStreamex.Operators.ProcessStatus.ProcessStatusStorageAdapter
def item_processed(item) do
GenServer.call(__MODULE__, {:save, item})
end
@impl EventStreamex.Operators.ProcessStatus.ProcessStatusStorageAdapter
def load() do
GenServer.call(__MODULE__, :load)
end
@impl EventStreamex.Operators.ProcessStatus.ProcessStatusStorageAdapter
def reset() do
GenServer.call(__MODULE__, :reset)
end
# Callbacks
@impl true
def init(_opts) do
{:ok, %{}}
end
@impl true
def handle_call({:save, {entity, timestamp}}, _from, process_status) do
new_process_status = process_status |> Map.put(entity, timestamp)
{:reply, {:ok, new_process_status}, new_process_status}
end
@impl true
def handle_call(:load, _from, process_status) do
{:reply, {:ok, process_status |> Map.to_list()}, process_status}
end
@impl true
def handle_call(:reset, _from, _process_status) do
{:reply, {:ok, []}, []}
end
end
The adapter must be a GenServer
and declare the function start_link/1
.
Then, it must provide these functions:
item_processed/1
: Updates the timestamps of an entity (or adds it if the entity was not known before)load/0
: Return the entire process statusreset/0
: Remove all items from the process status
An item is a tuple containing:
- The name of the entity (as a string)
- The timestamp of the last event processed for this entity
Summary
Callbacks
Callback called when an entity has just been processed.
Called at startup or in case of processor crash to return the full list of items.
Called for tests to reset the process status
Called when starting the process status adapter.
Functions
Returns a specification to start this module under a supervisor.
Types
A process status item.
An item is a tuple containing:
- The entity name
- The timestamp of the last event processed for this entity
@type process_status_list() :: [process_status()]
A list of process_status/0
Callbacks
@callback item_processed(process_status()) :: {:ok, term()} | {:error, term()}
Callback called when an entity has just been processed.
This callback must save it with its timestamps
The item is a process_status/0
.
The return value is a result tuple. The value in case of success is not used for the moment.
@callback load() :: {:ok, process_status_list()} | {:error, term()}
Called at startup or in case of processor crash to return the full list of items.
The return value is a result tuple with the list of items process_status/0
.
@callback reset() :: {:ok, process_status_list()} | {:error, term()}
Called for tests to reset the process status
The return value is a result tuple with an empty list.
Called when starting the process status adapter.
The parameter is the one given in the config file when you set the adapter.
The return value is a result with the pid
of the adapter processor.
Functions
Returns a specification to start this module under a supervisor.
See Supervisor
.