glats/jetstream/stream
Types
Access method type for a get message request to Jetstream.
See get_message
.
pub type AccessMethod {
SequenceID(Int)
LastBySubject(String)
}
Constructors
-
SequenceID(Int)
Used to get a message from a stream by sequence ID.
-
LastBySubject(String)
Used to get newest message from a stream that matches a subject.
Used to set the discard policy of a stream.
pub type DiscardPolicy {
DiscardOld
DiscardNew
}
Constructors
-
DiscardOld
This policy will delete the oldest messages in order to maintain the limit. For example, if
MaxAge
is set to one minute, the server will automatically delete messages older than one minute with this policy. -
DiscardNew
This policy will reject new messages from being appended to the stream if it would exceed one of the limits. An extension to this policy is
DiscardNewPerSubject
which will apply this policy on a per-subject basis within the stream.
Used to set the retention policy of a stream.
pub type RetentionPolicy {
LimitsPolicy
InterestPolicy
WorkQueuePolicy
}
Constructors
-
LimitsPolicy
Retention based on the various limits that are set including:
MaxMessages
,MaxBytes
,MaxAge
, andMaxMessagesPerSubject
. If any of these limits are set, whichever limit is hit first will cause the automatic deletion of the respective message(s). -
InterestPolicy
Retention based on the consumer interest in the stream and messages. The base case is that there are zero consumers defined for a stream. If messages are published to the stream, they will be immediately deleted so there is no interest.
-
WorkQueuePolicy
Retention with the typical behavior of a FIFO queue. Each message can be consumed only once. This is enforced by only allowing one consumer to be created for a work-queue stream.
Available config options for a single stream.
pub type StreamConfig {
StreamConfig(
name: String,
subjects: List(String),
retention: Option(String),
max_consumers: Option(Int),
max_msgs: Option(Int),
max_bytes: Option(Int),
max_age: Option(Int),
max_msgs_per_subject: Option(Int),
max_msg_size: Option(Int),
discard: Option(String),
storage: Option(String),
num_replicas: Option(Int),
duplicate_window: Option(Int),
allow_direct: Option(Bool),
mirror_direct: Option(Bool),
sealed: Option(Bool),
deny_delete: Option(Bool),
deny_purge: Option(Bool),
allow_rollup_hdrs: Option(Bool),
)
}
Constructors
-
StreamConfig( name: String, subjects: List(String), retention: Option(String), max_consumers: Option(Int), max_msgs: Option(Int), max_bytes: Option(Int), max_age: Option(Int), max_msgs_per_subject: Option(Int), max_msg_size: Option(Int), discard: Option(String), storage: Option(String), num_replicas: Option(Int), duplicate_window: Option(Int), allow_direct: Option(Bool), mirror_direct: Option(Bool), sealed: Option(Bool), deny_delete: Option(Bool), deny_purge: Option(Bool), allow_rollup_hdrs: Option(Bool), )
Info about stream returned from info
function.
pub type StreamInfo {
StreamInfo(
created: String,
config: StreamConfig,
state: StreamState,
)
}
Constructors
-
StreamInfo( created: String, config: StreamConfig, state: StreamState, )
pub type StreamMessage {
StreamMessage(sequence: Int, time: String, message: Message)
}
Constructors
-
StreamMessage(sequence: Int, time: String, message: Message)
Available options to set during stream creation and update.
pub type StreamOption {
Description(String)
Retention(RetentionPolicy)
MaxConsumers(Int)
MaxMessages(Int)
MaxBytes(Int)
MaxAge(Int)
MaxMessagesPerSubject(Int)
MaxMessageSize(Int)
Discard(DiscardPolicy)
Storage(StorageType)
NumReplicas(Int)
DuplicateWindow(Int)
AllowDirect(Bool)
MirrorDirect(Bool)
DenyDelete(Bool)
DenyPurge(Bool)
AllowRollup(Bool)
DiscardNewPerSubject(Bool)
}
Constructors
-
Description(String)
A verbose description of the stream.
-
Retention(RetentionPolicy)
Declares the retention policy for the stream.
See: https://docs.nats.io/nats-concepts/jetstream/streams#retentionpolicy
-
MaxConsumers(Int)
How many Consumers can be defined for a given Stream, -1 for unlimited.
-
MaxMessages(Int)
How many messages may be in a Stream. Adheres to Discard Policy, removing oldest or refusing new messages if the Stream exceeds this number of messages.
-
MaxBytes(Int)
How many bytes the Stream may contain. Adheres to Discard Policy, removing oldest or refusing new messages if the Stream exceeds this size
-
MaxAge(Int)
Maximum age of any message in the Stream, expressed in nanoseconds.
-
MaxMessagesPerSubject(Int)
Limits how many messages in the stream to retain per subject.
-
MaxMessageSize(Int)
The largest message that will be accepted by the Stream
-
Discard(DiscardPolicy)
The behavior of discarding messages when any streams’ limits have been reached.
See: https://docs.nats.io/nats-concepts/jetstream/streams#discardpolicy
-
Storage(StorageType)
The storage type for stream data.
-
NumReplicas(Int)
How many replicas to keep for each message in a clustered JetStream, maximum 5.
-
DuplicateWindow(Int)
The window within which to track duplicate messages, expressed in nanoseconds.
-
AllowDirect(Bool)
If true, and the stream has more than one replica, each replica will respond to direct get requests for individual messages, not only the leader.
-
MirrorDirect(Bool)
If true, and the stream is a mirror, the mirror will participate in a serving direct get requests for individual messages from origin stream.
-
DenyDelete(Bool)
Restricts the ability to delete messages from a stream via the API.
-
DenyPurge(Bool)
Restricts the ability to purge messages from a stream via the API.
-
AllowRollup(Bool)
Allows the use of the Nats-Rollup header to replace all contents of a stream, or subject in a stream, with a single new message.
-
DiscardNewPerSubject(Bool)
If
True
, applies discard new semantics on a per subject basis. RequiresDiscardPolicy
to beDiscardNew
and theMaxMessagesPerSubject
to be set.
Stream state.
pub type StreamState {
StreamState(
messages: Int,
bytes: Int,
first_seq: Int,
first_ts: String,
last_seq: Int,
last_ts: String,
consumer_count: Int,
)
}
Constructors
-
StreamState( messages: Int, bytes: Int, first_seq: Int, first_ts: String, last_seq: Int, last_ts: String, consumer_count: Int, )
Functions
pub fn create(conn: Subject(ConnectionMessage), name: String, subjects: List(
String,
), opts: List(StreamOption)) -> Result(
StreamInfo,
JetstreamError,
)
Creates a new stream.
Calling this when a stream by the same already exists will run successfully when no mutable field in the config is different.
Example
create(
conn,
"samplestream",
["sample.subject.>"],
[
Storage(MemoryStorage),
Retention(WorkQueuePolicy),
],
)
pub fn delete(conn: Subject(ConnectionMessage), name: String) -> Result(
Nil,
JetstreamError,
)
Deletes a stream.
pub fn find_stream_name_by_subject(conn: Subject(
ConnectionMessage,
), subject: String) -> Result(String, JetstreamError)
Tries to find a stream name by subject.
pub fn get_message(conn: Subject(ConnectionMessage), stream: String, method: AccessMethod) -> Result(
StreamMessage,
JetstreamError,
)
Directly fetches a message from a stream either by sequence ID
(by passing SequenceID(Int)
) or by subject (by passing
LastBySubject(String)
).
A subject can have wildcards (e.g. orders.*.item.>
), please
refer to Subject-Based messaging docs
for more info.
Keep in mind that allow_direct
has to be enabled in stream config
for this to work.
pub fn info(conn: Subject(ConnectionMessage), name: String) -> Result(
StreamInfo,
JetstreamError,
)
Get info about a stream by name.
pub fn purge(conn: Subject(ConnectionMessage), name: String) -> Result(
Int,
JetstreamError,
)
Purges all of the data in a Stream, leaves the Stream.