Freddy.Publisher behaviour (freddy v0.17.1)

A behaviour module for implementing AMQP publisher processes.

The Freddy.Publisher module provides a way to create processes that holds, monitors, and restarts a channel in case of failure, exports a function to publish messages to an exchange, and some callbacks to hook into the process lifecycle.

An example Freddy.Publisher process that only sends every other message:

defmodule MyPublisher do
  use Freddy.Publisher

  def start_link(conn, config, opts \ []) do
    Freddy.Publisher.start_link(__MODULE__, conn, config, :ok, opts)
  end

  def publish(publisher, payload, routing_key) do
    Freddy.Publisher.publish(publisher, payload, routing_key)
  end

  def init(:ok) do
    {:ok, %{last_ignored: false}}
  end

  def before_publication(_payload, _routing_key, _opts, %{last_ignored: false}) do
    {:ignore, %{last_ignored: true}}
  end
  def before_publication(_payload, _routing_key, _opts, %{last_ignored: true}) do
    {:ok, %{last_ignored: false}}
  end
end

Channel handling

When the Freddy.Publisher starts with start_link/5 it runs the init/1 callback and responds with {:ok, pid} on success, like a GenServer.

After starting the process it attempts to open a channel on the given connection. It monitors the channel, and in case of failure it tries to reopen again and again on the same connection.

Context setup

The context setup process for a publisher is to declare its exchange.

Every time a channel is opened the context is set up, meaning that the exchange is declared through the new channel based on the given configuration.

The configuration must be a Keyword.t that contains a single key: :exchange whose value is the configuration for the Freddy.Core.Exchange.

Check Freddy.Core.Exchange for more detailed information.

Link to this section Summary

Callbacks

Called before a message will be encoded and published to the exchange.

Called before a message will be published to the exchange.

Called when the process receives a call message sent by call/3. This callback has the same arguments as the GenServer equivalent and the :reply, :noreply and :stop return tuples behave the same.

Called when the process receives a cast message sent by cast/2. This callback has the same arguments as the GenServer equivalent and the :noreply and :stop return tuples behave the same.

Called when the Freddy.Publisher process has opened and AMQP channel and declared an exchange.

Called when the Freddy.Publisher process has been disconnected from the AMQP broker.

Called when the process receives a message. This callback has the same arguments as the GenServer equivalent and the :noreply and :stop return tuples behave the same.

Called when the Freddy.Publisher process is first started.

This callback is the same as the GenServer equivalent and is called when the process terminates. The first argument is the reason the process is about to exit with.

Link to this section Types

Link to this type

connection()

Specs

connection() :: GenServer.server()
Link to this type

connection_info()

Specs

connection_info() :: %{
  channel: Freddy.Core.Channel.t(),
  exchange: Freddy.Core.Exchange.t()
}
Link to this type

consumer_options()

Specs

consumer_options() :: [{:channel_open_timeout, timeout()}]

Specs

meta() :: map()

Specs

payload() :: term()
Link to this type

routing_key()

Specs

routing_key() :: String.t()

Specs

state() :: term()

Link to this section Functions

Link to this function

call(consumer, message, timeout \\ 5000)

See Connection.call/3.

Link to this function

cast(consumer, message)

See Connection.cast/2.

Link to this function

publish(publisher, payload, routing_key \\ "", opts \\ [])

Specs

publish(
  GenServer.server() | connection_info(),
  payload :: term(),
  routing_key :: String.t(),
  opts :: Keyword.t()
) :: :ok

Publishes a message to an exchange through the Freddy.Publisher process or from Freddy.Publisher process using the connection meta information.

When publishing from within the publisher process, the connection_info can be obtained from handle_connected/2 callback.

Options

  • :mandatory - If set, returns an error if the broker can't route the message to a queue (default false);
  • :immediate - If set, returns an error if the broker can't deliver te message to a consumer immediately (default false);
  • :content_type - MIME Content type;
  • :content_encoding - MIME Content encoding;
  • :headers - Message headers. Can be used with headers Exchanges;
  • :persistent - If set, uses persistent delivery mode. Messages marked as persistent that are delivered to durable queues will be logged to disk;
  • :correlation_id - application correlation identifier;
  • :priority - message priority, ranging from 0 to 9;
  • :reply_to - name of the reply queue;
  • :expiration - how long the message is valid (in milliseconds);
  • :message_id - message identifier;
  • :timestamp - timestamp associated with this message (epoch time);
  • :type - message type as a string;
  • :user_id - creating user ID. RabbitMQ will validate this against the active connection user;
  • :app_id - publishing application ID.
Link to this function

start(mod, connection, config, initial, opts \\ [])

Start a Freddy.Publisher process without linking to the current process.

See start_link/5 for more information.

Link to this function

start_link(mod, connection, config, initial, opts \\ [])

Specs

start_link(
  module(),
  connection(),
  Keyword.t(),
  initial :: term(),
  GenServer.options()
) :: GenServer.on_start()

Start a Freddy.Publisher process linked to the current process.

Arguments:

  • mod - the module that defines the server callbacks (like GenServer)
  • connection - the pid of a Freddy.Connection process
  • config - the configuration of the consumer
  • initial - the value that will be given to init/1
  • opts - the GenServer options
Link to this function

stop(consumer, reason \\ :normal)

