glats

Package Version Hex Docs

A NATS client for Gleam. This wraps Elixir’s client, Gnat.

Publish

import gleam/result
import glats

pub fn main() {
  use conn <- result.then(glats.connect("localhost", 4222, []))

  // Publish a single message to "some.topic".
  let assert Ok(Nil) = glats.publish(conn, "some.topic", "hello world!", [])

  Ok(Nil)
}

Subscribe

import gleam/result
import gleam/erlang/process
import glats

pub fn main() {
  use conn <- result.then(glats.connect("localhost", 4222, []))

  let subject = process.new_subject()

  // Subscribe to "some.topic".
  // Messages will be delivered to the erlang subject passed in.
  let assert Ok(sid) = glats.subscribe(conn, subject, "some.topic", [])

  // Publish a single message to "some.topic".
  let assert Ok(Nil) = glats.publish(conn, "some.topic", "hello world!", [])

  // Receive from erlang subject.
  let assert Ok(glats.ReceivedMessage(
    conn: _conn,
    sid: _sid,
    status: _status,
    message: glats.Message(
      topic: _topic,
      headers: _headers,
      reply_to: _reply_to,
      body: _body,
    ),
  )) = process.receive(subject, 1000)

  // Unsubscribe from the subscription.
  let assert Ok(Nil) = glats.unsubscribe(conn, sid)

  Ok(Nil)
}

Request handler

import gleam/io
import gleam/option.{None}
import gleam/result
import gleam/erlang/process
import glats
import glats/handler.{Reply, Request, Response}

pub fn main() {
  use conn <- result.then(glats.connect("localhost", 4222, []))

  // Start a request handler actor that will call `ping_pong_handler`
  // for every request received from NATS topic "do.ping".
  let assert Ok(_actor) =
    handler.handle_request(conn, [], "do.ping", [], ping_pong_handler)

  process.sleep_forever()

  Ok(Nil)
}

pub fn ping_pong_handler(req: Request, state) {
  // Got message: Hello
  io.println("Got message: " <> req.body)

  // Reply with a message with the same headers and append to body.
  Reply(
    Response(
      headers: req.headers,
      reply_to: None,
      body: req.body <> " from glats!",
    ),
    state,
  )
}

Then in shell with natscli.

$ nats req do.ping 'Hello'
12:16:47 Sending request on "do.ping"
12:16:47 Received with rtt 427.64µs
Hello from glats!

Jetstream

Pull subscription

Pull consumers allow clients to request batches of messages on demand. For best performance set the batch size to as high as possible, but keep in mind that the client should be able to process (and ack) the entire batch in the AckWait time window of the consumer.

In the example below we request a batch of 50 messages with NoWait set. This instructs NATS to immediately return a response, event when no pending messages exist in the stream (in which case we get a message with status 404). That way we can exit the program when we’re caught up.

Normally you would request a batch with an expiry time (option Expires(Int)), in which case the request lasts for the time requested (and a message with status 408 is received when it expires).

This allows the client to control the flow of messages instead of NATS flooding it when there is high load as is possible with push consumers.

More info on pull consumers

All the logic in this example is handled for you in glats/jetstream/handler.{handle_pull_consumer}, see docs.

import gleam/io
import gleam/string
import gleam/result
import gleam/option.{Some}
import gleam/otp/actor.{InitFailed}
import gleam/erlang/process.{Abnormal}
import glats.{ReceivedMessage}
import glats/jetstream
import glats/jetstream/stream
import glats/jetstream/consumer.{
  AckExplicit, AckPolicy, Batch, BindStream, Description, InactiveThreshold,
  NoWait, Subscription, With,
}

const batch_size = 50

type State {
  State(sub: Subscription, remaining: Int)
}

pub fn main() {
  use conn <- result.then(glats.connect("localhost", 4222, []))

  // Create a stream
  let assert Ok(stream) =
    stream.create(conn, "mystream", ["orders.>", "items.>"], [])

  // Publish 3 messages to subjects contained in the stream
  let assert Ok(Nil) = glats.publish(conn, "orders.1", "order_data", [])
  let assert Ok(Nil) = glats.publish(conn, "orders.2", "order_data", [])
  let assert Ok(Nil) = glats.publish(conn, "items.1", "item_data", [])

  // Subscribe to subject in the stream using an ephemeral consumer
  let subject = process.new_subject()
  let assert Ok(sub) =
    consumer.subscribe(
      conn,
      subject,
      "orders.*",
      [
        // Bind to stream created above
        BindStream(stream.config.name),
        // Set description for the ephemeral consumer
        With(Description("An ephemeral consumer for subscription")),
        // Set ack policy for the consumer
        With(AckPolicy(AckExplicit)),
        // Sets the inactive threshold of the ephemeral consumer
        With(InactiveThreshold(60_000_000_000)),
      ],
    )
    |> io.debug

  // Start loop
  case request_and_loop(subject, State(sub: sub, remaining: 0)) {
    Ok(Nil) -> Ok(Nil)
    Error(err) -> {
      io.println("got error: " <> string.inspect(err))

      Error(InitFailed(Abnormal(string.inspect(err))))
    }
  }
}

