View Source Gnat (gnat v1.9.1)

The primary interface for interacting with NATS

Summary

Types

  • connection_timeout - limits how long it can take to establish a connection to a server
  • host - The location of the NATS server
  • port - The port the NATS server is listening on
  • ssl_opts - Options for connecting over SSL
  • tcp_opts - Options for connecting over TCP
  • tls - If the server should use a TLS connection
  • inbox_prefix - Prefix to use for the message inbox of this connection
  • no_responders - Enable the no responders behavior (see Gnat.request/4)

A message received from NATS will be delivered to your process in this form. Please note that the :reply_to and :headers keys are optional. They will only be present if the message was received from the NATS server with headers or a reply_to topic

t()

Functions

Get the number of active subscriptions

Returns a specification to start this module under a supervisor.

ping(pid) deprecated

Ping the NATS server

Send a request and listen for a response synchronously

Send a request and listen for multiple responses synchronously

Get information about the NATS server the connection is for

Starts a connection to a nats broker

Gracefully shuts down a connection

Unsubscribe from a topic

Types

connection_settings()

@type connection_settings() :: %{
  optional(:connection_timeout) => non_neg_integer(),
  optional(:host) => binary(),
  optional(:inbox_prefix) => binary(),
  optional(:port) => non_neg_integer(),
  optional(:ssl_opts) => list(),
  optional(:tcp_opts) => list(),
  optional(:tls) => boolean(),
  optional(:no_responders) => boolean()
}
  • connection_timeout - limits how long it can take to establish a connection to a server
  • host - The location of the NATS server
  • port - The port the NATS server is listening on
  • ssl_opts - Options for connecting over SSL
  • tcp_opts - Options for connecting over TCP
  • tls - If the server should use a TLS connection
  • inbox_prefix - Prefix to use for the message inbox of this connection
  • no_responders - Enable the no responders behavior (see Gnat.request/4)

headers()

@type headers() :: [{binary(), iodata()}]

message()

@type message() :: %{
  :gnat => t(),
  :topic => binary(),
  :body => iodata(),
  :sid => non_neg_integer(),
  optional(:reply_to) => binary(),
  optional(:headers) => headers(),
  optional(:status) => String.t(),
  optional(:description) => String.t()
}

A message received from NATS will be delivered to your process in this form. Please note that the :reply_to and :headers keys are optional. They will only be present if the message was received from the NATS server with headers or a reply_to topic

  • gnat - The Gnat connection
  • topic - The topic on which the message arrived
  • body - The raw payload of the message
  • sid - The subscription ID corresponding to this message. You generally won't need to use this value directly.
  • reply_to - A topic supplied for expected replies
  • headers - A set of NATS message headers on the message
  • status - Similar to an HTTP status, this is present for messages with headers and can indicate the specific purpose of a message. Example status: "408"
  • description - A string description of the status

sent_message()

@type sent_message() :: {:msg, message()}

server_info()

@type server_info() :: %{
  :client_id => non_neg_integer(),
  :client_ip => binary(),
  optional(:ip) => binary(),
  optional(:cluster) => binary(),
  optional(:cluster_dynamic) => boolean(),
  optional(:connect_urls) => [binary()],
  optional(:ws_connect_urls) => [binary()],
  optional(:git_commit) => binary(),
  :go => binary(),
  :headers => boolean(),
  :host => binary(),
  optional(:jetstream) => binary(),
  :max_payload => integer(),
  :port => non_neg_integer(),
  :proto => integer(),
  :server_id => binary(),
  :server_name => binary(),
  :version => binary(),
  optional(:ldm) => boolean(),
  optional(:tls_verify) => boolean(),
  optional(:tls_available) => boolean(),
  optional(:tls_required) => boolean(),
  optional(:auth_required) => boolean()
}

