View Source Getting Started

In this guide, we're going to learn how to install Jetstream in your project and start consuming messages from your streams.

Starting Jetstream

The following Docker Compose file will do the job:

version: "3"
services:
  nats:
    image: nats:latest
    command:
      - -js
    ports:
      - 4222:4222

Save this snippet as docker-compose.yml and run the following command:

docker compose up -d

Let's also create Jetstream stream where we will publish our hello world messages:

nats stream add HELLO --subjects="greetings"

Tip

You can also manage Jetstream streams and consumers via Elixir. You can see more details in this guide.

Adding Jetstream and Gnat to an application

To start off with, we'll generate a new Elixir application by running this command:

mix new hello_jetstream --sup

We need to have a supervision tree up and running in your app, and the --sup option ensures that.

To add Jetstream to this application, you need to add Jetstream and Gnat libraries to your deps definition in our mix.exs file. Fill exact version requirements from each package Hex.pm pages.

defp deps do
  [
    {:gnat, ...},
    {:jetstream, ...}
  ]
end

To install these dependencies, we will run this command:

mix deps.get

Now let's connect to our NATS server. To do this, you need to start Gnat.ConnectionSupervisor under our application's supervision tree. Add following to lib/hello_jetstream/application.ex:

def start(_type, _args) do
  children = [
    ...

    # Create NATS connection
    {Gnat.ConnectionSupervisor,
      %{
        name: :gnat,
        connection_settings: [
          %{host: "localhost", port: 4222}
        ]
      }},
  ]

  ...

This piece of configuration will start Gnat processes that connect to the NATS server and allow publishing and subscribing to any subjects. Jetstream operates using plain NATS subjects which follow specific naming and message format conventions.

Let's now create a pull consumer which will subscribe a specific Jetstream stream and print incoming messages to standard output.

Creating a pull consumer

Jetstream requires us to allocate a view/cursor of the stream that our consumer will operate on. In Jetstream terminology, this view is called a consumer (Funnily enough we've just implemented a consumer in our code, coincidence?). Jetstream documentation offers great insights on benefits of having this separate concept so we won't duplicate work here.

Jetstream offers two stream consuming modes: push and pull.

In push mode, Jetstream will simply send messages to selected consumers immediately when they are received. This approach does offer congestion control, so it is not recommended for high-volume and/or reliability sensitive streams. You do not really need this library to implement push consumer because all building blocks are in Gnat library. You can read more about push consumers in this guide.

On the other hand, in pull mode consumers ask Jetstream for more messages when they are ready to process them. This is the recommended approach for most use cases and we will proceed with it in this guide.

This is just a brief outline

For more details about differences between consumer modes, consult Jetstream documentation.

Let's create a pull consumer module within our application at lib/hello_jetstream/logger_pull_consumer.ex:

defmodule HelloJetstream.LoggerPullConsumer do
  use Jetstream.PullConsumer

  def start_link([]) do
    Jetstream.PullConsumer.start_link(__MODULE__, [])
  end

  @impl true
  def init([]) do
    {:ok, nil, connection_name: :gnat, stream_name: "HELLO", consumer_name: "LOGGER"}
  end

  @impl true
  def handle_message(message, state) do
    IO.inspect(message)
    {:ack, state}
  end
end

Pull Consumer is a regular GenServer and it takes a reference to Gnat.ConnectionSupervisor along with names of Jetstream stream and consumer as options passed to Jetstream.PullConsumer.start* functions. These options are passed as keyword list in third element of tuple returned from the c:Jetstream.PullConsumer.init/1 callback.

The only required callbacks are well known gen server's c:Jetstream.PullConsumer.init/1 and c:Jetstream.PullConsumer.handle_message/2, which takes new message as its first argument and is expected to return an ACK action instructing underlying process loop what to do with this message. Here we are asking it to automatically send for us an ACK message back to Jetstream.

Let's now create a consumer in our NATS server. We will call it LOGGER as we plan to let it simply log everything published to the stream.

nats consumer add --pull --deliver=all HELLO LOGGER

Now, let's start our pull consumer under application's supervision tree.

def start(_type, _args) do
  children = [
    ...

    # Jetstream Pull Consumer
    HelloJetstream.LoggerPullConsumer,
  ]

  ...

Let's now publish some messages to our HELLO stream, so something will be waiting for our application to be read when it starts.

Publishing messages to streams

Jetstream listens on regular NATS subjects, so publishing messages is dead simple with Gnat.pub/3:

Gnat.pub(:gnat, "greetings", "Hello World")

Or via NATS CLI:

nats pub greetings "Hello World"

That's it! When you run your app, you should see your messages being read by your application.