View Source Jetstream.API.Stream (jetstream v0.0.1)
A module representing a NATS JetStream Stream.
Learn more about Streams: https://docs.nats.io/nats-concepts/jetstream/streams
the-jetstream-api-stream-struct
The Jetstream.API.Stream struct
The struct's mandatory fields are :name and :subjects. The rest will have the NATS
default values set.
Stream struct fields explanation:
:allow_rollup_hdrs- allows the use of the Nats-Rollup header to replace all contents of a stream, or subject in a stream, with a single new message.:deny_delete- restricts the ability to delete messages from a stream via the API. Cannot be changed once set to true.:deny_purge- restricts the ability to purge messages from a stream via the API. Cannot be change once set to true.:description- a short description of the purpose of this stream.:discard- determines what happens when a Stream reaches its limits. It has the following options::old- the default option. Old messages are deleted.:new- refuses new messages.
:duplicate_window- the window within which to track duplicate messages, expressed in nanoseconds.:max_age- maximum age of any message in the Stream, expressed in nanoseconds.:max_bytes- how many bytes the Stream may contain. Adheres to:discard, removing oldest or refusing new messages if the Stream exceeds this size.:max_consumers- how many Consumers can be defined for a given Stream, -1 for unlimited.:max_msg_size- the largest message that will be accepted by the Stream.:max_msgs_per_subject- For wildcard streams ensure that for every unique subject this many messages are kept - a per subject retention limit. Only available on nats-server versions greater than 2.3.0:max_msgs- how many messages may be in a Stream. Adheres to:discard, removing oldest or refusing new messages if the Stream exceeds this number of messages:mirror- maintains a 1:1 mirror of another stream with name matching this property. When a mirror is configured subjects and sources must be empty.:name- a name for the Stream. See naming.:no_ack- disables acknowledging messages that are received by the Stream.:num_replicas- how many replicas to keep for each message.:placement- placement directives to consider when placing replicas of this stream, random placement when unset. It has the following properties::cluster- the desired cluster name to place the stream.:tags- tags required on servers hosting this stream.
:retention- how messages are retained in the Stream. Once this is exceeded, old messages are removed. It has the following options::limits- the default policy.:interest:workqueue
:sealed- sealed streams do not allow messages to be deleted via limits or API, sealed streams can not be unsealed via configuration update. Can only be set on already created streams via the Update API.:sources- list of stream names to replicate into this stream.:storage- the type of storage backend. Available options::file:memory
:subjects- a list of subjects to consume, supports wildcards.:template_owner- when the Stream is managed by a Stream Template this identifies the template that manages the Stream.
Link to this section Summary
Types
seq- Stream sequence number of the message to retrieve, cannot be combined withlast_by_subjlast_by_subj- Retrieves the last message for a given subject, cannot be combined withseq
data- The decoded message payloadsubject- The subject the message was originally received ontime- The time the message was receivedseq- The sequence number of the message in the Streamhdrs- The decoded headers for the message
code- HTTP like error code in the 300 to 500 rangedescription- A human friendly description of the errorerr_code- The NATS error code unique to each kind of error
Stream source fields explained
Functions
Creates a new Stream.
Deletes a Stream and all its data.
Get a message from the stream either by "stream sequence number" or the "last message for a given subject"
Information about config and state of a Stream.
Paged list of known Streams including all their current information.
Purges all of data in the stream but doesn't delete the stream.
Updates a Stream.
Link to this section Types
@type info() :: %{ cluster: nil | %{ optional(:name) => binary(), optional(:leader) => binary(), optional(:replicas) => [ %{ :active => nanoseconds(), :name => binary(), :current => boolean(), optional(:offline) => boolean(), optional(:lag) => non_neg_integer() } ] }, config: t(), created: DateTime.t(), mirror: nil | source_info(), sources: nil | [source_info()], state: state() }
@type message_access_method() :: %{ optional(:seq) => non_neg_integer(), optional(:last_by_subj) => binary() }
seq- Stream sequence number of the message to retrieve, cannot be combined withlast_by_subjlast_by_subj- Retrieves the last message for a given subject, cannot be combined withseq
@type message_response() :: %{ data: any(), seq: non_neg_integer(), subject: binary(), time: DateTime.t(), hdrs: nil | binary() }
data- The decoded message payloadsubject- The subject the message was originally received ontime- The time the message was receivedseq- The sequence number of the message in the Streamhdrs- The decoded headers for the message
@type nanoseconds() :: non_neg_integer()
@type response_error() :: %{ :code => non_neg_integer(), optional(:description) => binary(), optional(:err_code) => non_neg_integer() }
code- HTTP like error code in the 300 to 500 rangedescription- A human friendly description of the errorerr_code- The NATS error code unique to each kind of error
@type source() :: %{ :name => binary(), optional(:opt_start_seq) => integer(), optional(:opt_start_time) => DateTime.t(), optional(:filter_subject) => binary(), optional(:external) => %{api: binary(), deliver: binary()} }
Stream source fields explained:
:name- stream name.:opt_start_seq- sequence to start replicating from.:opt_start_time- timestamp to start replicating from.:filter_subject- replicate only a subset of messages based on filter.:external- configuration referencing a stream source in another account or JetStream domain. It has the following parameters::api- the subject prefix that imports other account/domain$JS.API.CONSUMER.>subjects:deliver- the delivery subject to use for push consumer
@type source_info() :: %{ :active => nanoseconds(), :lag => non_neg_integer(), :name => binary(), optional(:external) => %{api: binary(), deliver: binary()}, optional(:error) => response_error() }
@type state() :: %{ bytes: non_neg_integer(), consumer_count: non_neg_integer(), deleted: nil | [non_neg_integer()], first_seq: non_neg_integer(), first_ts: DateTime.t(), last_seq: non_neg_integer(), last_ts: DateTime.t(), lost: nil | [%{msgs: [non_neg_integer()], bytes: non_neg_integer()}], messages: non_neg_integer(), num_deleted: nil | integer(), num_subjects: nil | integer(), subjects: nil | %{} | %{required(binary()) => non_neg_integer()} }
@type streams() :: %{ limit: non_neg_integer(), offset: non_neg_integer(), streams: [binary()], total: non_neg_integer() }
@type t() :: %Jetstream.API.Stream{ allow_rollup_hdrs: boolean(), deny_delete: boolean(), deny_purge: boolean(), description: nil | binary(), discard: :old | :new, duplicate_window: nil | nanoseconds(), max_age: nanoseconds(), max_bytes: integer(), max_consumers: integer(), max_msg_size: nil | integer(), max_msgs: integer(), max_msgs_per_subject: integer(), mirror: nil | source(), name: binary(), no_ack: nil | boolean(), num_replicas: pos_integer(), placement: nil | placement(), retention: :limits | :workqueue | :interest, sealed: boolean(), sources: nil | [source()], storage: :file | :memory, subjects: nil | [binary()], template_owner: nil | binary() }
Link to this section Functions
Creates a new Stream.
examples
Examples
iex> {:ok, %{created: _}} = Jetstream.API.Stream.create(:gnat, %Jetstream.API.Stream{name: "stream", subjects: ["subject"]})
Deletes a Stream and all its data.
examples
Examples
iex> Jetstream.API.Stream.create(:gnat, %Jetstream.API.Stream{name: "stream", subjects: ["subject"]})
iex> Jetstream.API.Stream.delete(:gnat, "stream")
:ok
iex> {:error, %{"code" => 404, "description" => "stream not found"}} = Jetstream.API.Stream.delete(:gnat, "wrong_stream")
@spec get_message( conn :: Gnat.t(), stream_name :: binary(), method :: message_access_method() ) :: {:ok, message_response()} | {:error, response_error()}
Get a message from the stream either by "stream sequence number" or the "last message for a given subject"
Information about config and state of a Stream.
examples
Examples
iex> Jetstream.API.Stream.create(:gnat, %Jetstream.API.Stream{name: "stream", subjects: ["subject"]})
iex> {:ok, %{created: _}} = Jetstream.API.Stream.info(:gnat, "stream")
iex> {:error, %{"code" => 404, "description" => "stream not found"}} = Jetstream.API.Stream.info(:gnat, "wrong_stream")
@spec list(conn :: Gnat.t(), params :: [{:offset, non_neg_integer()}]) :: {:ok, streams()} | {:error, term()}
Paged list of known Streams including all their current information.
examples
Examples
iex> {:ok, %{total: _, offset: 0, limit: 1024, streams: _}} = Jetstream.API.Stream.list(:gnat)
Purges all of data in the stream but doesn't delete the stream.
examples
Examples
iex> Jetstream.API.Stream.create(:gnat, %Jetstream.API.Stream{name: "stream", subjects: ["subject"]})
iex> Jetstream.API.Stream.purge(:gnat, "stream")
:ok
iex> {:error, %{"code" => 404, "description" => "stream not found"}} = Jetstream.API.Stream.purge(:gnat, "wrong_stream")
Updates a Stream.
examples
Examples
iex> {:ok, %{created: _}} = Jetstream.API.Stream.create(:gnat, %Jetstream.API.Stream{name: "update_test_stream", subjects: ["update_subject"]})
iex> {:ok, _} = Jetstream.API.Stream.update(:gnat, %Jetstream.API.Stream{name: "update_test_stream", subjects: ["update_subject", "new.update_subject"]})