See GenServer.stop/2.

Link to this section Callbacks

Link to this callback

before_publication(payload, routing_key, opts, state)

Specs

before_publication(payload(), routing_key(), opts :: Keyword.t(), state()) ::
  {:ok, state()}
  | {:ok, payload(), routing_key(), opts :: Keyword.t(), state()}
  | {:ignore, state()}
  | {:stop, reason :: term(), state()}

Called before a message will be encoded and published to the exchange.

It receives as argument the message payload, the routing key, the options for that publication and the internal state.

Returning {:ok, state} will cause the message to be sent with no modification, and enter the main loop with the given state.

Returning {:ok, payload, routing_key, opts, state} will cause the given payload, routing key and options to be used instead of the original ones, and enter the main loop with the given state.

Returning {:ignore, state} will ignore that message and enter the main loop again with the given state.

Returning {:stop, reason, state} will not send the message, terminate the main loop and call terminate(reason, state) before the process exits with reason reason.

Link to this callback

encode_message(payload, routing_key, opts, state)

Specs

encode_message(payload(), routing_key(), opts :: Keyword.t(), state()) ::
  {:ok, String.t(), state()}
  | {:ok, String.t(), routing_key(), opts :: Keyword.t(), state()}
  | {:ignore, state()}
  | {:stop, reason :: term(), state()}

Called before a message will be published to the exchange.

It receives as argument the message payload, the routing key, the options for that publication and the internal state.

Returning {:ok, string, state} will cause the returned string to be published to the exchange, and the process to enter the main loop with the given state.

Returning {:ok, string, routing_key, opts, state} will cause the given string, routing key and options to be used instead of the original ones, and enter the main loop with the given state.

Returning {:ignore, state} will ignore that message and enter the main loop again with the given state.

Returning {:stop, reason, state} will not send the message, terminate the main loop and call terminate(reason, state) before the process exits with reason reason.

Link to this callback

handle_call(request, arg2, state)

Specs

handle_call(request :: term(), GenServer.from(), state()) ::
  {:reply, reply :: term(), state()}
  | {:reply, reply :: term(), state(), timeout() | :hibernate}
  | {:noreply, state()}
  | {:noreply, state(), timeout() | :hibernate}
  | {:stop, reason :: term(), state()}
  | {:stop, reason :: term(), reply :: term(), state()}

Called when the process receives a call message sent by call/3. This callback has the same arguments as the GenServer equivalent and the :reply, :noreply and :stop return tuples behave the same.

Link to this callback

handle_cast(request, state)

Specs

handle_cast(request :: term(), state()) ::
  {:noreply, state()}
  | {:noreply, state(), timeout() | :hibernate}
  | {:stop, reason :: term(), state()}

Called when the process receives a cast message sent by cast/2. This callback has the same arguments as the GenServer equivalent and the :noreply and :stop return tuples behave the same.

Link to this callback

handle_connected(meta, state)

Specs

handle_connected(meta :: connection_info(), state()) ::
  {:noreply, state()}
  | {:noreply, state(), timeout() | :hibernate}
  | {:error, state()}
  | {:stop, reason :: term(), state()}

Called when the Freddy.Publisher process has opened and AMQP channel and declared an exchange.

First argument is a map, containing :channel and :exchange structures.

Returning {:noreply, state} will cause the process to enter the main loop with the given state.

Returning {:error, state} will indicate that process failed to perform some critical actions and must reconnect.

Returning {:stop, reason, state} will terminate the main loop and call terminate/2 before the process exits with reason reason.

Link to this callback

handle_disconnected(reason, state)

Specs

handle_disconnected(reason :: term(), state()) ::
  {:noreply, state()} | {:stop, reason :: term(), state()}

Called when the Freddy.Publisher process has been disconnected from the AMQP broker.

Returning {:noreply, state} causes the process to enter the main loop with the given state. The process will not consume any new messages until connection to AMQP broker is established again.

Returning {:stop, reason, state} will terminate the main loop and call terminate/2 before the process exits with reason reason.

Link to this callback

handle_info(term, state)

Specs

handle_info(term(), state()) ::
  {:noreply, state()}
  | {:noreply, state(), timeout() | :hibernate}
  | {:stop, reason :: term(), state()}

Called when the process receives a message. This callback has the same arguments as the GenServer equivalent and the :noreply and :stop return tuples behave the same.

Specs

init(state()) ::
  {:ok, state()}
  | {:ok, state(), consumer_options()}
  | :ignore
  | {:stop, reason :: term()}

Called when the Freddy.Publisher process is first started.

Returning {:ok, state} will cause start_link/3 to return {:ok, pid} and attempt to open a channel on the given connection and initialize an actor (it depends on the actor's nature, consumer will, for example, declare an exchange, a queue and a bindings).

After that it will enter the main loop with state as its internal state.

Returning :ignore will cause start_link/3 to return :ignore and the process will exit normally without entering the loop, opening a channel or calling terminate/2.

Returning {:stop, reason} will cause start_link/3 to return {:error, reason} and the process will exit with reason reason without entering the loop, opening a channel, or calling terminate/2.

Link to this callback

terminate(reason, state)

Specs

terminate(reason :: term(), state()) :: any()

This callback is the same as the GenServer equivalent and is called when the process terminates. The first argument is the reason the process is about to exit with.