Gnat.Jetstream.Pager (gnat v1.13.0)
View SourcePage through all the messages in a stream
This module provides a synchronous API to inspect the messages in a stream.
You can use the reduce module to write a simple function that works like Enum.reduce across each message individually.
If you want to handle messages in batches, you can use the init + page functions to accomplish that.
Summary
Functions
Similar to Enum.reduce but you can iterate through all messages in a stream
Types
@type message() :: Gnat.message()
@type opt() :: {:batch, non_neg_integer()} | {:domain, String.t()} | {:from_datetime, DateTime.t()} | {:from_seq, non_neg_integer()} | {:headers_only, boolean()}
Options you can pass to the pager
batchcontrols the maximum number of messages we'll pull in each page/batch (default 10)domainYou can specify a jetstream domain if neededfrom_datetimeOnly page through messages recorded on or after this datetimefrom_seqOnly page through messages with a sequence number equal or above this optionheaders_onlyYou can passtrueto this if you only want to see the headers from each message. Can be useful to get metadata without having to receieve large body payloads.
@type opts() :: [opt()]
@opaque pager()
Functions
@spec reduce( Gnat.t(), String.t(), opts(), Enum.acc(), (Gnat.message(), Enum.acc() -> Enum.acc()) ) :: {:ok, Enum.acc()} | {:error, term()}
Similar to Enum.reduce but you can iterate through all messages in a stream
# Assume we have a stream with messages like "1", "2", ... "10"
Gnat.Jetstream.Pager.reduce(:gnat, "NUMBERS_STREAM", [batch_size: 5], 0, fn(message, total) ->
num = String.to_integer(message.body)
total + num
end)
# => {:ok, 55}