ExESDBGater.API (ex_esdb_gater v0.1.19)

The ExESDBGater.API GenServer acts as a simple high-availability proxy and load balancer for the GaterWorker processes in the cluster.

Summary

Functions

Acknowledge receipt of an event by a subscriber to persistent subscription.

Append events to a stream. ## Parameters

Returns a specification to start this module under a supervisor.

Gets a list of all gateway worker pids.

Get events from a stream, staring from a given version, in a given direction.

Get all streams from the store. ## Parameters

Get the subscriptions for a store.

Get the version of a stream.

List all managed stores in the cluster.

Gets a random pid of a gateway worker in the cluster.

Get events from a stream, staring from a given version, backward. ## Parameters

Get events from a stream, staring from a given version, forward.

Types

error()

@type error() :: term()

selector_type()

@type selector_type() :: String.t() | map()

store()

@type store() :: atom()

stream()

@type stream() :: String.t()

subscription_name()

@type subscription_name() :: String.t()

subscription_type()

@type subscription_type() ::
  :by_stream | :by_event_type | :by_event_pattern | :by_event_payload

Functions

ack_event(store, subscription_name, subscriber_pid, event)

@spec ack_event(
  store :: atom(),
  subscription_name :: String.t(),
  subscriber_pid :: pid(),
  event :: map()
) :: :ok | {:error, term()}

Acknowledge receipt of an event by a subscriber to persistent subscription.

append_events(store, stream_id, events)

@spec append_events(
  store :: atom(),
  stream_id :: stream(),
  events :: list()
) :: {:ok, integer()} | {:error, term()}

Append events to a stream. ## Parameters

  • store: the id of the store
  • stream_id: the id of the stream
  • events: the events to append

    Returns

    where new_version is the new version of the stream {:error, reason} if there was an error

append_events(store, stream_id, expected_version, events)

@spec append_events(
  store :: atom(),
  stream_id :: stream(),
  expected_version :: integer(),
  events :: list()
) ::
  {:ok, integer()}
  | {:error, term()}
  | {:error, {:wrong_expected_version, integer()}}

child_spec(init_arg)

Returns a specification to start this module under a supervisor.

See Supervisor.

delete_snapshot(store, source_uuid, stream_uuid, version)

@spec delete_snapshot(
  store :: atom(),
  source_uuid :: binary(),
  stream_uuid :: binary(),
  version :: non_neg_integer()
) :: :ok

gater_api_name()

gateway_worker_pids()

@spec gateway_worker_pids() :: list()

Gets a list of all gateway worker pids.

gateway_worker_pids_for_store(store_id)

@spec gateway_worker_pids_for_store(store_id :: atom()) :: list()

get_events(store, stream_id, start_version, count, direction \\ :forward)

@spec get_events(
  store :: atom(),
  stream_id :: stream(),
  start_version :: integer(),
  count :: integer(),
  direction :: :forward | :backward
) :: {:ok, list()} | {:error, term()}

Get events from a stream, staring from a given version, in a given direction.

get_gater_api_pids()

get_streams(store)

@spec get_streams(store :: atom()) :: {:ok, list()} | {:error, term()}

Get all streams from the store. ## Parameters

- store: the id of the store

## Returns

- a list of all streams in the store

get_subscriptions(store)

@spec get_subscriptions(store :: atom()) :: {:ok, list()} | {:error, term()}

Get the subscriptions for a store.

get_version(store, stream)

@spec get_version(
  store :: atom(),
  stream :: stream()
) :: {:ok, integer()} | {:error, term()}

Get the version of a stream.

list_snapshots(store, source_uuid \\ :any, stream_uuid \\ :any)

@spec list_snapshots(
  store :: atom(),
  source_uuid :: binary() | :any,
  stream_uuid :: binary() | :any
) :: {:ok, [map()]} | {:error, term()}

list_stores()

@spec list_stores() :: {:ok, list()} | {:error, term()}

List all managed stores in the cluster.

## Returns

  • {:ok, stores_map} containing store information
  • {:error, reason} if failed

random_gater_api()

random_gateway_worker()

@spec random_gateway_worker() :: pid()

Gets a random pid of a gateway worker in the cluster.

random_gateway_worker_pid_for_store(store_id)

@spec random_gateway_worker_pid_for_store(store_id :: atom()) :: pid()

read_snapshot(store, source_uuid, stream_uuid, version)

@spec read_snapshot(
  store :: atom(),
  source_uuid :: binary(),
  stream_uuid :: binary(),
  version :: non_neg_integer()
) :: {:ok, map()} | {:error, term()}

record_snapshot(store, source_uuid, stream_uuid, version, snapshot_record)

@spec record_snapshot(
  store :: atom(),
  source_uuid :: binary(),
  stream_uuid :: binary(),
  version :: non_neg_integer(),
  snapshot_record :: map()
) :: :ok

remove_subscription(store, type, selector, subscription_name \\ "transient")

@spec remove_subscription(
  store :: any(),
  type :: subscription_type(),
  selector :: selector_type(),
  subscription_name :: subscription_name()
) :: :ok | {:error, error()}

Remove a permanent or transient subscription.

save_subscription(store, type, selector, subscription_name \\ "transient", start_from \\ 0, subscriber \\ nil)

@spec save_subscription(
  store :: atom(),
  type :: atom(),
  selector :: String.t() | map(),
  subscription_name :: String.t(),
  start_from :: non_neg_integer(),
  subscriber :: pid() | nil
) :: :ok

Description

Add a permanent or transient subscription.

Parameters

- store: the id of the store

- type: the type of subscription (:by_stream, :by_event_type, :by_event_pattern, :by_event_payload)

- selector: the selector for the subscription

($all, $<stream-id>, event-type (a string), or event-pattern)

- subscription_name: the name of the subscription, "transient" for a transient subscriptions, except for subscriptions :by_event_pattern or :by_event_payload

- start_from: the version to start from

- subscriber: the pid of the subscriber

start_link(opts)

stream_backward(store, stream_id, start_version, count)

@spec stream_backward(
  store :: atom(),
  stream_id :: stream(),
  start_version :: integer(),
  count :: non_neg_integer()
) :: {:ok, stream()} | {:error, term()}

Get events from a stream, staring from a given version, backward. ## Parameters

- store: the id of the store

- stream_id: the id of the stream

- start_version: the version to start from

- count: the number of events to return

## Returns

- a stream of events

stream_forward(store, stream_id, start_version, count)

@spec stream_forward(
  store :: atom(),
  stream_id :: stream(),
  start_version :: integer(),
  count :: integer()
) :: {:ok, stream()} | {:error, term()}

Get events from a stream, staring from a given version, forward.

## Parameters

  • store: the id of the store
  • stream_id: the id of the stream
  • start_version: the version to start from
  • count: the number of events to return

    Returns

  • a stream of events