View Source Writing Events

This guide covers specifics about the Spear.append/4 and Spear.append_batch/5 functions and general information about the event-writing functionality.

Enumeration Characteristics

event_stream is an Enumerable.t/0 which will be lazily written to the EventStoreDB as elements of the stream are computed and serialized on to the wire.

This means a few things:

First, you can efficiently emit events from a stream over a large source such as a CSV file with many lines:

File.stream!("large.csv", read_ahead: 100_000)
|> MyCsvParser.parse_stream()
|> Stream.map(&MyCsvParser.turn_csv_line_into_spear_event/1)
|> Spear.append(conn, "ChargesFromCsvs", timeout: :infinity)
# => :ok

The stream is only fully run after the last bytes have been written to the gRPC network request: the stream is never computed entirely in memory.

Second, you may (but are not encouraged to) write events via an infinite stream. A trivial counter mechanism could be implemented like so

iex> Stream.iterate(0, &(&1 + 1))
...> |> Stream.map(fn n -> Spear.Event.new("incremented", n) end)
...> |> Spear.append(conn, "InfiniteCounter", timeout: :infinity, expect: :empty)
{:error,
 %Spear.Grpc.Response{
   data: "",
   message: "Maximum Append Size of 1048576 Exceeded.",
   status: :invalid_argument,
   status_code: 3
 }}

Note that while EventStoreDB streams can in theory store infinitely long streams, they are not practically able to do so. EventStoreDB limits the size of a single write to 1_048_576 cumulative bytes. This budget can be spent on one very large event or, as shown above, many tiny events in a single call to Spear.append/4. Attempting to write more than the budget will fail the request with the above signature and no events in the request will be written to the EventStoreDB. This value is configurable in the EventStoreDB server configuration.

Blocking

While Spear.append/4 blocks the caller for the duration of the request, it does not fully block the connection. The connection will write chunks of data over the wire as allowed by HTTP2 window sizes.

HTTP2 includes a back-pressure mechanism for clients sending large amounts of data to the server faster than the server can handle. Servers negotiate a maximum number of bytes which the client is allowed to send called a window. When the window has been exhausted by streaming data to the server, the client must wait until the server replenishes the window. During the downtime between exhausting a window and waiting for the server to replenish, Spear suspends the exhausted request stream and handles incoming messages from the server as normal. Since HTTP2 window sizes are relatively small (usually somewhere around the range of 10 and 100 KB), Spear takes conceptual breaks somewhat often during large requests. This allows Spear to efficiently multiplex large writes with large reads and subscriptions.

Increasing append throughput with Spear.append_batch/5

EventStoreDB v21.6.0+'s BatchAppend feature is designed to improve performance of appending large numbers of events. The protobuf and gRPC service definitions give BatchAppend an advantage over the standard Append RPC both in terms of encoded message size (and therefore bytes over the network) and number of HTTP/2 requests.

Spear.append/4 is a simpler interface for appends and is a reasonable default. Spear.append_batch/5 is more cumbersome to use but is also more powerful. In general, Spear.append_batch/5 should be preferred when writing large numbers of events and when append throughput is critical.

Example: use Streams and Tasks with Spear.append_batch/5

Say we have an Enumerable.t/0 of event batches where each element is a tuple of the stream name to append to and a batch of events to append: {stream_name, [%Spear.Event{}, ...]}. We can use Task.async_stream/3 to parallelize appends like so:

alias Spear.BatchAppendResult, as: Result

[{stream_name, event_batch}] = Enum.take(event_batch_stream, 1)

{:ok, batch_id, request_id} =
  Spear.append_batch(event_batch, conn, :new, stream_name)

receive(do: (%Result{batch_id: ^batch_id, request_id: ^request_id, result: :ok} -> :ok))

We start by opening up a new Spear.append_batch/5 request using the :new atom as the request_id and the first element of the stream as input.

event_batch_stream
|> Stream.drop(1)
|> Task.async_stream(fn {stream_name, event_batch} ->
  {:ok, batch_id} =
    Spear.append_batch(event_batch, conn, request_id, stream_name)

  receive do
    %Result{batch_id: ^batch_id, request_id: ^request_id, result: :ok} -> :ok
  end
end)
|> Stream.run()

Now we drop the first element of our stream (which we appended to open the request) and pass the rest of the stream through Task.async_stream/2. Task.async_stream/2 will apply the anonymous function to each element of the stream, so we append a batch for each {stream_name, event_batch} element and await the result. Task.async_stream/2 will spawn processes for each element, keeping at most System.schedulers_online/0 processes running at a time.

Finally we'll close the request using Spear.cancel_subscription/2

:ok = Spear.cancel_subscription(conn, request_id)