glats/jetstream/stream

Types

Access method type for a get message request to Jetstream.

See get_message.

pub type AccessMethod {
  SequenceID(Int)
  LastBySubject(String)
}

Constructors

  • SequenceID(Int)

    Used to get a message from a stream by sequence ID.

  • LastBySubject(String)

    Used to get newest message from a stream that matches a subject.

Used to set the discard policy of a stream.

pub type DiscardPolicy {
  DiscardOld
  DiscardNew
}

Constructors

  • DiscardOld

    This policy will delete the oldest messages in order to maintain the limit. For example, if MaxAge is set to one minute, the server will automatically delete messages older than one minute with this policy.

  • DiscardNew

    This policy will reject new messages from being appended to the stream if it would exceed one of the limits. An extension to this policy is DiscardNewPerSubject which will apply this policy on a per-subject basis within the stream.

Used to set the retention policy of a stream.

pub type RetentionPolicy {
  LimitsPolicy
  InterestPolicy
  WorkQueuePolicy
}

Constructors

  • LimitsPolicy

    Retention based on the various limits that are set including: MaxMessages, MaxBytes, MaxAge, and MaxMessagesPerSubject. If any of these limits are set, whichever limit is hit first will cause the automatic deletion of the respective message(s).

  • InterestPolicy

    Retention based on the consumer interest in the stream and messages. The base case is that there are zero consumers defined for a stream. If messages are published to the stream, they will be immediately deleted so there is no interest.

  • WorkQueuePolicy

    Retention with the typical behavior of a FIFO queue. Each message can be consumed only once. This is enforced by only allowing one consumer to be created for a work-queue stream.

Available config options for a single stream.

pub type StreamConfig {
  StreamConfig(
    name: String,
    subjects: List(String),
    retention: Option(String),
    max_consumers: Option(Int),
    max_msgs: Option(Int),
    max_bytes: Option(Int),
    max_age: Option(Int),
    max_msgs_per_subject: Option(Int),
    max_msg_size: Option(Int),
    discard: Option(String),
    storage: Option(String),
    num_replicas: Option(Int),
    duplicate_window: Option(Int),
    allow_direct: Option(Bool),
    mirror_direct: Option(Bool),
    sealed: Option(Bool),
    deny_delete: Option(Bool),
    deny_purge: Option(Bool),
    allow_rollup_hdrs: Option(Bool),
  )
}

Constructors

  • StreamConfig(
      name: String,
      subjects: List(String),
      retention: Option(String),
      max_consumers: Option(Int),
      max_msgs: Option(Int),
      max_bytes: Option(Int),
      max_age: Option(Int),
      max_msgs_per_subject: Option(Int),
      max_msg_size: Option(Int),
      discard: Option(String),
      storage: Option(String),
      num_replicas: Option(Int),
      duplicate_window: Option(Int),
      allow_direct: Option(Bool),
      mirror_direct: Option(Bool),
      sealed: Option(Bool),
      deny_delete: Option(Bool),
      deny_purge: Option(Bool),
      allow_rollup_hdrs: Option(Bool),
    )

Info about stream returned from info function.

pub type StreamInfo {
  StreamInfo(
    created: String,
    config: StreamConfig,
    state: StreamState,
  )
}

Constructors

  • StreamInfo(
      created: String,
      config: StreamConfig,
      state: StreamState,
    )
pub type StreamMessage {
  StreamMessage(sequence: Int, time: String, message: Message)
}

Constructors

  • StreamMessage(sequence: Int, time: String, message: Message)

Available options to set during stream creation and update.

pub type StreamOption {
  Description(String)
  Retention(RetentionPolicy)
  MaxConsumers(Int)
  MaxMessages(Int)
  MaxBytes(Int)
  MaxAge(Int)
  MaxMessagesPerSubject(Int)
  MaxMessageSize(Int)
  Discard(DiscardPolicy)
  Storage(StorageType)
  NumReplicas(Int)
  DuplicateWindow(Int)
  AllowDirect(Bool)
  MirrorDirect(Bool)
  DenyDelete(Bool)
  DenyPurge(Bool)
  AllowRollup(Bool)
  DiscardNewPerSubject(Bool)
}

