View Source Beeline (Beeline v1.0.0)

A tool for building in-order GenStage topologies for EventStoreDB

Beeline provides a Broadway-like experience for building GenStage topologies for consuming streams of events from EventStoreDB in-order, usually one event at a time. Beeline aims to close over the supervision and basic configuration of the producer(s), as well as some of the run-of-the-mill procedure done in the GenStage.init/1 callback of the consumer such as linking the producer process(es).

the-beeline-topology

The Beeline Topology

Beeline creates a topology of GenStage, GenServer, and Supervisor processes. This topology looks like the following:

Supervisor
├── HealthChecker*
└── StageSupervisor
    ├── Producer*
    └── Consumer

Let's break these down from the bottom up:

  • "Consumer" - the GenStage consumer module which invokes Beeline.start_link/2, handles events, and increments stream positions.
  • "Producer*" - one or more GenStage producers which feed the consumer. These producers are declared with the :producers key and may either be Kelvin.InOrderSubscription, Volley.InOrderSubscription, or Beeline.DummyProducer producer modules.
  • "StageSupervisor" - a supervisor for the GenStage pipeline. This supervisor has a :transient restart strategy so that if the GenStage pipeline halts on an event it cannot handle, the StageSupervisor supervision tree is brought down but not the entire supervision tree. This behavior is desirable so that the health-checker process can continue reading the stream positions and so that an operator can perform any necessary manual intervention on the crashed supervision tree (for example, skipping the failure event).
  • "HealthChecker*" - a GenServer which periodically polls the stream positions of a producer. There is one health checker process per producer.
  • "Supervisor" - a top-level supervisor. This supervisor has a :permanent restart strategy.

See the start_link/2 documentation for a full configuration reference and examples.

Link to this section Summary

Functions

Wraps an event in a subscription event packet

Decodes the body of a subscription event

Determines which producer emitted the subscription event

Restarts the supervision tree of GenStages for the given Beeline topology

Starts a Beeline topology

Determines the stream position of the subscription event

Gives a set of events to a topology's dummy producer

Link to this section Functions

Link to this function

as_subscription_event(event, producer)

View Source
@spec as_subscription_event(map(), atom()) :: {atom(), map()}

Wraps an event in a subscription event packet

This can be useful for building test events to pass through the dummy producer.

Link to this function

decode_event(subscription_event)

View Source

Decodes the body of a subscription event

This function performs JSON decoding if necessary and converts maps with string keys into maps keyed by atoms. This This can potentially lead to atom exhaustion, but the allowed atom count is quite high and usually this concern is only theoretical.

examples

Examples

@impl GenStage
def handle_events([subscription_event], _from, state) do
  event = Beeline.decode_event(subscription_event)
  # ..
Link to this function

producer(subscription_event)

View Source

Determines which producer emitted the subscription event

This can be useful in order to save stream positions when a consumer is subscribed to multiple producers. Should be used in tandem with stream_position/1.

@spec restart_stages(module()) :: :ok | {:error, term()}

Restarts the supervision tree of GenStages for the given Beeline topology

This can be useful for manual intervention by a human operator in a remote console session, if the GenStage supervision tree crashes and exceeds the retry limits.

examples

Examples

iex> Beeline.restart_stages(MyEventHandler)
:ok
Link to this function

start_link(module, opts)

View Source (since 0.1.0)
@spec start_link(module :: module(), opts :: Keyword.t()) :: Supervisor.on_start()

Starts a Beeline topology

options

