View Source Gnat.Jetstream.API.Stream (gnat v1.9.1)

A module representing a NATS JetStream Stream.

Learn more about Streams: https://docs.nats.io/nats-concepts/jetstream/streams

The Jetstream.API.Stream struct

The struct's mandatory fields are :name and :subjects. The rest will have the NATS default values set.

Stream struct fields explanation:

  • :allow_direct - Allow higher performance, direct access to get individual messages. E.g. KeyValue
  • :allow_rollup_hdrs - allows the use of the Nats-Rollup header to replace all contents of a stream, or subject in a stream, with a single new message.
  • :deny_delete - restricts the ability to delete messages from a stream via the API. Cannot be changed once set to true.
  • :deny_purge - restricts the ability to purge messages from a stream via the API. Cannot be change once set to true.
  • :description - a short description of the purpose of this stream.
  • :discard - determines what happens when a Stream reaches its limits. It has the following options:
    • :old - the default option. Old messages are deleted.
    • :new - refuses new messages.
  • :discard_new_per_subject - - allows to enable discarding new messages per subject when limits are reached. Requires discard: :new and the :max_msgs_per_subject to be configured.
  • :domain - JetStream domain, mainly used for leaf nodes. See JetStream on Leaf Nodes.
  • :duplicate_window - the window within which to track duplicate messages, expressed in nanoseconds.
  • :max_age - maximum age of any message in the Stream, expressed in nanoseconds.
  • :max_bytes - how many bytes the Stream may contain. Adheres to :discard, removing oldest or refusing new messages if the Stream exceeds this size.
  • :max_consumers - how many Consumers can be defined for a given Stream, -1 for unlimited.
  • :max_msg_size - the largest message that will be accepted by the Stream.
  • :max_msgs_per_subject - For wildcard streams ensure that for every unique subject this many messages are kept - a per subject retention limit. Only available on nats-server versions greater than 2.3.0
  • :max_msgs - how many messages may be in a Stream. Adheres to :discard, removing oldest or refusing new messages if the Stream exceeds this number of messages
  • :mirror - maintains a 1:1 mirror of another stream with name matching this property. When a mirror is configured subjects and sources must be empty.
  • :mirror_direct - Allow higher performance and unified direct access for mirrors as well.
  • :name - a name for the Stream. See naming.
  • :no_ack - disables acknowledging messages that are received by the Stream.
  • :num_replicas - how many replicas to keep for each message.
  • :placement - placement directives to consider when placing replicas of this stream, random placement when unset. It has the following properties:
    • :cluster - the desired cluster name to place the stream.
    • :tags - tags required on servers hosting this stream.
  • :retention - how messages are retained in the Stream. Once this is exceeded, old messages are removed. It has the following options:
    • :limits - the default policy.
    • :interest
    • :workqueue
  • :sealed - sealed streams do not allow messages to be deleted via limits or API, sealed streams can not be unsealed via configuration update. Can only be set on already created streams via the Update API.
  • :sources - list of stream names to replicate into this stream.
  • :storage - the type of storage backend. Available options:
    • :file
    • :memory
  • :subjects - a list of subjects to consume, supports wildcards.
  • :template_owner - when the Stream is managed by a Stream Template this identifies the template that manages the Stream.

Summary

Types

  • seq - Stream sequence number of the message to retrieve, cannot be combined with last_by_subj
  • last_by_subj - Retrieves the last message for a given subject, cannot be combined with seq
  • data - The decoded message payload
  • subject - The subject the message was originally received on
  • time - The time the message was received
  • seq - The sequence number of the message in the Stream
  • hdrs - The decoded headers for the message
  • code - HTTP like error code in the 300 to 500 range
  • description - A human friendly description of the error
  • err_code - The NATS error code unique to each kind of error

Stream source fields explained

t()

Functions

Creates a new Stream.

Deletes a Stream and all its data.

Get a message from the stream either by "stream sequence number" or the "last message for a given subject"

Information about config and state of a Stream.

Paged list of known Streams including all their current information.

Purges all of data in the stream but doesn't delete the stream.

Purges some of the messages in a stream according to the supplied filter

Updates a Stream.

Types

info()

