glats/jetstream/stream
Types
Access method type for a get message request to Jetstream.
See get_message
.
pub type AccessMethod {
SequenceID(Int)
LastBySubject(String)
}
Constructors
-
SequenceID(Int)
Used to get a message from a stream by sequence ID.
-
LastBySubject(String)
Used to get newest message from a stream that matches a subject.
Available config options for a single stream.
pub type StreamConfig {
StreamConfig(
name: String,
subjects: List(String),
retention: Option(String),
max_consumers: Option(Int),
max_msgs: Option(Int),
max_bytes: Option(Int),
max_age: Option(Int),
max_msgs_per_subject: Option(Int),
max_msg_size: Option(Int),
discard: Option(String),
storage: Option(String),
num_replicas: Option(Int),
duplicate_window: Option(Int),
allow_direct: Option(Bool),
mirror_direct: Option(Bool),
sealed: Option(Bool),
deny_delete: Option(Bool),
deny_purge: Option(Bool),
allow_rollup_hdrs: Option(Bool),
)
}
Constructors
-
StreamConfig( name: String, subjects: List(String), retention: Option(String), max_consumers: Option(Int), max_msgs: Option(Int), max_bytes: Option(Int), max_age: Option(Int), max_msgs_per_subject: Option(Int), max_msg_size: Option(Int), discard: Option(String), storage: Option(String), num_replicas: Option(Int), duplicate_window: Option(Int), allow_direct: Option(Bool), mirror_direct: Option(Bool), sealed: Option(Bool), deny_delete: Option(Bool), deny_purge: Option(Bool), allow_rollup_hdrs: Option(Bool), )
Info about stream returned from info
function.
pub type StreamInfo {
StreamInfo(
created: String,
config: StreamConfig,
state: StreamState,
)
}
Constructors
-
StreamInfo( created: String, config: StreamConfig, state: StreamState, )
pub type StreamMessage {
StreamMessage(sequence: Int, time: String, message: Message)
}
Constructors
-
StreamMessage(sequence: Int, time: String, message: Message)
Available options to set during stream creation and update.
pub type StreamOption {
Description(String)
Retention(RetentionPolicy)
MaxConsumers(Int)
MaxMessages(Int)
MaxBytes(Int)
MaxAge(Int)
MaxMessagesPerSubject(Int)
MaxMessageSize(Int)
Discard(DiscardPolicy)
Storage(StorageType)
NumReplicas(Int)
DuplicateWindow(Int)
AllowDirect(Bool)
MirrorDirect(Bool)
DenyDelete(Bool)
DenyPurge(Bool)
AllowRollup(Bool)
}
Constructors
-
Description(String)
A verbose description of the stream.
-
Retention(RetentionPolicy)
Declares the retention policy for the stream.
See: https://docs.nats.io/nats-concepts/jetstream/streams#retentionpolicy
-
MaxConsumers(Int)
How many Consumers can be defined for a given Stream, -1 for unlimited.
-
MaxMessages(Int)
How many messages may be in a Stream. Adheres to Discard Policy, removing oldest or refusing new messages if the Stream exceeds this number of messages.
-
MaxBytes(Int)
How many bytes the Stream may contain. Adheres to Discard Policy, removing oldest or refusing new messages if the Stream exceeds this size
-
MaxAge(Int)
Maximum age of any message in the Stream, expressed in nanoseconds.
-
MaxMessagesPerSubject(Int)
Limits how many messages in the stream to retain per subject.
-
MaxMessageSize(Int)
The largest message that will be accepted by the Stream
-
Discard(DiscardPolicy)
The behavior of discarding messages when any streams’ limits have been reached.
See: https://docs.nats.io/nats-concepts/jetstream/streams#discardpolicy
-
Storage(StorageType)
The storage type for stream data.
-
NumReplicas(Int)
How many replicas to keep for each message in a clustered JetStream, maximum 5.
-
DuplicateWindow(Int)
The window within which to track duplicate messages, expressed in nanoseconds.
-
AllowDirect(Bool)
If true, and the stream has more than one replica, each replica will respond to direct get requests for individual messages, not only the leader.
-
MirrorDirect(Bool)
If true, and the stream is a mirror, the mirror will participate in a serving direct get requests for individual messages from origin stream.
-
DenyDelete(Bool)
Restricts the ability to delete messages from a stream via the API.
-
DenyPurge(Bool)
Restricts the ability to purge messages from a stream via the API.
-
AllowRollup(Bool)
Allows the use of the Nats-Rollup header to replace all contents of a stream, or subject in a stream, with a single new message.
Stream state.
pub type StreamState {
StreamState(
messages: Int,
bytes: Int,
first_seq: Int,
first_ts: String,
last_seq: Int,
last_ts: String,
consumer_count: Int,
)
}
Constructors
-
StreamState( messages: Int, bytes: Int, first_seq: Int, first_ts: String, last_seq: Int, last_ts: String, consumer_count: Int, )
Functions
pub fn create(conn: Subject(ConnectionMessage), name: String, subjects: List(
String,
), opts: List(StreamOption)) -> Result(
StreamInfo,
JetstreamError,
)
Creates a new stream.
Calling this when a stream by the same already exists will run successfully when no mutable field in the config is different.
Example
create(
conn,
"samplestream",
["sample.subject.>"],
[
Storage(MemoryStorage),
Retention(WorkQueuePolicy),
],
)
pub fn delete(conn: Subject(ConnectionMessage), name: String) -> Result(
Nil,
JetstreamError,
)
Deletes a stream.
pub fn find_stream_name_by_subject(conn: Subject(
ConnectionMessage,
), subject: String) -> Result(String, JetstreamError)
Tries to find a stream name by subject.
pub fn get_message(conn: Subject(ConnectionMessage), stream: String, method: AccessMethod) -> Result(
StreamMessage,
JetstreamError,
)
Directly fetches a message from a stream either by sequence ID
(by passing SequenceID(Int)
) or by subject (by passing
LastBySubject(String)
).
A subject can have wildcards (e.g. orders.*.item.>
), please
refer to Subject-Based messaging docs
for more info.
Keep in mind that allow_direct
has to be enabled in stream config
for this to work.
pub fn info(conn: Subject(ConnectionMessage), name: String) -> Result(
StreamInfo,
JetstreamError,
)
Get info about a stream by name.
pub fn purge(conn: Subject(ConnectionMessage), name: String) -> Result(
Int,
JetstreamError,
)
Purges all of the data in a Stream, leaves the Stream.