hare v0.1.9 Hare.RPC.Client behaviour
A behaviour module for implementing AMQP RPC client processes.
The Hare.RPC.Client
module provides a way to create processes that hold,
monitor, and restart a channel in case of failure, exports a function perform
RPC to an exchange and receive the response, and some callbacks to hook into
the process lifecycle.
An example Hare.RPC.Client
process that performs a RPC request and returns
the response, but returns "already_requested"
when a request with the same
payload has already been performed.
defmodule MyRPCClient do
use Hare.RPC.Client
def start_link(conn, config, opts \ []) do
Hare.RPC.Client.start_link(__MODULE__, conn, config, :ok, opts)
end
def init(:ok) do
{:ok, MapSet.new}
end
def handle_request(payload, _routing_key, _opts, state) do
case MapSet.member?(cache, payload) do
{:ok, response} -> {:reply, "already_requested", state}
:error -> {:ok, MapSet.put(state, payload)}
end
end
end
Channel handling
When the Hare.RPC.Client
starts with start_link/5
it runs the init/1
callback
and responds with {:ok, pid}
on success, like a GenServer.
After starting the process it attempts to open a channel on the given connection. It monitors the channel, and in case of failure it tries to reopen again and again on the same connection.
Context setup
The context setup process for a RPC client is to declare the exchange to perform requests to, declare a exclusive server-named queue to receive responses, and consume that server-named queue.
Every time a channel is open the context is set up, meaning that the exchange and a new server-named queue are declared, and the queue is consumed through the new channel based on the given configuration.
The configuration must be a Keyword.t
that contains a single key: :exchange
whose value is the configuration for the Hare.Context.Action.DeclareExchange
.
Check it for more detailed information.
Summary
Functions
Performs a RPC request and blocks until the response arrives
Starts a Hare.RPC.Client
process linked to the current process
Callbacks
Called every time the channel has been opened, the exchange and the server-named queue declared, and the queue consumed
Called when the process receives a message
Called when the AMQP server has registered the process as a consumer of the server-named queue and it will start to receive messages
Called before a request will be performed to the exchange
Called when the RPC client process is first started. start_link/5
will block
until it returns
This callback is the same as the GenServer
equivalent and is called when the
process terminates. The first argument is the reason the process is about
to exit with
Types
Functions
request(pid, payload, routing_key, opts, timeout) :: {:ok, response :: binary} | {:error, reason :: term}
Performs a RPC request and blocks until the response arrives.
A timeout bound to the same rules as the GenServer
timeout may be
specified (5 seconds by default)
start_link(module, pid, config, initial :: term, GenServer.options) :: GenServer.on_start
Starts a Hare.RPC.Client
process linked to the current process.
This function is used to start a Hare.RPC.Client
process in a supervision
tree. The process will be started by calling init
with the given initial
value.
Arguments:
mod
- the module that defines the server callbacks (like GenServer)conn
- the pid of aHare.Core.Conn
processconfig
- the configuration of the publisher (describing the exchange to declare)initial
- the value that will be given toinit/1
opts
- the GenServer options
Callbacks
Called every time the channel has been opened, the exchange and the server-named queue declared, and the queue consumed.
It is called with two arguments: some metadata and the process’ internal state.
The metadata is a map with the following fields:
:req_exchange
- theHare.Core.Exchange
to issue requests to:resp_queue
- the server-namedHare.Core.Queue
to consume the responses from
Returning {:noreply, state}
will cause the process to enter the main loop
with state
as its internal state.
Returning {:stop, reason, state}
will terminate the loop and call
terminate(reason, state)
before the process exists with reason reason
.
Called when the process receives a message.
Returning {:noreply, state}
will causes the process to enter the main loop
with the given state.
Returning {:stop, reason, state}
will not send the message, terminate the
main loop and call terminate(reason, state)
before the process exists with
reason reason
.
Called when the AMQP server has registered the process as a consumer of the server-named queue and it will start to receive messages.
Returning {:noreply, state}
will causes the process to enter the main loop
with the given state.
Returning {:stop, reason, state}
will not send the message, terminate the
main loop and call terminate(reason, state)
before the process exists with
reason reason
.
handle_request(payload, routing_key, opts :: term, state) :: {:ok, state} | {:ok, payload, routing_key, opts :: term, state} | {:reply, response :: term, state} | {:stop, reason :: term, response :: binary, state}
Called before a request will be performed to the exchange.
It receives as argument the message payload, the routing key, the options for that publication and the internal state.
Returning {:ok, state}
will cause the request to be performed with no
modification, block the client until the response is received, and enter
the main loop with the given state.
Returning {:ok, payload, routing_key, opts, state}
will cause the
given payload, routing key and options to be used instead of the original
ones, block the client until the response is received, and enter
the main loop with the given state.
Returning {:reply, response, state}
will respond the client inmediately
without performing the request with the given response, and enter the main
loop again with the given state.
Returning {:stop, reason, state}
will not send the message, terminate the
main loop and call terminate(reason, state)
before the process exists with
reason reason
.
Called when the RPC client process is first started. start_link/5
will block
until it returns.
It receives as argument the fourth argument given to start_link/5
.
Returning {:ok, state}
will cause start_link/5
to return {:ok, pid}
and attempt to open a channel on the given connection, declare the exchange,
declare a server-named queue, and consume it.
After that it will enter the main loop with state
as its internal state.
Returning :ignore
will cause start_link/5
to return :ignore
and the
process will exit normally without entering the loop, opening a channel or calling
terminate/2
.
Returning {:stop, reason}
will cause start_link/5
to return {:error, reason}
and
the process will exit with reason reason
without entering the loop, opening a channel,
or calling terminate/2
.