Freddy.RPC.Server behaviour (freddy v0.17.2)
A behaviour module for implementing AMQP RPC server processes.
The Freddy.RPC.Server
module provides a way to create processes that hold,
monitor, and restart a channel in case of failure, and have some callbacks
to hook into the process lifecycle and handle messages.
An example Freddy.RPC.Server
process that responds messages with "ping"
as
payload with a "pong"
response, otherwise it does not reply but calls a given
handler function with the payload and a callback function asynchronously, so
the handler can respond when the message is processed:
defmodule MyRPC.Server do
use Freddy.RPC.Server
def start_link(conn, config, handler) do
Freddy.RPC.Server.start_link(__MODULE__, conn, config, handler)
end
def init(handler) do
{:ok, %{handler: handler}}
end
def handle_request("ping", _meta, state) do
{:reply, "pong", state}
end
def handle_request(payload, meta, %{handler: handler} = state) do
callback = &Freddy.RPC.Server.reply(meta, &1)
Task.start_link(fn -> handler.(payload, callback) end)
{:noreply, state}
end
end
Channel handling
When the Freddy.RPC.Server
starts with start_link/5
it runs the init/1
callback
and responds with {:ok, pid}
on success, like a GenServer
.
After starting the process it attempts to open a channel on the given connection. It monitors the channel, and in case of failure it tries to reopen again and again on the same connection.
Context setup
The context setup process for a RPC server is to declare an exchange, then declare a queue to consume, and then bind the queue to the exchange. It also creates a default exchange to use it to respond to the reply-to queue.
Every time a channel is open the context is set up, meaning that the queue and the exchange are declared and binded through the new channel based on the given configuration.
The configuration must be a Keyword.t
that contains the same keys as Freddy.Consumer
.
Check out Freddy.Consumer
documentation for the list of available configuration keys.
Acknowledgement mode
By default RPC server starts in automatic acknowledgement mode. It means that all incoming requests will be acknowledged automatically by RabbitMQ server once delivered to a client (RPC server process).
If your logic requires manual acknowledgements, you should start server with configuration
option [consumer: [no_ack: false]]
and acknowledge messages manually using ack/2
function.
Below is an example of how to start a server in manual acknowledgement mode:
defmodule MyRPC.Server do
alias Freddy.RPC.Server
use Server
def start_link(conn, handler) do
config = [
queue: [name: "rpc-queue"],
consumer: [no_ack: false]
]
Server.start_link(__MODULE__, conn, config, handler)
end
def handle_request(payload, meta, handler) do
result = handler.handle_request(payload)
Server.ack(meta)
{:reply, result, handler}
end
end
Link to this section Summary
Functions
Responds a request given its meta
Start a Freddy.RPC.Server
process without linking to the current process.
Start a Freddy.RPC.Server
process linked to the current process.
Callbacks
Called when a request message is delivered from the queue before passing it into a
handle_request
function.
Called before a response message will be published to the default exchange.
Called when the RPC server process has opened AMQP channel before registering itself as a consumer.
Called when the RPC server has been disconnected from the AMQP broker.
Called when the process receives a message. This callback has the same
arguments as the GenServer
equivalent and the :noreply
and :stop
return tuples behave the same.
Called when the AMQP server has registered the process as a consumer of the server-named queue and it will start to receive messages.
Called when a request message has been successfully decoded by decode_request/3
function.
Called when the RPC server process is first started. start_link/5
will block
until it returns.
This callback is the same as the GenServer
equivalent and is called when the
process terminates. The first argument is the reason the process is about
to exit with.
Link to this section Types
meta()
Specs
meta() :: map()
opts()
Specs
opts() :: Keyword.t()
payload()
Specs
payload() :: String.t()
request()
Specs
request() :: term()
response()
Specs
response() :: term()
routing_key()
Specs
routing_key() :: String.t()
state()
Specs
state() :: term()
Link to this section Functions
ack(meta, opts \\ [])
call(server, message, timeout \\ 5000)
cast(server, message)
reply(request_meta, response, opts \\ [])
Specs
Responds a request given its meta
start(mod, connection, config, initial, opts \\ [])
Specs
start( module(), Freddy.Consumer.connection(), Keyword.t(), initial :: term(), GenServer.options() ) :: GenServer.on_start()
Start a Freddy.RPC.Server
process without linking to the current process.
See start_link/5
for more information.
start_link(mod, connection, config, initial, opts \\ [])
Specs
start_link( module(), Freddy.Consumer.connection(), Keyword.t(), initial :: term(), GenServer.options() ) :: GenServer.on_start()
Start a Freddy.RPC.Server
process linked to the current process.
Arguments:
mod
- the module that defines the server callbacks (likeGenServer
)connection
- the pid of aFreddy.Connection
processconfig
- the configuration of the RPC serverinitial
- the value that will be given toinit/1
opts
- the GenServer options
stop(server, reason \\ :normal)
See GenServer.stop/2
.
Link to this section Callbacks
decode_request(payload, meta, state)
Specs
decode_request(payload(), meta(), state()) :: {:ok, request(), state()} | {:reply, response(), state()} | {:reply, response(), opts(), state()} | {:noreply, state()} | {:noreply, state(), timeout() | :hibernate} | {:stop, reason :: term(), state()}
Called when a request message is delivered from the queue before passing it into a
handle_request
function.
The arguments are the message's payload, some metadata and the internal state.
The metadata is a map containing all metadata given by the AMQP client when receiving
the message plus the :exchange
and :queue
values.
Returning {:ok, request, state}
will pass the returned request term into
c:handle_message/3
function.
Returning {:ok, request, meta, state}
will pass the returned request term
and the meta into c:handle_message/3
function.
Returning {:reply, response, state}
will publish response message without calling
c:handle_message/3
function. Function encode_response/3
will be called before
publishing the response.
Returning {:reply, response, opts, state}
will publish the response message
with returned options without calling c:handle_message/3
function. Function
encode_response/3
will be called before publishing the response.
Returning {:noreply, state}
will ignore that message and enter the main loop
again with the given state.
Returning {:stop, reason, state}
will terminate the main loop and call
terminate/2
before the process exits with reason reason
.
encode_response(response, opts, state)
Specs
encode_response(response(), opts(), state()) :: {:reply, payload(), state()} | {:reply, payload(), opts(), state()} | {:noreply, state()} | {:noreply, state(), timeout() | :hibernate} | {:stop, reason :: term(), state()}
Called before a response message will be published to the default exchange.
It receives as argument the message payload, the options for that publication and the internal state.
Returning {:reply, string, state}
will cause the returned string
to be
published to the exchange, and the process to enter the main loop with the
given state.
Returning {:reply, string, opts, state}
will cause the returned string
to be
published to the exchange with the returned options, and enter the main loop with
the given state.
Returning {:noreply, state}
will ignore that message and enter the main loop
again with the given state.
Returning {:stop, reason, state}
will terminate the main loop and call
terminate/2
before the process exits with reason reason
.
handle_call(request, arg2, state)
Specs
handle_call(request(), GenServer.from(), state()) :: {:reply, reply :: term(), state()} | {:reply, reply :: term(), state(), timeout() | :hibernate} | {:noreply, state()} | {:noreply, state(), timeout() | :hibernate} | {:stop, reason :: term(), state()} | {:stop, reason :: term(), reply :: term(), state()}
Called when the process receives a call message sent by call/3
. This
callback has the same arguments as the GenServer
equivalent and the
:reply
, :noreply
and :stop
return tuples behave the same.
handle_cast(request, state)
Specs
handle_cast(request(), state()) :: {:noreply, state()} | {:noreply, state(), timeout() | :hibernate} | {:stop, reason :: term(), state()}
Called when the process receives a cast message sent by cast/2
. This
callback has the same arguments as the GenServer
equivalent and the
:noreply
and :stop
return tuples behave the same.
handle_connected(arg1, state)
Specs
handle_connected(Freddy.Consumer.connection_info(), state()) :: {:noreply, state()} | {:noreply, state(), timeout() | :hibernate} | {:error, state()} | {:stop, reason :: term(), state()}
Called when the RPC server process has opened AMQP channel before registering itself as a consumer.
First argument is a map, containing :channel
, :exchange
and :queue
structures.
Returning {:noreply, state}
will cause the process to enter the main loop
with the given state.
Returning {:error, state}
will cause the process to reconnect (i.e. open
new channel, declare exchange and queue, etc).
Returning {:stop, reason, state}
will terminate the main loop and call
terminate/2
before the process exits with reason reason
.
handle_disconnected(reason, state)
Specs
handle_disconnected(reason :: term(), state()) :: {:noreply, state()} | {:stop, reason :: term(), state()}
Called when the RPC server has been disconnected from the AMQP broker.
Returning {:noreply, state}
will cause the process to enter the main loop
with the given state. The server will not consume any new messages until
connection to AMQP broker is restored.
Returning {:stop, reason, state}
will terminate the main loop and call
terminate/2
before the process exits with reason reason
.
handle_info(message, state)
Specs
handle_info(message :: term(), state()) :: {:noreply, state()} | {:noreply, state(), timeout() | :hibernate} | {:stop, reason :: term(), state()}
Called when the process receives a message. This callback has the same
arguments as the GenServer
equivalent and the :noreply
and :stop
return tuples behave the same.
handle_ready(meta, state)
Specs
handle_ready(meta(), state()) :: {:noreply, state()} | {:noreply, state(), timeout() | :hibernate} | {:stop, reason :: term(), state()}
Called when the AMQP server has registered the process as a consumer of the server-named queue and it will start to receive messages.
Returning {:noreply, state}
will causes the process to enter the main loop
with the given state.
Returning {:stop, reason, state}
will not send the message, terminate
the main loop and call terminate/2
before the process exits with
reason reason
.
handle_request(request, meta, state)
Specs
handle_request(request(), meta(), state()) :: {:reply, response(), state()} | {:reply, response(), opts(), state()} | {:noreply, state()} | {:noreply, state(), timeout() | :hibernate} | {:stop, reason :: term(), state()}
Called when a request message has been successfully decoded by decode_request/3
function.
The arguments are the message's decoded payload, some metadata and the internal state.
The metadata is a map containing all metadata given by the AMQP client when receiving
the message plus the :exchange
and :queue
values.
Returning {:reply, response, state}
will publish response message. Function
encode_response/3
will be called before publishing the response.
Returning {:reply, response, opts, state}
will publish the response message
with returned options. Function encode_response/3
will be called before publishing
the response.
Returning {:noreply, state}
will ignore that message and enter the main loop
again with the given state.
Returning {:stop, reason, state}
will terminate the main loop and call
terminate/2
before the process exits with reason reason
.
init(initial)
Specs
Called when the RPC server process is first started. start_link/5
will block
until it returns.
It receives as argument the fourth argument given to start_link/5
.
Returning {:ok, state}
will cause start_link/5
to return {:ok, pid}
and attempt to open a channel on the given connection, declare the exchange,
declare a queue, and start consumption. After that it will enter the main loop
with state
as its internal state.
Returning :ignore
will cause start_link/5
to return :ignore
and the
process will exit normally without entering the loop, opening a channel or calling
terminate/2
.
Returning {:stop, reason}
will cause start_link/5
to return {:error, reason}
and
the process will exit with reason reason
without entering the loop, opening a channel,
or calling terminate/2
.
terminate(reason, state)
Specs
This callback is the same as the GenServer
equivalent and is called when the
process terminates. The first argument is the reason the process is about
to exit with.