@type info() :: %{
  cluster:
    nil
    | %{
        optional(:name) => binary(),
        optional(:leader) => binary(),
        optional(:replicas) => [
          %{
            :active => nanoseconds(),
            :name => binary(),
            :current => boolean(),
            optional(:offline) => boolean(),
            optional(:lag) => non_neg_integer()
          }
        ]
      },
  config: t(),
  created: DateTime.t(),
  mirror: nil | source_info(),
  sources: nil | [source_info()],
  state: state()
}

message_access_method()

@type message_access_method() :: %{
  optional(:seq) => non_neg_integer(),
  optional(:last_by_subj) => binary()
}
  • seq - Stream sequence number of the message to retrieve, cannot be combined with last_by_subj
  • last_by_subj - Retrieves the last message for a given subject, cannot be combined with seq

message_response()

@type message_response() :: %{
  data: any(),
  seq: non_neg_integer(),
  subject: binary(),
  time: DateTime.t(),
  hdrs: nil | binary()
}
  • data - The decoded message payload
  • subject - The subject the message was originally received on
  • time - The time the message was received
  • seq - The sequence number of the message in the Stream
  • hdrs - The decoded headers for the message

method()

@type method() :: %{filter: String.t()}

nanoseconds()

@type nanoseconds() :: non_neg_integer()

placement()

@type placement() :: %{:cluster => binary(), optional(:tags) => [binary()]}

response_error()

@type response_error() :: %{
  :code => non_neg_integer(),
  optional(:description) => binary(),
  optional(:err_code) => non_neg_integer()
}
  • code - HTTP like error code in the 300 to 500 range
  • description - A human friendly description of the error
  • err_code - The NATS error code unique to each kind of error

source()

@type source() :: %{
  :name => binary(),
  optional(:opt_start_seq) => integer(),
  optional(:opt_start_time) => DateTime.t(),
  optional(:filter_subject) => binary(),
  optional(:external) => %{api: binary(), deliver: binary()}
}

Stream source fields explained:

  • :name - stream name.
  • :opt_start_seq - sequence to start replicating from.
  • :opt_start_time - timestamp to start replicating from.
  • :filter_subject - replicate only a subset of messages based on filter.
  • :external - configuration referencing a stream source in another account or JetStream domain. It has the following parameters:
    • :api - the subject prefix that imports other account/domain $JS.API.CONSUMER.> subjects
    • :deliver - the delivery subject to use for push consumer

source_info()

@type source_info() :: %{
  :active => nanoseconds(),
  :lag => non_neg_integer(),
  :name => binary(),
  optional(:external) => %{api: binary(), deliver: binary()},
  optional(:error) => response_error()
}

state()

@type state() :: %{
  bytes: non_neg_integer(),
  consumer_count: non_neg_integer(),
  deleted: nil | [non_neg_integer()],
  first_seq: non_neg_integer(),
  first_ts: DateTime.t(),
  last_seq: non_neg_integer(),
  last_ts: DateTime.t(),
  lost: nil | [%{msgs: [non_neg_integer()], bytes: non_neg_integer()}],
  messages: non_neg_integer(),
  num_deleted: nil | integer(),
  num_subjects: nil | integer(),
  subjects: nil | %{} | %{required(binary()) => non_neg_integer()}
}

streams()

@type streams() :: %{
  limit: non_neg_integer(),
  offset: non_neg_integer(),
  streams: [binary()],
  total: non_neg_integer()
}

t()

@type t() :: %Gnat.Jetstream.API.Stream{
  allow_direct: boolean(),
  allow_rollup_hdrs: boolean(),
  deny_delete: boolean(),
  deny_purge: boolean(),
  description: nil | binary(),
  discard: :old | :new,
  discard_new_per_subject: boolean(),
  domain: nil | binary(),
  duplicate_window: nil | nanoseconds(),
  max_age: nanoseconds(),
  max_bytes: integer(),
  max_consumers: integer(),
  max_msg_size: nil | integer(),
  max_msgs: integer(),
  max_msgs_per_subject: integer(),
  mirror: nil | source(),
  mirror_direct: boolean(),
  name: binary(),
  no_ack: nil | boolean(),
  num_replicas: pos_integer(),
  placement: nil | placement(),
  retention: :limits | :workqueue | :interest,
  sealed: boolean(),
  sources: nil | [source()],
  storage: :file | :memory,
  subjects: nil | [binary()],
  template_owner: nil | binary()
}

Functions

create(conn, stream)

@spec create(conn :: Gnat.t(), stream :: t()) :: {:ok, info()} | {:error, any()}

