glats/jetstream/handler

A convenience handler that will handle a consumer subscription for you. For every message it receives it will call the provided SubscriptionHandler(a) function and take action depending on its return value.

It will also keep the state for you of type a.

Pull consumer example

import gleam/io
import gleam/int
import gleam/string
import gleam/result
import gleam/function
import gleam/erlang/process
import glats.{Connection, Message}
import glats/jetstream/stream.{Retention, WorkQueuePolicy}
import glats/jetstream/consumer.{
  AckExplicit, AckPolicy, BindStream, Description, With,
}
import glats/jetstream/handler.{Ack}

pub fn main() {
  use conn <- result.then(glats.connect("localhost", 4222, []))

  // Create a stream
  let assert Ok(stream) =
    stream.create(conn, "wqstream", ["ticket.>"], [Retention(WorkQueuePolicy)])

  // Run pull handler
  let assert Ok(_actor) =
    handler.handle_pull_consumer(
      conn,
      0,            // Initial state
      "ticket.*",   // Topic
      100,          // Batch size
      pull_handler, // Handler function
      [
        // Bind to stream created above
        BindStream(stream.config.name),
        // Set description for the ephemeral consumer
        With(Description("An ephemeral consumer for subscription")),
        // Set ack policy for the consumer
        With(AckPolicy(AckExplicit)),
      ],
    )

  // Run a loop that publishes a message every 100ms
  publish_loop(conn, 0)

  Ok(Nil)
}

// Publishes a new message every 100ms
fn publish_loop(conn: Connection, counter: Int) {
  let assert Ok(Nil) =
    glats.publish(
      conn,
      "ticket." <> int.to_string(counter),
      "ticket body",
      [],
    )

  process.sleep(100)

  publish_loop(conn, counter + 1)
}

// Handler function for the pull consumer handler
pub fn pull_handler(message: Message, state) {
  // Increment state counter, print message and instruct
  // pull handler to ack the message.
  state + 1
  |> function.tap(print_message(_, message.topic, message.body))
  |> Ack
}

fn print_message(num: Int, topic: String, body: String) {
  "message " <> int.to_string(num) <> " (" <> topic <> "): " <> body
  |> io.println
}

Will output:

message 1 (ticket.0): ticket body
message 2 (ticket.1): ticket body
message 3 (ticket.2): ticket body
message 4 (ticket.3): ticket body
message 5 (ticket.4): ticket body
...

Types

Used to instruct the consumer handler what to do with the processed message. Also used to return new state.

pub type Outcome(a) {
  Ack(a)
  Nack(a)
  Term(a)
  NoReply(a)
}

Constructors

  • Ack(a)

    Acknowledge message and save state.

  • Nack(a)

    Negatively acknowledge message and save state.

  • Term(a)

    Terminate message and save state.

  • NoReply(a)

    Do nothing with message and save state.

The handler func that should be passed to the consumer handler.

pub type SubscriptionHandler(a) =
  fn(glats.Message, a) -> Outcome(a)

Functions

pub fn handle_pull_consumer(
  conn: Subject(ConnectionMessage),
  initial_state: a,
  topic: String,
  batch_size: Int,
  handler: fn(Message, a) -> Outcome(a),
  opts: List(SubscriptionOption),
) -> Result(Subject(SubscriptionMessage), StartError)

Start a pull consumer handler actor.

Search Document