View Source Gnat.Jetstream.API.Consumer (gnat v1.8.3)

A module representing a NATS JetStream Consumer.

Learn more about consumers: https://docs.nats.io/nats-concepts/jetstream/consumers

The Jetstream.API.Consumer struct

The struct's only mandatory field to set is the :stream_name. The rest will have the NATS default values set.

Note that consumers are ephemeral by default. Set the :durable_name to make it durable.

Consumer struct fields explanation:

  • :stream_name - name of a stream the consumer is pointing at.
  • :domain - JetStream domain the stream is on.
  • :ack_policy - how the messages should be acknowledged. It has the following options:
    • :explicit - the default policy. It means that each individual message must be acknowledged. It is the only allowed option for pull consumers.
    • :none - no need to ack messages, the server will assume ack on delivery.
    • :all - only the last received message needs to be acked, all the previous messages received are automatically acknowledged.
  • :ack_wait - time in nanoseconds that server will wait for an ack for any individual. If an ack is not received in time, the message will be redelivered.
  • :backoff - list of durations that represents a retry timescale for NAK'd messages or those being normally retried.
  • :deliver_group - when set, will only deliver messages to subscriptions matching that group.
  • :deliver_policy - specifies where in the stream it wants to start receiving messages. It has the following options:
    • :all - the default policy. The consumer will start receiving from the earliest available message.
    • :last - the consumer will start receiving messages with the last message added to the stream.
    • :new - the consumer will only start receiving messages that were created after the customer was created.
    • :by_start_sequence - the consumer is required to specify :opt_start_seq, the sequence number to start on. It will receive the closest available message moving forward in the sequence should the message specified have been removed based on the stream limit policy.
    • :by_start_time - the consumer will start with messages on or after this time. The consumer is required to specify :opt_start_time, the time in the stream to start at.
    • :last_per_subject - the consumer will start with the latest one for each filtered subject currently in the stream.
  • :deliver_subject - the subject to deliver observed messages. Not allowed for pull subscriptions. A delivery subject is required for queue subscribing as it configures a subject that all the queue consumers should listen on.
  • :description - a short description of the purpose of this customer.
  • :durable_name - the name of the consumer, which the server will track, allowing resuming consumption where left off. By default, a consumer is ephemeral. To make the consumer durable, set the name. See naming.
  • :filter_subject - when consuming from a stream with a wildcard subject, this allows you to select a subset of the full wildcard subject to receive messages from.
  • :flow_control - when set to true, an empty message with Status header 100 and a reply subject will be sent. Consumers must reply to these messages to control the rate of message delivery.
  • :headers_only - delivers only the headers of messages in the stream and not the bodies. Additionally adds the Nats-Msg-Size header to indicate the size of the removed payload.
  • :idle_heartbeat - if set, the server will regularly send a status message to the client while there are no new messages to send. This lets the client know that the JetStream service is still up and running, even when there is no activity on the stream. The message status header will have a code of 100. Unlike :flow_control, it will have no reply to address. It may have a description like "Idle Heartbeat".
  • :inactive_threshold - duration that instructs the server to clean up ephemeral consumers that are inactive for that long.
  • :max_ack_pending - it sets the maximum number of messages without an acknowledgement that can be outstanding, once this limit is reached, message delivery will be suspended. It cannot be used with :ack_none ack policy. This maximum number of pending acks applies for all the consumer's subscriber processes. A value of -1 means there can be any number of pending acks (i.e. no flow control).
  • :max_batch - the largest batch property that may be specified when doing a pull on a Pull consumer.
  • :max_deliver - the maximum number of times a specific message will be delivered. Applies to any message that is re-sent due to ack policy.
  • :max_expires - the maximum expires value that may be set when doing a pull on a Pull consumer.
  • :max_waiting - the number of pulls that can be outstanding on a pull consumer, pulls received after this is reached are ignored.
  • :opt_start_seq - use with :deliver_policy set to :by_start_sequence. It represents the sequence number to start consuming on.
  • :opt_start_time - use with :deliver_policy set to :by_start_time. It represents the time to start consuming at.
  • :rate_limit_bps - used to throttle the delivery of messages to the consumer, in bits per second.
  • :replay_policy - it applies when the :deliver_policy is set to :all, :by_start_sequence or :by_start_time. It has the following options:
    • :instant - the default policy. The messages will be pushed to the client as fast as possible.
    • :original - the messages in the stream will be pushed to the client at the same rate that they were originally received.
  • :sample_freq - Sets the percentage of acknowledgements that should be sampled for observability, 0-100. This value is a binary and for example allows both 30 and 30% as valid values.

Link to this section Summary

Functions

