Gnat.Jetstream.Pager (gnat v1.13.0)

View Source

Page through all the messages in a stream

This module provides a synchronous API to inspect the messages in a stream. You can use the reduce module to write a simple function that works like Enum.reduce across each message individually. If you want to handle messages in batches, you can use the init + page functions to accomplish that.

Summary

Types

Options you can pass to the pager

Functions

Similar to Enum.reduce but you can iterate through all messages in a stream

Types

message()

@type message() :: Gnat.message()

opt()

@type opt() ::
  {:batch, non_neg_integer()}
  | {:domain, String.t()}
  | {:from_datetime, DateTime.t()}
  | {:from_seq, non_neg_integer()}
  | {:headers_only, boolean()}

Options you can pass to the pager

  • batch controls the maximum number of messages we'll pull in each page/batch (default 10)
  • domain You can specify a jetstream domain if needed
  • from_datetime Only page through messages recorded on or after this datetime
  • from_seq Only page through messages with a sequence number equal or above this option
  • headers_only You can pass true to this if you only want to see the headers from each message. Can be useful to get metadata without having to receieve large body payloads.

opts()

@type opts() :: [opt()]

pager()

@opaque pager()

Functions

cleanup(state)

init(conn, stream_name, opts)

@spec init(Gnat.t(), String.t(), opts()) :: {:ok, pager()} | {:error, term()}

page(state)

@spec page(pager()) :: {:page, [message()]} | {:done, [message()]} | {:error, term()}

reduce(conn, stream_name, opts, initial_state, fun)

@spec reduce(
  Gnat.t(),
  String.t(),
  opts(),
  Enum.acc(),
  (Gnat.message(), Enum.acc() -> Enum.acc())
) :: {:ok, Enum.acc()} | {:error, term()}

Similar to Enum.reduce but you can iterate through all messages in a stream

# Assume we have a stream with messages like "1", "2", ... "10"
Gnat.Jetstream.Pager.reduce(:gnat, "NUMBERS_STREAM", [batch_size: 5], 0, fn(message, total) ->
  num = String.to_integer(message.body)
  total + num
end)

# => {:ok, 55}