Gnat.Jetstream.PullConsumer behaviour (gnat v1.15.0)

View Source

A behaviour which provides the NATS JetStream Pull Consumer functionalities.

When a Consumer is pull-based, it means that the messages will be delivered when the server is asked for them.

Example

Declare a module which uses Gnat.Jetstream.PullConsumer and implements init/1 and handle_message/2 callbacks.

defmodule MyApp.PullConsumer do
  use Gnat.Jetstream.PullConsumer

  def start_link(arg) do
    Jetstream.PullConsumer.start_link(__MODULE__, arg)
  end

  @impl true
  def init(_arg) do
    {:ok, nil,
      connection_name: :gnat,
      stream_name: "TEST_STREAM",
      consumer_name: "TEST_CONSUMER"}
  end

  @impl true
  def handle_message(message, state) do
    # Do some processing with the message.
    {:ack, state}
  end
end

You can then place your Pull Consumer in a supervision tree. Remember that you need to have the Gnat.ConnectionSupervisor set up.

defmodule MyApp.Application do
  use Application

  @impl true
  def start(_type, _args) do
    children = [
      # Create NATS connection
      {Gnat.ConnectionSupervisor, ...},
      # Start NATS Jetstream Pull Consumer
      MyApp.PullConsumer,
    ]
    opts = [strategy: :one_for_one]
    Supervisor.start_link(children, opts)
  end
end

Connection Options

In order to establish consumer connection with NATS, you need to pass several connection options via keyword list in third element of a tuple returned from init/1 callback.

Following options must be provided. Omitting this options will cause the process to raise errors upon initialization:

For durable consumers, provide:

  • :stream_name - name of an existing stream the consumer will consume messages from.
  • :consumer_name - name of an existing consumer pointing at the stream.

For ephemeral consumers, provide:

  • :consumer - a Gnat.Jetstream.API.Consumer struct for creating an ephemeral consumer. The consumer struct must have durable_name: nil OR inactive_threshold set to ensure that the server will clean it up. The stream_name field must also be set.

You can also pass the optional ones:

Time-unit gotcha

Server-side JetStream options (:request_expires, :idle_heartbeat) take nanoseconds because that's what the JetStream wire protocol uses. Client-side options (:connection_retry_timeout, :heartbeat_check_interval) take milliseconds. The unit is called out per option below — double-check before tuning.

  • :connection_retry_timeout - a duration in milliseconds after which the PullConsumer which failed to establish NATS connection retries, defaults to 1000
  • :connection_retries - a number of attempts the PullConsumer will make to establish the NATS connection. When this value is exceeded, the pull consumer stops with the :timeout reason, defaults to 10
  • :inbox_prefix - allows the default _INBOX. prefix to be customized. Should end with a dot.
  • :domain - use a JetStream domain, this is mostly used on leaf nodes.
  • :batch_size - when set to a value greater than 1, enables batch mode. Messages are buffered and delivered to handle_message/2 in batches. Only the last message per batch is acknowledged, so the underlying consumer should use ack_policy: :all for correctness. This dramatically improves throughput for consumers that need to catch up on large backlogs. In batch mode, :nack and :term returns from handle_message/2 are treated as :ack since ack_policy: :all cannot selectively reject messages. Defaults to 1 (single-message mode).
  • :request_expires - duration in nanoseconds that a long-poll pull request will linger on the server before the server replies with a 408 terminator and the consumer issues a fresh pull. Defaults to 5_000_000_000 (5 seconds).
  • :idle_heartbeat - duration in nanoseconds at which the server is asked to emit 100-status idle heartbeat messages while a long-poll pull request is outstanding but no real messages are available. The PullConsumer also runs a local watchdog: if no traffic at all (data, status, or heartbeat) is observed within 2 * idle_heartbeat the consumer assumes the pull request was lost (e.g. dropped during a JetStream leadership change without killing the TCP connection) and forces a reconnect. Must be at most :request_expires / 2 — the server rejects pull requests that violate this. Defaults to half of :request_expires (2.5 seconds with default settings, watchdog fires at 5s).
  • :heartbeat_check_interval - cadence in milliseconds at which the local watchdog checks for missed heartbeats. Independent of (and finer-grained than) the missed-heartbeat threshold itself. Defaults to 1_000 (1 second).

Telemetry

The PullConsumer emits the following telemetry events:

  • [:gnat, :jetstream, :pull_consumer, :heartbeat_expired] — emitted when the local heartbeat watchdog observes that no inbound message has arrived within 2 * idle_heartbeat and the consumer is about to force a reconnect. Measurements: %{gap_ms, threshold_ms}. Metadata: %{module, stream_name, consumer_name, connection_name}.

Dynamic Connection Options

It is possible that you have to determine some of the options dynamically depending on pull consumer's init argument. To do so, it is recommended to derive these options values from some init argument:

defmodule MyApp.PullConsumer do
  use Gnat.Jetstream.PullConsumer

  def start_link() do
    Gnat.Jetstream.PullConsumer.start_link(__MODULE__, %{counter: counter})
  end

  @impl true
  def init(%{counter: counter}) do
    {:ok, nil,
      connection_name: :gnat,
      stream_name: "TEST_STREAM_#{counter}",
      consumer_name: "TEST_CONSUMER_#{counter}"}
  end

  ...
end

Ephemeral Consumer Example

You can create ephemeral consumers by providing a :consumer struct with durable_name: nil. These are automatically created and cleaned up with the connection lifecycle:

defmodule MyApp.EphemeralPullConsumer do
  use Gnat.Jetstream.PullConsumer

  def start_link(arg) do
    Jetstream.PullConsumer.start_link(__MODULE__, arg)
  end

  @impl true
  def init(_arg) do
    consumer = %Gnat.Jetstream.API.Consumer{
      stream_name: "TEST_STREAM",
      durable_name: nil,  # Must be nil for ephemeral consumers
      filter_subject: "orders.*"
    }

    {:ok, nil,
      connection_name: :gnat,
      consumer: consumer}
  end

  @impl true
  def handle_message(message, state) do
    # Do some processing with the message.
    {:ack, state}
  end
end

Auto-Cleanup Durable Consumer Example

For scenarios where you want persistence across reconnections but automatic cleanup (e.g., Kubernetes pods), you can create durable consumers with inactive_threshold:

defmodule MyApp.AutoCleanupPullConsumer do
  use Gnat.Jetstream.PullConsumer

  def start_link(pod_name) do
    Jetstream.PullConsumer.start_link(__MODULE__, pod_name)
  end

  @impl true
  def init(pod_name) do
    consumer = %Gnat.Jetstream.API.Consumer{
      stream_name: "ORDERS_STREAM",
      durable_name: "orders-consumer-",  # Named after pod
      inactive_threshold: 300_000_000_000,  # 5 minutes in nanoseconds
      filter_subject: "orders.*"
    }

    {:ok, nil,
      connection_name: :gnat,
      consumer: consumer}
  end

  @impl true
  def handle_message(message, state) do
    # Process order message with state persistence
    {:ack, state}
  end
end

How to supervise

A PullConsumer is most commonly started under a supervision tree. When we invoke use Gnat.Jetstream.PullConsumer, it automatically defines a child_spec/1 function that allows us to start the pull consumer directly under a supervisor. To start a pull consumer under a supervisor with an initial argument of :example, one may do:

children = [
  {MyPullConsumer, :example}
]
Supervisor.start_link(children, strategy: :one_for_all)

While one could also simply pass the MyPullConsumer as a child to the supervisor, such as:

children = [
  MyPullConsumer # Same as {MyPullConsumer, []}
]
Supervisor.start_link(children, strategy: :one_for_all)

A common approach is to use a keyword list, which allows setting init argument and server options, for example:

def start_link(opts) do
  {initial_state, opts} = Keyword.pop(opts, :initial_state, nil)
  Gnat.Jetstream.PullConsumer.start_link(__MODULE__, initial_state, opts)
end

and then you can use MyPullConsumer, {MyPullConsumer, name: :my_consumer} or even {MyPullConsumer, initial_state: :example, name: :my_consumer} as a child specification.

use Gnat.Jetstream.PullConsumer also accepts a list of options which configures the child specification and therefore how it runs under a supervisor. The generated child_spec/1 can be customized with the following options:

  • :id - the child specification identifier, defaults to the current module
  • :restart - when the child should be restarted, defaults to :permanent
  • :shutdown - how to shut down the child, either immediately or by giving it time to shut down

For example:

use Gnat.Jetstream.PullConsumer, restart: :transient, shutdown: 10_000

See the "Child specification" section in the Supervisor module for more detailed information. The @doc annotation immediately preceding use Jetstream.PullConsumer will be attached to the generated child_spec/1 function.

Name registration

A pull consumer is bound to the same name registration rules as GenServers. Read more about it in the GenServer documentation.

Summary

Types

Connection option values used to connect the consumer to NATS server.

Connection options used to connect the consumer to NATS server.

The pull consumer reference.

Callbacks

Invoked after the consumer has been created or verified on the NATS server.

Invoked to synchronously process a message pulled by the consumer. Depending on the value it returns, the acknowledgement is or is not sent.

Invoked when the consumer receives an informational JetStream status message instead of a real stream message.

Invoked when the server is started. start_link/3 or start/3 will block until it returns.

Functions

Closes the pull consumer and stops underlying process.

Starts a Jetstream.PullConsumer process without links (outside of a supervision tree).

Starts a pull consumer linked to the current process with the given function.

Types

connection_option()

@type connection_option() ::
  {:connection_name, GenServer.server()}
  | {:stream_name, String.t()}
  | {:consumer_name, String.t()}
  | {:consumer, Gnat.Jetstream.API.Consumer.t()}
  | {:connection_retry_timeout, non_neg_integer()}
  | {:connection_retries, non_neg_integer()}
  | {:domain, String.t()}
  | {:batch_size, pos_integer()}
  | {:request_expires, non_neg_integer()}
  | {:idle_heartbeat, non_neg_integer()}
  | {:heartbeat_check_interval, non_neg_integer()}