Creates a consumer. When consumer's :durable_name field is not set, the function creates an ephemeral consumer. Otherwise, it creates a durable consumer.

Paged list of known consumers, including their current info.

Requests a next message from a stream to be consumed. The response (consumed message)will be sent on the subject given as the reply_to parameter.

Link to this section Types

Specs

config() :: %{
  ack_policy: :none | :all | :explicit,
  ack_wait: nil | non_neg_integer(),
  backoff: nil | [non_neg_integer()],
  deliver_group: nil | binary(),
  deliver_policy:
    :all
    | :last
    | :new
    | :by_start_sequence
    | :by_start_time
    | :last_per_subject,
  deliver_subject: nil | binary(),
  description: nil | binary(),
  durable_name: nil | binary(),
  filter_subject: nil | binary(),
  flow_control: nil | boolean(),
  headers_only: nil | boolean(),
  idle_heartbeat: nil | non_neg_integer(),
  inactive_threshold: nil | non_neg_integer(),
  max_ack_pending: nil | integer(),
  max_batch: nil | integer(),
  max_deliver: nil | integer(),
  max_expires: nil | non_neg_integer(),
  max_waiting: nil | integer(),
  opt_start_seq: nil | non_neg_integer(),
  opt_start_time: nil | DateTime.t(),
  rate_limit_bps: nil | non_neg_integer(),
  replay_policy: :instant | :original,
  sample_freq: nil | binary()
}

Specs

consumers() :: %{
  consumers: [binary()],
  limit: non_neg_integer(),
  offset: non_neg_integer(),
  total: non_neg_integer()
}

Specs

info() :: %{
  ack_floor: %{consumer_seq: non_neg_integer(), stream_seq: non_neg_integer()},
  cluster:
    nil
    | %{
        optional(:name) => binary(),
        optional(:leader) => binary(),
        optional(:replicas) => [
          %{
            :active => non_neg_integer(),
            :current => boolean(),
            :name => binary(),
            optional(:lag) => non_neg_integer(),
            optional(:offline) => boolean()
          }
        ]
      },
  config: config(),
  created: DateTime.t(),
  delivered: %{consumer_seq: non_neg_integer(), stream_seq: non_neg_integer()},
  name: binary(),
  num_ack_pending: non_neg_integer(),
  num_pending: non_neg_integer(),
  num_redelivered: non_neg_integer(),
  num_waiting: non_neg_integer(),
  push_bound: nil | boolean(),
  stream_name: binary()
}

Specs

t() :: %Gnat.Jetstream.API.Consumer{
  ack_policy: :none | :all | :explicit,
  ack_wait: nil | non_neg_integer(),
  backoff: nil | [non_neg_integer()],
  deliver_group: nil | binary(),
  deliver_policy:
    :all
    | :last
    | :new
    | :by_start_sequence
    | :by_start_time
    | :last_per_subject,
  deliver_subject: nil | binary(),
  description: nil | binary(),
  domain: nil | binary(),
  durable_name: nil | binary(),
  filter_subject: nil | binary(),
  flow_control: nil | boolean(),
  headers_only: nil | boolean(),
  idle_heartbeat: nil | non_neg_integer(),
  inactive_threshold: nil | non_neg_integer(),
  max_ack_pending: nil | integer(),
  max_batch: nil | integer(),
  max_deliver: nil | integer(),
  max_expires: nil | non_neg_integer(),
  max_waiting: nil | integer(),
  opt_start_seq: nil | non_neg_integer(),
  opt_start_time: nil | DateTime.t(),
  rate_limit_bps: nil | non_neg_integer(),
  replay_policy: :instant | :original,
  sample_freq: nil | binary(),
  stream_name: binary()
}

Link to this section Functions

Specs

create(conn :: Gnat.t(), consumer :: t()) :: {:ok, info()} | {:error, term()}

Creates a consumer. When consumer's :durable_name field is not set, the function creates an ephemeral consumer. Otherwise, it creates a durable consumer.

Examples

iex> {:ok, _response} = Gnat.Jetstream.API.Stream.create(:gnat, %Gnat.Jetstream.API.Stream{name: "astream", subjects: ["subject"]})
iex> {:ok, %{name: "consumer", stream_name: "astream"}} = Gnat.Jetstream.API.Consumer.create(:gnat, %Gnat.Jetstream.API.Consumer{durable_name: "consumer", stream_name: "astream"})

iex> {:ok, _response} = Gnat.Jetstream.API.Stream.create(:gnat, %Gnat.Jetstream.API.Stream{name: "astream", subjects: ["subject"]})
iex> {:error, %{"description" => "consumer delivery policy is deliver by start sequence, but optional start sequence is not set"}} = Gnat.Jetstream.API.Consumer.create(:gnat, %Gnat.Jetstream.API.Consumer{durable_name: "consumer", stream_name: "astream", deliver_policy: :by_start_sequence})
Link to this function

