glats/jetstream/consumer

Types

The requirement of client acknowledgements.

pub type AckPolicy {
  AckExplicit
  AckNone
  AckAll
}

Constructors

  • AckExplicit

    The default policy. It means that each individual message must be acknowledged. It is recommended to use this mode, as it provides the most reliability and functionality.

  • AckNone

    You do not have to ack any messages, the server will assume ack on delivery.

  • AckAll

    If you receive a series of messages, you only have to ack the last one you received. All the previous messages received are automatically acknowledged at the same time.

Avaialble config options for a single consumer.

pub type ConsumerConfig {
  ConsumerConfig(
    durable_name: Option(String),
    description: Option(String),
    filter_subject: Option(String),
    ack_policy: AckPolicy,
    ack_wait: Option(Int),
    deliver_policy: DeliverPolicy,
    inactive_threshold: Option(Int),
    max_ack_pending: Option(Int),
    max_deliver: Option(Int),
    replay_policy: ReplayPolicy,
    num_replicas: Option(Int),
    sample_freq: Option(String),
    deliver_subject: Option(String),
    deliver_group: Option(String),
    headers_only: Option(Bool),
  )
}

Constructors

  • ConsumerConfig(
      durable_name: Option(String),
      description: Option(String),
      filter_subject: Option(String),
      ack_policy: AckPolicy,
      ack_wait: Option(Int),
      deliver_policy: DeliverPolicy,
      inactive_threshold: Option(Int),
      max_ack_pending: Option(Int),
      max_deliver: Option(Int),
      replay_policy: ReplayPolicy,
      num_replicas: Option(Int),
      sample_freq: Option(String),
      deliver_subject: Option(String),
      deliver_group: Option(String),
      headers_only: Option(Bool),
    )
pub type ConsumerInfo {
  ConsumerInfo(
    stream: String,
    name: String,
    created: String,
    config: ConsumerConfig,
    delivered: SequenceInfo,
    ack_floor: SequenceInfo,
    num_ack_pending: Int,
    num_redelivered: Int,
    num_waiting: Int,
    num_pending: Int,
  )
}

Constructors

  • ConsumerInfo(
      stream: String,
      name: String,
      created: String,
      config: ConsumerConfig,
      delivered: SequenceInfo,
      ack_floor: SequenceInfo,
      num_ack_pending: Int,
      num_redelivered: Int,
      num_waiting: Int,
      num_pending: Int,
    )

Available options to set during consumer creation and update.

pub type ConsumerOption {
  DurableName(String)
  Description(String)
  FilterSubject(String)
  AckPolicy(AckPolicy)
  AckWait(Int)
  DeliverPolicy(DeliverPolicy)
  InactiveThreshold(Int)
  MaxAckPending(Int)
  MaxDeliver(Int)
  ReplayPolicy(ReplayPolicy)
  NumReplicas(Int)
  SampleFrequency(String)
  MaxWaiting(Int)
  MaxRequestExpires(Int)
  MaxRequestBatch(Int)
  MaxRequestMaxBytes(Int)
  DeliverSubject(String)
  DeliverGroup(String)
  HeadersOnly
}

Constructors

  • DurableName(String)

    If set, clients can have subscriptions bind to the consumer and resume until the consumer is explicitly deleted. A durable name cannot contain whitespace, ., *, >, path separators (forward or backwards slash), and non-printable characters.

  • Description(String)

    A description of the consumer. This can be particularly useful for ephemeral consumers to indicate their purpose since the durable name cannot be provided.

  • FilterSubject(String)

    An overlapping subject with the subjects bound to the stream which will filter the set of messages received by the consumer.

  • AckPolicy(AckPolicy)

    The requirement of client acknowledgements, either AckExplicit, AckNone, or AckAll.

  • AckWait(Int)

    The duration that the server will wait for an ack for any individual message once it has been delivered to a consumer. If an ack is not received in time, the message will be redelivered.

  • DeliverPolicy(DeliverPolicy)

    The point in the stream to receive messages from, either DeliverAll, DeliverLast, DeliverNew, DeliverByStartSequence, DeliverByStartTime, or DeliverLastPerSubject.

  • InactiveThreshold(Int)

    Duration that instructs the server to cleanup consumers that are inactive for that long. Prior to 2.9, this only applied to ephemeral consumers.

  • MaxAckPending(Int)

    Defines the maximum number of messages, without an acknowledgement, that can be outstanding. Once this limit is reached message delivery will be suspended. This limit applies across all of the consumer’s bound subscriptions. A value of -1 means there can be any number of pending acks (i.e. no flow control). This does not apply when the AckNone policy is used.

  • MaxDeliver(Int)

    The maximum number of times a specific message delivery will be attempted. Applies to any message that is re-sent due to ack policy (i.e. due to a negative ack, or no ack sent by the client).

  • ReplayPolicy(ReplayPolicy)

    If the policy is ReplayOriginal, the messages in the stream will be pushed to the client at the same rate that they were originally received, simulating the original timing of messages. If the policy is ReplayInstant (the default), the messages will be pushed to the client as fast as possible while adhering to the Ack Policy, Max Ack Pending and the client’s ability to consume those messages.

  • NumReplicas(Int)

    Sets the number of replicas for the consumer’s state. By default, when the value is set to zero, consumers inherit the number of replicas from the stream.

  • SampleFrequency(String)

    Sets the percentage of acknowledgements that should be sampled for observability, 0-100 This value is a string and for example allows both 30 and 30% as valid values.

  • MaxWaiting(Int)

    The maximum number of waiting pull requests.

  • MaxRequestExpires(Int)

    The maximum duration a single pull request will wait for messages to be available to pull.

  • MaxRequestBatch(Int)

    The maximum batch size a single pull request can make. When set with MaxRequestMaxBytes, the batch size will be constrained by whichever limit is hit first.

  • MaxRequestMaxBytes(Int)

    The maximum total bytes that can be requested in a given batch. When set with MaxRequestBatch, the batch size will be constrained by whichever limit is hit first.

  • DeliverSubject(String)

    The subject to deliver messages to. Note, setting this field implicitly decides whether the consumer is push or pull-based. With a deliver subject, the server will push messages to client subscribed to this subject.

  • DeliverGroup(String)

    The queue group name which, if specified, is then used to distribute the messages between the subscribers to the consumer. This is analogous to a queue group in core NATS.

  • HeadersOnly

    Delivers only the headers of messages in the stream and not the bodies. Additionally adds Nats-Msg-Size header to indicate the size of the removed payload.

