View Source Spear.Filter (Spear v1.4.1)
A server-side filter to apply when reading events from an EventStoreDB
Regular expressions
Elixir's built-in Regex
module and Kernel.sigil_r/2
use PCRE-compatible regular expressions, but from the EventStoreDB
codebase
it seems that at time of writing, filtering is done with C-sharp's
System.Text.RegularExpressions
built-in. C-sharp regular expressions
diverge in syntax a bit from PCRE-compatible expressions, so mileage may
vary when passing Elixir regular expressions.
As an escape hatch, you may build a Spear.Filter.t/0
manually and use
a String.t/0
as the :by
field. This value will be untouched except
for last-minute encoding to go over-the-wire to the server.
%Spear.Filter{
on: :stream_name,
by: "<some-complicated-regex>",
checkpoint_after: 1024
}
If possible, the list of prefixes should be used as the :by
option as
they are unambiguous.
Checkpoints
The EventStoreDB will emit checkpoint events to a subscriber regularly.
This prevents a possible (and perhaps probable) scenario where
- the EventStoreDB contains many events
- the client is searching for a small number of events relative to the size
of
:all
- (and/or) the target events are sparsely spread throughout
:all
Under these conditions, the EventStoreDB may progress through :all
quite a
ways before finding an event. If the connection is severed between the client
and server while the EventStoreDB is part-way through a large drought of
targeted events, the server will need to re-seek through the drought
when the client re-connects and passes an old :from
option to
Spear.subscribe/4
.
These checkpoints will arrive in the mailbox of the subscriber process as
Spear.Filter.Checkpoint.t/0
structs and may be used as a restore point
by passing them to the :from
option of Spear.subscribe/4
.
For example, say we have a subscriber process which is a GenServer and some
function save_checkpoint/1
which saves checkpoint information durably
(to disk or a database for example). It will handle subscription events
in its GenServer.handle_info/2
callback.
defmodule MySubscriber do
use GenServer
alias Spear.{Event, Filter.Checkpoint}
..
@impl GenServer
def handle_info(%Event{} = event, state) do
# .. handle the event
event |> Event.to_checkpoint() |> save_checkpoint()
{:noreply, state}
end
def handle_info(%Checkpoint{} = checkpoint, state) do
save_checkpoint(checkpoint)
{:noreply, state}
end
end
Checkpoint Interval
The EventStoreDB will send a checkpoint after filtering a configurable number
of events. The :checkpoint_after
field can be used to configure this
behavior. Note that the :checkpoint_after
is only allowed by the server
to be a multiple of 32
. Other values will be rounded to the nearest
multiple.
The default is set at 1024
(32
* 32
) in Spear but this is tunable
per filter with checkpoint_after/2
, or by manually adjusting the
:checkpoint_after
field in this struct.
Summary
Functions
Sets the checkpoint interval
Produces a filter which excludes system events
A sigil defining a filter, without escaping
A sigil defining short-hand notation for writing filters
Types
@type t() :: %Spear.Filter{ by: Regex.t() | String.t() | [String.t()], checkpoint_after: pos_integer(), on: :event_type | :stream_name }
A filter which can be applied to a subscription to trigger server-side result filtering
This filter type is intended to be passed as the :filter
option to
Spear.subscribe/4
.
Examples
iex> import Spear.Filter
iex> ~f/^[^\$].*/rs # exclude system events which start with "$"
%Spear.Filter{by: ~r/^[^\$].*/, checkpoint_after: 1024, on: :stream_name}
Functions
@spec checkpoint_after(t(), pos_integer()) :: t()
Sets the checkpoint interval
Examples
iex> import Spear.Filter
iex> checkpoint_after(~f/^[^\$].*/rs, 32 * 8)
%Spear.Filter{by: ~r/^[^\$].*/, checkpoint_after: 256, on: :stream_name}
@spec exclude_system_events() :: t()
Produces a filter which excludes system events
This is a potentially common filter for subscribers reading from :all
.
The sigil_f/2
version is
~f/^[^\$].*/rs
Examples
iex> Spear.Filter.exclude_system_events()
%Spear.Filter{by: ~r/^[^\$].*/, checkpoint_after: 1024, on: :stream_name}
A sigil defining a filter, without escaping
Works the same as sigil_f/2
but does not allow interpolation or escape
sequences.
Examples
iex> import Spear.Filter
iex> ~F/^[^\$].*/rs
%Spear.Filter{by: ~r/^[^\$].*/, checkpoint_after: 1024, on: :stream_name}
A sigil defining short-hand notation for writing filters
Filters may either filter on EventStoreDB stream name or event type and may either filter by a regular expression or a list of prefix strings.
Modifiers
This f
sigil supports the following modifiers. (Note that modifiers are
the suffix of the sigil. E.g. the i
in ~r/hello/i
is a modifier for
the regex sigil that declares that the match is case-insensitive.)
For the choice between stream-name and event-type filtering:
s
- filter on the stream namet
- filter on the event type
For the choice between prefixes and regular expressions:
p
- filter by a list of prefixes. If this option is passed the sigil body will be interpreted as a white-space separated list of prefixes similar tosigil_w/2
from the standard libraryr
- filter using a regular expression
Examples
iex> import Spear.Filter
iex> ~f/My.Aggregate.A- My.Aggregate.B-/ps
%Spear.Filter{
by: ["My.Aggregate.A-", "My.Aggregate.B-"],
checkpoint_after: 1024,
on: :stream_name
}
iex> ~f/^[^\$].*/rs
%Spear.Filter{by: ~r/^[^\$].*/, checkpoint_after: 1024, on: :stream_name}