Constructors

  • Description(String)

    A verbose description of the stream.

  • Retention(RetentionPolicy)

    Declares the retention policy for the stream.

    See: https://docs.nats.io/nats-concepts/jetstream/streams#retentionpolicy

  • MaxConsumers(Int)

    How many Consumers can be defined for a given Stream, -1 for unlimited.

  • MaxMessages(Int)

    How many messages may be in a Stream. Adheres to Discard Policy, removing oldest or refusing new messages if the Stream exceeds this number of messages.

  • MaxBytes(Int)

    How many bytes the Stream may contain. Adheres to Discard Policy, removing oldest or refusing new messages if the Stream exceeds this size

  • MaxAge(Int)

    Maximum age of any message in the Stream, expressed in nanoseconds.

  • MaxMessagesPerSubject(Int)

    Limits how many messages in the stream to retain per subject.

  • MaxMessageSize(Int)

    The largest message that will be accepted by the Stream

  • Discard(DiscardPolicy)

    The behavior of discarding messages when any streams’ limits have been reached.

    See: https://docs.nats.io/nats-concepts/jetstream/streams#discardpolicy

  • Storage(StorageType)

    The storage type for stream data.

  • NumReplicas(Int)

    How many replicas to keep for each message in a clustered JetStream, maximum 5.

  • DuplicateWindow(Int)

    The window within which to track duplicate messages, expressed in nanoseconds.

  • AllowDirect(Bool)

    If true, and the stream has more than one replica, each replica will respond to direct get requests for individual messages, not only the leader.

  • MirrorDirect(Bool)

    If true, and the stream is a mirror, the mirror will participate in a serving direct get requests for individual messages from origin stream.

  • DenyDelete(Bool)

    Restricts the ability to delete messages from a stream via the API.

  • DenyPurge(Bool)

    Restricts the ability to purge messages from a stream via the API.

  • AllowRollup(Bool)

    Allows the use of the Nats-Rollup header to replace all contents of a stream, or subject in a stream, with a single new message.

  • DiscardNewPerSubject(Bool)

    If True, applies discard new semantics on a per subject basis. Requires DiscardPolicy to be DiscardNew and the MaxMessagesPerSubject to be set.

Stream state.

pub type StreamState {
  StreamState(
    messages: Int,
    bytes: Int,
    first_seq: Int,
    first_ts: String,
    last_seq: Int,
    last_ts: String,
    consumer_count: Int,
  )
}

Constructors

  • StreamState(
      messages: Int,
      bytes: Int,
      first_seq: Int,
      first_ts: String,
      last_seq: Int,
      last_ts: String,
      consumer_count: Int,
    )

Functions

pub fn create(conn: Subject(ConnectionMessage), name: String, subjects: List(
    String,
  ), opts: List(StreamOption)) -> Result(
  StreamInfo,
  JetstreamError,
)

Creates a new stream.

Calling this when a stream by the same already exists will run successfully when no mutable field in the config is different.

Example

create(
  conn,
  "samplestream",
  ["sample.subject.>"],
  [
    Storage(MemoryStorage),
    Retention(WorkQueuePolicy),
  ],
)
pub fn delete(conn: Subject(ConnectionMessage), name: String) -> Result(
  Nil,
  JetstreamError,
)

Deletes a stream.

pub fn find_stream_name_by_subject(conn: Subject(
    ConnectionMessage,
  ), subject: String) -> Result(String, JetstreamError)

Tries to find a stream name by subject.

pub fn get_message(conn: Subject(ConnectionMessage), stream: String, method: AccessMethod) -> Result(
  StreamMessage,
  JetstreamError,
)

Directly fetches a message from a stream either by sequence ID (by passing SequenceID(Int)) or by subject (by passing LastBySubject(String)).

A subject can have wildcards (e.g. orders.*.item.>), please refer to Subject-Based messaging docs for more info.

Keep in mind that allow_direct has to be enabled in stream config for this to work.

pub fn info(conn: Subject(ConnectionMessage), name: String) -> Result(
  StreamInfo,
  JetstreamError,
)

Get info about a stream by name.

pub fn purge(conn: Subject(ConnectionMessage), name: String) -> Result(
  Int,
  JetstreamError,
)

Purges all of the data in a Stream, leaves the Stream.

pub fn update(conn: Subject(ConnectionMessage), name: String, opts: List(
    StreamOption,
  )) -> Result(StreamInfo, JetstreamError)

Updates the config of a stream.

Search Document