View Source Gnat.Jetstream.PullConsumer behaviour (gnat v1.9.1)

A behaviour which provides the NATS JetStream Pull Consumer functionalities.

When a Consumer is pull-based, it means that the messages will be delivered when the server is asked for them.

Example

Declare a module which uses Gnat.Jetstream.PullConsumer and implements init/1 and handle_message/2 callbacks.

defmodule MyApp.PullConsumer do
  use Gnat.Jetstream.PullConsumer

  def start_link(arg) do
    Jetstream.PullConsumer.start_link(__MODULE__, arg)
  end

  @impl true
  def init(_arg) do
    {:ok, nil,
      connection_name: :gnat,
      stream_name: "TEST_STREAM",
      consumer_name: "TEST_CONSUMER"}
  end

  @impl true
  def handle_message(message, state) do
    # Do some processing with the message.
    {:ack, state}
  end
end

You can then place your Pull Consumer in a supervision tree. Remember that you need to have the Gnat.ConnectionSupervisor set up.

defmodule MyApp.Application do
  use Application

  @impl true
  def start(_type, _args) do
    children = [
      # Create NATS connection
      {Gnat.ConnectionSupervisor, ...},
      # Start NATS Jetstream Pull Consumer
      MyApp.PullConsumer,
    ]
    opts = [strategy: :one_for_one]
    Supervisor.start_link(children, opts)
  end
end

Connection Options

In order to establish consumer connection with NATS, you need to pass several connection options via keyword list in third element of a tuple returned from init/1 callback.

Following options must be provided. Omitting this options will cause the process to raise errors upon initialization:

  • :connection_name - Gnat connection or Gnat.ConnectionSupervisor name/PID.
  • :stream_name - name of an existing string the consumer will consume messages from.
  • :consumer_name - name of an existing consumer pointing at the stream.

You can also pass the optional ones:

  • :connection_retry_timeout - a duration in milliseconds after which the PullConsumer which failed to establish NATS connection retries, defaults to 1000
  • :connection_retries - a number of attempts the PullConsumer will make to establish the NATS connection. When this value is exceeded, the pull consumer stops with the :timeout reason, defaults to 10
  • :inbox_prefix - allows the default _INBOX. prefix to be customized. Should end with a dot.
  • :domain - use a JetStream domain, this is mostly used on leaf nodes.

Dynamic Connection Options

It is possible that you have to determine some of the options dynamically depending on pull consumer's init argument. To do so, it is recommended to derive these options values from some init argument:

defmodule MyApp.PullConsumer do
  use Gnat.Jetstream.PullConsumer

  def start_link() do
    Gnat.Jetstream.PullConsumer.start_link(__MODULE__, %{counter: counter})
  end

  @impl true
  def init(%{counter: counter}) do
    {:ok, nil,
      connection_name: :gnat,
      stream_name: "TEST_STREAM_#{counter}",
      consumer_name: "TEST_CONSUMER_#{counter}"}
  end

  ...
end

How to supervise

A PullConsumer is most commonly started under a supervision tree. When we invoke use Gnat.Jetstream.PullConsumer, it automatically defines a child_spec/1 function that allows us to start the pull consumer directly under a supervisor. To start a pull consumer under a supervisor with an initial argument of :example, one may do:

children = [
  {MyPullConsumer, :example}
]
Supervisor.start_link(children, strategy: :one_for_all)

While one could also simply pass the MyPullConsumer as a child to the supervisor, such as:

children = [
  MyPullConsumer # Same as {MyPullConsumer, []}
]
Supervisor.start_link(children, strategy: :one_for_all)

A common approach is to use a keyword list, which allows setting init argument and server options, for example:

def start_link(opts) do
  {initial_state, opts} = Keyword.pop(opts, :initial_state, nil)
  Gnat.Jetstream.PullConsumer.start_link(__MODULE__, initial_state, opts)
end

and then you can use MyPullConsumer, {MyPullConsumer, name: :my_consumer} or even {MyPullConsumer, initial_state: :example, name: :my_consumer} as a child specification.