// Requests a batch of messages for the pull consumer and updates state
fn request_and_loop(subject, state: State) {
  case consumer.request_batch(state.sub, [Batch(batch_size), NoWait]) {
    Ok(Nil) -> loop(subject, State(..state, remaining: batch_size))
    Error(_) -> Error(Nil)
  }
}

// Receives messages
fn loop(subject, state: State) {
  case process.receive(subject, 2000) {
    // When request expires we get a message with status of `408`.
    Ok(ReceivedMessage(status: Some(408), ..)) -> {
      io.println("request expired, requesting more")

      request_and_loop(subject, state)
    }

    // When request is made with `NoWait` we get a message with status
    // of `404` when no new messages exist.
    Ok(ReceivedMessage(status: Some(404), ..)) -> {
      io.println("no new messages")

      Ok(Nil)
    }

    // We got a new message!
    Ok(ReceivedMessage(conn: conn, message: msg, ..)) -> {
      // Print message
      io.debug(msg)

      // Acknowledge message
      let assert Ok(_) = jetstream.ack(conn, msg)

      // Keep track of remaining messages in the request
      case state.remaining <= 1 {
        // Request more
        True -> request_and_loop(subject, state)
        // Decrement the counter
        False -> loop(subject, State(..state, remaining: state.remaining - 1))
      }
    }

    // An error!
    Error(err) -> Error(err)
  }
}

Push subscription

With push consumers messages are automatically published on a specified subject.

This example is considerably simpler than the pull consumer above but keep in mind that with push consumers the flow of messages is harder to control.

More info on push consumers

import gleam/io
import gleam/result
import gleam/erlang/process.{Subject}
import glats.{ReceivedMessage, SubscriptionMessage}
import glats/jetstream
import glats/jetstream/stream
import glats/jetstream/consumer.{
  AckExplicit, AckPolicy, BindStream, DeliverSubject, Description,
  InactiveThreshold, With,
}

pub fn main() {
  use conn <- result.then(glats.connect("localhost", 4222, []))

  // Create a stream
  let assert Ok(stream) =
    stream.create(conn, "mystream", ["orders.>", "items.>"], [])

  // Publish 3 messages to subjects contained in the stream
  let assert Ok(Nil) = glats.publish(conn, "orders.1", "order_data", [])
  let assert Ok(Nil) = glats.publish(conn, "orders.2", "order_data", [])
  let assert Ok(Nil) = glats.publish(conn, "items.1", "item_data", [])

  // Generate a random inbox topic for the push consumer's delivery topic
  let inbox = glats.new_inbox()

  // Subscribe to subject in the stream using an ephemeral consumer
  let subject = process.new_subject()
  let assert Ok(_sub) =
    consumer.subscribe(
      conn,
      subject,
      "orders.*",
      [
        // Bind to stream created above
        BindStream(stream.config.name),
        // Make it a push consumer
        With(DeliverSubject(inbox)),
        // Set description for the ephemeral consumer
        With(Description("An ephemeral consumer for subscription")),
        // Set ack policy for the consumer
        With(AckPolicy(AckExplicit)),
        // Sets the inactive threshold of the ephemeral consumer
        With(InactiveThreshold(60_000_000_000)),
      ],
    )
    |> io.debug

  // Start loop
  loop(subject)
}

fn loop(subject: Subject(SubscriptionMessage)) {
  case process.receive(subject, 2000) {
    // New message received
    Ok(ReceivedMessage(conn: conn, message: msg, ..)) -> {
      // Print message
      io.debug(msg)

      // Acknowledge message
      let assert Ok(Nil) = jetstream.ack(conn, msg)

      // Run loop again
      loop(subject)
    }

    // Error!
    Error(Nil) -> {
      io.println("no new message in 2 seconds")

      Ok(Nil)
    }
  }
}

Installation

gleam add glats

and its documentation can be found at https://hexdocs.pm/glats.

Search Document