View Source Gnat.Jetstream.API.Stream (gnat v1.9.1)
A module representing a NATS JetStream Stream.
Learn more about Streams: https://docs.nats.io/nats-concepts/jetstream/streams
The Jetstream.API.Stream struct
The struct's mandatory fields are :name
and :subjects
. The rest will have the NATS
default values set.
Stream struct fields explanation:
:allow_direct
- Allow higher performance, direct access to get individual messages. E.g. KeyValue:allow_rollup_hdrs
- allows the use of the Nats-Rollup header to replace all contents of a stream, or subject in a stream, with a single new message.:deny_delete
- restricts the ability to delete messages from a stream via the API. Cannot be changed once set to true.:deny_purge
- restricts the ability to purge messages from a stream via the API. Cannot be change once set to true.:description
- a short description of the purpose of this stream.:discard
- determines what happens when a Stream reaches its limits. It has the following options::old
- the default option. Old messages are deleted.:new
- refuses new messages.
:discard_new_per_subject
- - allows to enable discarding new messages per subject when limits are reached. Requiresdiscard: :new
and the:max_msgs_per_subject
to be configured.:domain
- JetStream domain, mainly used for leaf nodes. See JetStream on Leaf Nodes.:duplicate_window
- the window within which to track duplicate messages, expressed in nanoseconds.:max_age
- maximum age of any message in the Stream, expressed in nanoseconds.:max_bytes
- how many bytes the Stream may contain. Adheres to:discard
, removing oldest or refusing new messages if the Stream exceeds this size.:max_consumers
- how many Consumers can be defined for a given Stream, -1 for unlimited.:max_msg_size
- the largest message that will be accepted by the Stream.:max_msgs_per_subject
- For wildcard streams ensure that for every unique subject this many messages are kept - a per subject retention limit. Only available on nats-server versions greater than 2.3.0:max_msgs
- how many messages may be in a Stream. Adheres to:discard
, removing oldest or refusing new messages if the Stream exceeds this number of messages:mirror
- maintains a 1:1 mirror of another stream with name matching this property. When a mirror is configured subjects and sources must be empty.:mirror_direct
- Allow higher performance and unified direct access for mirrors as well.:name
- a name for the Stream. See naming.:no_ack
- disables acknowledging messages that are received by the Stream.:num_replicas
- how many replicas to keep for each message.:placement
- placement directives to consider when placing replicas of this stream, random placement when unset. It has the following properties::cluster
- the desired cluster name to place the stream.:tags
- tags required on servers hosting this stream.
:retention
- how messages are retained in the Stream. Once this is exceeded, old messages are removed. It has the following options::limits
- the default policy.:interest
:workqueue
:sealed
- sealed streams do not allow messages to be deleted via limits or API, sealed streams can not be unsealed via configuration update. Can only be set on already created streams via the Update API.:sources
- list of stream names to replicate into this stream.:storage
- the type of storage backend. Available options::file
:memory
:subjects
- a list of subjects to consume, supports wildcards.:template_owner
- when the Stream is managed by a Stream Template this identifies the template that manages the Stream.
Summary
Types
seq
- Stream sequence number of the message to retrieve, cannot be combined withlast_by_subj
last_by_subj
- Retrieves the last message for a given subject, cannot be combined withseq
data
- The decoded message payloadsubject
- The subject the message was originally received ontime
- The time the message was receivedseq
- The sequence number of the message in the Streamhdrs
- The decoded headers for the message
code
- HTTP like error code in the 300 to 500 rangedescription
- A human friendly description of the errorerr_code
- The NATS error code unique to each kind of error
Stream source fields explained
Functions
Creates a new Stream.
Deletes a Stream and all its data.
Get a message from the stream either by "stream sequence number" or the "last message for a given subject"
Information about config and state of a Stream.
Paged list of known Streams including all their current information.
Purges all of data in the stream but doesn't delete the stream.
Purges some of the messages in a stream according to the supplied filter
Updates a Stream.
Types
@type info() :: %{ cluster: nil | %{ optional(:name) => binary(), optional(:leader) => binary(), optional(:replicas) => [ %{ :active => nanoseconds(), :name => binary(), :current => boolean(), optional(:offline) => boolean(), optional(:lag) => non_neg_integer() } ] }, config: t(), created: DateTime.t(), mirror: nil | source_info(), sources: nil | [source_info()], state: state() }
@type message_access_method() :: %{ optional(:seq) => non_neg_integer(), optional(:last_by_subj) => binary() }
seq
- Stream sequence number of the message to retrieve, cannot be combined withlast_by_subj
last_by_subj
- Retrieves the last message for a given subject, cannot be combined withseq
@type message_response() :: %{ data: any(), seq: non_neg_integer(), subject: binary(), time: DateTime.t(), hdrs: nil | binary() }
data
- The decoded message payloadsubject
- The subject the message was originally received ontime
- The time the message was receivedseq
- The sequence number of the message in the Streamhdrs
- The decoded headers for the message
@type method() :: %{filter: String.t()}
@type nanoseconds() :: non_neg_integer()
@type response_error() :: %{ :code => non_neg_integer(), optional(:description) => binary(), optional(:err_code) => non_neg_integer() }
code
- HTTP like error code in the 300 to 500 rangedescription
- A human friendly description of the errorerr_code
- The NATS error code unique to each kind of error
@type source() :: %{ :name => binary(), optional(:opt_start_seq) => integer(), optional(:opt_start_time) => DateTime.t(), optional(:filter_subject) => binary(), optional(:external) => %{api: binary(), deliver: binary()} }
Stream source fields explained:
:name
- stream name.:opt_start_seq
- sequence to start replicating from.:opt_start_time
- timestamp to start replicating from.:filter_subject
- replicate only a subset of messages based on filter.:external
- configuration referencing a stream source in another account or JetStream domain. It has the following parameters::api
- the subject prefix that imports other account/domain$JS.API.CONSUMER.>
subjects:deliver
- the delivery subject to use for push consumer
@type source_info() :: %{ :active => nanoseconds(), :lag => non_neg_integer(), :name => binary(), optional(:external) => %{api: binary(), deliver: binary()}, optional(:error) => response_error() }
@type state() :: %{ bytes: non_neg_integer(), consumer_count: non_neg_integer(), deleted: nil | [non_neg_integer()], first_seq: non_neg_integer(), first_ts: DateTime.t(), last_seq: non_neg_integer(), last_ts: DateTime.t(), lost: nil | [%{msgs: [non_neg_integer()], bytes: non_neg_integer()}], messages: non_neg_integer(), num_deleted: nil | integer(), num_subjects: nil | integer(), subjects: nil | %{} | %{required(binary()) => non_neg_integer()} }
@type streams() :: %{ limit: non_neg_integer(), offset: non_neg_integer(), streams: [binary()], total: non_neg_integer() }
@type t() :: %Gnat.Jetstream.API.Stream{ allow_direct: boolean(), allow_rollup_hdrs: boolean(), deny_delete: boolean(), deny_purge: boolean(), description: nil | binary(), discard: :old | :new, discard_new_per_subject: boolean(), domain: nil | binary(), duplicate_window: nil | nanoseconds(), max_age: nanoseconds(), max_bytes: integer(), max_consumers: integer(), max_msg_size: nil | integer(), max_msgs: integer(), max_msgs_per_subject: integer(), mirror: nil | source(), mirror_direct: boolean(), name: binary(), no_ack: nil | boolean(), num_replicas: pos_integer(), placement: nil | placement(), retention: :limits | :workqueue | :interest, sealed: boolean(), sources: nil | [source()], storage: :file | :memory, subjects: nil | [binary()], template_owner: nil | binary() }
Functions
Creates a new Stream.
Examples
iex> {:ok, %{created: _}} = Gnat.Jetstream.API.Stream.create(:gnat, %Gnat.Jetstream.API.Stream{name: "anewstream", subjects: ["anewsubject"]})
@spec delete(conn :: Gnat.t(), stream_name :: binary(), domain :: nil | binary()) :: :ok | {:error, any()}
Deletes a Stream and all its data.
Examples
iex> Gnat.Jetstream.API.Stream.create(:gnat, %Gnat.Jetstream.API.Stream{name: "delstream", subjects: ["delsubject"]})
iex> Gnat.Jetstream.API.Stream.delete(:gnat, "delstream")
:ok
iex> {:error, %{"code" => 404, "description" => "stream not found"}} = Gnat.Jetstream.API.Stream.delete(:gnat, "wrong_stream")
@spec get_message( conn :: Gnat.t(), stream_name :: binary(), method :: message_access_method(), domain :: nil | binary() ) :: {:ok, message_response()} | {:error, response_error()}
Get a message from the stream either by "stream sequence number" or the "last message for a given subject"
@spec info(conn :: Gnat.t(), stream_name :: binary(), domain :: nil | binary()) :: {:ok, info()} | {:error, any()}
Information about config and state of a Stream.
Examples
iex> {:ok, _} = Gnat.Jetstream.API.Stream.create(:gnat, %Gnat.Jetstream.API.Stream{name: "infostream", subjects: ["infosubject"]})
iex> {:ok, %{created: _}} = Gnat.Jetstream.API.Stream.info(:gnat, "infostream")
iex> {:error, %{"code" => 404, "description" => "stream not found"}} = Gnat.Jetstream.API.Stream.info(:gnat, "wrong_stream")
@spec list( conn :: Gnat.t(), params :: [ offset: non_neg_integer(), subject: binary(), domain: nil | binary() ] ) :: {:ok, streams()} | {:error, term()}
Paged list of known Streams including all their current information.
Options
:offset
- Number of records to skip:subject
- A subject theStream
must collect to appear in the list.
Examples
iex> {:ok, %{total: _, offset: 0, limit: 1024, streams: _}} = Gnat.Jetstream.API.Stream.list(:gnat)
@spec purge(conn :: Gnat.t(), stream_name :: binary(), domain :: nil | binary()) :: :ok | {:error, any()}
Purges all of data in the stream but doesn't delete the stream.
Examples
iex> Gnat.Jetstream.API.Stream.create(:gnat, %Gnat.Jetstream.API.Stream{name: "purgestream", subjects: ["purgesubject"]})
iex> Gnat.Jetstream.API.Stream.purge(:gnat, "purgestream")
:ok
iex> {:error, %{"code" => 404, "description" => "stream not found"}} = Gnat.Jetstream.API.Stream.purge(:gnat, "wrong_stream")
@spec purge( conn :: Gnat.t(), stream_name :: binary(), domain :: nil | binary(), method() ) :: :ok | {:error, any()}
Purges some of the messages in a stream according to the supplied filter
Examples
iex> Gnat.Jetstream.API.Stream.create(:gnat, %Gnat.Jetstream.API.Stream{name: "pstream", subjects: ["psub1", "psub2"]})
iex> Gnat.Jetstream.API.Stream.purge(:gnat, "pstream", nil, %{filter: "psub1"})
:ok
Updates a Stream.
Examples
iex> {:ok, %{created: _}} = Gnat.Jetstream.API.Stream.create(:gnat, %Gnat.Jetstream.API.Stream{name: "update_test_stream", subjects: ["update_subject"]})
iex> {:ok, _} = Gnat.Jetstream.API.Stream.update(:gnat, %Gnat.Jetstream.API.Stream{name: "update_test_stream", subjects: ["update_subject", "new.update_subject"]})