Gnat.Jetstream.API.KV.Entry (gnat v1.15.0)

View Source

A parsed view of a single message from a Key/Value bucket's underlying stream.

Messages delivered from a KV bucket's stream encode three different operations (put, delete, purge) using a combination of the kv-operation header, the nats-marker-reason header, and the absence of any headers. Recovering the original key also requires stripping the $KV.<bucket>. subject prefix.

This module captures that convention in one place so both the built-in Gnat.Jetstream.API.KV.Watcher (push consumer) and user-supplied Gnat.Jetstream.PullConsumer implementations can share it.

Using with a custom PullConsumer

A common use case is hydrating a local cache from a KV bucket by driving a Gnat.Jetstream.PullConsumer. Inside c:handle_message/2, convert the raw message into an Entry and branch on the operation:

defmodule MyApp.KVCache do
  use Gnat.Jetstream.PullConsumer

  alias Gnat.Jetstream.API.KV

  @bucket "my_bucket"

  @impl true
  def handle_message(message, state) do
    case KV.Entry.from_message(message, @bucket) do
      {:ok, %KV.Entry{operation: :put, key: key, value: value}} ->
        {:ack, put_in(state.cache[key], value)}

      {:ok, %KV.Entry{operation: op, key: key}} when op in [:delete, :purge] ->
        {:ack, update_in(state.cache, &Map.delete(&1, key))}

      :ignore ->
        {:ack, state}
    end
  end
end

The returned struct also carries the JetStream revision (stream sequence), created timestamp, and delta (num_pending) when the message includes JetStream metadata, which is useful for detecting when the consumer has caught up with the tail of the stream (delta == 0).

Messages that are not KV records

from_message/2 returns :ignore when the input is not a KV record — for example a JetStream status message (100 heartbeat, 404/408 pull terminator, 409 leadership change) or a message whose subject does not belong to the given bucket. In normal operation the Watcher and PullConsumer layers filter status messages out before they reach user code, so this is a defensive fallback rather than something consumers are expected to rely on.

Summary

Functions

Parse a NATS message delivered from a KV bucket's underlying stream into an Entry.

Types

operation()

@type operation() :: :put | :delete | :purge

t()

@type t() :: %Gnat.Jetstream.API.KV.Entry{
  bucket: String.t(),
  created: DateTime.t() | nil,
  delta: non_neg_integer() | nil,
  key: String.t(),
  operation: operation(),
  revision: non_neg_integer() | nil,
  value: binary()
}

Functions

from_message(message, bucket_name)

@spec from_message(Gnat.message(), bucket_name :: String.t()) :: {:ok, t()} | :ignore

Parse a NATS message delivered from a KV bucket's underlying stream into an Entry.

bucket_name must match the bucket the message was published to; it is used to strip the $KV.<bucket>. subject prefix and recover the key.

Returns :ignore if the message is not a KV record for the given bucket (JetStream status message, wrong subject, etc.).

The :revision, :created, and :delta fields are populated when the message carries a JetStream $JS.ACK... reply subject. For messages without one (e.g. direct get responses), those fields are nil.