View Source Beeline (Beeline v1.1.6)
A tool for building in-order GenStage topologies for EventStoreDB
Beeline provides a Broadway-like experience for building GenStage
topologies for consuming streams of events from EventStoreDB in-order,
usually one event at a time. Beeline aims to close over the supervision
and basic configuration of the producer(s), as well as some of the
run-of-the-mill procedure done in the GenStage.init/1
callback of
the consumer such as linking the producer process(es).
The Beeline Topology
Beeline creates a topology of GenStage, GenServer, and Supervisor processes. This topology looks like the following:
Supervisor
├── HealthChecker*
└── StageSupervisor
├── Producer*
└── Consumer
Let's break these down from the bottom up:
- "Consumer" - the GenStage consumer module which invokes
Beeline.start_link/2
, handles events, and increments stream positions. - "Producer*" - one or more GenStage producers which feed the consumer.
These producers are declared with the
:producers
key and may either beKelvin.InOrderSubscription
,Volley.InOrderSubscription
, orBeeline.DummyProducer
producer modules. - "StageSupervisor" - a supervisor for the GenStage pipeline. This supervisor
has a
:transient
restart strategy so that if the GenStage pipeline halts on an event it cannot handle, theStageSupervisor
supervision tree is brought down but not the entire supervision tree. This behavior is desirable so that the health-checker process can continue reading the stream positions and so that an operator can perform any necessary manual intervention on the crashed supervision tree (for example, skipping the failure event). - "HealthChecker*" - a GenServer which periodically polls the stream positions of a producer. There is one health checker process per producer.
- "Supervisor" - a top-level supervisor. This supervisor has a
:permanent
restart strategy.
See the start_link/2
documentation for a full configuration reference and
examples.
Summary
Functions
Wraps an event in a subscription event packet
Decodes the body of a subscription event
Determines which producer emitted the subscription event
Restarts the supervision tree of GenStages for the given Beeline topology
Starts a Beeline topology
Determines the stream position of the subscription event
Gives a set of events to a topology's dummy producer
Functions
Wraps an event in a subscription event packet
This can be useful for building test events to pass through the dummy producer.
Decodes the body of a subscription event
This function performs JSON decoding if necessary and converts maps with string keys into maps keyed by atoms. This This can potentially lead to atom exhaustion, but the allowed atom count is quite high and usually this concern is only theoretical.
Examples
@impl GenStage
def handle_events([subscription_event], _from, state) do
event = Beeline.decode_event(subscription_event)
# ..
Determines which producer emitted the subscription event
This can be useful in order to save stream positions when a consumer is
subscribed to multiple producers. Should be used in tandem with
stream_position/1
.
@spec restart_stages(GenServer.name()) :: :ok | {:error, term()}
Restarts the supervision tree of GenStages for the given Beeline topology
This can be useful for manual intervention by a human operator in a remote console session, if the GenStage supervision tree crashes and exceeds the retry limits.
Examples
iex> Beeline.restart_stages(MyEventHandler)
:ok
@spec start_link(module :: module(), opts :: Keyword.t()) :: Supervisor.on_start()
Starts a Beeline topology
Options
:name
- The GenServer name for the topology. The topology will build on this name, using it as a prefix.:producers
(keyword/0
) - A list of producers to which the consumer should subscribe. See the "producer options" section below for the schema.:get_stream_position
- A function to invoke in order to get the stream position for a producer. This function should be a 1-arity function (anonymous or capture) where the name of the producer is passed as the argument. This option may also be passed as an MFA tuple where the producer name will be prepended to the argument list. If this option is not provided, a default will be fetched withApplication.fetch_env!(:beeline, :get_stream_position)
. This configuration can be used to set a blanket function for all beelines to use.:auto_subscribe?
- A function to invoke to determine whether each producer should subscribe to events as it starts up. The argument passed is the GenServer name of the producer. If this option is not provided, a default will be fetched withApplication.fetch_env!(:beeline, :auto_subscribe?)
.:subscribe_after
- A period in msec after initialization when each producer should query the:auto_subscribe?
function. The default value is{Enum, :random, [3000..5000]}
.:spawn_health_checkers?
- Controls whether the topology should spawn the HealthChecker children. It can be useful to disable this inMix.env() in [:dev, :test]
as the health checker provides little or no value in those environments and can produce many log lines. If this option is left blank, it will be gotten from the application environment defaulting totrue
withApplication.get_env(:beeline, :spawn_health_checkers?, true)
. The default value isnil
.:health_check_interval
- How long the health checker processes should wait between polling the stream positions. Can either be a function (MFA or 0-arity function) or a non-negative integer. The value is treated as milliseconds. The default value is51000
.:health_check_drift
- A noise to add to the interval specified with:health_check_interval
. This can be useful to allow that not all producers poll their positions at the same time, which can reduce strain on the stream position store and the EventStoreDB. Can either be a function (MFA or 0-arity function) or a non-negative integer. The value is treated as milliseconds. If a function is provided, it is invoked every time the health checker process attempts to schedule the next poll. The default value is{Enum, :random, [0..10000]}
.:test_mode?
- Controls whether the topology should start up in test mode. In test mode, any adapters set in producer specs are switched out with the:dummy
adapter. If this option is left blank, it will be gotten from the application environment defaulting tofalse
withApplication.get_env(:beeline, :test_mode?, false)
.:context
(term/0
) - A user-defined data structure which is used as the initial state of the GenStage consumer process. The default value isnil
.:acceptable_behind_by
(non_neg_integer/0
) - If listener is behind stream but by this number of events it is considered ok. If in next check it is still behind in this range but didn't process any event since last check, listener is considered stale or "behind". The default value is5
.
Producer options
:adapter
- Required. The adapter module to use for creating the producer. Use:kelvin
for EventStoreDB v3-5,:volley
for EventStoreDB v20+, and:dummy
for test cases.:stream_name
(String.t/0
) - Required. The name of the EventStoreDB stream to which this producer should subscribe for events.:connection
(atom/0
) - Required. The module to use as a connection to form the subscription. When the:adapter
option is:kelvin
, this should be an Extreme client module. When the adapter is:volley
, it should be aSpear.Client
module.:name
(atom/0
) - The full GenServer name to use for this producer. When this option is not provided, the name will be a formula of the name of the consumer and the key in the keyword list of producers. The default value isnil
.:max_demand
(pos_integer/0
) - The maximum number of events the consumer is allowed to request from this producer. This option can be configured to allow batch processing. The default value is1
.:min_demand
(non_neg_integer/0
) - The minimum number of events the consumer can request at a time from this producer. The default value is0
.
Examples
defmodule MyEventHandler do
use Beeline
def start_link(_opts) do
Beeline.start_link(MyEventHandler,
name: MyEventHandler,
producers: [
default: [
name: MyEventHandler.EventListener,
stream_name: "$ce-BoundedContext.AggregateName",
connection: MyEventHandler.EventStoreDBConnection,
adapter: :kelvin
]
]
)
end
# .. GenStage callbacks
@impl GenStage
def handle_events([subscription_event], _from, state) do
# .. handle the events one-by-one ..
{:noreply, [], state}
end
end
Determines the stream position of the subscription event
This function prefers link stream positions if available. This means that if the subscription from which the event is emitted is reading a projected stream such as a category stream, the returned stream position will reflect the position in the projected stream instead of the origin stream.
Examples
@impl GenStage
def handle_events([subscription_event], _from, state) do
# consume the events
MyApp.Repo.transaction(fn ->
# save some state
producer = Beeline.producer(subscription_event)
stream_position = Beeline.stream_position(subscription_event)
MyApp.StreamPosition.persist(producer, stream_position)
end)
end
Gives a set of events to a topology's dummy producer
This function can be used to test running events through a topology. If there are multiple producers, one is picked at random.