Freddy.Publisher behaviour (freddy v0.17.2)
A behaviour module for implementing AMQP publisher processes.
The Freddy.Publisher
module provides a way to create processes that holds,
monitors, and restarts a channel in case of failure, exports a function to publish
messages to an exchange, and some callbacks to hook into the process lifecycle.
An example Freddy.Publisher
process that only sends every other message:
defmodule MyPublisher do
use Freddy.Publisher
def start_link(conn, config, opts \ []) do
Freddy.Publisher.start_link(__MODULE__, conn, config, :ok, opts)
end
def publish(publisher, payload, routing_key) do
Freddy.Publisher.publish(publisher, payload, routing_key)
end
def init(:ok) do
{:ok, %{last_ignored: false}}
end
def before_publication(_payload, _routing_key, _opts, %{last_ignored: false}) do
{:ignore, %{last_ignored: true}}
end
def before_publication(_payload, _routing_key, _opts, %{last_ignored: true}) do
{:ok, %{last_ignored: false}}
end
end
Channel handling
When the Freddy.Publisher
starts with start_link/5
it runs the init/1
callback
and responds with {:ok, pid}
on success, like a GenServer
.
After starting the process it attempts to open a channel on the given connection. It monitors the channel, and in case of failure it tries to reopen again and again on the same connection.
Context setup
The context setup process for a publisher is to declare its exchange.
Every time a channel is opened the context is set up, meaning that the exchange is declared through the new channel based on the given configuration.
The configuration must be a Keyword.t
that contains a single key: :exchange
whose value is the configuration for the Freddy.Core.Exchange
.
Check Freddy.Core.Exchange
for more detailed information.
Link to this section Summary
Functions
Publishes a message to an exchange through the Freddy.Publisher
process or
from Freddy.Publisher
process using the connection meta information.
Start a Freddy.Publisher
process without linking to the current process.
Start a Freddy.Publisher
process linked to the current process.
Callbacks
Called before a message will be encoded and published to the exchange.
Called before a message will be published to the exchange.
Called when the Freddy.Publisher
process has opened and AMQP channel and declared an exchange.
Called when the Freddy.Publisher
process has been disconnected from the AMQP broker.
Called when the process receives a message. This callback has the same
arguments as the GenServer
equivalent and the :noreply
and :stop
return tuples behave the same.
Called when the Freddy.Publisher
process is first started.
This callback is the same as the GenServer
equivalent and is called when the
process terminates. The first argument is the reason the process is about
to exit with.
Link to this section Types
connection()
Specs
connection() :: GenServer.server()
connection_info()
Specs
connection_info() :: %{ channel: Freddy.Core.Channel.t(), exchange: Freddy.Core.Exchange.t() }
consumer_options()
Specs
consumer_options() :: [{:channel_open_timeout, timeout()}]
meta()
Specs
meta() :: map()
payload()
Specs
payload() :: term()
routing_key()
Specs
routing_key() :: String.t()
state()
Specs
state() :: term()
Link to this section Functions
call(consumer, message, timeout \\ 5000)
See Connection.call/3
.
cast(consumer, message)
See Connection.cast/2
.
publish(publisher, payload, routing_key \\ "", opts \\ [])
Specs
publish( GenServer.server() | connection_info(), payload :: term(), routing_key :: String.t(), opts :: Keyword.t() ) :: :ok
Publishes a message to an exchange through the Freddy.Publisher
process or
from Freddy.Publisher
process using the connection meta information.
When publishing from within the publisher process, the connection_info can be
obtained from handle_connected/2
callback.
Options
:mandatory
- If set, returns an error if the broker can't route the message to a queue (defaultfalse
);:immediate
- If set, returns an error if the broker can't deliver te message to a consumer immediately (defaultfalse
);:content_type
- MIME Content type;:content_encoding
- MIME Content encoding;:headers
- Message headers. Can be used with headers Exchanges;:persistent
- If set, uses persistent delivery mode. Messages marked aspersistent
that are delivered todurable
queues will be logged to disk;:correlation_id
- application correlation identifier;:priority
- message priority, ranging from 0 to 9;:reply_to
- name of the reply queue;:expiration
- how long the message is valid (in milliseconds);:message_id
- message identifier;:timestamp
- timestamp associated with this message (epoch time);:type
- message type as a string;:user_id
- creating user ID. RabbitMQ will validate this against the active connection user;:app_id
- publishing application ID.
start(mod, connection, config, initial, opts \\ [])
Start a Freddy.Publisher
process without linking to the current process.
See start_link/5
for more information.
start_link(mod, connection, config, initial, opts \\ [])
Specs
start_link( module(), connection(), Keyword.t(), initial :: term(), GenServer.options() ) :: GenServer.on_start()
Start a Freddy.Publisher
process linked to the current process.
Arguments:
mod
- the module that defines the server callbacks (likeGenServer
)connection
- the pid of aFreddy.Connection
processconfig
- the configuration of the consumerinitial
- the value that will be given toinit/1
opts
- the GenServer options
stop(consumer, reason \\ :normal)
See GenServer.stop/2
.
Link to this section Callbacks
before_publication(payload, routing_key, opts, state)
Specs
before_publication(payload(), routing_key(), opts :: Keyword.t(), state()) :: {:ok, state()} | {:ok, payload(), routing_key(), opts :: Keyword.t(), state()} | {:ignore, state()} | {:stop, reason :: term(), state()}
Called before a message will be encoded and published to the exchange.
It receives as argument the message payload, the routing key, the options for that publication and the internal state.
Returning {:ok, state}
will cause the message to be sent with no
modification, and enter the main loop with the given state.
Returning {:ok, payload, routing_key, opts, state}
will cause the
given payload, routing key and options to be used instead of the original
ones, and enter the main loop with the given state.
Returning {:ignore, state}
will ignore that message and enter the main loop
again with the given state.
Returning {:stop, reason, state}
will not send the message, terminate the
main loop and call terminate(reason, state)
before the process exits with
reason reason
.
encode_message(payload, routing_key, opts, state)
Specs
encode_message(payload(), routing_key(), opts :: Keyword.t(), state()) :: {:ok, String.t(), state()} | {:ok, String.t(), routing_key(), opts :: Keyword.t(), state()} | {:ignore, state()} | {:stop, reason :: term(), state()}
Called before a message will be published to the exchange.
It receives as argument the message payload, the routing key, the options for that publication and the internal state.
Returning {:ok, string, state}
will cause the returned string
to be
published to the exchange, and the process to enter the main loop with the
given state.
Returning {:ok, string, routing_key, opts, state}
will cause the
given string, routing key and options to be used instead of the original
ones, and enter the main loop with the given state.
Returning {:ignore, state}
will ignore that message and enter the main loop
again with the given state.
Returning {:stop, reason, state}
will not send the message, terminate the
main loop and call terminate(reason, state)
before the process exits with
reason reason
.
handle_call(request, arg2, state)
Specs
handle_call(request :: term(), GenServer.from(), state()) :: {:reply, reply :: term(), state()} | {:reply, reply :: term(), state(), timeout() | :hibernate} | {:noreply, state()} | {:noreply, state(), timeout() | :hibernate} | {:stop, reason :: term(), state()} | {:stop, reason :: term(), reply :: term(), state()}
Called when the process receives a call message sent by call/3
. This
callback has the same arguments as the GenServer
equivalent and the
:reply
, :noreply
and :stop
return tuples behave the same.
handle_cast(request, state)
Specs
handle_cast(request :: term(), state()) :: {:noreply, state()} | {:noreply, state(), timeout() | :hibernate} | {:stop, reason :: term(), state()}
Called when the process receives a cast message sent by cast/2
. This
callback has the same arguments as the GenServer
equivalent and the
:noreply
and :stop
return tuples behave the same.
handle_connected(meta, state)
Specs
handle_connected(meta :: connection_info(), state()) :: {:noreply, state()} | {:noreply, state(), timeout() | :hibernate} | {:error, state()} | {:stop, reason :: term(), state()}
Called when the Freddy.Publisher
process has opened and AMQP channel and declared an exchange.
First argument is a map, containing :channel
and :exchange
structures.
Returning {:noreply, state}
will cause the process to enter the main loop
with the given state.
Returning {:error, state}
will indicate that process failed to perform some critical actions
and must reconnect.
Returning {:stop, reason, state}
will terminate the main loop and call
terminate/2
before the process exits with reason reason
.
handle_disconnected(reason, state)
Specs
handle_disconnected(reason :: term(), state()) :: {:noreply, state()} | {:stop, reason :: term(), state()}
Called when the Freddy.Publisher
process has been disconnected from the AMQP broker.
Returning {:noreply, state}
causes the process to enter the main loop with
the given state. The process will not consume any new messages until connection
to AMQP broker is established again.
Returning {:stop, reason, state}
will terminate the main loop and call
terminate/2
before the process exits with reason reason
.
handle_info(term, state)
Specs
handle_info(term(), state()) :: {:noreply, state()} | {:noreply, state(), timeout() | :hibernate} | {:stop, reason :: term(), state()}
Called when the process receives a message. This callback has the same
arguments as the GenServer
equivalent and the :noreply
and :stop
return tuples behave the same.
init(state)
Specs
init(state()) :: {:ok, state()} | {:ok, state(), consumer_options()} | :ignore | {:stop, reason :: term()}
Called when the Freddy.Publisher
process is first started.
Returning {:ok, state}
will cause start_link/3
to return {:ok, pid}
and attempt to
open a channel on the given connection and initialize an actor (it depends on the actor's
nature, consumer will, for example, declare an exchange, a queue and a bindings).
After that it will enter the main loop with state
as its internal state.
Returning :ignore
will cause start_link/3
to return :ignore
and the
process will exit normally without entering the loop, opening a channel or calling
terminate/2
.
Returning {:stop, reason}
will cause start_link/3
to return {:error, reason}
and
the process will exit with reason reason
without entering the loop, opening a channel,
or calling terminate/2
.
terminate(reason, state)
Specs
This callback is the same as the GenServer
equivalent and is called when the
process terminates. The first argument is the reason the process is about
to exit with.