glats/jetstream/stream

Types

Access method type for a get message request to Jetstream.

See get_message.

pub type AccessMethod {
  SequenceID(Int)
  LastBySubject(String)
}

Constructors

  • SequenceID(Int)

    Used to get a message from a stream by sequence ID.

  • LastBySubject(String)

    Used to get newest message from a stream that matches a subject.

Available config options for a single stream.

pub type StreamConfig {
  StreamConfig(
    name: String,
    subjects: List(String),
    retention: Option(String),
    max_consumers: Option(Int),
    max_msgs: Option(Int),
    max_bytes: Option(Int),
    max_age: Option(Int),
    max_msgs_per_subject: Option(Int),
    max_msg_size: Option(Int),
    discard: Option(String),
    storage: Option(String),
    num_replicas: Option(Int),
    duplicate_window: Option(Int),
    allow_direct: Option(Bool),
    mirror_direct: Option(Bool),
    sealed: Option(Bool),
    deny_delete: Option(Bool),
    deny_purge: Option(Bool),
    allow_rollup_hdrs: Option(Bool),
  )
}

Constructors

  • StreamConfig(
      name: String,
      subjects: List(String),
      retention: Option(String),
      max_consumers: Option(Int),
      max_msgs: Option(Int),
      max_bytes: Option(Int),
      max_age: Option(Int),
      max_msgs_per_subject: Option(Int),
      max_msg_size: Option(Int),
      discard: Option(String),
      storage: Option(String),
      num_replicas: Option(Int),
      duplicate_window: Option(Int),
      allow_direct: Option(Bool),
      mirror_direct: Option(Bool),
      sealed: Option(Bool),
      deny_delete: Option(Bool),
      deny_purge: Option(Bool),
      allow_rollup_hdrs: Option(Bool),
    )

Info about stream returned from info function.

pub type StreamInfo {
  StreamInfo(
    created: String,
    config: StreamConfig,
    state: StreamState,
  )
}

Constructors

  • StreamInfo(
      created: String,
      config: StreamConfig,
      state: StreamState,
    )
pub type StreamMessage {
  StreamMessage(sequence: Int, time: String, message: Message)
}

Constructors

  • StreamMessage(sequence: Int, time: String, message: Message)

Available options to set during stream creation and update.

pub type StreamOption {
  Description(String)
  Retention(RetentionPolicy)
  MaxConsumers(Int)
  MaxMessages(Int)
  MaxBytes(Int)
  MaxAge(Int)
  MaxMessagesPerSubject(Int)
  MaxMessageSize(Int)
  Discard(DiscardPolicy)
  Storage(StorageType)
  NumReplicas(Int)
  DuplicateWindow(Int)
  AllowDirect(Bool)
  MirrorDirect(Bool)
  DenyDelete(Bool)
  DenyPurge(Bool)
  AllowRollup(Bool)
}

Constructors

  • Description(String)

    A verbose description of the stream.

  • Retention(RetentionPolicy)

    Declares the retention policy for the stream.

    See: https://docs.nats.io/nats-concepts/jetstream/streams#retentionpolicy

  • MaxConsumers(Int)

    How many Consumers can be defined for a given Stream, -1 for unlimited.

  • MaxMessages(Int)

    How many messages may be in a Stream. Adheres to Discard Policy, removing oldest or refusing new messages if the Stream exceeds this number of messages.

  • MaxBytes(Int)

    How many bytes the Stream may contain. Adheres to Discard Policy, removing oldest or refusing new messages if the Stream exceeds this size

  • MaxAge(Int)

    Maximum age of any message in the Stream, expressed in nanoseconds.

  • MaxMessagesPerSubject(Int)

    Limits how many messages in the stream to retain per subject.

  • MaxMessageSize(Int)

    The largest message that will be accepted by the Stream

  • Discard(DiscardPolicy)

    The behavior of discarding messages when any streams’ limits have been reached.

    See: https://docs.nats.io/nats-concepts/jetstream/streams#discardpolicy

  • Storage(StorageType)

    The storage type for stream data.

  • NumReplicas(Int)

    How many replicas to keep for each message in a clustered JetStream, maximum 5.

  • DuplicateWindow(Int)

    The window within which to track duplicate messages, expressed in nanoseconds.

  • AllowDirect(Bool)

    If true, and the stream has more than one replica, each replica will respond to direct get requests for individual messages, not only the leader.

  • MirrorDirect(Bool)

    If true, and the stream is a mirror, the mirror will participate in a serving direct get requests for individual messages from origin stream.

  • DenyDelete(Bool)

    Restricts the ability to delete messages from a stream via the API.

  • DenyPurge(Bool)

    Restricts the ability to purge messages from a stream via the API.

  • AllowRollup(Bool)

    Allows the use of the Nats-Rollup header to replace all contents of a stream, or subject in a stream, with a single new message.

Stream state.

pub type StreamState {
  StreamState(
    messages: Int,
    bytes: Int,
    first_seq: Int,
    first_ts: String,
    last_seq: Int,
    last_ts: String,
    consumer_count: Int,
  )
}

Constructors

  • StreamState(
      messages: Int,
      bytes: Int,
      first_seq: Int,
      first_ts: String,
      last_seq: Int,
      last_ts: String,
      consumer_count: Int,
    )

Functions

pub fn create(conn: Subject(ConnectionMessage), name: String, subjects: List(
    String,
  ), opts: List(StreamOption)) -> Result(
  StreamInfo,
  JetstreamError,
)

Creates a new stream.

Calling this when a stream by the same already exists will run successfully when no mutable field in the config is different.

Example

create(
  conn,
  "samplestream",
  ["sample.subject.>"],
  [
    Storage(MemoryStorage),
    Retention(WorkQueuePolicy),
  ],
)
pub fn delete(conn: Subject(ConnectionMessage), name: String) -> Result(
  Nil,
  JetstreamError,
)

Deletes a stream.

pub fn find_stream_name_by_subject(conn: Subject(
    ConnectionMessage,
  ), subject: String) -> Result(String, JetstreamError)

Tries to find a stream name by subject.

pub fn get_message(conn: Subject(ConnectionMessage), stream: String, method: AccessMethod) -> Result(
  StreamMessage,
  JetstreamError,
)

Directly fetches a message from a stream either by sequence ID (by passing SequenceID(Int)) or by subject (by passing LastBySubject(String)).

A subject can have wildcards (e.g. orders.*.item.>), please refer to Subject-Based messaging docs for more info.

Keep in mind that allow_direct has to be enabled in stream config for this to work.

pub fn info(conn: Subject(ConnectionMessage), name: String) -> Result(
  StreamInfo,
  JetstreamError,
)

Get info about a stream by name.

pub fn purge(conn: Subject(ConnectionMessage), name: String) -> Result(
  Int,
  JetstreamError,
)

Purges all of the data in a Stream, leaves the Stream.

pub fn update(conn: Subject(ConnectionMessage), name: String, opts: List(
    StreamOption,
  )) -> Result(StreamInfo, JetstreamError)

Updates the config of a stream.

Search Document