Fact.EventIndexer behaviour (Fact v0.2.0)

View Source

Base behaviour and macro for building event indexers.

Fact.EventIndexer defines the callbacks and GenServer scaffolding shared by all event indexers in the Fact storage system. An indexer listens for new events and extracts zero, one, or many values from each even it processes. For each extracted value, the indexer create or appends to a file, recording the event's Fact.record_id/0.

Indexers are just event projections, that filter the event ledger, and produce keyed, ordered sets of events.

Safe to Delete

It's safe to delete any of index files and folders written to the file system, when the system is not operating. They will be recreated, the next time indexer is started.

Behaviour

An indexer must implement the index_event/3 callback, which extracts values from the supplied event.

The __using__/1 macro injects the full GenServer implementation that:

  • initializes and ensures the index exists
  • rebuilds the index from history on startup
  • subscribes to live event notifications
  • updates the index as new events arrive

Custom Indexers

Custom indexers only need to implement the callback.

Examples

This would produce an index for every user, including all the events which define a user_id in the event data.

defmodule YourApp.UserIndexer do
  use Fact.EventIndexer

  @impl true
  def index_event(schema, event, _opts) do
    unless is_nil(user_id = Map.get(event[schema.event_data], "user_id")), 
      do: to_string(user_id)
  end 
end

This would produce an index for every tenant, including all the events which define a tenant_id in the event metadata.

defmodule YourApp.TenantIndexer do
  use Fact.EventIndexer

  @impl true
  def index_event(schema, event, _opts) do
    unless is_nil(tenant_id = Map.get(event[schema.event_metadata], "tenant_id")),
      do: to_string(tenant_id)          
  end
end

Summary

Types

The values that can be return by a c:Fact.EventIndexer.index_event callback function.

This describes the results of the indexing process.

The value produced by an Fact.EventIndexer.indexer_id/0 when indexing an Fact.event_record/0.

The message that is published immediately after an indexer processes a Fact.record/0.

Custom option values passed to the Fact.EventIndexer.index_event/3 callback function to control the indexing of records.

The unique identifier for an indexer.

This is additional metadata for a specific Fact.EventIndexer.indexer_id/0.

A module that implements the Fact.EventIndexer behaviour to index records.

Option values passed to the Fact.EventIndexer.index_event/3 callback function to control the indexing of of records.

Options passed to the Fact.EventIndexer.index_event/3 callback function to control the indexing of records.

Option values used by the start_link/1 functions for indexer modules.

t()

The state structure used by indexers in the GenServer callback functions.

Callbacks

Called when an event needs to be indexed.

Functions

Injects the GenServer implementation and behavior for an event indexer.

Subscribe to messages published by the specified indexer.

Gets the name of the topic where the indexer publishes messages.

Types

index_event_result()

(since 0.1.0)
@type index_event_result() :: index_value() | [index_value()] | nil

The values that can be return by a c:Fact.EventIndexer.index_event callback function.

index_result()

(since 0.1.0)
@type index_result() :: %{
  position: Fact.event_position(),
  record_id: Fact.record_id(),
  index_values: [index_value()]
}

This describes the results of the indexing process.

index_value()

(since 0.1.0)
@type index_value() :: String.t()

The value produced by an Fact.EventIndexer.indexer_id/0 when indexing an Fact.event_record/0.

indexed_message()

(since 0.1.0)
@type indexed_message() :: {:indexed, indexer_id(), index_result()}

The message that is published immediately after an indexer processes a Fact.record/0.

indexer_custom_option()

(since 0.1.0)
@type indexer_custom_option() :: {atom(), term()}

Custom option values passed to the Fact.EventIndexer.index_event/3 callback function to control the indexing of records.

indexer_id()

(since 0.1.2)

The unique identifier for an indexer.

Built-in Indexers

indexer_key()

(since 0.1.0)
@type indexer_key() :: String.t()

This is additional metadata for a specific Fact.EventIndexer.indexer_id/0.

At the time of writing, only Fact.EventDataIndexer uses an Fact.EventIndexer.indexer_key/0, because there can be multiple processes running, each indexing a different key within an Fact.event_data/0

indexer_module()

(since 0.1.0)
@type indexer_module() :: :atom

A module that implements the Fact.EventIndexer behaviour to index records.

indexer_option()

(since 0.1.0)
@type indexer_option() :: {:indexer_key, indexer_key()} | indexer_custom_option()

Option values passed to the Fact.EventIndexer.index_event/3 callback function to control the indexing of of records.

indexer_options()

(since 0.1.0)
@type indexer_options() :: [indexer_option()]

Options passed to the Fact.EventIndexer.index_event/3 callback function to control the indexing of records.

option()

(since 0.1.0)
@type option() ::
  {:database_id, Fact.database_id()}
  | {:id, indexer_id()}
  | {:options, indexer_options()}

Option values used by the start_link/1 functions for indexer modules.

t()

(since 0.1.0)
@type t() :: %{
  database_id: Fact.database_id(),
  indexer: indexer_id(),
  indexer_opts: indexer_options(),
  checkpoint: Fact.event_position(),
  schema: Fact.event_record_schema()
}

The state structure used by indexers in the GenServer callback functions.

Callbacks

index_event(schema, event, indexer_options)

(since 0.1.0)
@callback index_event(
  schema :: Fact.event_record_schema(),
  event :: Fact.event(),
  indexer_options()
) :: index_event_result()

Called when an event needs to be indexed.

Functions

__using__(opts \\ [])

(since 0.1.2) (macro)

Injects the GenServer implementation and behavior for an event indexer.

This macro provides the complete GenServer scaffolding needed to build an event indexer, including initialization, index rebuilding from history, subscription to live events, and index updates as new events arrive.

Options

  • :name - The filesystem name for the indexer's directory. When not provided, it is derived automatically from the module name by taking the last segment, converting it to snake case and removing the _indexer suffix.

Requires

Modules using this macro must implement the index_event/3 callback to define how events are indexed.

subscribe(database_id, indexer)

(since 0.1.0)
@spec subscribe(Fact.database_id(), indexer_id()) :: :ok

Subscribe to messages published by the specified indexer.

Messages

topic(indexer)

(since 0.1.0)
@spec topic(indexer_id()) :: String.t()

Gets the name of the topic where the indexer publishes messages.