View Source Spear (Spear v1.4.1)
A sharp EventStoreDB 20+ client backed by mint
Streams
Spear uses the term stream across different contexts. There are four possible contexts for the term stream in Spear:
- HTTP2 streams of data
- gRPC stream requests, responses, and bidirectional communication
- EventStoreDB streams of events
- Elixir
Stream
s
Descriptions of each are given in the Streams guide.
Connections
Spear needs a connection to interact with an EventStoreDB. Spear provides
the Spear.Connection
GenServer for this purpose. Connections are referred
to as "conn
" in the documentation.
Like an Ecto.Repo
, it can be handy to have a module which itself represents
a connection to an EventStoreDB. For this, Spear provides Spear.Client
which allows one to call any function in Spear
without the conn
argument
on the client module.
defmodule MyApp.MyClient do
use Spear.Client,
otp_app: :my_app
end
iex> MyApp.MyClient.start_link(connection_string: "esdb://localhost:2113")
iex> MyApp.MyClient.stream!("my_stream") |> Enum.to_list()
[
%Spear.Event{},
%Spear.Event{},
..
]
See the Spear.Client
module for more information.
Record interfaces
The Spear.Records.*
modules provide macro interfaces for matching and
creating messages sent and received from the EventStoreDB. These are mostly
intended for internal uses such as the mapping between a
Spear.Records.Streams.read_resp/0
and a Spear.Event.t/0
, but they can
also be used to extract values from any raw response records (e.g. those
returned from functions where the raw?: true
option is passed).
iex> import Spear.Records.Streams, only: [read_resp: 0, read_resp: 1]
iex> event = Spear.stream!(conn, "my_stream", raw?: true) |> Enum.take(1) |> List.first()
{:"event_store.client.streams.ReadResp", {:checkpoint, ..}}
iex> match?(read_resp(), event)
true
iex> match?(read_resp(content: {:checkpoint, _}), event)
true
Macros in these modules are generated with Record.defrecord/2
with the
contents extracted from the protobuf messages (indirectly via :gpb
).
Summary
Utility Functions
Cancels a subscription
Determines the metadata stream for any given stream
Returns the parked events stream for a persistent subscription stream and group.
Parses an EventStoreDB timestamp into a DateTime.t()
in UTC time.
Pings a connection
Performs a generic request synchronously
Produces the scavenge stream for a scavenge ID
Sets the global stream ACL
Streams
Appends an enumeration of events to an EventStoreDB stream
Appends an enumeration of events to an EventStoreDB stream
A convenience wrapper around append_batch/5
for transforming a stream
into a batch append operation.
Deletes an EventStoreDB stream
Queries the metadata for a stream
Reads a chunk of events from an EventStoreDB stream into an enumerable
Sets a stream's metadata
Collects an EventStoreDB stream into an enumerable
Subscribes a process to an EventStoreDB stream
Users
Changes a user's password by providing the current password
Creates an EventStoreDB user
Deletes a user from the EventStoreDB
Disables a user's ability to make requests against the EventStoreDB
Enables a user to make requests against the EventStoreDB
Resets a user's password
Updates an existing EventStoreDB user
Fetches details about an EventStoreDB user
Operations
Requests that the indices be merged
Requests that the currently connected node resign its leadership role
Restarts all persistent subscriptions
Sets the node priority number
Shuts down the connected EventStoreDB
Requests that a scavenge be started
Stops a running scavenge
Persistent Subscriptions
Acknowledges that an event received as part of a persistent subscription was successfully handled
Subscribes a process to an existing persistent subscription
Creates a persistent subscription
Deletes a persistent subscription from the EventStoreDB
Gets information pertaining to a persistent subcription.
Lists the currently existing persistent subscriptions
Negatively acknowldeges a persistent subscription event
Requests that the server replays any messages parked for a persistent subscription stream and group.
Restarts the persistent subscription subsystem on the EventStoreDB server.
Updates an existing persistent subscription
Gossip
Reads the cluster information from the connected EventStoreDB
Monitoring
Subscribes a process to stats updates from the EventStoreDB
Server Features
Determines the current version of the connected server
Requests the available server RPCs
Utility Functions
cancel_subscription(conn, subscription_reference, timeout \\ 5000)
View Source (since 0.1.0)@spec cancel_subscription( connection :: Spear.Connection.t(), subscription_reference :: reference(), timeout() ) :: :ok | {:error, any()}
Cancels a subscription
This function will cancel a subscription if the provided
subscription_reference
exists, but is idempotent: if the
subscription_reference
is not an active subscription reference, :ok
will
be returned.
Subscriptions are automatically cancelled when a subscribe process exits.
Examples
iex> {:ok, subscription} = Spear.subscribe(conn, self(), "my_stream")
{:ok, #Reference<0.4293953740.2750676995.30541>}
iex> Spear.cancel_subscription(conn, subscription)
:ok
iex> Spear.cancel_subscription(conn, subscription)
:ok
Determines the metadata stream for any given stream
Meta streams are used by the EventStoreDB to store some internal information about a stream, and to configure features such setting time-to-lives for events or streams.
Examples
iex> Spear.meta_stream("es_supported_clients")
"$$es_supported_clients"
Returns the parked events stream for a persistent subscription stream and group.
If an event is negatively acknowledged and parked, the persistent subscription will add it to the park stream for the given stream+group combination. It can be useful to read this stream to determine if there are any parked messages.
Examples
iex> Spear.park_stream("MyStream", "MyGroup")
"$persistentsubscription-MyStream::MyGroup-parked"
@spec parse_stamp(stamp :: pos_integer()) :: {:ok, DateTime.t()} | {:error, atom()}
Parses an EventStoreDB timestamp into a DateTime.t()
in UTC time.
Examples
iex> Spear.parse_stamp(16187636458580612)
{:ok, ~U[2021-04-18 16:34:05.858061Z]}
@spec ping(connection :: Spear.Connection.t(), timeout()) :: :pong | {:error, any()}
Pings a connection
This can be used to ensure that the connection process is alive, or to roughly measure the latency between the connection process and EventStoreDB.
Examples
iex> Spear.ping(conn)
:pong
@spec request( connection :: Spear.Connection.t(), api :: module(), rpc :: atom(), messages :: Enumerable.t(), opts :: Keyword.t() ) :: {:ok, tuple() | Enumerable.t()} | {:error, any()}
Performs a generic request synchronously
This is appropriate for many operations across the Users, Streams, and
Operations APIs but not suitable for Spear.subscribe/4
or the
Persistent Subscriptions API.
message
must be an enumeration of records as created by the Record
Interfaces. Lazy stream enumerations are allowed and are not run until each
element is serialized over the wire.
This function is mostly used under-the-hood to implement functions in
Spear
such as Spear.create_user/5
, but may be used generically.
Options
:timeout
- (default:5_000
ms - 5s) the GenServer timeout: the maximum time allowed to wait for this request to complete.:credentials
- (default:nil
) the username and password to use to make the request. Overrides the connection-level credentials if provided. Connection-level credentials are used as the default if not provided.
Examples
iex> alias Spear.Records.Users
iex> require Users
iex> message = Users.enable_req(options: Users.enable_req_options(login_name: "my_user"))
iex> Spear.request(conn, Users, :Enable, [message], credentials: {"admin", "changeit"})
{:ok, Users.enable_resp()}
@spec scavenge_stream(scavenge :: String.t() | Spear.Scavenge.t()) :: String.t()
Produces the scavenge stream for a scavenge ID
start_scavenge/2
begins an asynchronous scavenge operation since scavenges
may be time consuming. In order to check the progress of a running scavenge,
one may read the scavenge stream with read_stream/3
or stream!/3
or
subscribe to updates on the scavenge with subscribe/4
.
Examples
iex> {:ok, scavenge} = Spear.start_scavenge(conn)
{:ok,
%Spear.Scavenge{id: "d2897ba8-2f0c-4fc4-bb25-798ba75f3562", result: :Started}}
iex> Spear.scavenge_stream(scavenge)
"$scavenges-d2897ba8-2f0c-4fc4-bb25-798ba75f3562"
set_global_acl(conn, user_acl, system_acl, opts \\ [])
View Source (since 0.1.3)@spec set_global_acl( connection :: Spear.Connection.t(), user_acl :: Spear.Acl.t(), system_acl :: Spear.Acl.t(), opts :: Keyword.t() ) :: :ok | {:error, any()}
Sets the global stream ACL
This function appends metadata to the $streams
EventStoreDB stream
detailing how the EventStoreDB should allow access to user and system
streams (with the user_acl
and system_acl
arguments, respectively).
See the security guide for more information.
Options
:json_encode!
- (default:Jason.encode!/1
) a 1-arity JSON encoding function used to serialize the event. This event must be JSON encoded in order for the EventStoreDB to consider it valid.
Remaining options are passed to Spear.append/4
. The :expect
option
will be applied to the $streams
system stream, so one could attempt to
set the initial ACL by passing expect: :empty
.
Examples
This recreates the default ACL:
iex> Spear.set_global_acl(conn, Spear.Acl.allow_all(), Spear.Acl.admins_only())
:ok
Streams
@spec append( event_stream :: Enumerable.t(), connection :: Spear.Connection.t(), stream_name :: String.t(), opts :: Keyword.t() ) :: :ok | {:ok, AppendResp.t()} | {:error, reason :: Spear.ExpectationViolation.t() | any()}
Appends an enumeration of events to an EventStoreDB stream
event_stream
is an enumerable which may either be a
collection of Spear.Event.t/0
structs or more low-level
Spear.Records.Streams.append_resp/0
records. In cases where the enumerable
produces Spear.Event.t/0
structs, they will be lazily mapped to
Spear.Records.Streams.append_req/0
records before being encoded to wire
data.
See the Writing Events guide for more information about writing events.
Options
:expect
- (default::any
) the expectation to set on the status of the stream. The write will fail if the expectation fails. SeeSpear.ExpectationViolation
for more information about expectations.:timeout
- (default:5_000
- 5s) the GenServer timeout for calling the RPC.:raw?
- (default:false
) a boolean which controls whether the return signature should be a simple:ok | {:error, any()}
or{:ok, AppendResp.t()} | {:error, any()}
. This can be used to extract metadata and information from the append response which is not available through the simplified return API, such as the stream's revision number after writing the events.:credentials
- (default:nil
) a two-tuple{username, password}
to use as credentials for the request. This option overrides any credentials set in the connection configuration, if present. See the Security guide for more details.
Examples
iex> [Spear.Event.new("es_supported_clients", %{})]
...> |> Spear.append(conn, expect: :exists)
:ok
iex> [Spear.Event.new("es_supported_clients", %{})]
...> |> Spear.append(conn, expect: :empty)
{:error, %Spear.ExpectationViolation{current: 1, expected: :empty}}
append_batch(event_stream, conn, request_id, stream_name, opts \\ [])
View Source (since 0.10.0)@spec append_batch( event_stream :: Enumerable.t(), connection :: Spear.Connection.t(), request_id :: reference() | :new, stream_name :: String.t(), opts :: Keyword.t() ) :: {:ok, batch_id :: String.t(), request_id :: reference()} | {:error, term()}
Appends an enumeration of events to an EventStoreDB stream
BatchAppend is a feature added in EventStoreDB version 21.6.0 which aims to optimize append throughput. It works like a persistent subscription in reverse: the client sends chunks of events to write and once the events have been committed, the client receives an acknowledgement message.
BatchAppends have a life cycle: a new batch append request is created
by passing the :new
atom as the request_id
argument and a request
is concluded by calling Spear.cancel_subscription/2
on the request_id
.
See the Writing Events guide for more information
about batching and when to prefer the append_batch/5
function over
append/4
.
Fragmentation
By default, each invocation of append_batch/5
sends a single protobuf
message across the wire for all events in the event_stream
argument.
If the event_stream
argument is very large (> ~1MB), this may be undesirable
from a networking perspective.
The :done?
flag can be set to false
to mark a batch as a fragment,
which prevents the EventStoreDB from attempting to commit any events
sent for the batch until a fragment with the :done?
flag set
to true
is sent. Each batch of events after the first fragment must
pass the initial fragment's batch_id
in the :batch_id
option. A
set of fragments can be concluded by sending a final append_batch/5
with the :done?
option set to true
.
Options
:done?
- (default:true
) controls whether the current chunk of events being written is complete. See the "Fragmentation" section above for more details.:batch_id
- (default:Spear.Uuid.uuid_v4()
) the unique ID of the batch of events being appended. This must be passed thebatch_id
returned by the first request which sets:done?
tofalse
until a final fragment is appended (done?: true
). See the "Fragmentation" section above for more details.:expect
- (default::any
) the expectation to set on the status of the stream. The write will fail if the expectation fails. SeeSpear.ExpectationViolation
for more information about expectations.:send_ack_to
- (default:self()
) a process or process name which should receive acknowledgement messages detailing whether a batch has succeeded or failed to be committed by the deadline.:raw?
- (default:false
) a boolean which controls whether messages emitted to the:send_ack_to
process are decoded fromSpear.Records.Streams.batch_append_resp/0
records. Spear preserves most of the information when decoding the record into theSpear.BatchAppendResult.t/0
struct, but it discards duplicated information such as stream name and expected revision. Setting the:raw?
flag allows one to decode the record however they wish.:credentials
- (default:nil
) a two-tuple{username, password}
to use as credentials for the request. This option overrides any credentials set in the connection configuration, if present. See the Security guide for more details.:deadline
- (default:nil
) the deadline for the batch to be appended. This is like:timeout
but is interpreted on the EventStoreDB server. This may be aDateTime
or a tuple of{seconds, nanos}
since 1970 (smeared) but may be a tuple{:duration, seconds, nanos}
when using EventStoreDB version 21.10.5 or higher.:timeout
- (default:5_000
) the timeout for the initial call to open the batch request.
Examples
iex> {:ok, first_batch_id, request_id} =
...> Spear.append_batch(first_batch, conn, :new, first_stream_name)
{:ok, "496ba076-098f-4108-a2ca-c73f7b94c06f", #Reference<0.1691282361.2492989441.105913>}
iex> receive do
...> %Spear.BatchAppendResult{request_id: ^request_id, batch_id: ^first_batch_id, result: result} ->
...> result
...> end
:ok
iex> {:ok, second_batch_id, ^request_id} =
...> Spear.append_batch(second_batch, conn, batch_id, second_stream_name)
{:ok, "3a1ed972-716c-4679-9cc7-c23c3544e538", #Reference<0.1691282361.2492989441.105913>}
iex> receive(do: (%Spear.BatchAppendResult{request_id: ^request_id, batch_id: ^second_batch_id, result: result}) -> result))
:ok
iex> {:ok, third_batch_id, ^request_id} =
...> Spear.append_batch(third_batch, conn, batch_id, third_stream_name)
{:ok, "b4eb1330-8c59-48d0-8a0b-2df33672cc0b", #Reference<0.1691282361.2492989441.105913>}
iex> receive(do: (%Spear.BatchAppendResult{request_id: ^request_id, batch_id: ^third_batch_id, result: result}) -> result))
:ok
iex> Spear.cancel_subscription(conn, request_id)
:ok
@spec append_batch_stream( batch_stream :: Enumerable.t(), connection :: Spear.Connection.t() ) :: Enumerable.t()
A convenience wrapper around append_batch/5
for transforming a stream
into a batch append operation.
The append_batch/5
function provides fine-grained control over the
batch append feature. This function transforms an input stream of batches
to apply the append_batch/5
on all of them, making sure to clean up
the request once the batch is finished.
The expected batch_stream
is an enumerable with each element follows the
format
{stream_name :: String.t(), events :: [Spear.Event.t()]}
# or
{stream_name :: String.t(), events :: [Spear.Event.t()], opts :: Keyword.t()}
Where opts
can be any option below. The options below are applied to
each call to append_batch/5
when provided, except :credentials
which is only applied when specified on the first batch.
The resulting stream must be run (with an Enum
function or Stream.run/1
).
Each element is mapped to the acknowledgement Spear.BatchAppendResult.t/0
responses for each batch attempting to be appended.
Options
:expect
- (default::any
) the expectation to set on the status of the stream. The write will fail if the expectation fails. SeeSpear.ExpectationViolation
for more information about expectations.:raw?
- (default:false
) a boolean which controls whether messages emitted to the:send_ack_to
process are decoded fromSpear.Records.Streams.batch_append_resp/0
records. Spear preserves most of the information when decoding the record into theSpear.BatchAppendResult.t/0
struct, but it discards duplicated information such as stream name and expected revision. Setting the:raw?
flag allows one to decode the record however they wish.:credentials
- (default:nil
) a two-tuple{username, password}
to use as credentials for the request. This option overrides any credentials set in the connection configuration, if present. See the Security guide for more details.:timeout
- (default:5_000
) the timeout for the initial call to open the batch request. After the first batch, this argument instead controls how long eachappend_batch/5
operation will await an acknowledgement.
Examples
batch_stream
|> Spear.append_batch_stream(conn)
|> Enum.reduce_while(:ok, fn ack, _acc ->
case ack.result do
:ok -> {:cont, :ok}
{:error, reason} -> {:halt, {:error, reason}}
end
end)
#=> :ok
@spec delete_stream( connection :: Spear.Connection.t(), stream_name :: String.t(), opts :: Keyword.t() ) :: :ok | {:error, any()}
Deletes an EventStoreDB stream
EventStoreDB supports two kinds of stream deletions: soft-deletes and
tombstones. By default this function will perform a soft-delete. Pass the
tombstone?: true
option to tombstone the stream.
Soft-deletes make the events in the specified stream no longer accessible
through reads. A scavenge operation will reclaim the disk space taken by
any soft-deleted events. New events may be written to a soft-deleted stream.
When reading soft-deleted streams, :from
options of :start
and :end
will behave as expected, but all events in the stream will have revision
numbers off-set by the number of deleted events.
Tombstoned streams may not be written to ever again. Attempting to write
to a tombstoned stream will fail with a gRPC :failed_precondition
error
iex> [Spear.Event.new("delete_test", %{})] |> Spear.append(conn, "delete_test_0")
:ok
iex> Spear.delete_stream(conn, "delete_test_0", tombstone?: true)
:ok
iex> [Spear.Event.new("delete_test", %{})] |> Spear.append(conn, "delete_test_0")
{:error,
%Spear.Grpc.Response{
data: "",
message: "Event stream 'delete_test_0' is deleted.",
status: :failed_precondition,
status_code: 9
}}
Options
:tombstone?
- (default:false
) controls whether the stream is soft-deleted or tombstoned.:timeout
- (default:5_000
- 5s) the time allowed to block while waiting for the EventStoreDB to delete the stream.:expect
- (default::any
) the expected state of the stream when performing the deletion. Seeappend/4
andSpear.ExpectationViolation
for more information.:credentials
- (default:nil
) a two-tuple{username, password}
to use as credentials for the request. This option overrides any credentials set in the connection configuration, if present. See the Security guide for more details.
Examples
iex> Spear.append(events, conn, "my_stream")
:ok
iex> Spear.delete_stream(conn, "my_stream")
:ok
iex> Spear.stream!(conn, "my_stream") |> Enum.to_list()
[]
@spec get_stream_metadata( connection :: Spear.Connection.t(), stream :: String.t(), opts :: Keyword.t() ) :: {:ok, Spear.StreamMetadata.t()} | {:error, any()}
Queries the metadata for a stream
Note that the stream
argument is passed through meta_stream/1
before
being read. It is not necessary to call that function on the stream name
before passing it as stream
.
If no metadata has been set on a stream {:error, :unset}
is returned.
Options
Under the hood, get_stream_metadata/3
uses read_stream/3
and all options
are passed directly to that function. These options are overridden, however,
and cannot be changed:
:direction
:from
:max_count
:raw?
Examples
iex> Spear.get_stream_metadata(conn, "my_stream")
{:error, :unset}
iex> Spear.get_stream_metadata(conn, "some_stream_with_max_count")
{:ok, %Spear.StreamMetadata{max_count: 50_000, ..}}
@spec read_stream(Spear.Connection.t(), String.t() | :all, Keyword.t()) :: {:ok, event_stream :: Enumerable.t()} | {:error, any()}
Reads a chunk of events from an EventStoreDB stream into an enumerable
Unlike stream!/3
, this function will only read one chunk of events at a time
specified by the :max_count
option. This function also does not raise in
cases of error, instead returning an ok- or error-tuple.
If the stream_name
EventStoreDB stream does not exist (is empty) and the
gRPC request succeeds for this function, {:ok, []}
will be returned.
Options
:from
- (default::start
) the EventStoreDB stream revision from which to read. Valid values include:start
,:end
, any non-negative integer representing the event number revision in the stream and events. Event numbers are inclusive (e.g. reading from0
will first return the event with revision0
in the stream, if one exists).:start
and:end
are treated as inclusive (e.g.:start
will return the first event in the stream). Events (eitherSpear.Event
or ReadResp records) can also be supplied and will be treated as inclusive.:direction
- (default::forwards
) the direction in which to read the EventStoreDB stream. Valid values include:forwards
and:backwards
. Reading the EventStoreDB stream forwards will return events in the order in which they were written to the EventStoreDB; reading backwards will return events in the opposite order.:filter
- (default:nil
) the server-side filter to apply. This option is only valid if thestream_name
is:all
. SeeSpear.Filter
for more information. This feature requires EventStoreDB vTODO+.:resolve_links?
- (default:true
) whether or not to request that link references be resolved. See the moduledocs for more information about link resolution.:max_count
- (default:42
) the maximum number of events to read from the EventStoreDB stream. Any positive integer is valid. Even if the stream is longer than this:max_count
option, only:max_count
events will be returned from this function.:infinity
is not a valid value for:max_count
. Usestream!/3
for an enumerable which reads an EventStoreDB stream in its entirety in chunked network requests.:timeout
- (default:5_000
- 5s) the time allowed for the read of the single chunk of events in the EventStoreDB stream. Note that the gRPC request which reads events from the EventStoreDB is front-loaded in this function: the:timeout
covers the time it takes to read the events. The timeout may be exceeded:raw?:
- (default:false
) controls whether or not the enumerableevent_stream
is decoded toSpear.Event
structs from their rawReadResp
output. Settingraw?: true
prevents this transformation and leaves each event as aSpear.Records.Streams.read_resp/0
record. SeeSpear.Event.from_read_response/2
for more information.:credentials
- (default:nil
) a two-tuple{username, password}
to use as credentials for the request. This option overrides any credentials set in the connection configuration, if present. See the Security guide for more details.
Timing and Timeouts
The gRPC request which reads events from the EventStoreDB is front-loaded
in this function: this function returns immediately after receiving all data
off the wire from the network request. This means that the :timeout
option
covers the gRPC request and response time but not any time spend decoding
the response (see the Enumeration Characteristics section below for more
details on how the enumerable decodes messages).
The default timeout of 5s may not be enough time in cases of either reading very large numbers of events or reading events with very large bodies.
Note that up to the :max_count
of events is returned from this call
depending on however many events are in the EventStoreDB stream being read.
When tuning the :timeout
option, make sure to test against a stream which
is at least as long as :max_count
events.
Enumeration Characteristics
The event_stream
Enumerable.t/0
returned in the success case of this
function is a wrapper around the bytes received from the gRPC response. Note
that when the {:ok, event_stream}
is returned, the gRPC request has already
concluded.
This offers only marginal performance improvement: an enumerable is returned mostly for consistency in the Spear API.
Examples
# say we have 5 events in the stream "es_supported_clients"
iex> {:ok, events} = Spear.read_stream(conn, "es_supported_clients", max_count: 2)
iex> events |> Enum.count()
2
iex> {:ok, events} = Spear.read_stream(conn, "es_supported_clients", max_count: 10)
iex> events |> Enum.count()
5
iex> events |> Enum.take(1)
[
%Spear.Event{
body: %{"languages" => ["typescript", "javascript"], "runtime" => "NodeJS"},
id: "1fc908c1-af32-4d06-a9bd-3bf86a833fdf",
metadata: %{..},
type: "grpc-client"
}
]
set_stream_metadata(conn, stream, metadata, opts \\ [])
View Source (since 0.1.3)@spec set_stream_metadata( connection :: Spear.Connection.t(), stream :: String.t(), metadata :: Spear.StreamMetadata.t(), opts :: Keyword.t() ) :: :ok | {:error, any()}
Sets a stream's metadata
Note that the stream
argument is passed through meta_stream/1
before
being read. It is not necessary to call that function on the stream name
before passing it as stream
.
Options
This function uses append/4
under the hood. All options are passed to
the opts
argument of append/4
.
Examples
# only allow admins to read, write, and delete the stream (or stream metadata)
iex> metadata = %Spear.StreamMetadata{acl: Spear.Acl.admins_only()}
iex> Spear.set_stream_metadata(conn, stream, metadata)
:ok
@spec stream!( connection :: Spear.Connection.t(), stream_name :: String.t() | :all, opts :: Keyword.t() ) :: event_stream :: Enumerable.t()
Collects an EventStoreDB stream into an enumerable
This function may raise in cases where the gRPC requests fail to read events from the EventStoreDB (in cases of timeout or unavailability).
This function does not raise if a stream does not exist (is empty), instead
returning an empty enumerable []
.
connection
may be any valid GenServer name (including PIDs) for a process
running the Spear.Connection
GenServer.
stream_name
can be any stream, existing or not, including projected
streams such as category streams or event-type streams. The :all
atom
may be passed as stream_name
to read all events in the EventStoreDB.
Options
:from
- (default::start
) the EventStoreDB stream revision from which to read. Valid values include:start
,:end
, any non-negative integer representing the event number revision in the stream and events. Event numbers are inclusive (e.g. reading from0
will first return the event with revision0
in the stream, if one exists).:start
and:end
are treated as inclusive (e.g.:start
will return the first event in the stream). Events (eitherSpear.Event
or ReadResp records) can also be supplied and will be treated as inclusive.:direction
- (default::forwards
) the direction in which to read the EventStoreDB stream. Valid values include:forwards
and:backwards
. Reading the EventStoreDB stream forwards will return events in the order in which they were written to the EventStoreDB; reading backwards will return events in the opposite order.:filter
- (default:nil
) the server-side filter to apply. This option is only valid if thestream_name
is:all
. SeeSpear.Filter
for more information. This feature requires EventStoreDB vTODO+.:resolve_links?
- (default:true
) whether or not to request that link references be resolved. See the moduledocs for more information about link resolution.:chunk_size
- (default:128
) the number of events to read from the EventStoreDB at a time. Any positive integer is valid. See the enumeration characteristics section below for more information about how:chunk_size
works and how to tune it.:timeout
- (default:5_000
- 5s) the time allowed for the read of a single chunk of events in the EventStoreDB stream. This time is not cumulative: an EventStoreDB stream 100 events long which takes 5s to read each chunk may be read in chunks of 20 events culumaltively in 25s. A timeout of5_001
ms would not raise a timeout error in that scenario (assuming the chunk read consistently takes<= 5_000
ms).:raw?:
- (default:false
) controls whether or not the enumerableevent_stream
is decoded toSpear.Event
structs from their rawSpear.Records.Streams.read_resp/0
output. Settingraw?: true
prevents this transformation and leaves each event as aReadResp
record. SeeSpear.Event.from_read_response/2
for more information.:credentials
- (default:nil
) a two-tuple{username, password}
to use as credentials for the request. This option overrides any credentials set in the connection configuration, if present. See the Security guide for more details.
Enumeration Characteristics
The event_stream
Enumerable.t/0
returned by this function initially
contains a buffer of bytes from the first read of the stream stream_name
.
This buffer potentially contains up to :chunk_size
messages when run.
The enumerable is written as a formula which abstracts away the chunking
nature of the gRPC requests, however, so even though the EventStoreDB stream is
read in chunks (per the :chunk_size
option), the entire EventStoreDB stream
can be read by running the enumeration (e.g. with Enum.to_list/1
). Note
that the stream will make a gRPC request to read more events whenever the
buffer runs dry with up to :chunk_size
messages filling the buffer on each
request.
:chunk_size
is difficult to tune as it causes a tradeoff between (gRPC)
request duration and number of messages added to the buffer. A higher
:chunk_size
may hydrate the buffer with more events and reduce the number
of gRPC requests needed to read an entire stream, but it also increases
the number of messages that will be sent over the network per request which
could decrease reliability. Generally speaking, a lower :chunk_size
is
appropriate for streams in which the events are large and a higher
:chunk_size
is appropriate for streams with many small events. Manual
tuning and trial-and-error can be used to find a performant :chunk_size
setting for any individual environment.
Examples
iex> Spear.stream!(MyConnection, "es_supported_clients", chunk_size: 1) |> Enum.take(1)
[
%Spear.Event{
body: %{"languages" => ["typescript", "javascript"], "runtime" => "NodeJS"},
id: "1fc908c1-af32-4d06-a9bd-3bf86a833fdf",
metadata: %{..},
type: "grpc-client"
}
]
# say we have 5 events in the "es_supported_clients" stream
iex> Spear.stream!(MyConnection, "es_supported_clients", chunk_size: 3) |> Enum.count()
5
subscribe(conn, subscriber, stream_name, opts \\ [])
View Source (since 0.1.0)@spec subscribe( connection :: Spear.Connection.t(), subscriber :: pid() | GenServer.name(), stream_name :: String.t() | :all, opts :: Keyword.t() ) :: {:ok, subscription_reference :: reference()} | {:error, any()}
Subscribes a process to an EventStoreDB stream
Unlike read_stream/3
or stream!/3
, this function does not return an
enumerable. Instead the subscriber
process is signed up to receive messages
for subscription events. Events are emitted in order as info messages with
the signature
Spear.Event.t() | Spear.Filter.Checkpoint.t()
or if the raw?: true
option is provided,
Spear.Records.Streams.read_resp/0
records will be returned in the shape of
{subscription :: reference(), Spear.Records.Streams.read_resp()}
This function will block the caller until the subscription has been confirmed by the EventStoreDB.
When the subscription is terminated, the subscription process will receive a
message in the form of {:eos, subscription, reason}
. {:eos, subscription, :closed}
is emitted when the connection between EventStoreDB and subscriber
is severed and {:eos, subscription, :dropped}
is emitted when the
EventStoreDB explicitly drops a subscription. If this message is received,
the subscription is considered to be concluded and the subscription process
must re-subscribe from the last received event or checkpoint to resume
the subscription. subscription
is the reference returned by this function.
Events can be correlated to their subscription via the subscription
reference returned by this function. The subscription reference is included
in Spear.Event.metadata.subscription
,
Spear.Filter.Checkpoint.subscription
, and in the
{:eos, subscription, reason}
tuples as noted above.
Subscriptions can be gracefully shut down with Spear.cancel_subscription/3
.
The subscription will be cancelled by the connection process if the
subscriber process exits.
Options
:from
- (default::start
) the EventStoreDB stream revision from which to read. Valid values include:start
,:end
, any non-negative integer representing the event number revision in the stream and events. Event numbers are exclusive (e.g. reading from0
will first return the event numbered1
in the stream, if one exists).:start
and:end
are treated as inclusive (e.g.:start
will return the first event in the stream). Events and checkpoints (Spear.Event.t/0
, ReadResp records, orSpear.Filter.Checkpoint.t/0
) can also be supplied and will be treated as exclusive.:filter
- (default:nil
) the server-side filter to apply. This option is only valid if thestream_name
is:all
. SeeSpear.Filter
for more information.:resolve_links?
- (default:true
) whether or not to request that link references be resolved. See the moduledocs for more information about link resolution.:timeout
- (default:5_000
) the time to wait for the EventStoreDB to confirm the subscription request.:raw?
- (default:false
) controls whether the events are sent as rawReadResp
records or decoded intoSpear.Event.t/0
s:credentials
- (default:nil
) a two-tuple{username, password}
to use as credentials for the request. This option overrides any credentials set in the connection configuration, if present. See the Security guide for more details.
Examples
# say there are 3 events in the EventStoreDB stream "my_stream"
iex> {:ok, sub} = Spear.subscribe(conn, self(), "my_stream", from: 0)
{:ok, #Reference<0.1160763861.3015180291.51238>}
iex> flush
%Spear.Event{} # second event
%Spear.Event{} # third event
:ok
iex> Spear.cancel_subscription(conn, sub)
:ok
iex> {:ok, sub} = Spear.subscribe(conn, self(), :all, filter: Spear.Filter.exclude_system_events())
iex> flush
%Spear.Filter.Checkpoint{}
%Spear.Filter.Checkpoint{}
%Spear.Event{}
%Spear.Event{}
%Spear.Filter.Checkpoint{}
%Spear.Event{}
%Spear.Filter.Checkpoint{}
:ok
iex> GenServer.call(conn, :close)
{:ok, :closed}
iex> flush
{:eos, #Reference<0.1160763861.3015180291.51238>, :closed}
Users
change_user_password(conn, login_name, current_password, new_password, opts \\ [])
View Source@spec change_user_password( connection :: Spear.Connection.t(), login_name :: String.t(), current_password :: String.t(), new_password :: String.t(), opts :: Keyword.t() ) :: :ok | {:error, any()}
Changes a user's password by providing the current password
This can be accomplished regardless of the current credentials since the user's current password is provided.
Options
All options are passed to Spear.request/5
.
Examples
iex> Spear.create_user(conn, "Aladdin", "aladdin", "changeit", ["$ops"])
:ok
iex> Spear.change_user_password(conn, "aladdin", "changeit", "open sesame")
:ok
create_user(conn, full_name, login_name, password, groups, opts \\ [])
View Source (since 0.3.0)@spec create_user( connection :: Spear.Connection.t(), full_name :: String.t(), login_name :: String.t(), password :: String.t(), groups :: [String.t()], opts :: Keyword.t() ) :: :ok | {:error, any()}
Creates an EventStoreDB user
Options
All options are passed to Spear.request/5
.
Examples
iex> Spear.create_user(conn, "Aladdin", "aladdin", "open sesame", ["$ops"], credentials: {"admin", "changeit"})
:ok
@spec delete_user( connection :: Spear.Connection.t(), login_name :: String.t(), opts :: Keyword.t() ) :: :ok | {:error, any()}
Deletes a user from the EventStoreDB
EventStoreDB users are deleted by the login_name
parameter as passed
to Spear.create_user/6
.
Options
All options are passed to Spear.request/5
.
Examples
iex> Spear.create_user(conn, "Aladdin", "aladdin", "open sesame", ["$ops"], credentials: {"admin", "changeit"})
:ok
iex> Spear.delete_user(conn, "aladdin", credentials: {"admin", "changeit"})
:ok
@spec disable_user( connection :: Spear.Connection.t(), login_name :: String.t(), opts :: Keyword.t() ) :: :ok | {:error, any()}
Disables a user's ability to make requests against the EventStoreDB
This can be used in conjunction with Spear.enable_user/3
to temporarily
deny access to a user as an alternative to deleting and creating the user.
Enabling and disabling users does not require the password of the user:
just that requestor to be in the $admins
group.
Options
All options are passed to Spear.request/5
.
Examples
iex> Spear.enable_user(conn, "aladdin")
:ok
iex> Spear.disable_user(conn, "aladdin")
:ok
@spec enable_user( connection :: Spear.Connection.t(), login_name :: String.t(), opts :: Keyword.t() ) :: :ok | {:error, any()}
Enables a user to make requests against the EventStoreDB
Disabling and enabling users are an alternative to repeatedly creating and deleting users and is suitable for when a user needs to be temporarily denied access.
Options
All options are passed to Spear.request/5
.
Examples
iex> Spear.disable_user(conn, "aladdin")
:ok
iex> Spear.enable_user(conn, "aladdin")
:ok
reset_user_password(conn, login_name, new_password, opts \\ [])
View Source (since 0.3.0)@spec reset_user_password( connection :: Spear.Connection.t(), login_name :: String.t(), new_password :: String.t(), opts :: Keyword.t() ) :: :ok | {:error, any()}
Resets a user's password
This can be only requested by a user in the $admins
group. The current
password is not passed in this request, so this function is suitable for
setting a new password when the current password is lost.
Options
All options are passed to Spear.request/5
.
Examples
iex> Spear.create_user(conn, "Aladdin", "aladdin", "changeit", ["$ops"])
:ok
iex> Spear.reset_user_password(conn, "aladdin", "open sesame", credentials: {"admin", "changeit"})
:ok
update_user(conn, full_name, login_name, password, groups, opts \\ [])
View Source (since 0.3.0)@spec update_user( connection :: Spear.Connection.t(), full_name :: String.t(), login_name :: String.t(), password :: String.t(), groups :: [String.t()], opts :: Keyword.t() ) :: :ok | {:error, any()}
Updates an existing EventStoreDB user
Options
All options are passed to Spear.request/5
.
Examples
iex> Spear.create_user(conn, "Aladdin", "aladdin", "open sesame", ["$ops"], credentials: {"admin", "changeit"})
:ok
iex> Spear.update_user(conn, "Aladdin", "aladdin", "open sesame", ["$admins"], credentials: {"admin", "changeit"})
:ok
@spec user_details( connection :: Spear.Connection.t(), login_name :: String.t(), opts :: Keyword.t() ) :: :ok | {:error, any()}
Fetches details about an EventStoreDB user
Options
All options are passed to Spear.request/5
.
Examples
iex> Spear.create_user(conn, "Aladdin", "aladdin", "open sesame", ["$ops"])
:ok
iex> Spear.user_details(conn, "aladdin")
{:ok,
%Spear.User{
enabled?: true,
full_name: "Aladdin",
groups: ["$ops"],
last_updated: ~U[2021-04-18 16:48:38.583313Z],
login_name: "aladdin"
}}
Operations
@spec merge_indexes(connection :: Spear.Connection.t(), opts :: Keyword.t()) :: :ok | {:error, any()}
Requests that the indices be merged
See the EventStoreDB documentation for more information.
A user does not need to be in $ops
or any group to initiate this request.
Options
Options are passed to request/5
.
Examples
iex> Spear.merge_indexes(conn)
:ok
@spec resign_node(connection :: Spear.Connection.t(), opts :: Keyword.t()) :: :ok | {:error, any()}
Requests that the currently connected node resign its leadership role
See the EventStoreDB documentation for more information.
A user does not need to be in $ops
or any group to initiate this request.
Options
Options are passed to request/5
.
Examples
iex> Spear.resign_node(conn)
:ok
@spec restart_persistent_subscriptions( connection :: Spear.Connection.t(), opts :: Keyword.t() ) :: :ok | {:error, any()}
Restarts all persistent subscriptions
See the EventStoreDB documentation for more information.
A user does not need to be in $ops
or any group to initiate this request.
Options
Options are passed to request/5
.
Examples
iex> Spear.restart_persistent_subscriptions(conn)
:ok
@spec set_node_priority( connection :: Spear.Connection.t(), priority :: integer(), opts :: Keyword.t() ) :: :ok | {:error, any()}
Sets the node priority number
See the EventStoreDB documentation for more information.
A user does not need to be in $ops
or any group to initiate this request.
Options
Options are passed to request/5
.
Examples
iex> Spear.set_node_priority(conn, 1)
:ok
@spec shutdown(connection :: Spear.Connection.t(), opts :: Keyword.t()) :: :ok | {:error, any()}
Shuts down the connected EventStoreDB
The user performing the shutdown (either the connection credentials or
credentials passed by the :credentials
option) must at least be in the
$ops
group. $admins
permissions are a superset of $ops
.
Options
Options are passed to request/5
.
Examples
iex> Spear.shutdown(conn)
:ok
iex> Spear.ping(conn)
{:error, :closed}
iex> Spear.shutdown(conn, credentials: {"some_non_ops_user", "changeit"})
{:error,
%Spear.Grpc.Response{
data: "",
message: "Access Denied",
status: :permission_denied,
status_code: 7
}}
iex> Spear.ping(conn)
:pong
@spec start_scavenge(connection :: Spear.Connection.t(), opts :: Keyword.t()) :: {:ok, Spear.Scavenge.t()} | {:error, any()}
Requests that a scavenge be started
Scavenges are disk-space reclaiming operations run on the EventStoreDB server.
Options
:thread_count
- (default:1
) the number of threads to use for the scavenge process. Scavenging can be resource intensive. Setting this to a low thread count can lower the impact on the server's resources.:start_from_chunk
- (default:0
) the chunk number to start the scavenge from. Generally this is only useful if a prior scavenge has failed on a certain chunk.
Remaining options are passed to request/5
.
Examples
iex> Spear.start_scavenge(conn)
{:ok,
%Spear.Scavenge{id: "d2897ba8-2f0c-4fc4-bb25-798ba75f3562", result: :Started}}
@spec stop_scavenge( connection :: Spear.Connection.t(), scavenge_id :: String.t(), opts :: Keyword.t() ) :: {:ok, Spear.Scavenge.t()} | {:error, any()}
Stops a running scavenge
Options
All options are passed to request/5
.
Examples
iex> {:ok, scavenge} = Spear.start_scavenge(conn)
iex> Spear.stop_scavenge(conn, scavenge.id)
{:ok,
%Spear.Scavenge{id: "d2897ba8-2f0c-4fc4-bb25-798ba75f3562", result: :Stopped}}
Persistent Subscriptions
@spec ack( connection :: Spear.Connection.t(), subscription :: reference(), event_or_ids :: Spear.Event.t() | [String.t()] ) :: :ok
Acknowledges that an event received as part of a persistent subscription was successfully handled
Although ack/3
can accept a Spear.Event.t/0
alone, the underlying
gRPC call acknowledges a batch of event IDs.
Spear.ack(conn, subscription, events |> Enum.map(&Spear.Event.id/1))
should be preferred over
Enum.each(events, &Spear.ack(conn, subscription, Spear.Event.id(&1)))
As the acknowledgements will be batched.
This function (and nack/4
) are asynchronous casts to the connection
process.
Examples
# some stream with 3 events
stream_name = "my_stream"
group_name = "spear_iex"
settings = %Spear.PersistentSubscription.Settings{}
get_event_and_ack = fn conn, sub ->
receive do
%Spear.Event{} = event ->
:ok = Spear.ack(conn, sub, event)
event
after
3_000 -> :no_events
end
end
iex> Spear.create_persistent_subscription(conn, stream_name, group_name, settings)
:ok
iex> {:ok, sub} = Spear.connect_to_persistent_subscription(conn, self(), stream_name, group_name)
iex> get_event_and_ack.(conn, sub)
%Spear.Event{..}
iex> get_event_and_ack.(conn, sub)
%Spear.Event{..}
iex> get_event_and_ack.(conn, sub)
%Spear.Event{..}
iex> get_event_and_ack.(conn, sub)
:no_events
iex> Spear.cancel_subscription(conn, sub)
:ok
connect_to_persistent_subscription(conn, subscriber, stream_name, group_name, opts \\ [])
View Source (since 0.6.0)@spec connect_to_persistent_subscription( connection :: Spear.Connection.t(), subscriber :: pid() | GenServer.name(), stream_name :: String.t() | :all, group_name :: String.t(), opts :: Keyword.t() ) :: {:ok, subscription :: reference()} | {:error, any()}
Subscribes a process to an existing persistent subscription
Persistent subscriptions can be gracefully closed with
cancel_subscription/3
just like subscriptions started with subscribe/4
.
The subscription will be cancelled by the connection process if the
subscriber process exits.
Persistent subscriptions are an alternative to standard subscriptions
(via subscribe/4
) which use ack/3
and nack/4
to exert backpressure
and allow out-of-order and batch processing within a single consumer
and allow multiple connected consumers at once.
In standard subscriptions (via subscribe/4
), if a client wishes to
handle events in order without reprocessing, the client must keep track
of its own position in a stream, either in memory or using some sort of
persistence such as PostgreSQL or mnesia for durability.
In contrast, persistent subscriptions are stateful on the server-side: the EventStoreDB will keep track of which events have been positively and negatively acknowledged and will only emit events which have not yet been processed to any connected consumers.
This allows one to connect multiple subscriber processes to a persistent subscription stream-group combination in a strategy called Competing Consumers.
Note that persistent subscription events are not guaranteed to be processed
in order like the standard subscriptions because of the ability to nack/4
and reprocess or park a message. While this requires special considerations
when authoring a consumer, it allows one to easily write a consumer which
does not head-of-line block in failure cases.
The subscriber will receive a message {:eos, subscription, reason}
when the
subscription is closed by the server. :closed
denotes that the EventStoreDB
connection has been severed and :dropped
denotes that the EventStoreDB
has explicitly told the subscriber that the subscription is terminated.
This can occur for persistent subscriptions in the case where the
subscription is deleted (e.g. via Spear.delete_persistent_subscription/4
).
subscription
is the reference returned by this function.
iex> Spear.create_persistent_subscription(conn, "asdf", "asdf", %Spear.PersistentSubscription.Settings{})
:ok
iex> Spear.connect_to_persistent_subscription(conn, self(), "asdf", "asdf")
{:ok, #Reference<0.515780924.2297430020.166204>}
iex> flush
:ok
iex> Spear.delete_persistent_subscription(conn, "asdf", "asdf")
:ok
iex> flush
{:eos, #Reference<0.515780924.2297430020.166204>, :dropped}
:ok
Like subscriptions from subscribe/4
, events can be correlated to their
subscription by the :subscription
key in each Spear.Event.metadata
map.
Note that persistent subscriptions to the :all
stream with server-side
filtering is a feature introduced in EventStoreDB v21.6.0. Attempting
to use the :all
stream on older EventStoreDB versions will fail.
Backpressure
Persistent subscriptions allow the subscriber process to exert backpressure
on the EventStoreDB so that the message queue is not flooded. This is
implemented with a buffer of events which are considered by the EventStoreDB
to be in-flight when they are sent to the client. Events remain in-flight
until they are ack/3
-ed, nack/4
-ed, or until the :message_timeout
duration is exceeded. If a client ack/3
s a message, the EventStoreDB
will send a new message if any are available.
The in-flight buffer size is controllable per subscriber through
:buffer_size
. Note that :message_timeout
applies to each event: if
the :buffer_size
is 5 and five events arrive simultaneously, the client
has the duration :message_timeout
to acknowledge all five events before
they are considered stale and are automatically considered nack-ed by
the EventStoreDB.
The :buffer_size
should align with the consumer's ability to batch
process events.
Delivery guarantees
Persistent subscriptions provide at-least once delivery. Messages may be re-delivered under a few circumstances:
- Negatively acknowledging a message with
nack/4
with the:action
set to:retry
will queue the message for re-delivery. - A message may be handled successfully but not within the configured
:message_timeout
. If the server does not receive anack/3
within the timeout, the message may be re-delivered. - The acknowledgement from
ack/3
may be lost. This can happen in a few cases:- If the acknowledgement is sent and then a subscription is immediately
cancelled (either explicitly with
Spear.cancel_subscription/2
or if the subscription process terminates immediately after sending an ack), the EventStoreDB may discard the ack. This is because of a limitation in the protocol which conflates the closing of a subscription with the corruption of data being sent to the server. Subscription processes may wish to sleep between the last acknowledgement and exiting to reduce the chances of this happening. - An unreliable network may drop the packet containing the acknowledgement.
- If the acknowledgement is sent and then a subscription is immediately
cancelled (either explicitly with
Re-delivered messages may arrive out-of-order. Cases of repeated delivery or out-of-order delivery should be handled at the application level.
Also see the EventStoreDB Server docs on Persistent Subscriptions.
Options
:timeout
- (default:5_000
ms - 5s) the time to await a subscription confirmation from the EventStoreDB.:raw?
- (default:false
) controls whether events are translated from low-levelSpear.Records.Persistent.read_resp/0
records toSpear.Event.t/0
s. By defaultSpear.Event.t/0
s are sent to the subscriber.:credentials
- (default:nil
) the credentials to use to connect to the subscription. When not specified, the connection-level credentials are used. Credentials must be a two-tuple{username, password}
.:buffer_size
- (default:1
) the number of events allowed to be sent to the client at a time. These events are considered in-flight. See the backpressure section above for more information.
Examples
iex> Spear.connect_to_persistent_subscription(conn, self(), "my_stream", "my_group")
iex> flush
%Spear.Event{}
:ok
create_persistent_subscription(conn, stream_name, group_name, settings, opts \\ [])
View Source (since 0.6.0)@spec create_persistent_subscription( connection :: Spear.Connection.t(), stream_name :: String.t() | :all, group_name :: String.t(), settings :: Spear.PersistentSubscription.Settings.t(), opts :: Keyword.t() ) :: :ok | {:error, any()}
Creates a persistent subscription
See Spear.PersistentSubscription.Settings.t/0
for more information.
Note that persistent subscriptions to the :all
stream with server-side
filtering is a feature introduced in EventStoreDB v21.6.0. Attempting
to use the :all
stream on older EventStoreDB versions will fail.
Options
:from
- the position or revision in the stream where the persistent subscription should start. This option may be:start
or:end
describing the beginning or end of the stream. When thestream_name
is:all
, this parameter describes the prepare and commit positions in the:all
stream which can be found on any event emitted from a subscription to the:all
stream. When thestream_name
is not the:all
stream, this option may be an integer representing the event number in the stream from which the subscription should start. This may be found on anySpear.Event.t/0
. This option may be passed aSpear.Event.t/0
, from which either the revision or position will be determined based on the stream name. This option overwrites the:revision
field on theSpear.PersistentSubscription.Settings.t/0
type which is now deprecated.:filter
- a filter to apply while reading from the:all
stream. This option only applies when reading the:all
stream. The same data structure works for regular and persistent subscriptions to the:all
stream. See theSpear.Filter.t/0
documentation for more information.
Remaining options are passed to request/5
.
Examples
iex> Spear.create_persistent_subscription(conn, "my_stream", "my_group", %Spear.PersistentSubscription.Settings{})
:ok
iex> import Spear.Filter
iex> filter = ~f/My.Aggregate.A- My.Aggregate.B-/ps
iex> Spear.create_persistent_subscription(conn, :all, "my_all_group", %Spear.PersistentSubscription.Settings{}, filter: filter)
:ok
delete_persistent_subscription(conn, stream_name, group_name, opts \\ [])
View Source (since 0.6.0)@spec delete_persistent_subscription( connection :: Spear.Connection.t(), stream_name :: String.t(), group_name :: String.t(), opts :: Keyword.t() ) :: :ok | {:error, any()}
Deletes a persistent subscription from the EventStoreDB
Persistent subscriptions are considered unique by their stream and group names together: you may define separate persistent subscriptions for the same stream with multiple groups or use the same group name for persistent subscriptions to multiple streams. A combination of stream name and group name together is considered unique though.
Options
Options are passed to request/5
.
Examples
iex> Spear.delete_persistent_subscription(conn, "my_stream", "MyGroup")
:ok
get_persistent_subscription_info(conn, stream_name, group_name, opts \\ [])
View Source (since 1.2.0)@spec get_persistent_subscription_info( connection :: Spear.Connection.t(), stream_name :: String.t() | :all, group_name :: String.t(), opts :: Keyword.t() ) :: {:ok, Spear.PersistentSubcription.Info.t()} | {:error, any()}
Gets information pertaining to a persistent subcription.
Requires server version 22.6.0 or above.
opts
are passed to the underlying request.
Examples
iex> Spear.get_persistent_subscription_info(conn, "accounts", "subscription-group")
{:ok,
%Spear.PersistentSubscription.Info{
event_source: "accounts",
group_name: "subscription-group",
status: "Live",
average_per_second: 0,
total_items: 78,
count_since_last_measurement: 0,
last_checkpointed_event_position: "31",
last_known_event_position: "36",
start_from: "0",
message_timeout_milliseconds: 30000,
max_retry_count: 10,
live_buffer_size: 500,
buffer_size: 500,
read_batch_size: 20,
check_point_after_milliseconds: 2000,
min_check_point_count: 10,
max_check_point_count: 1000,
read_buffer_count: 0,
live_buffer_count: 36,
retry_buffer_count: 0,
total_in_flight_messages: 0,
outstanding_messages_count: 0,
named_consumer_strategy: :RoundRobin,
max_subscriber_count: 0,
parked_message_count: 1,
connections: [],
extra_statistics?: false,
resolve_link_tos?: false
}}
@spec list_persistent_subscriptions( connection :: Spear.Connection.t(), opts :: Keyword.t() ) :: {:ok, Enumerable.t()} | {:error, any()}
Lists the currently existing persistent subscriptions
Results are returned in an Enumerable.t/0
of
Spear.PersistentSubscription.t/0
.
Note that the :extra_statistics?
field of settings is not determined by
this function: :extra_statistics?
will always be returned as nil
in this
function.
This function works by reading the built-in $persistentSubscriptionConfig
stream. This stream can be read normally to obtain additional information
such as at timestamp for the last time the persistent subscription config was
updated.
Options
Options are passed to read_stream/3
. :direction
, :from
, and
:max_count
are fixed and cannot be overridden.
Examples
iex> Spear.create_persistent_subscription(conn, "my_stream", "my_group", %Spear.PersistentSubscription.Settings{})
:ok
iex> {:ok, subscriptions} = Spear.list_persistent_subscriptions(conn)
iex> subscriptions |> Enum.to_list()
[
%Spear.PersistentSubscription{
group_name: "my_group",
settings: %Spear.PersistentSubscription.Settings{
checkpoint_after: 3000,
extra_statistics?: nil,
history_buffer_size: 300,
live_buffer_size: 100,
max_checkpoint_count: 100,
max_retry_count: 10,
max_subscriber_count: 1,
message_timeout: 5000,
min_checkpoint_count: 1,
named_consumer_strategy: :RoundRobin,
read_batch_size: 100,
resolve_links?: true,
revision: 0
},
stream_name: "my_stream"
}
]
@spec nack( connection :: Spear.Connection.t(), subscription :: reference(), event_or_ids :: Spear.Event.t() | [String.t()], opts :: Keyword.t() ) :: :ok
Negatively acknowldeges a persistent subscription event
Nacking is the opposite of ack/3
ing: it tells the EventStoreDB that the
event should not be considered processed.
Options
:action
- (default::retry
) controls the action the EventStoreDB should take about the event. SeeSpear.PersistentSubscription.nack_action/0
for a full description.:reason
- (default:""
) a description of why the event is being nacked
Examples
# some stream with 3 events
stream_name = "my_stream"
group_name = "spear_iex"
settings = %Spear.PersistentSubscription.Settings{}
get_event_and_ack = fn conn, sub, action ->
receive do
%Spear.Event{} = event ->
:ok = Spear.nack(conn, sub, event, action: action)
event
after
3_000 -> :no_events
end
end
iex> Spear.create_persistent_subscription(conn, stream_name, group_name, settings)
:ok
iex> {:ok, sub} = Spear.connect_to_persistent_subscription(conn, self(), stream_name, group_name)
iex> get_event_and_nack.(conn, sub, :retry)
%Spear.Event{..} # event 0
iex> get_event_and_nack.(conn, sub, :retry)
%Spear.Event{..} # event 0
iex> get_event_and_nack.(conn, sub, :park) # park event 0 and move on
%Spear.Event{..} # event 0
iex> get_event_and_nack.(conn, sub, :skip) # skip event 1
%Spear.Event{..} # event 1
iex> get_event_and_nack.(conn, sub, :skip) # skip event 2
%Spear.Event{..} # event 2
iex> get_event_and_nack.(conn, sub, :skip)
:no_events
iex> Spear.cancel_subscription(conn, sub)
:ok
replay_parked_messages(conn, stream_name, group_name, opts \\ [])
View Source (since 1.2.0)@spec replay_parked_messages( connection :: Spear.Connection.t(), stream_name :: String.t() | :all, group_name :: String.t(), opts :: Keyword.t() ) :: :ok | {:error, any()}
Requests that the server replays any messages parked for a persistent subscription stream and group.
Options
:stop_at
- the number of messages to request be replayed. If not specified, the number of messages is not limited.
Remaining options are passed to Spear.request/5
.
Examples
iex> Spear.create_persistent_subscription(conn, "my_stream", "my_group", %Spear.PersistentSubscription.Settings{})
# ... nack some events with the `:park` action ...
iex> Spear.replay_parked(conn, "my_stream", "my_group")
:ok
# ... parked messages are re-delivered ...
restart_persistent_subscription_subsystem(conn, opts \\ [])
View Source (since 1.2.0)@spec restart_persistent_subscription_subsystem( conn :: Spear.Connection.t(), opts :: Keyword.t() ) :: :ok | {:error, any()}
Restarts the persistent subscription subsystem on the EventStoreDB server.
update_persistent_subscription(conn, stream_name, group_name, settings, opts \\ [])
View Source (since 0.6.0)@spec update_persistent_subscription( connection :: Spear.Connection.t(), stream_name :: String.t(), group_name :: String.t(), settings :: Spear.PersistentSubscription.Settings.t(), opts :: Keyword.t() ) :: :ok | {:error, any()}
Updates an existing persistent subscription
See Spear.PersistentSubscription.Settings.t/0
for more information.
Note that persistent subscriptions to the :all
stream with server-side
filtering is a feature introduced in EventStoreDB v21.6.0. Attempting
to use the :all
stream on older EventStoreDB versions will fail.
Options
Options are passed to request/5
.
Examples
iex> Spear.update_persistent_subscription(conn, "my_stream", "my_group", %Spear.PersistentSubscription.Settings{})
:ok
Gossip
@spec cluster_info(connection :: Spear.Connection.t(), opts :: Keyword.t()) :: {:ok, [Spear.ClusterMember.t()]} | {:error, any()}
Reads the cluster information from the connected EventStoreDB
Returns a list of members which are clustered to the currently connected EventStoreDB.
Options
Options are passed to request/5
.
Examples
iex> Spear.cluster_info(conn)
{:ok,
[
%Spear.ClusterMember{
address: "127.0.0.1",
alive?: true,
instance_id: "eba4c27f-e443-4b21-8756-00845bc5cda1",
port: 2113,
state: :Leader,
timestamp: ~U[2021-04-19 17:25:17.875824Z]
}
]}
Monitoring
@spec subscribe_to_stats( connection :: Spear.Connection.t(), subscriber :: pid() | GenServer.name(), opts :: Keyword.t() ) :: {:ok, reference()} | {:error, any()}
Subscribes a process to stats updates from the EventStoreDB
This function subscribes a process in the same way as subscribe/4
: the
function will return a reference representing the subscription and the
stats messages will be sent to the subscriber process with send/2
.
This subscription can be cancelled with cancel_subscription/3
.
This functionality was added to EventStoreDB in release v21.6.0. Prior EventStoreDB versions will throw a GRPC error when attempting to use this function.
Options
:interval
- (default:5_000
- 5 seconds) the interval after which a new stats message should be sent. By default, stats messages arrive every five seconds.:use_metadata?
- (default:true
) undocumented option. See the EventStoreDB implementation for more information.:timeout
- (default:5_000
- 5 seconds) the GenServer timeout to use when requesting a subscription to stats:raw?
- (default:false
) whether to emit the stats messages as 'raw'Spear.Records.Monitoring.stats_resp/0
records in a tuple of{subscription :: reference(), Spear.Records.Monitoring.stats_resp()}
. By default, stats messages are returned as maps.:credentials
- (default:nil
) credentials to use to perform this subscription request.
Examples
iex> Spear.subscribe_to_stats(conn, self())
{:ok, #Reference<0.359109646.3547594759.216222>}
iex> flush()
%{
"es-queue-Projection Core #2-length" => "0",
"es-queue-Worker #1-lengthLifetimePeak" => "0",
"es-queue-Worker #3-lengthCurrentTryPeak" => "0",
"es-queue-StorageReaderQueue #9-avgProcessingTime" => "0",
"es-queue-StorageReaderQueue #6-idleTimePercent" => "100",
..
}
Server Features
@spec get_server_version(connection :: Spear.Connection.t(), opts :: Keyword.t()) :: {:ok, [String.t()]} | {:error, any()}
Determines the current version of the connected server
This function is compatible with server version v21.10.0 and later.
Options
Options are passed to request/5
.
Examples
iex> Spear.get_server_version(conn)
{:ok, "21.10.0"}
@spec get_supported_rpcs(connection :: Spear.Connection.t(), opts :: Keyword.t()) :: {:ok, [Spear.SupportedRpc.t()]} | {:error, any()}
Requests the available server RPCs
This function is compatible with server version v21.10.0 and later.
Options
Options are passed to request/5
.
Examples
iex> Spear.get_supported_rpcs(conn)
{:ok,
[
%Spear.SupportedRpc{
features: ["stream", "all"],
rpc: "create",
service: "event_store.client.persistent_subscriptions.persistentsubscriptions"
},
%Spear.SupportedRpc{
features: ["stream", "all"],
rpc: "update",
service: "event_store.client.persistent_subscriptions.persistentsubscriptions"
},
..
]}