Options

  • :name - The GenServer name for the topology. The topology will build on this name, using it as a prefix.

  • :producers - A list of producers to which the consumer should subscribe. See the "producer options" section below for the schema.

  • :get_stream_position - A function to invoke in order to get the stream position for a producer. This function should be a 1-arity function (anonymous or capture) where the name of the producer is passed as the argument. This option may also be passed as an MFA tuple where the producer name will be prepended to the argument list. If this option is not provided, a default will be fetched with Application.fetch_env!(:beeline, :get_stream_position). This configuration can be used to set a blanket function for all beelines to use.

  • :auto_subscribe? - A function to invoke to determine whether each producer should subscribe to events as it starts up. The argument passed is the GenServer name of the producer. If this option is not provided, a default will be fetched with Application.fetch_env!(:beeline, :auto_subscribe?).

  • :subscribe_after - A period in msec after initialization when each producer should query the :auto_subscribe? function. The default value is {Enum, :random, [3000..5000]}.

  • :spawn_health_checkers? - Controls whether the topology should spawn the HealthChecker children. It can be useful to disable this in Mix.env() in [:dev, :test] as the health checker provides little or no value in those environments and can produce many log lines. If this option is left blank, it will be gotten from the application environment defaulting to true with Application.get_env(:beeline, :spawn_health_checkers?, true). The default value is nil.

  • :health_check_interval - How long the health checker processes should wait between polling the stream positions. Can either be a function (MFA or 0-arity function) or a non-negative integer. The value is treated as milliseconds. The default value is 51000.

  • :health_check_drift - A noise to add to the interval specified with :health_check_interval. This can be useful to allow that not all producers poll their positions at the same time, which can reduce strain on the stream position store and the EventStoreDB. Can either be a function (MFA or 0-arity function) or a non-negative integer. The value is treated as milliseconds. If a function is provided, it is invoked every time the health checker process attempts to schedule the next poll. The default value is {Enum, :random, [0..10000]}.

  • :test_mode? - Controls whether the topology should start up in test mode. In test mode, any adapters set in producer specs are switched out with the :dummy adapter. If this option is left blank, it will be gotten from the application environment defaulting to false with Application.get_env(:beeline, :test_mode?, false).

  • :context - A user-defined data structure which is used as the initial state of the GenStage consumer process. The default value is nil.

Producer options

  • :adapter - Required. The adapter module to use for creating the producer. Use :kelvin for EventStoreDB v3-5, :volley for EventStoreDB v20+, and :dummy for test cases.

  • :stream_name - Required. The name of the EventStoreDB stream to which this producer should subscribe for events.

  • :connection - Required. The module to use as a connection to form the subscription. When the :adapter option is :kelvin, this should be an Extreme client module. When the adapter is :volley, it should be a Spear.Client module.

  • :name - The full GenServer name to use for this producer. When this option is not provided, the name will be a formula of the name of the consumer and the key in the keyword list of producers. The default value is nil.

  • :max_demand - The maximum number of events the consumer is allowed to request from this producer. This option can be configured to allow batch processing. The default value is 1.

  • :min_demand - The minimum number of events the consumer can request at a time from this producer. The default value is 0.

examples

Examples

defmodule MyEventHandler do
  use Beeline

  def start_link(_opts) do
    Beeline.start_link(MyEventHandler,
      name: MyEventHandler,
      producers: [
        default: [
          name: MyEventHandler.EventListener,
          stream_name: "$ce-BoundedContext.AggregateName",
          connection: MyEventHandler.EventStoreDBConnection,
          adapter: :kelvin
        ]
      ]
    )
  end

  # .. GenStage callbacks

  @impl GenStage
  def handle_events([subscription_event], _from, state) do
    # .. handle the events one-by-one ..

    {:noreply, [], state}
  end
end
Link to this function

stream_position(subscription_event)

View Source

Determines the stream position of the subscription event

This function prefers link stream positions if available. This means that if the subscription from which the event is emitted is reading a projected stream such as a category stream, the returned stream position will reflect the position in the projected stream instead of the origin stream.

examples

Examples

@impl GenStage
def handle_events([subscription_event], _from, state) do
  # consume the events

  MyApp.Repo.transaction(fn ->
    # save some state

    producer = Beeline.producer(subscription_event)
    stream_position = Beeline.stream_position(subscription_event)
    MyApp.StreamPosition.persist(producer, stream_position)
  end)
end
Link to this function

test_events(events, beeline)

View Source

Gives a set of events to a topology's dummy producer

This function can be used to test running events through a topology. If there are multiple producers, one is picked at random.