Freddy.RPC.Server behaviour (freddy v0.17.1)

A behaviour module for implementing AMQP RPC server processes.

The Freddy.RPC.Server module provides a way to create processes that hold, monitor, and restart a channel in case of failure, and have some callbacks to hook into the process lifecycle and handle messages.

An example Freddy.RPC.Server process that responds messages with "ping" as payload with a "pong" response, otherwise it does not reply but calls a given handler function with the payload and a callback function asynchronously, so the handler can respond when the message is processed:

defmodule MyRPC.Server do
  use Freddy.RPC.Server

  def start_link(conn, config, handler) do
    Freddy.RPC.Server.start_link(__MODULE__, conn, config, handler)
  end

  def init(handler) do
    {:ok, %{handler: handler}}
  end

  def handle_request("ping", _meta, state) do
    {:reply, "pong", state}
  end

  def handle_request(payload, meta, %{handler: handler} = state) do
    callback = &Freddy.RPC.Server.reply(meta, &1)
    Task.start_link(fn -> handler.(payload, callback) end)

    {:noreply, state}
  end
end

Channel handling

When the Freddy.RPC.Server starts with start_link/5 it runs the init/1 callback and responds with {:ok, pid} on success, like a GenServer.

After starting the process it attempts to open a channel on the given connection. It monitors the channel, and in case of failure it tries to reopen again and again on the same connection.

Context setup

The context setup process for a RPC server is to declare an exchange, then declare a queue to consume, and then bind the queue to the exchange. It also creates a default exchange to use it to respond to the reply-to queue.

Every time a channel is open the context is set up, meaning that the queue and the exchange are declared and binded through the new channel based on the given configuration.

The configuration must be a Keyword.t that contains the same keys as Freddy.Consumer. Check out Freddy.Consumer documentation for the list of available configuration keys.

Acknowledgement mode

By default RPC server starts in automatic acknowledgement mode. It means that all incoming requests will be acknowledged automatically by RabbitMQ server once delivered to a client (RPC server process).

If your logic requires manual acknowledgements, you should start server with configuration option [consumer: [no_ack: false]] and acknowledge messages manually using ack/2 function.

Below is an example of how to start a server in manual acknowledgement mode:

defmodule MyRPC.Server do
  alias Freddy.RPC.Server
  use Server

  def start_link(conn, handler) do
    config = [
      queue: [name: "rpc-queue"],
      consumer: [no_ack: false]
    ]

    Server.start_link(__MODULE__, conn, config, handler)
  end

  def handle_request(payload, meta, handler) do
    result = handler.handle_request(payload)
    Server.ack(meta)
    {:reply, result, handler}
  end
end

Link to this section Summary

Callbacks

Called when a request message is delivered from the queue before passing it into a handle_request function.

Called before a response message will be published to the default exchange.

Called when the process receives a call message sent by call/3. This callback has the same arguments as the GenServer equivalent and the :reply, :noreply and :stop return tuples behave the same.

Called when the process receives a cast message sent by cast/2. This callback has the same arguments as the GenServer equivalent and the :noreply and :stop return tuples behave the same.

Called when the RPC server process has opened AMQP channel before registering itself as a consumer.

Called when the RPC server has been disconnected from the AMQP broker.

Called when the process receives a message. This callback has the same arguments as the GenServer equivalent and the :noreply and :stop return tuples behave the same.

Called when the AMQP server has registered the process as a consumer of the server-named queue and it will start to receive messages.

Called when a request message has been successfully decoded by decode_request/3 function.

Called when the RPC server process is first started. start_link/5 will block until it returns.

This callback is the same as the GenServer equivalent and is called when the process terminates. The first argument is the reason the process is about to exit with.

Link to this section Types

Specs

meta() :: map()

Specs

opts() :: Keyword.t()

Specs

payload() :: String.t()

Specs

request() :: term()

Specs

response() :: term()
Link to this type

routing_key()

Specs

routing_key() :: String.t()

Specs

state() :: term()

Link to this section Functions

Link to this function

