Freddy.Consumer behaviour (freddy v0.17.2)
This module allows to consume messages from specified queue bound to specified exchange.
Configuration
:exchange
- specifies an exchange to declare. SeeFreddy.Core.Exchange
for available options. Optional.:queue
- specifies a queue to declare. SeeFreddy.Core.Queue
for available options. Mandatory.:qos
- configures channel QoS. SeeFreddy.Core.QoS
for available options.:binds
- specifies bindings to create from the declared queue to the declared exchange. Must be a list of keywords or%Freddy.Core.Bind{}
structs. SeeFreddy.Core.Bind
for available options.:routing_keys
- a short way to declare bindings, for example providing a list["key1", "key2"]
is an equivalent of specifying option[binds: [[routing_key: "key1"], [routing_key: "key2"]]]
.:consumer
- arguments to provide tobasic.consume
method, see below.
Consumer options
:consumer_tag
- Specifies the identifier for the consumer. The consumer tag is local to a channel, so two clients can use the same consumer tags. If this field is empty the server will generate a unique tag. Default is empty.:no_local
- If the:no_local
field is set the server will not send messages to the connection that published them. Default isfalse
.:no_ack
- If this field is set the server does not expect acknowledgements for messages. That is, when a message is delivered to the client the server assumes the delivery will succeed and immediately dequeues it. This functionality may increase performance but at the cost of reliability. Messages can get lost if a client dies before they are delivered to the application. Defaults tofalse
.:exclusive
- Request exclusive consumer access, meaning only this consumer can access the queue. Default isfalse
.:nowait
- If set, the server will not respond to the method and client will not wait for a reply. Default isfalse
.:arguments
- A set of arguments for the consume. The syntax and semantics of these arguments depends on the server implementation.
Example
defmodule Notifications.Listener do
use Freddy.Consumer
def start_link(conn, initial \ nil, opts \ []) do
config = [
exchange: [name: "freddy-topic", type: :topic],
queue: [name: "notifications-queue", opts: [auto_delete: true]],
qos: [prefetch_count: 10], # optional
routing_keys: ["routing_key1", "routing_key2"], # short way to declare binds
binds: [ # fully customizable bindings
[routing_key: "routing_key3", no_wait: true]
],
consumer: [exclusive: true] # optional
]
Freddy.Consumer.start_link(__MODULE__, conn, config, initial, opts)
end
def init(initial) do
# do something on init
{:ok, initial}
end
def handle_message(payload, %{routing_key: "visitor.status.disconnect"}, state) do
{:reply, :ack, state}
end
def handle_error(error, message, _meta) do
# log error?
{:reply, :nack, state}
end
end
Link to this section Summary
Functions
Ack's a message given its meta
Nack's a message given its meta
Rejects a message given its meta
Start a Freddy.Consumer
process without linking to the current process.
Start a Freddy.Consumer
process linked to the current process.
Callbacks
Called when a message is delivered from the queue before passing it into a
handle_message
function.
Called when the Freddy.Consumer
process has opened and AMQP channel and declared an exchange and a queue.
Called when the Freddy.Consumer
process has been disconnected from the AMQP broker.
Called when the process receives a message. This callback has the same
arguments as the GenServer
equivalent and the :noreply
and :stop
return tuples behave the same.
Called when a message is delivered from the queue.
Called when the AMQP server has registered the process as a consumer and it will start to receive messages.
Called when the Freddy.Consumer
process is first started.
This callback is the same as the GenServer
equivalent and is called when the
process terminates. The first argument is the reason the process is about
to exit with.
Link to this section Types
action()
Specs
action() :: :ack | :nack | :reject
connection()
Specs
connection() :: GenServer.server()
connection_info()
Specs
connection_info() :: %{ channel: Freddy.Core.Channel.t(), queue: Freddy.Core.Queue.t(), exchange: Freddy.Core.Exchange.t() }
consumer_options()
Specs
consumer_options() :: [{:channel_open_timeout, timeout()}]
error()
Specs
error() :: term()
meta()
Specs
meta() :: map()
payload()
Specs
payload() :: term()
routing_key()
Specs
routing_key() :: String.t()
state()
Specs
state() :: term()
Link to this section Functions
ack(meta, opts \\ [])
Specs
Ack's a message given its meta
call(consumer, message, timeout \\ 5000)
See Connection.call/3
.
cast(consumer, message)
See Connection.cast/2
.
nack(meta, opts \\ [])
Specs
Nack's a message given its meta
reject(meta, opts \\ [])
Specs
Rejects a message given its meta
start(mod, connection, config, initial, opts \\ [])
Start a Freddy.Consumer
process without linking to the current process.
See start_link/5
for more information.
start_link(mod, connection, config, initial, opts \\ [])
Specs
start_link( module(), connection(), Keyword.t(), initial :: term(), GenServer.options() ) :: GenServer.on_start()
Start a Freddy.Consumer
process linked to the current process.
Arguments:
mod
- the module that defines the server callbacks (likeGenServer
)connection
- the pid of aFreddy.Connection
processconfig
- the configuration of the consumerinitial
- the value that will be given toinit/1
opts
- the GenServer options
stop(consumer, reason \\ :normal)
See GenServer.stop/2
.
Link to this section Callbacks
decode_message(payload, meta, state)
Specs
decode_message(payload :: String.t(), meta(), state()) :: {:ok, payload(), state()} | {:ok, payload(), meta(), state()} | {:reply, action(), opts :: Keyword.t(), state()} | {:reply, action(), state()} | {:noreply, state()} | {:stop, reason :: term(), state()}
Called when a message is delivered from the queue before passing it into a
handle_message
function.
The arguments are the message's raw payload, some metatdata and the internal state.
The metadata is a map containing all metadata given by the AMQP client when receiving
the message plus the :exchange
and :queue
values.
Returning {:ok, payload, state}
or {:ok, payload, meta, state}
will pass the decoded
payload and meta into handle_message/3
function.
Returning {:reply, action, opts, state}
or {:reply, action, state}
will immediately ack,
nack or reject the message.
Returning {:noreply, state}
will do nothing, and therefore the message should
be acknowledged by using Freddy.Consumer.ack/2
, Freddy.Consumer.nack/2
or
Freddy.Consumer.reject/2
.
Returning {:stop, reason, state}
will terminate the main loop and call
terminate(reason, state)
before the process exits with reason reason
.
handle_call(request, arg2, state)
Specs
handle_call(request :: term(), GenServer.from(), state()) :: {:reply, reply :: term(), state()} | {:reply, reply :: term(), state(), timeout() | :hibernate} | {:noreply, state()} | {:noreply, state(), timeout() | :hibernate} | {:stop, reason :: term(), state()} | {:stop, reason :: term(), reply :: term(), state()}
Called when the process receives a call message sent by call/3
. This
callback has the same arguments as the GenServer
equivalent and the
:reply
, :noreply
and :stop
return tuples behave the same.
handle_cast(request, state)
Specs
handle_cast(request :: term(), state()) :: {:noreply, state()} | {:noreply, state(), timeout() | :hibernate} | {:stop, reason :: term(), state()}
Called when the process receives a cast message sent by cast/2
. This
callback has the same arguments as the GenServer
equivalent and the
:noreply
and :stop
return tuples behave the same.
handle_connected(meta, state)
Specs
handle_connected(meta :: connection_info(), state()) :: {:noreply, state()} | {:noreply, state(), timeout() | :hibernate} | {:error, state()} | {:stop, reason :: term(), state()}
Called when the Freddy.Consumer
process has opened and AMQP channel and declared an exchange and a queue.
First argument is a map, containing :channel
, :exchange
and :queue
structures.
Returning {:noreply, state}
will cause the process to enter the main loop
with the given state.
Returning {:error, state}
will indicate that process failed to perform some critical actions
and must reconnect.
Returning {:stop, reason, state}
will terminate the main loop and call
terminate/2
before the process exits with reason reason
.
handle_disconnected(reason, state)
Specs
handle_disconnected(reason :: term(), state()) :: {:noreply, state()} | {:stop, reason :: term(), state()}
Called when the Freddy.Consumer
process has been disconnected from the AMQP broker.
Returning {:noreply, state}
causes the process to enter the main loop with
the given state. The process will not consume any new messages until connection
to AMQP broker is established again.
Returning {:stop, reason, state}
will terminate the main loop and call
terminate/2
before the process exits with reason reason
.
handle_info(term, state)
Specs
handle_info(term(), state()) :: {:noreply, state()} | {:noreply, state(), timeout() | :hibernate} | {:stop, reason :: term(), state()}
Called when the process receives a message. This callback has the same
arguments as the GenServer
equivalent and the :noreply
and :stop
return tuples behave the same.
handle_message(payload, meta, state)
Specs
handle_message(payload(), meta(), state()) :: {:reply, action(), state()} | {:reply, action(), opts :: Keyword.t(), state()} | {:noreply, state()} | {:noreply, state(), timeout() | :hibernate} | {:stop, reason :: term(), state()}
Called when a message is delivered from the queue.
The arguments are the message's decoded payload, some metadata and the internal state.
The metadata is a map containing all metadata given by the adapter when receiving
the message plus the :exchange
and :queue
values received at the connect/2
callback.
Returning {:reply, :ack | :nack | :reject, state}
will ack, nack or reject
the message.
Returning {:reply, :ack | :nack | :reject, opts, state}
will ack, nack or reject
the message with the given opts.
Returning {:noreply, state}
will do nothing, and therefore the message should
be acknowledged by using Freddy.Consumer.ack/2
, Freddy.Consumer.nack/2
or
Freddy.Consumer.reject/2
.
Returning {:stop, reason, state}
will terminate the main loop and call
terminate(reason, state)
before the process exits with reason reason
.
handle_ready(meta, state)
Specs
handle_ready(meta(), state()) :: {:noreply, state()} | {:noreply, state(), timeout() | :hibernate} | {:stop, reason :: term(), state()}
Called when the AMQP server has registered the process as a consumer and it will start to receive messages.
Returning {:noreply, state}
will causes the process to enter the main loop
with the given state.
Returning {:stop, reason, state}
will terminate the main loop and call
terminate(reason, state)
before the process exits with reason reason
.
init(state)
Specs
init(state()) :: {:ok, state()} | {:ok, state(), consumer_options()} | :ignore | {:stop, reason :: term()}
Called when the Freddy.Consumer
process is first started.
Returning {:ok, state}
will cause start_link/3
to return {:ok, pid}
and attempt to
open a channel on the given connection and initialize an actor (it depends on the actor's
nature, consumer will, for example, declare an exchange, a queue and a bindings).
After that it will enter the main loop with state
as its internal state.
Returning :ignore
will cause start_link/3
to return :ignore
and the
process will exit normally without entering the loop, opening a channel or calling
terminate/2
.
Returning {:stop, reason}
will cause start_link/3
to return {:error, reason}
and
the process will exit with reason reason
without entering the loop, opening a channel,
or calling terminate/2
.
terminate(reason, state)
Specs
This callback is the same as the GenServer
equivalent and is called when the
process terminates. The first argument is the reason the process is about
to exit with.