Info Protocol

  • client_id - An optional unsigned integer (64 bits) representing the internal client identifier in the server. This can be used to filter client connections in monitoring, correlate with error logs, etc...
  • client_ip - The IP address the client is connecting from
  • cluster - The name of the cluster if any
  • cluster_dynamic - If the cluster is dynamic
  • connect_urls - An optional list of server urls that a client can connect to.
  • ws_connect_urls - An optional list of server urls that a websocket client can connect to.
  • git_commit - The git commit associated with this NATS version
  • go - The version of golang the NATS server was built with
  • headers - If messages can have headers in them
  • host - The IP address used to start the NATS server, by default this will be 0.0.0.0 and can be configured with -client_advertise host:port
  • jetstream - If the server is using JetStream features
  • max_payload - Maximum payload size, in bytes, that the server will accept from the client
  • port - The port number the NATS server is configured to listen on
  • proto - An integer indicating the protocol version of the server. The server version 1.2.0 sets this to 1 to indicate that it supports the "Echo" feature.
  • server_id - The unique identifier of the NATS server
  • server_name - A name for the server
  • version - The version of the NATS server
  • ldm - If the server supports Lame Duck Mode notifications, and the current server has transitioned to lame duck, ldm will be set to true.
  • auth_required - If this is set, then the client should try to authenticate upon connect.
  • tls_required - If this is set, then the client must perform the TLS/1.2 handshake. Note, this used to be ssl_required and has been updated along with the protocol from SSL to TLS.
  • tls_verify - If this is set, the client must provide a valid certificate during the TLS handshake.
  • tls_available - If the server can use TLS

t()

@type t() :: GenServer.server()

Functions

active_subscriptions(pid)

@spec active_subscriptions(t()) :: {:ok, non_neg_integer()}

Get the number of active subscriptions

child_spec(init_arg)

Returns a specification to start this module under a supervisor.

See Supervisor.

ping(pid)

This function is deprecated. Pinging is handled internally by the connection, this functionality will be removed.

Ping the NATS server

This correlates to the PING command in the NATS protocol. If the NATS server responds with a PONG message this function will return :ok

{:ok, gnat} = Gnat.start_link()
:ok = Gnat.ping(gnat)

pub(pid, topic, message, opts \\ [])

@spec pub(t(), String.t(), binary(), keyword()) :: :ok

Publish a message

{:ok, gnat} = Gnat.start_link()
:ok = Gnat.pub(gnat, "characters", "Ron Swanson")

If you want to provide a reply address to receive a response you can pass it as an option. See request-reply pattern.

{:ok, gnat} = Gnat.start_link()
:ok = Gnat.pub(gnat, "characters", "Star Lord", reply_to: "me")

If you want to publish a message with headers you can pass the :headers key in the opts like this.

{:ok, gnat} = Gnat.start_link()
:ok = Gnat.pub(gnat, "listen", "Yo", headers: [{"foo", "bar"}])

Headers must be passed as a t:headers() value (a list of tuples). Sending and parsing headers has more overhead than typical nats messages (see the Nats 2.2 release notes for details), so only use them when they are really valuable.

request(pid, topic, body, opts \\ [])

@spec request(t(), String.t(), binary(), keyword()) ::
  {:ok, message()} | {:error, :timeout} | {:error, :no_responders}

Send a request and listen for a response synchronously

Following the nats request-reply pattern this function generates a one-time topic to receive replies and then sends a message to the provided topic.

Supported options:

  • receive_timeout - An integer number of milliseconds to wait for a response. Defaults to 60_000
  • headers - A set of headers you want to send with the request (see Gnat.pub/4)
{:ok, gnat} = Gnat.start_link()
case Gnat.request(gnat, "i_can_haz_cheezburger", "plZZZZ?!?!?") do
  {:ok, %{body: delicious_cheezburger}} -> :yum
  {:error, :timeout} -> :sad_cat
end

No Responders

If you send a request to a topic that has no registered listeners, it is sometimes convenient to find out right away, rather than waiting for a timeout to occur. In order to support this use-case, you can start your Gnat connection with the no_responders: true option and this function will return very quickly with an {:error, :no_responders} value. This behavior also works with request_multi/4