delete(conn, stream_name, consumer_name, domain \\ nil)

View Source

Specs

delete(
  conn :: Gnat.t(),
  stream_name :: binary(),
  consumer_name :: binary(),
  domain :: nil | binary()
) :: :ok | {:error, any()}

Deletes a consumer.

Examples

iex> {:ok, _response} = Gnat.Jetstream.API.Stream.create(:gnat, %Gnat.Jetstream.API.Stream{name: "astream", subjects: ["subject"]})
iex> {:ok, _response} = Gnat.Jetstream.API.Consumer.create(:gnat, %Gnat.Jetstream.API.Consumer{durable_name: "consumer", stream_name: "astream"})
iex> Gnat.Jetstream.API.Consumer.delete(:gnat, "astream", "consumer")
:ok

iex> {:error, %{"code" => 404, "description" => "stream not found"}} = Gnat.Jetstream.API.Consumer.delete(:gnat, "wrong_stream", "consumer")
Link to this function

info(conn, stream_name, consumer_name, domain \\ nil)

View Source

Specs

info(
  conn :: Gnat.t(),
  stream_name :: binary(),
  consumer_name :: binary(),
  domain :: nil | binary()
) :: {:ok, info()} | {:error, any()}

Information about the consumer.

Examples

iex> {:ok, _response} = Gnat.Jetstream.API.Stream.create(:gnat, %Gnat.Jetstream.API.Stream{name: "astream", subjects: ["subject"]})
iex> {:ok, _response} = Gnat.Jetstream.API.Consumer.create(:gnat, %Gnat.Jetstream.API.Consumer{durable_name: "consumer", stream_name: "astream"})
iex> {:ok, %{created: _}} = Gnat.Jetstream.API.Consumer.info(:gnat, "astream", "consumer")

iex>  {:error, %{"code" => 404, "description" => "stream not found"}} = Gnat.Jetstream.API.Consumer.info(:gnat, "wrong_stream", "consumer")
Link to this function

list(conn, stream_name, params \\ [])

View Source

Specs

list(
  conn :: Gnat.t(),
  stream_name :: binary(),
  params :: [offset: non_neg_integer(), domain: nil | binary()]
) :: {:ok, consumers()} | {:error, term()}

Paged list of known consumers, including their current info.

Examples

iex> {:ok, _response} =  Gnat.Jetstream.API.Stream.create(:gnat, %Gnat.Jetstream.API.Stream{name: "astream", subjects: ["subject"]})
iex> {:ok, %{consumers: _, limit: 1024, offset: 0, total: _}} = Gnat.Jetstream.API.Consumer.list(:gnat, "astream")

iex> {:error, %{"code" => 404, "description" => "stream not found"}} = Gnat.Jetstream.API.Consumer.list(:gnat, "wrong_stream")
Link to this function

request_next_message(conn, stream_name, consumer_name, reply_to, domain \\ nil, opts \\ [])

View Source

Specs

request_next_message(
  conn :: Gnat.t(),
  stream_name :: binary(),
  consumer_name :: binary(),
  reply_to :: String.t(),
  domain :: nil | binary(),
  opts :: keyword()
) :: :ok

Requests a next message from a stream to be consumed. The response (consumed message)will be sent on the subject given as the reply_to parameter.

Options

  • batch - How many messages to receive. Messages will be sent to the reply_to subject separately. Defaults to 1.

  • expires - Time in nanoseconds the request will be kept in the server. Once this time passes a message with empty body and topic set to reply_to subject is sent. Useful when polling the server frequently and not wanting the pull requests to accumulate. By default, the pull request stays in the server until a message comes.

  • no_wait - Boolean value which indicates whether the pull request should be accumulated on the server. When set to true and no message is present to be consumed, a message with empty body and topic value set to reply_to is sent. Defaults to false.

Example

iex> {:ok, _response} = Gnat.Jetstream.API.Stream.create(:gnat, %Gnat.Jetstream.API.Stream{name: "astream", subjects: ["subject"]})
iex> {:ok, _response} = Gnat.Jetstream.API.Consumer.create(:gnat, %Gnat.Jetstream.API.Consumer{durable_name: "consumer", stream_name: "astream"})
iex> {:ok, _sid} = Gnat.sub(:gnat, self(), "reply_subject")
iex> :ok = Gnat.Jetstream.API.Consumer.request_next_message(:gnat, "astream", "consumer", "reply_subject")
iex> :ok = Gnat.pub(:gnat, "subject", "message1")
iex> assert_receive {:msg, %{body: "message1", topic: "subject"}}