ExRabbitMQ RPC v1.0.0 ExRabbitMQ.RPC.Client behaviour View Source
A behavior module for implementing a RabbitMQ RPC client with a GenServer.
It uses the ExRabbitMQ.Consumer, which in-turn uses the AMQP library to configure and consume messages from a
queue that are actually the response messages of the requests. This queue is set as the reply_to header in the AMQP
message so that the RPC server knows where to reply with a response message. Additionally, all request messages are
“tagged” with a correlation_id which the RPC server also includes in the response message, so that the RPC client
can be track and relate it.
A typical implementation of this behavior is to call the setup_client/2 on GenServer.init/1 and then call the
request/4 for sending request messages. When the response message is received the handle_response/3 will be
invoked.
Make sure that before starting a ExRabbitMQ.RPC.Client to already run in your supervision tree the
ExRabbitMQ.Connection.Supervisor.
Example
defmodule MyClient do
use GenServer
use ExRabbitMQ.RPC.Client
def start_link(args) do
GenServer.start_link(__MODULE__, args, [])
end
def init(_) do
{:ok, state} = setup_client(:connection, %{})
{:ok, state}
end
def request_something(client, queue, value) do
GenServer.cast(client, {:request_something, queue, value})
end
def handle_cast({:request_something, queue, value}, state) do
payload = Poison.encode!(value)
{:ok, _correlation_id} = request(payload, "", queue)
{:noreply, state}line
end
def handle_response({:ok, payload}, correlation_id, state) do
# Do some processing here...
{:noreply, state}
end
end
Link to this section Summary
Callbacks
Invoked when a message has been received from RabbitMQ which is a response message from the RPC server for a request we previously did
Publishes a request message with payload to specified exchange and queue
Opens a RabbitMQ connection & channel and configures a default queue for receiving responses
Opens a RabbitMQ connection & channel and configures the queue for receiving responses
Link to this section Callbacks
Invoked when a message has been received from RabbitMQ which is a response message from the RPC server for a request we previously did.
Parameters
The parameter response has the result of the request and the can take the following values:
{:ok, payload}- the RPC server has replied with a response message for our request.{:error, reason}- when there was an error with the response of the request. If thereasonhas the value:expired, then the:expirationvalue in the request message has been exceeded, meaning that the RPC server didn’t respond within this time.
The parameter correlation_id is the id of the request that this response is related to. This value was set
previously with the call of the request/4 function and the RPC server returned it back with the response message.
The parameter state is the state of the GenServer process.
This callback should return a value, as in GenServer.handle_info/2.
Publishes a request message with payload to specified exchange and queue.
This function will publish a message on a queue that a RPC server is consuming, which we will receive the response
message through the handle_response/3 callback. This function must be called from the ExRabbitMQ.RPC.Client
process, as it needs the process’s dictionary which contain the connection & channel information.
Parameters
The parameter payload is the payload of the request message to be sent to the RPC server.
The parameter exchange is the RabbitMQ exchange to use for routing this message.
The parameter queue is the RabbitMQ queue to deliver this message. This queue must be the queue that an RPC server
is consuming.
The parameter opts is a keyword list with the publishing options. The publish options are the same as in
AMQP.Basic.publish/5 but with a few changes:
:correlation_id- if not specified, will be set to an auto-generated one (usingUUID.uuid4/0),:reply_to- cannot be overrided and will be always set as the queue name as configured withsetup_client/2orsetup_client/3,:timestamp- if not specified, will be set to the current time,:expiration- if not specified, will be set to 5000ms. For no expiration, it needs to be set to a value that is less or equal than zero.
The return value can be:
{:ok, correlation_id}- the request has been published successfully. Thecorrelation_idis an id for this request, that the RPC server will include in the response message, and this process can relate it when receives this response,{:error, reason}- the request has failed to be published with the returnedreason.
setup_client(connection_config :: connection, state :: term()) :: {:ok, new_state} | {:error, reason :: term(), new_state} when new_state: term(), connection: atom() | %ExRabbitMQ.Connection.Config{ heartbeat: term(), host: term(), password: term(), port: term(), reconnect_after: term(), username: term(), vhost: term() }
Opens a RabbitMQ connection & channel and configures a default queue for receiving responses.
Almost same as the setup_client/3 but without any parameters for configuring the queue. Instead it will configure
a temporary queue on RabbitMQ just for receiving message, that will be deleted when the channel is down.
The configuration of the queue will be:
%QueueConfig{
queue: "rpc.gen-" <> UUID.uuid4(),
queue_opts: [exclusive: true, auto_delete: true],
consume_opts: [no_ack: false]
}
For more information about the usage, also check the documentation of the function setup_client/3.
setup_client( connection_config :: connection, state :: term(), opts :: keyword() ) :: {:ok, new_state} | {:error, reason :: term(), new_state} when new_state: term(), connection: atom() | %ExRabbitMQ.Connection.Config{ heartbeat: term(), host: term(), password: term(), port: term(), reconnect_after: term(), username: term(), vhost: term() }
Opens a RabbitMQ connection & channel and configures the queue for receiving responses.
This function calls the function ExRabbitMQ.Consumer.xrmq_init/3 for creating a new RabbitMQ connection &
channel and configure the exchange & queue for consuming incoming response messages. This queue will be set in the
reply_to header of the AMQP message and will be used by the RPC server to reply back with a response message.
This function is usually called on GenServer.init/1 callback.
Parameters
The parameter connection_config specifies the configuration of the RabbitMQ connection. If set to an atom,
the configuration will be loaded from the application’s config.exs under the app key :exrabbitmq,
eg. if the value is set to :default_connection, then the config.exs should have configuration like the following:
config :exrabbitmq, :default_connection,
username: "guest",
password: "guest",
host: "localhost",
port: 5672
The parameter connection_config can also be set to the struct ExRabbitMQ.Connection.Config which allows
to programatically configure the connection without config.exs.
The parameter state is the state of the GenServer process.
The optional parameter opts provides additional options for setting up the RabbitMQ client.
The available options are:
:queue- specifies a custom Queue configuration. If set to an atom, the configuration will be loaded from the application’s config.exs under the app key :exrabbitmq, eg. if the value is set to:default_queue, then the config.exs should have configuration like the following:
config :exrabbitmq, :default_queue,
queue: "test_queue",
queue_opts: [durable: true],
consume_opts: [no_ack: true]
If not set, then a temporary queue on RabbitMQ just for receiving message, that will be deleted when the channel is down. The configuration of the queue will be:
%QueueConfig{
queue: "rpc.gen-" <> UUID.uuid4(),
queue_opts: [exclusive: true, auto_delete: true],
consume_opts: [no_ack: false]
}
:queue_prefix- allows to specify the prefix of the generated queue name, which by default isrpc.gen-. If the:queueoption is set, this setting will be ignored.
The return of the function can be {:ok, state} when the consumer has been successfully registered or on error the
tuple {:error, reason, state}.
For more information about the connection & queue configuration, please check the documentation of the function
ExRabbitMQ.Consumer.xrmq_init/3.