Gnat.Jetstream.API.KV.Entry (gnat v1.15.0)
View SourceA parsed view of a single message from a Key/Value bucket's underlying stream.
Messages delivered from a KV bucket's stream encode three different operations
(put, delete, purge) using a combination of the kv-operation header, the
nats-marker-reason header, and the absence of any headers. Recovering the
original key also requires stripping the $KV.<bucket>. subject prefix.
This module captures that convention in one place so both the built-in
Gnat.Jetstream.API.KV.Watcher (push consumer) and user-supplied
Gnat.Jetstream.PullConsumer implementations can share it.
Using with a custom PullConsumer
A common use case is hydrating a local cache from a KV bucket by driving a
Gnat.Jetstream.PullConsumer. Inside c:handle_message/2, convert the raw
message into an Entry and branch on the operation:
defmodule MyApp.KVCache do
use Gnat.Jetstream.PullConsumer
alias Gnat.Jetstream.API.KV
@bucket "my_bucket"
@impl true
def handle_message(message, state) do
case KV.Entry.from_message(message, @bucket) do
{:ok, %KV.Entry{operation: :put, key: key, value: value}} ->
{:ack, put_in(state.cache[key], value)}
{:ok, %KV.Entry{operation: op, key: key}} when op in [:delete, :purge] ->
{:ack, update_in(state.cache, &Map.delete(&1, key))}
:ignore ->
{:ack, state}
end
end
endThe returned struct also carries the JetStream revision (stream sequence),
created timestamp, and delta (num_pending) when the message includes
JetStream metadata, which is useful for detecting when the consumer has
caught up with the tail of the stream (delta == 0).
Messages that are not KV records
from_message/2 returns :ignore when the input is not a KV record — for
example a JetStream status message (100 heartbeat, 404/408 pull
terminator, 409 leadership change) or a message whose subject does not
belong to the given bucket. In normal operation the Watcher and
PullConsumer layers filter status messages out before they reach user
code, so this is a defensive fallback rather than something consumers are
expected to rely on.
Summary
Functions
Parse a NATS message delivered from a KV bucket's underlying stream into an
Entry.
Types
@type operation() :: :put | :delete | :purge
@type t() :: %Gnat.Jetstream.API.KV.Entry{ bucket: String.t(), created: DateTime.t() | nil, delta: non_neg_integer() | nil, key: String.t(), operation: operation(), revision: non_neg_integer() | nil, value: binary() }
Functions
@spec from_message(Gnat.message(), bucket_name :: String.t()) :: {:ok, t()} | :ignore
Parse a NATS message delivered from a KV bucket's underlying stream into an
Entry.
bucket_name must match the bucket the message was published to; it is used
to strip the $KV.<bucket>. subject prefix and recover the key.
Returns :ignore if the message is not a KV record for the given bucket
(JetStream status message, wrong subject, etc.).
The :revision, :created, and :delta fields are populated when the
message carries a JetStream $JS.ACK... reply subject. For messages without
one (e.g. direct get responses), those fields are nil.