Creates a new Stream.

Examples

iex> {:ok, %{created: _}} = Gnat.Jetstream.API.Stream.create(:gnat, %Gnat.Jetstream.API.Stream{name: "anewstream", subjects: ["anewsubject"]})

delete(conn, stream_name, domain \\ nil)

@spec delete(conn :: Gnat.t(), stream_name :: binary(), domain :: nil | binary()) ::
  :ok | {:error, any()}

Deletes a Stream and all its data.

Examples

iex> Gnat.Jetstream.API.Stream.create(:gnat, %Gnat.Jetstream.API.Stream{name: "delstream", subjects: ["delsubject"]})
iex> Gnat.Jetstream.API.Stream.delete(:gnat, "delstream")
:ok

iex> {:error, %{"code" => 404, "description" => "stream not found"}} = Gnat.Jetstream.API.Stream.delete(:gnat, "wrong_stream")

get_message(conn, stream_name, method, domain \\ nil)

@spec get_message(
  conn :: Gnat.t(),
  stream_name :: binary(),
  method :: message_access_method(),
  domain :: nil | binary()
) :: {:ok, message_response()} | {:error, response_error()}

Get a message from the stream either by "stream sequence number" or the "last message for a given subject"

info(conn, stream_name, domain \\ nil)

@spec info(conn :: Gnat.t(), stream_name :: binary(), domain :: nil | binary()) ::
  {:ok, info()} | {:error, any()}

Information about config and state of a Stream.

Examples

iex> {:ok, _} = Gnat.Jetstream.API.Stream.create(:gnat, %Gnat.Jetstream.API.Stream{name: "infostream", subjects: ["infosubject"]})
iex> {:ok, %{created: _}} = Gnat.Jetstream.API.Stream.info(:gnat, "infostream")

iex> {:error, %{"code" => 404, "description" => "stream not found"}} = Gnat.Jetstream.API.Stream.info(:gnat, "wrong_stream")

list(conn, params \\ [])

@spec list(
  conn :: Gnat.t(),
  params :: [
    offset: non_neg_integer(),
    subject: binary(),
    domain: nil | binary()
  ]
) :: {:ok, streams()} | {:error, term()}

Paged list of known Streams including all their current information.

Options

  • :offset - Number of records to skip
  • :subject - A subject the Stream must collect to appear in the list.

Examples

iex> {:ok, %{total: _, offset: 0, limit: 1024, streams: _}} = Gnat.Jetstream.API.Stream.list(:gnat)

purge(conn, stream_name, domain \\ nil)

@spec purge(conn :: Gnat.t(), stream_name :: binary(), domain :: nil | binary()) ::
  :ok | {:error, any()}

Purges all of data in the stream but doesn't delete the stream.

Examples

iex> Gnat.Jetstream.API.Stream.create(:gnat, %Gnat.Jetstream.API.Stream{name: "purgestream", subjects: ["purgesubject"]})
iex> Gnat.Jetstream.API.Stream.purge(:gnat, "purgestream")
:ok

iex> {:error, %{"code" => 404, "description" => "stream not found"}} = Gnat.Jetstream.API.Stream.purge(:gnat, "wrong_stream")

purge(conn, stream_name, domain, method)

@spec purge(
  conn :: Gnat.t(),
  stream_name :: binary(),
  domain :: nil | binary(),
  method()
) ::
  :ok | {:error, any()}

Purges some of the messages in a stream according to the supplied filter

Examples

iex> Gnat.Jetstream.API.Stream.create(:gnat, %Gnat.Jetstream.API.Stream{name: "pstream", subjects: ["psub1", "psub2"]})
iex> Gnat.Jetstream.API.Stream.purge(:gnat, "pstream", nil, %{filter: "psub1"})
:ok

update(conn, stream)

@spec update(conn :: Gnat.t(), stream :: t()) :: {:ok, info()} | {:error, any()}

Updates a Stream.

Examples

iex> {:ok, %{created: _}} = Gnat.Jetstream.API.Stream.create(:gnat, %Gnat.Jetstream.API.Stream{name: "update_test_stream", subjects: ["update_subject"]})
iex> {:ok, _} = Gnat.Jetstream.API.Stream.update(:gnat, %Gnat.Jetstream.API.Stream{name: "update_test_stream", subjects: ["update_subject", "new.update_subject"]})