Connection option values used to connect the consumer to NATS server.

connection_options()

@type connection_options() :: [connection_option()]

Connection options used to connect the consumer to NATS server.

consumer()

@type consumer() :: GenServer.server()

The pull consumer reference.

Callbacks

handle_connected(consumer_info, state)

(optional)
@callback handle_connected(
  consumer_info :: Gnat.Jetstream.API.Consumer.info(),
  state :: term()
) :: {:ok, new_state :: term()}

Invoked after the consumer has been created or verified on the NATS server.

This callback is called during connection (and reconnection) after the JetStream consumer has been successfully created or confirmed to exist. It receives the full consumer info map returned by the server, which includes fields like num_pending (the number of messages waiting to be delivered).

This is useful for detecting the initial state of the consumer. For example, if num_pending is 0, you know there are no existing messages to replay and can mark the consumer as caught up immediately.

Returning {:ok, state} allows you to update the consumer's state based on the consumer info.

This callback is optional. If not implemented, the state is passed through unchanged.

Example

@impl true
def handle_connected(consumer_info, state) do
  if consumer_info.num_pending == 0 do
    {:ok, mark_as_loaded(state)}
  else
    {:ok, state}
  end
end

handle_message(message, state)

@callback handle_message(message :: Gnat.message(), state :: term()) ::
  {ack_action, new_state}
when ack_action: :ack | :nack | :term | :noreply, new_state: term()

Invoked to synchronously process a message pulled by the consumer. Depending on the value it returns, the acknowledgement is or is not sent.

Only real stream messages reach this callback. JetStream informational status messages (e.g. 100 heartbeat, 404/408 pull terminator, 409 leadership change) are intercepted by the consumer and never passed here. See handle_status/2 if you want to observe them.

ACK actions

Possible ACK actions values explained:

  • :ack - acknowledges the message was handled and requests delivery of the next message to the reply subject.
  • :nack - signals that the message will not be processed now and processing can move onto the next message, NAK'd message will be retried.
  • :term - instructs the server to stop redelivery of a message without acknowledging it as successfully processed.
  • :noreply - nothing is sent. You may send later asynchronously an ACK or NACK message using the Jetstream.ack/1 or Jetstream.nack/1 and similar functions from Jetstream module.

Example

def handle_message(message, state) do
  IO.inspect(message)
  {:ack, state}
end

handle_status(message, state)

(optional)
@callback handle_status(message :: Gnat.message(), state :: term()) ::
  {:ok, new_state :: term()}

Invoked when the consumer receives an informational JetStream status message instead of a real stream message.

JetStream delivers status messages on the same subscription as regular messages — for example a 100 idle heartbeat, a 404/408 pull request terminator, or a 409 leadership change. These are not stream records and cannot be acked, so the PullConsumer never forwards them to handle_message/2.

By default they are silently dropped and the consumer continues fetching the next message. Implement this callback if you want to observe them — for example to log a warning on leadership changes, or to track heartbeat arrival.

The callback receives the raw Gnat.message() (which includes :status and optionally :description) and the current state. Returning {:ok, new_state} updates the state; the consumer then proceeds the same way it would have if the callback had not been defined.

This callback is optional.

Example

@impl true
def handle_status(%{status: "409", description: description}, state) do
  Logger.warning("JetStream 409 from consumer: #{description}")
  {:ok, state}
end

def handle_status(_message, state), do: {:ok, state}

init(init_arg)

@callback init(init_arg :: term()) ::
  {:ok, state :: term(), connection_options()}
  | :ignore
  | {:stop, reason :: any()}

Invoked when the server is started. start_link/3 or start/3 will block until it returns.

init_arg is the argument term (second argument) passed to start_link/3.

See Connection.init/1 for more details.

Functions

close(consumer)

@spec close(consumer :: consumer()) :: :ok

Closes the pull consumer and stops underlying process.

Example

{:ok, consumer} =
  PullConsumer.start_link(ExamplePullConsumer,
    connection_name: :gnat,
    stream_name: "TEST_STREAM",
    consumer_name: "TEST_CONSUMER"
  )

:ok = PullConsumer.close(consumer)

start(module, init_arg, options \\ [])

@spec start(module(), init_arg :: term(), options :: GenServer.options()) ::
  GenServer.on_start()

Starts a Jetstream.PullConsumer process without links (outside of a supervision tree).

See start_link/3 for more information.

start_link(module, init_arg, options \\ [])

@spec start_link(module(), init_arg :: term(), options :: GenServer.options()) ::
  GenServer.on_start()

Starts a pull consumer linked to the current process with the given function.

This is often used to start the pull consumer as part of a supervision tree.

Once the server is started, the init/1 function of the given module is called with init_arg as its argument to initialize the server. To ensure a synchronized start-up procedure, this function does not return until init/1 has returned.

See GenServer.start_link/3 for more details.