View Source Spear.Filter (Spear v1.0.0)

A server-side filter to apply when reading events from an EventStoreDB

regular-expressions

Regular expressions

Elixir's built-in Regex module and Kernel.sigil_r/2 use PCRE-compatible regular expressions, but from the EventStoreDB codebase it seems that at time of writing, filtering is done with C-sharp's System.Text.RegularExpressions built-in. C-sharp regular expressions diverge in syntax a bit from PCRE-compatible expressions, so mileage may vary when passing Elixir regular expressions.

As an escape hatch, you may build a Spear.Filter.t/0 manually and use a String.t/0 as the :by field. This value will be untouched except for last-minute encoding to go over-the-wire to the server.

%Spear.Filter{
  on: :stream_name,
  by: "<some-complicated-regex>",
  checkpoint_after: 1024
}

If possible, the list of prefixes should be used as the :by option as they are unambiguous.

checkpoints

Checkpoints

The EventStoreDB will emit checkpoint events to a subscriber regularly.

This prevents a possible (and perhaps probable) scenario where

  • the EventStoreDB contains many events
  • the client is searching for a small number of events relative to the size of :all
  • (and/or) the target events are sparsely spread throughout :all

Under these conditions, the EventStoreDB may progress through :all quite a ways before finding an event. If the connection is severed between the client and server while the EventStoreDB is part-way through a large drought of targeted events, the server will need to re-seek through the drought when the client re-connects and passes an old :from option to Spear.subscribe/4.

These checkpoints will arrive in the mailbox of the subscriber process as Spear.Filter.Checkpoint.t/0 structs and may be used as a restore point by passing them to the :from option of Spear.subscribe/4.

For example, say we have a subscriber process which is a GenServer and some function save_checkpoint/1 which saves checkpoint information durably (to disk or a database for example). It will handle subscription events in its GenServer.handle_info/2 callback.

defmodule MySubscriber do
  use GenServer

  alias Spear.{Event, Filter.Checkpoint}

  ..

  @impl GenServer
  def handle_info(%Event{} = event, state) do
    # .. handle the event

    event |> Event.to_checkpoint() |> save_checkpoint()

    {:noreply, state}
  end

  def handle_info(%Checkpoint{} = checkpoint, state) do
    save_checkpoint(checkpoint)

    {:noreply, state}
  end
end

checkpoint-interval

Checkpoint Interval

The EventStoreDB will send a checkpoint after filtering a configurable number of events. The :checkpoint_after field can be used to configure this behavior. Note that the :checkpoint_after is only allowed by the server to be a multiple of 32. Other values will be rounded to the nearest multiple.

The default is set at 1024 (32 * 32) in Spear but this is tunable per filter with checkpoint_after/2, or by manually adjusting the :checkpoint_after field in this struct.

Link to this section Summary

Types

t()

A filter which can be applied to a subscription to trigger server-side result filtering

Functions

Sets the checkpoint interval

Produces a filter which excludes system events

A sigil defining a filter, without escaping

A sigil defining short-hand notation for writing filters

Link to this section Types

@type t() :: %Spear.Filter{
  by: Regex.t() | String.t() | [String.t()],
  checkpoint_after: pos_integer(),
  on: :event_type | :stream_name
}

A filter which can be applied to a subscription to trigger server-side result filtering

This filter type is intended to be passed as the :filter option to Spear.subscribe/4.

examples

Examples

iex> import Spear.Filter
iex> ~f/^[^\$].*/rs # exclude system events which start with "$"
%Spear.Filter{by: ~r/^[^\$].*/, checkpoint_after: 1024, on: :stream_name}

Link to this section Functions

Link to this function

checkpoint_after(filter, interval)

View Source (since 0.1.0)
@spec checkpoint_after(t(), pos_integer()) :: t()

Sets the checkpoint interval

examples

Examples

iex> import Spear.Filter
iex> checkpoint_after(~f/^[^\$].*/rs, 32 * 8)
%Spear.Filter{by: ~r/^[^\$].*/, checkpoint_after: 256, on: :stream_name}
Link to this function

exclude_system_events()

View Source (since 0.1.0)
@spec exclude_system_events() :: t()

Produces a filter which excludes system events

This is a potentially common filter for subscribers reading from :all.

The sigil_f/2 version is

~f/^[^\$].*/rs

examples

Examples

iex> Spear.Filter.exclude_system_events()
%Spear.Filter{by: ~r/^[^\$].*/, checkpoint_after: 1024, on: :stream_name}
Link to this function

sigil_F(source, mods)

View Source (since 0.1.0)
@spec sigil_F(binary(), charlist()) :: t()

A sigil defining a filter, without escaping

Works the same as sigil_f/2 but does not allow interpolation or escape sequences.

examples

Examples

iex> import Spear.Filter
iex> ~F/^[^\$].*/rs
%Spear.Filter{by: ~r/^[^\$].*/, checkpoint_after: 1024, on: :stream_name}
Link to this function

sigil_f(source, mods)

View Source (since 0.1.0)
@spec sigil_f(binary(), charlist()) :: t()

A sigil defining short-hand notation for writing filters

Filters may either filter on EventStoreDB stream name or event type and may either filter by a regular expression or a list of prefix strings.

modifiers

Modifiers

This f sigil supports the following modifiers. (Note that modifiers are the suffix of the sigil. E.g. the i in ~r/hello/i is a modifier for the regex sigil that declares that the match is case-insensitive.)

For the choice between stream-name and event-type filtering:

  • s - filter on the stream name
  • t - filter on the event type

For the choice between prefixes and regular expressions:

  • p - filter by a list of prefixes. If this option is passed the sigil body will be interpreted as a white-space separated list of prefixes similar to sigil_w/2 from the standard library
  • r - filter using a regular expression

examples

Examples

iex> import Spear.Filter
iex> ~f/My.Aggregate.A- My.Aggregate.B-/ps
%Spear.Filter{
  by: ["My.Aggregate.A-", "My.Aggregate.B-"],
  checkpoint_after: 1024,
  on: :stream_name
}
iex> ~f/^[^\$].*/rs
%Spear.Filter{by: ~r/^[^\$].*/, checkpoint_after: 1024, on: :stream_name}