use Gnat.Jetstream.PullConsumer also accepts a list of options which configures the child specification and therefore how it runs under a supervisor. The generated child_spec/1 can be customized with the following options:

  • :id - the child specification identifier, defaults to the current module
  • :restart - when the child should be restarted, defaults to :permanent
  • :shutdown - how to shut down the child, either immediately or by giving it time to shut down

For example:

use Gnat.Jetstream.PullConsumer, restart: :transient, shutdown: 10_000

See the "Child specification" section in the Supervisor module for more detailed information. The @doc annotation immediately preceding use Jetstream.PullConsumer will be attached to the generated child_spec/1 function.

Name registration

A pull consumer is bound to the same name registration rules as GenServers. Read more about it in the GenServer documentation.

Summary

Types

Connection option values used to connect the consumer to NATS server.

Connection options used to connect the consumer to NATS server.

The pull consumer reference.

Callbacks

Invoked to synchronously process a message pulled by the consumer. Depending on the value it returns, the acknowledgement is or is not sent.

Invoked when the server is started. start_link/3 or start/3 will block until it returns.

Functions

Closes the pull consumer and stops underlying process.

Starts a Jetstream.PullConsumer process without links (outside of a supervision tree).

Starts a pull consumer linked to the current process with the given function.

Types

connection_option()

@type connection_option() ::
  {:connection_name, GenServer.server()}
  | {:stream_name, String.t()}
  | {:consumer_name, String.t()}
  | {:connection_retry_timeout, non_neg_integer()}
  | {:connection_retries, non_neg_integer()}
  | {:domain, String.t()}

Connection option values used to connect the consumer to NATS server.

connection_options()

@type connection_options() :: [connection_option()]

Connection options used to connect the consumer to NATS server.

consumer()

@type consumer() :: GenServer.server()

The pull consumer reference.

Callbacks

handle_message(message, state)

@callback handle_message(message :: Gnat.message(), state :: term()) ::
  {ack_action, new_state}
when ack_action: :ack | :nack | :term | :noreply, new_state: term()

Invoked to synchronously process a message pulled by the consumer. Depending on the value it returns, the acknowledgement is or is not sent.

ACK actions

Possible ACK actions values explained:

  • :ack - acknowledges the message was handled and requests delivery of the next message to the reply subject.
  • :nack - signals that the message will not be processed now and processing can move onto the next message, NAK'd message will be retried.
  • :term - instructs the server to stop redelivery of a message without acknowledging it as successfully processed.
  • :noreply - nothing is sent. You may send later asynchronously an ACK or NACK message using the Jetstream.ack/1 or Jetstream.nack/1 and similar functions from Jetstream module.

Example

def handle_message(message, state) do
  IO.inspect(message)
  {:ack, state}
end

init(init_arg)

@callback init(init_arg :: term()) ::
  {:ok, state :: term(), connection_options()}
  | :ignore
  | {:stop, reason :: any()}

Invoked when the server is started. start_link/3 or start/3 will block until it returns.

init_arg is the argument term (second argument) passed to start_link/3.

See Connection.init/1 for more details.

Functions

close(consumer)

@spec close(consumer :: consumer()) :: :ok

Closes the pull consumer and stops underlying process.

Example

{:ok, consumer} =
  PullConsumer.start_link(ExamplePullConsumer,
    connection_name: :gnat,
    stream_name: "TEST_STREAM",
    consumer_name: "TEST_CONSUMER"
  )

:ok = PullConsumer.close(consumer)

start(module, init_arg, options \\ [])

@spec start(module(), init_arg :: term(), options :: GenServer.options()) ::
  GenServer.on_start()

Starts a Jetstream.PullConsumer process without links (outside of a supervision tree).

See start_link/3 for more information.

start_link(module, init_arg, options \\ [])

@spec start_link(module(), init_arg :: term(), options :: GenServer.options()) ::
  GenServer.on_start()

Starts a pull consumer linked to the current process with the given function.

This is often used to start the pull consumer as part of a supervision tree.

Once the server is started, the init/1 function of the given module is called with init_arg as its argument to initialize the server. To ensure a synchronized start-up procedure, this function does not return until init/1 has returned.

See GenServer.start_link/3 for more details.