View Source Gnat (gnat v1.9.1)
The primary interface for interacting with NATS
Summary
Types
connection_timeout
- limits how long it can take to establish a connection to a serverhost
- The location of the NATS serverport
- The port the NATS server is listening onssl_opts
- Options for connecting over SSLtcp_opts
- Options for connecting over TCPtls
- If the server should use a TLS connectioninbox_prefix
- Prefix to use for the message inbox of this connectionno_responders
- Enable the no responders behavior (seeGnat.request/4
)
A message received from NATS will be delivered to your process in this form.
Please note that the :reply_to
and :headers
keys are optional.
They will only be present if the message was received from the NATS server with
headers or a reply_to
topic
Functions
Get the number of active subscriptions
Returns a specification to start this module under a supervisor.
Ping the NATS server
Publish a message
Send a request and listen for a response synchronously
Send a request and listen for multiple responses synchronously
Get information about the NATS server the connection is for
Starts a connection to a nats broker
Gracefully shuts down a connection
Subscribe to a topic
Unsubscribe from a topic
Types
@type connection_settings() :: %{ optional(:connection_timeout) => non_neg_integer(), optional(:host) => binary(), optional(:inbox_prefix) => binary(), optional(:port) => non_neg_integer(), optional(:ssl_opts) => list(), optional(:tcp_opts) => list(), optional(:tls) => boolean(), optional(:no_responders) => boolean() }
connection_timeout
- limits how long it can take to establish a connection to a serverhost
- The location of the NATS serverport
- The port the NATS server is listening onssl_opts
- Options for connecting over SSLtcp_opts
- Options for connecting over TCPtls
- If the server should use a TLS connectioninbox_prefix
- Prefix to use for the message inbox of this connectionno_responders
- Enable the no responders behavior (seeGnat.request/4
)
@type message() :: %{ :gnat => t(), :topic => binary(), :body => iodata(), :sid => non_neg_integer(), optional(:reply_to) => binary(), optional(:headers) => headers(), optional(:status) => String.t(), optional(:description) => String.t() }
A message received from NATS will be delivered to your process in this form.
Please note that the :reply_to
and :headers
keys are optional.
They will only be present if the message was received from the NATS server with
headers or a reply_to
topic
gnat
- The Gnat connectiontopic
- The topic on which the message arrivedbody
- The raw payload of the messagesid
- The subscription ID corresponding to this message. You generally won't need to use this value directly.reply_to
- A topic supplied for expected repliesheaders
- A set of NATS message headers on the messagestatus
- Similar to an HTTP status, this is present for messages with headers and can indicate the specific purpose of a message. Examplestatus: "408"
description
- A string description of thestatus
@type sent_message() :: {:msg, message()}
@type server_info() :: %{ :client_id => non_neg_integer(), :client_ip => binary(), optional(:ip) => binary(), optional(:cluster) => binary(), optional(:cluster_dynamic) => boolean(), optional(:connect_urls) => [binary()], optional(:ws_connect_urls) => [binary()], optional(:git_commit) => binary(), :go => binary(), :headers => boolean(), :host => binary(), optional(:jetstream) => binary(), :max_payload => integer(), :port => non_neg_integer(), :proto => integer(), :server_id => binary(), :server_name => binary(), :version => binary(), optional(:ldm) => boolean(), optional(:tls_verify) => boolean(), optional(:tls_available) => boolean(), optional(:tls_required) => boolean(), optional(:auth_required) => boolean() }
client_id
- An optional unsigned integer (64 bits) representing the internal client identifier in the server. This can be used to filter client connections in monitoring, correlate with error logs, etc...client_ip
- The IP address the client is connecting fromcluster
- The name of the cluster if anycluster_dynamic
- If the cluster is dynamicconnect_urls
- An optional list of server urls that a client can connect to.ws_connect_urls
- An optional list of server urls that a websocket client can connect to.git_commit
- The git commit associated with this NATS versiongo
- The version of golang the NATS server was built withheaders
- If messages can have headers in themhost
- The IP address used to start the NATS server, by default this will be 0.0.0.0 and can be configured with -client_advertise host:portjetstream
- If the server is using JetStream featuresmax_payload
- Maximum payload size, in bytes, that the server will accept from the clientport
- The port number the NATS server is configured to listen onproto
- An integer indicating the protocol version of the server. The server version 1.2.0 sets this to 1 to indicate that it supports the "Echo" feature.server_id
- The unique identifier of the NATS serverserver_name
- A name for the serverversion
- The version of the NATS serverldm
- If the server supports Lame Duck Mode notifications, and the current server has transitioned to lame duck, ldm will be set to true.auth_required
- If this is set, then the client should try to authenticate upon connect.tls_required
- If this is set, then the client must perform the TLS/1.2 handshake. Note, this used to be ssl_required and has been updated along with the protocol from SSL to TLS.tls_verify
- If this is set, the client must provide a valid certificate during the TLS handshake.tls_available
- If the server can use TLS
@type t() :: GenServer.server()
Functions
@spec active_subscriptions(t()) :: {:ok, non_neg_integer()}
Get the number of active subscriptions
Returns a specification to start this module under a supervisor.
See Supervisor
.
Ping the NATS server
This correlates to the PING command in the NATS protocol.
If the NATS server responds with a PONG message this function will return :ok
{:ok, gnat} = Gnat.start_link()
:ok = Gnat.ping(gnat)
Publish a message
{:ok, gnat} = Gnat.start_link()
:ok = Gnat.pub(gnat, "characters", "Ron Swanson")
If you want to provide a reply address to receive a response you can pass it as an option. See request-reply pattern.
{:ok, gnat} = Gnat.start_link()
:ok = Gnat.pub(gnat, "characters", "Star Lord", reply_to: "me")
If you want to publish a message with headers you can pass the :headers
key in the opts
like this.
{:ok, gnat} = Gnat.start_link()
:ok = Gnat.pub(gnat, "listen", "Yo", headers: [{"foo", "bar"}])
Headers must be passed as a t:headers()
value (a list of tuples).
Sending and parsing headers has more overhead than typical nats messages
(see the Nats 2.2 release notes for details),
so only use them when they are really valuable.
@spec request(t(), String.t(), binary(), keyword()) :: {:ok, message()} | {:error, :timeout} | {:error, :no_responders}
Send a request and listen for a response synchronously
Following the nats request-reply pattern this function generates a one-time topic to receive replies and then sends a message to the provided topic.
Supported options:
receive_timeout
- An integer number of milliseconds to wait for a response. Defaults to 60_000headers
- A set of headers you want to send with the request (seeGnat.pub/4
)
{:ok, gnat} = Gnat.start_link()
case Gnat.request(gnat, "i_can_haz_cheezburger", "plZZZZ?!?!?") do
{:ok, %{body: delicious_cheezburger}} -> :yum
{:error, :timeout} -> :sad_cat
end
No Responders
If you send a request to a topic that has no registered listeners, it is sometimes convenient to find out
right away, rather than waiting for a timeout to occur. In order to support this use-case, you can start
your Gnat connection with the no_responders: true
option and this function will return very quickly with
an {:error, :no_responders}
value. This behavior also works with request_multi/4
@spec request_multi(t(), String.t(), binary(), keyword()) :: {:ok, [message()]} | {:error, :no_responders}
Send a request and listen for multiple responses synchronously
This function makes it easy to do a scatter-gather operation where you wait for a limited time and optionally a maximum number of replies.
Supported options:
receive_timeout
- An integer number of milliseconds to wait for responses. Defaults to 60_000max_messages
- An integer number of messages to listen for. Defaults to -1 meaning unlimitedheaders
- A set of headers you want to send with the request (seeGnat.pub/4
)
{:ok, gnat} = Gnat.start_link()
{:ok, responses} = Gnat.request_multi(gnat, "i_can_haz_fries", "plZZZZZ!?!?", max_messages: 5)
Enum.count(responses) #=> 5
@spec server_info(t()) :: server_info()
Get information about the NATS server the connection is for
@spec start_link( connection_settings(), keyword() ) :: GenServer.on_start()
Starts a connection to a nats broker
{:ok, gnat} = Gnat.start_link(%{host: "127.0.0.1", port: 4222})
# if the server requires TLS you can start a connection with:
{:ok, gnat} = Gnat.start_link(%{host: "127.0.0.1", port: 4222, tls: true})
# if the server requires TLS and a client certificate you can start a connection with:
{:ok, gnat} = Gnat.start_link(%{tls: true, ssl_opts: [certfile: "client-cert.pem", keyfile: "client-key.pem"]})
# you can customize default "_INBOX." inbox prefix with:
{:ok, gnat} = Gnat.start_link(%{host: "127.0.0.1", port: 4222, inbox_prefix: "my_prefix._INBOX."})
# you can use IPv6 addresses too
{:ok, gnat} = Gnat.start_link(%{host: "::1", port: 4222, tcp_opts: [:inet6, :binary]})
You can also pass arbitrary SSL or TCP options in the tcp_opts
and ssl_opts
keys.
If you pass custom TCP options please include :binary
. Gnat uses binary matching to parse messages.
The final opts
argument will be passed to the GenServer.start_link
call so you can pass things like [name: :gnat_connection]
.
@spec stop(t()) :: :ok
Gracefully shuts down a connection
{:ok, gnat} = Gnat.start_link()
:ok = Gnat.stop(gnat)
@spec sub(t(), pid(), String.t(), keyword()) :: {:ok, non_neg_integer()} | {:ok, String.t()} | {:error, String.t()}
Subscribe to a topic
Supported options:
- queue_group: a string that identifies which queue group you want to join
By default each subscriber will receive a copy of every message on the topic. When a queue_group is supplied messages will be spread among the subscribers in the same group. (see nats queueing)
The subscribed process will begin receiving messages with a structure of sent_message/0
{:ok, gnat} = Gnat.start_link()
{:ok, subscription} = Gnat.sub(gnat, self(), "topic")
receive do
{:msg, %{topic: "topic", body: body}} ->
IO.puts "Received: #{body}"
end
@spec unsub(t(), non_neg_integer() | String.t(), keyword()) :: :ok
Unsubscribe from a topic
Supported options:
max_messages
- Number of messages to be received before automatically unsubscribed
This correlates to the UNSUB command in the nats protocol.
By default the unsubscribe is affected immediately, but an optional max_messages
value can be provided which will allow
max_messages
to be received before affecting the unsubscribe.
This is especially useful for request/reply patterns.
{:ok, gnat} = Gnat.start_link()
{:ok, subscription} = Gnat.sub(gnat, self(), "my_inbox")
:ok = Gnat.unsub(gnat, subscription)
# OR
:ok = Gnat.unsub(gnat, subscription, max_messages: 2)