Available delivier policies to select the point in the stream to start consuming from.

pub type DeliverPolicy {
  DeliverAll
  DeliverLast
  DeliverLastPerSubject
  DeliverNew
  DeliverByStartSequence(Int)
  DeliverByStartTime(String)
}

Constructors

  • DeliverAll

    The default policy. The consumer will start receiving from the earliest available message.

  • DeliverLast

    When first consuming messages, the consumer will start receiving messages with the last message added to the stream, or the last message in the stream that matches the consumer’s filter subject if defined.

  • DeliverLastPerSubject

    When first consuming messages, start with the latest one for each filtered subject currently in the stream.

  • DeliverNew

    When first consuming messages, the consumer will only start receiving messages that were created after the consumer was created.

  • DeliverByStartSequence(Int)

    When first consuming messages, start at the first message having the sequence number or the next one available.

  • DeliverByStartTime(String)

    When first consuming messages, start with messages on or after this time.

The policy to control how to replay messages from a stream.

pub type ReplayPolicy {
  ReplayInstant
  ReplayOriginal
}

Constructors

  • ReplayInstant

    The default policy. The messages will be pushed to the client as fast as possible while adhering to the Ack Policy, Max Ack Pending and the client’s ability to consume those messages.

  • ReplayOriginal

    The messages in the stream will be pushed to the client at the same rate that they were originally received, simulating the original timing of messages.

Options for request_batch.

pub type RequestBatchOption {
  Batch(Int)
  NoWait
  Expires(Int)
}

Constructors

  • Batch(Int)

    The number of messages to receive. Defaults to 1.

  • NoWait

    Get an empty message immediately if no new ones exist for the consumer.

  • Expires(Int)

    Expiry time for the request in nanoseconds.

pub type SequenceInfo {
  SequenceInfo(consumer_seq: Int, stream_seq: Int)
}

Constructors

  • SequenceInfo(consumer_seq: Int, stream_seq: Int)

An active subscription to a consumer.

pub opaque type Subscription

Available options to set during subscribe to a stream subject.

pub type SubscriptionOption {
  Bind(String, String)
  BindStream(String)
  With(ConsumerOption)
}

Constructors

  • Bind(String, String)

    Used to bind to an existing stream and consumer while subscribing.

  • BindStream(String)

    Used to bind to an existing stream.

  • With(ConsumerOption)

    When not binding to an existing consumer this can be used to add consumer options for the consumer that will be created automatically.

Functions

pub fn create(conn: Subject(ConnectionMessage), stream: String, opts: List(
    ConsumerOption,
  )) -> Result(ConsumerInfo, JetstreamError)

Creates a new consumer

pub fn delete(conn: Subject(ConnectionMessage), stream: String, name: String) -> Result(
  Nil,
  JetstreamError,
)

Deletes a consumer

pub fn info(conn: Subject(ConnectionMessage), stream: String, name: String) -> Result(
  ConsumerInfo,
  JetstreamError,
)

Get info about a consumer by stream and name.

pub fn names(conn: Subject(ConnectionMessage), stream: String) -> Result(
  Nil,
  JetstreamError,
)

Get list of consumer names in a stream.

pub fn request_batch(sub: Subscription, opts: List(
    RequestBatchOption,
  )) -> Result(Nil, JetstreamError)

Request a batch of messages for a pull subscription.

pub fn subscribe(conn: Subject(ConnectionMessage), subscriber: Subject(
    SubscriptionMessage,
  ), topic: String, opts: List(SubscriptionOption)) -> Result(
  Subscription,
  JetstreamError,
)

Subscribe to a topic in a stream.

  • If no option is provided it will attempt to look up a stream by the topic and create an ephemeral consumer for the subscription.
  • If Bind("stream", "consumer") is provided it will subsribe to the stream and existing consumer, failing if either do not exist.
  • If BindStream("stream") is provided it will not attempt to lookup the stream by topic but creates an ephemeral consumer for the subscription.

In the cases where an ephemeral consumer will be created With(ConsumerOption) can be provided to configure it.

Search Document