ack(meta, opts \\ [])

See Freddy.Consumer.ack/2.

Link to this function

call(server, message, timeout \\ 5000)

See Freddy.Consumer.call/3.

Link to this function

cast(server, message)

See Freddy.Consumer.cast/2.

Link to this function

reply(request_meta, response, opts \\ [])

Specs

reply(meta(), response(), Keyword.t()) :: :ok

Responds a request given its meta

Link to this function

start(mod, connection, config, initial, opts \\ [])

Specs

Start a Freddy.RPC.Server process without linking to the current process.

See start_link/5 for more information.

Link to this function

start_link(mod, connection, config, initial, opts \\ [])

Specs

Start a Freddy.RPC.Server process linked to the current process.

Arguments:

  • mod - the module that defines the server callbacks (like GenServer)
  • connection - the pid of a Freddy.Connection process
  • config - the configuration of the RPC server
  • initial - the value that will be given to init/1
  • opts - the GenServer options
Link to this function

stop(server, reason \\ :normal)

See GenServer.stop/2.

Link to this section Callbacks

Link to this callback

decode_request(payload, meta, state)

Specs

decode_request(payload(), meta(), state()) ::
  {:ok, request(), state()}
  | {:reply, response(), state()}
  | {:reply, response(), opts(), state()}
  | {:noreply, state()}
  | {:noreply, state(), timeout() | :hibernate}
  | {:stop, reason :: term(), state()}

Called when a request message is delivered from the queue before passing it into a handle_request function.

The arguments are the message's payload, some metadata and the internal state.

The metadata is a map containing all metadata given by the AMQP client when receiving the message plus the :exchange and :queue values.

Returning {:ok, request, state} will pass the returned request term into c:handle_message/3 function.

Returning {:ok, request, meta, state} will pass the returned request term and the meta into c:handle_message/3 function.

Returning {:reply, response, state} will publish response message without calling c:handle_message/3 function. Function encode_response/3 will be called before publishing the response.

Returning {:reply, response, opts, state} will publish the response message with returned options without calling c:handle_message/3 function. Function encode_response/3 will be called before publishing the response.

Returning {:noreply, state} will ignore that message and enter the main loop again with the given state.

Returning {:stop, reason, state} will terminate the main loop and call terminate/2 before the process exits with reason reason.

Link to this callback

encode_response(response, opts, state)

Specs

encode_response(response(), opts(), state()) ::
  {:reply, payload(), state()}
  | {:reply, payload(), opts(), state()}
  | {:noreply, state()}
  | {:noreply, state(), timeout() | :hibernate}
  | {:stop, reason :: term(), state()}

Called before a response message will be published to the default exchange.

It receives as argument the message payload, the options for that publication and the internal state.

Returning {:reply, string, state} will cause the returned string to be published to the exchange, and the process to enter the main loop with the given state.

Returning {:reply, string, opts, state} will cause the returned string to be published to the exchange with the returned options, and enter the main loop with the given state.

Returning {:noreply, state} will ignore that message and enter the main loop again with the given state.

Returning {:stop, reason, state} will terminate the main loop and call terminate/2 before the process exits with reason reason.

Link to this callback

handle_call(request, arg2, state)

Specs

handle_call(request(), GenServer.from(), state()) ::
  {:reply, reply :: term(), state()}
  | {:reply, reply :: term(), state(), timeout() | :hibernate}
  | {:noreply, state()}
  | {:noreply, state(), timeout() | :hibernate}
  | {:stop, reason :: term(), state()}
  | {:stop, reason :: term(), reply :: term(), state()}

Called when the process receives a call message sent by call/3. This callback has the same arguments as the GenServer equivalent and the :reply, :noreply and :stop return tuples behave the same.

Link to this callback

handle_cast(request, state)

Specs

handle_cast(request(), state()) ::
  {:noreply, state()}
  | {:noreply, state(), timeout() | :hibernate}
  | {:stop, reason :: term(), state()}