request_multi(pid, topic, body, opts \\ [])

@spec request_multi(t(), String.t(), binary(), keyword()) ::
  {:ok, [message()]} | {:error, :no_responders}

Send a request and listen for multiple responses synchronously

This function makes it easy to do a scatter-gather operation where you wait for a limited time and optionally a maximum number of replies.

Supported options:

  • receive_timeout - An integer number of milliseconds to wait for responses. Defaults to 60_000
  • max_messages - An integer number of messages to listen for. Defaults to -1 meaning unlimited
  • headers - A set of headers you want to send with the request (see Gnat.pub/4)
{:ok, gnat} = Gnat.start_link()
{:ok, responses} = Gnat.request_multi(gnat, "i_can_haz_fries", "plZZZZZ!?!?", max_messages: 5)
Enum.count(responses) #=> 5

server_info(name)

@spec server_info(t()) :: server_info()

Get information about the NATS server the connection is for

start_link(connection_settings \\ %{}, opts \\ [])

@spec start_link(
  connection_settings(),
  keyword()
) :: GenServer.on_start()

Starts a connection to a nats broker

{:ok, gnat} = Gnat.start_link(%{host: "127.0.0.1", port: 4222})
# if the server requires TLS you can start a connection with:
{:ok, gnat} = Gnat.start_link(%{host: "127.0.0.1", port: 4222, tls: true})
# if the server requires TLS and a client certificate you can start a connection with:
{:ok, gnat} = Gnat.start_link(%{tls: true, ssl_opts: [certfile: "client-cert.pem", keyfile: "client-key.pem"]})
# you can customize default "_INBOX." inbox prefix with:
{:ok, gnat} = Gnat.start_link(%{host: "127.0.0.1", port: 4222, inbox_prefix: "my_prefix._INBOX."})
# you can use IPv6 addresses too
{:ok, gnat} = Gnat.start_link(%{host: "::1", port: 4222, tcp_opts: [:inet6, :binary]})

You can also pass arbitrary SSL or TCP options in the tcp_opts and ssl_opts keys. If you pass custom TCP options please include :binary. Gnat uses binary matching to parse messages.

The final opts argument will be passed to the GenServer.start_link call so you can pass things like [name: :gnat_connection].

stop(pid)

@spec stop(t()) :: :ok

Gracefully shuts down a connection

{:ok, gnat} = Gnat.start_link()
:ok = Gnat.stop(gnat)

sub(pid, subscriber, topic, opts \\ [])

@spec sub(t(), pid(), String.t(), keyword()) ::
  {:ok, non_neg_integer()} | {:ok, String.t()} | {:error, String.t()}

Subscribe to a topic

Supported options:

  • queue_group: a string that identifies which queue group you want to join

By default each subscriber will receive a copy of every message on the topic. When a queue_group is supplied messages will be spread among the subscribers in the same group. (see nats queueing)

The subscribed process will begin receiving messages with a structure of sent_message/0

{:ok, gnat} = Gnat.start_link()
{:ok, subscription} = Gnat.sub(gnat, self(), "topic")
receive do
  {:msg, %{topic: "topic", body: body}} ->
    IO.puts "Received: #{body}"
end

unsub(pid, sid, opts \\ [])

@spec unsub(t(), non_neg_integer() | String.t(), keyword()) :: :ok

Unsubscribe from a topic

Supported options:

  • max_messages - Number of messages to be received before automatically unsubscribed

This correlates to the UNSUB command in the nats protocol. By default the unsubscribe is affected immediately, but an optional max_messages value can be provided which will allow max_messages to be received before affecting the unsubscribe. This is especially useful for request/reply patterns.

{:ok, gnat} = Gnat.start_link()
{:ok, subscription} = Gnat.sub(gnat, self(), "my_inbox")
:ok = Gnat.unsub(gnat, subscription)
# OR
:ok = Gnat.unsub(gnat, subscription, max_messages: 2)