Gnat (gnat v1.3.0) View Source

Link to this section Summary

Functions

get the number of active subscriptions

Returns a specification to start this module under a supervisor.

ping(pid) deprecated

Ping the NATS server

Send a request and listen for a response synchronously

Starts a connection to a nats broker

Gracefully shuts down a connection

Unsubscribe from a topic

Link to this section Types

Specs

headers() :: [{binary(), iodata()}]

Specs

message() :: %{
  gnat: t(),
  topic: String.t(),
  body: String.t(),
  sid: non_neg_integer(),
  reply_to: String.t(),
  headers: headers()
}

Specs

sent_message() :: {:msg, message()}

Specs

t() :: GenServer.server()

Link to this section Functions

Link to this function

active_subscriptions(pid)

View Source

Specs

active_subscriptions(t()) :: {:ok, non_neg_integer()}

get the number of active subscriptions

Returns a specification to start this module under a supervisor.

See Supervisor.

This function is deprecated. Pinging is handled internally by the connection, this functionality will be removed.

Ping the NATS server

This correlates to the PING command in the NATS protocol. If the NATS server responds with a PONG message this function will return :ok

{:ok, gnat} = Gnat.start_link()
:ok = Gnat.ping(gnat)
Link to this function

pub(pid, topic, message, opts \\ [])

View Source

Specs

pub(t(), String.t(), binary(), keyword()) :: :ok

Publish a message

{:ok, gnat} = Gnat.start_link()
:ok = Gnat.pub(gnat, "characters", "Ron Swanson")

If you want to provide a reply address to receive a response you can pass it as an option. See request-response pattern.

{:ok, gnat} = Gnat.start_link()
:ok = Gnat.pub(gnat, "characters", "Star Lord", reply_to: "me")

If you want to publish a message with headers you can pass the :headers key in the opts like this.

{:ok, gnat} = Gnat.start_link()
:ok = Gnat.pub(gnat, "listen", "Yo", headers: [{"foo", "bar"}])

Headers must be passed as a t:headers() value (a list of tuples). Sending and parsing headers has more overhead than typical nats messages (see the Nats 2.2 release notes for details), so only use them when they are really valuable.

Link to this function

request(pid, topic, body, opts \\ [])

View Source

Specs

request(t(), String.t(), binary(), keyword()) ::
  {:ok, message()} | {:error, :timeout}

Send a request and listen for a response synchronously

Following the nats request-response pattern this function generates a one-time topic to receive replies and then sends a message to the provided topic.

Supported options:

  • receive_timeout: an integer number of milliseconds to wait for a response. Defaults to 60_000
  • headers: a set of headers you want to send with the request (see Gnat.pub/4)
{:ok, gnat} = Gnat.start_link()
case Gnat.request(gnat, "i_can_haz_cheezburger", "plZZZZ?!?!?") do
  {:ok, %{body: delicious_cheezburger}} -> :yum
  {:error, :timeout} -> :sad_cat
end
Link to this function

start_link(connection_settings \\ %{}, opts \\ [])

View Source

Specs

start_link(map(), keyword()) :: GenServer.on_start()

Starts a connection to a nats broker

{:ok, gnat} = Gnat.start_link(%{host: '127.0.0.1', port: 4222})
# if the server requires TLS you can start a connection with:
{:ok, gnat} = Gnat.start_link(%{host: '127.0.0.1', port: 4222, tls: true})
# if the server requires TLS and a client certificate you can start a connection with:
{:ok, gnat} = Gnat.start_link(%{tls: true, ssl_opts: [certfile: "client-cert.pem", keyfile: "client-key.pem"]})

You can also pass arbitrary SSL or TCP options in the tcp_opts and ssl_opts keys. If you pass custom TCP options please include :binary. Gnat uses binary matching to parse messages.

The final opts argument will be passed to the GenServer.start_link call so you can pass things like [name: :gnat_connection].

Specs

stop(t()) :: :ok

Gracefully shuts down a connection

{:ok, gnat} = Gnat.start_link()
:ok = Gnat.stop(gnat)
Link to this function

sub(pid, subscriber, topic, opts \\ [])

View Source

Specs

sub(t(), pid(), String.t(), keyword()) ::
  {:ok, non_neg_integer()} | {:ok, String.t()} | {:error, String.t()}

Subscribe to a topic

Supported options:

  • queue_group: a string that identifies which queue group you want to join

By default each subscriber will receive a copy of every message on the topic. When a queue_group is supplied messages will be spread among the subscribers in the same group. (see nats queueing)

The subscribed process will begin receiving messages with a structure of sent_message/0

{:ok, gnat} = Gnat.start_link()
{:ok, subscription} = Gnat.sub(gnat, self(), "topic")
receive do
  {:msg, %{topic: "topic", body: body}} ->
    IO.puts "Received: #{body}"
end
Link to this function

unsub(pid, sid, opts \\ [])

View Source

Specs

unsub(t(), non_neg_integer() | String.t(), keyword()) :: :ok

Unsubscribe from a topic

Supported options:

  • max_messages: number of messages to be received before automatically unsubscribed

This correlates to the UNSUB command in the nats protocol. By default the unsubscribe is affected immediately, but an optional max_messages value can be provided which will allow max_messages to be received before affecting the unsubscribe. This is especially useful for request response patterns.

{:ok, gnat} = Gnat.start_link()
{:ok, subscription} = Gnat.sub(gnat, self(), "my_inbox")
:ok = Gnat.unsub(gnat, subscription)
# OR
:ok = Gnat.unsub(gnat, subscription, max_messages: 2)