Called when the process receives a cast message sent by cast/2. This callback has the same arguments as the GenServer equivalent and the :noreply and :stop return tuples behave the same.

Link to this callback

handle_connected(arg1, state)

Specs

handle_connected(Freddy.Consumer.connection_info(), state()) ::
  {:noreply, state()}
  | {:noreply, state(), timeout() | :hibernate}
  | {:error, state()}
  | {:stop, reason :: term(), state()}

Called when the RPC server process has opened AMQP channel before registering itself as a consumer.

First argument is a map, containing :channel, :exchange and :queue structures.

Returning {:noreply, state} will cause the process to enter the main loop with the given state.

Returning {:error, state} will cause the process to reconnect (i.e. open new channel, declare exchange and queue, etc).

Returning {:stop, reason, state} will terminate the main loop and call terminate/2 before the process exits with reason reason.

Link to this callback

handle_disconnected(reason, state)

Specs

handle_disconnected(reason :: term(), state()) ::
  {:noreply, state()} | {:stop, reason :: term(), state()}

Called when the RPC server has been disconnected from the AMQP broker.

Returning {:noreply, state} will cause the process to enter the main loop with the given state. The server will not consume any new messages until connection to AMQP broker is restored.

Returning {:stop, reason, state} will terminate the main loop and call terminate/2 before the process exits with reason reason.

Link to this callback

handle_info(message, state)

Specs

handle_info(message :: term(), state()) ::
  {:noreply, state()}
  | {:noreply, state(), timeout() | :hibernate}
  | {:stop, reason :: term(), state()}

Called when the process receives a message. This callback has the same arguments as the GenServer equivalent and the :noreply and :stop return tuples behave the same.

Link to this callback

handle_ready(meta, state)

Specs

handle_ready(meta(), state()) ::
  {:noreply, state()}
  | {:noreply, state(), timeout() | :hibernate}
  | {:stop, reason :: term(), state()}

Called when the AMQP server has registered the process as a consumer of the server-named queue and it will start to receive messages.

Returning {:noreply, state} will causes the process to enter the main loop with the given state.

Returning {:stop, reason, state} will not send the message, terminate the main loop and call terminate/2 before the process exits with reason reason.

Link to this callback

handle_request(request, meta, state)

Specs

handle_request(request(), meta(), state()) ::
  {:reply, response(), state()}
  | {:reply, response(), opts(), state()}
  | {:noreply, state()}
  | {:noreply, state(), timeout() | :hibernate}
  | {:stop, reason :: term(), state()}

Called when a request message has been successfully decoded by decode_request/3 function.

The arguments are the message's decoded payload, some metadata and the internal state.

The metadata is a map containing all metadata given by the AMQP client when receiving the message plus the :exchange and :queue values.

Returning {:reply, response, state} will publish response message. Function encode_response/3 will be called before publishing the response.

Returning {:reply, response, opts, state} will publish the response message with returned options. Function encode_response/3 will be called before publishing the response.

Returning {:noreply, state} will ignore that message and enter the main loop again with the given state.

Returning {:stop, reason, state} will terminate the main loop and call terminate/2 before the process exits with reason reason.

Specs

init(initial :: term()) :: {:ok, state()} | :ignore | {:stop, reason :: term()}

Called when the RPC server process is first started. start_link/5 will block until it returns.

It receives as argument the fourth argument given to start_link/5.

Returning {:ok, state} will cause start_link/5 to return {:ok, pid} and attempt to open a channel on the given connection, declare the exchange, declare a queue, and start consumption. After that it will enter the main loop with state as its internal state.

Returning :ignore will cause start_link/5 to return :ignore and the process will exit normally without entering the loop, opening a channel or calling terminate/2.

Returning {:stop, reason} will cause start_link/5 to return {:error, reason} and the process will exit with reason reason without entering the loop, opening a channel, or calling terminate/2.

Link to this callback

terminate(reason, state)

Specs

terminate(reason :: term(), state()) :: any()

This callback is the same as the GenServer equivalent and is called when the process terminates. The first argument is the reason the process is about to exit with.