View Source EventStreamex.Operators.ProcessStatus.ProcessStatusStorageAdapter behaviour (EventStreamex v1.3.1)

Behaviour for the process status storage.

Everytime an event is received we will save its name and timestamp to keep a process status up to date.

The process status allows us to know for each entity the last time it has been processed. Here, "processed" means all of these steps:

  • Event received from WAL
  • Event emitted in pubsub
  • Event processed by all operators listening to it

The adapters keeps a copy of this process status, mostly in case of crash (Except for EventStreamex.Operators.ProcessStatus.NoAdapter which does not saves the process status and should not be used in production)

There are currently 2 process status adapters:

You can create your own adapter and set the config to use it.

Here is the full code of a possible memory adapter:

defmodule EventStreamex.Operators.ProcessStatus.MemAdapter do
  use GenServer
  @behaviour EventStreamex.Operators.ProcessStatus.ProcessStatusStorageAdapter

  @impl true
  def start_link(opts) do
    GenServer.start_link(__MODULE__, opts, name: __MODULE__)
  end

  @impl EventStreamex.Operators.ProcessStatus.ProcessStatusStorageAdapter
  def item_processed(item) do
    GenServer.call(__MODULE__, {:save, item})
  end

  @impl EventStreamex.Operators.ProcessStatus.ProcessStatusStorageAdapter
  def load() do
    GenServer.call(__MODULE__, :load)
  end

  @impl EventStreamex.Operators.ProcessStatus.ProcessStatusStorageAdapter
  def reset() do
    GenServer.call(__MODULE__, :reset)
  end

  # Callbacks

  @impl true
  def init(_opts) do
    {:ok, %{}}
  end

  @impl true
  def handle_call({:save, {entity, timestamp}}, _from, process_status) do
    new_process_status = process_status |> Map.put(entity, timestamp)
    {:reply, {:ok, new_process_status}, new_process_status}
  end

  @impl true
  def handle_call(:load, _from, process_status) do
    {:reply, {:ok, process_status |> Map.to_list()}, process_status}
  end

  @impl true
  def handle_call(:reset, _from, _process_status) do
    {:reply, {:ok, []}, []}
  end
end

The adapter must be a GenServer and declare the function start_link/1.

Then, it must provide these functions:

  • item_processed/1: Updates the timestamps of an entity (or adds it if the entity was not known before)
  • load/0: Return the entire process status
  • reset/0: Remove all items from the process status

An item is a tuple containing:

  • The name of the entity (as a string)
  • The timestamp of the last event processed for this entity

Summary

Callbacks

Callback called when an entity has just been processed.

Called at startup or in case of processor crash to return the full list of items.

Called for tests to reset the process status

Called when starting the process status adapter.

Functions

Returns a specification to start this module under a supervisor.

Types

Link to this type

process_status()

View Source (since 1.1.0)
@type process_status() :: {binary(), integer()}

A process status item.

An item is a tuple containing:

  • The entity name
  • The timestamp of the last event processed for this entity
Link to this type

process_status_list()

View Source (since 1.1.0)
@type process_status_list() :: [process_status()]

A list of process_status/0

Callbacks

Link to this callback

item_processed(process_status)

View Source (since 1.1.0)
@callback item_processed(process_status()) :: {:ok, term()} | {:error, term()}

Callback called when an entity has just been processed.

This callback must save it with its timestamps

The item is a process_status/0.

The return value is a result tuple. The value in case of success is not used for the moment.

@callback load() :: {:ok, process_status_list()} | {:error, term()}

Called at startup or in case of processor crash to return the full list of items.

The return value is a result tuple with the list of items process_status/0.

@callback reset() :: {:ok, process_status_list()} | {:error, term()}

Called for tests to reset the process status

The return value is a result tuple with an empty list.

Link to this callback

start_link(any)

View Source (since 1.1.0)
@callback start_link(any()) :: {:ok, pid()}

Called when starting the process status adapter.

The parameter is the one given in the config file when you set the adapter.

The return value is a result with the pid of the adapter processor.

Functions

Link to this function

child_spec(init_arg)

View Source (since 1.1.0)

Returns a specification to start this module under a supervisor.

See Supervisor.