Freddy.Consumer behaviour (freddy v0.17.1)

This module allows to consume messages from specified queue bound to specified exchange.

Configuration

  • :exchange - specifies an exchange to declare. See Freddy.Core.Exchange for available options. Optional.
  • :queue - specifies a queue to declare. See Freddy.Core.Queue for available options. Mandatory.
  • :qos - configures channel QoS. See Freddy.Core.QoS for available options.
  • :binds - specifies bindings to create from the declared queue to the declared exchange. Must be a list of keywords or %Freddy.Core.Bind{} structs. See Freddy.Core.Bind for available options.
  • :routing_keys - a short way to declare bindings, for example providing a list ["key1", "key2"] is an equivalent of specifying option [binds: [[routing_key: "key1"], [routing_key: "key2"]]].
  • :consumer - arguments to provide to basic.consume method, see below.

Consumer options

  • :consumer_tag - Specifies the identifier for the consumer. The consumer tag is local to a channel, so two clients can use the same consumer tags. If this field is empty the server will generate a unique tag. Default is empty.
  • :no_local - If the :no_local field is set the server will not send messages to the connection that published them. Default is false.
  • :no_ack - If this field is set the server does not expect acknowledgements for messages. That is, when a message is delivered to the client the server assumes the delivery will succeed and immediately dequeues it. This functionality may increase performance but at the cost of reliability. Messages can get lost if a client dies before they are delivered to the application. Defaults to false.
  • :exclusive - Request exclusive consumer access, meaning only this consumer can access the queue. Default is false.
  • :nowait - If set, the server will not respond to the method and client will not wait for a reply. Default is false.
  • :arguments - A set of arguments for the consume. The syntax and semantics of these arguments depends on the server implementation.

Example

defmodule Notifications.Listener do
  use Freddy.Consumer

  def start_link(conn, initial \ nil, opts \ []) do
    config = [
      exchange: [name: "freddy-topic", type: :topic],
      queue: [name: "notifications-queue", opts: [auto_delete: true]],
      qos: [prefetch_count: 10], # optional
      routing_keys: ["routing_key1", "routing_key2"], # short way to declare binds
      binds: [ # fully customizable bindings
        [routing_key: "routing_key3", no_wait: true]
      ],
      consumer: [exclusive: true] # optional
    ]
    Freddy.Consumer.start_link(__MODULE__, conn, config, initial, opts)
  end

  def init(initial) do
    # do something on init
    {:ok, initial}
  end

  def handle_message(payload, %{routing_key: "visitor.status.disconnect"}, state) do
    {:reply, :ack, state}
  end

  def handle_error(error, message, _meta) do
    # log error?
    {:reply, :nack, state}
  end
end

Link to this section Summary

Callbacks

Called when a message is delivered from the queue before passing it into a handle_message function.

Called when the process receives a call message sent by call/3. This callback has the same arguments as the GenServer equivalent and the :reply, :noreply and :stop return tuples behave the same.

Called when the process receives a cast message sent by cast/2. This callback has the same arguments as the GenServer equivalent and the :noreply and :stop return tuples behave the same.

Called when the Freddy.Consumer process has opened and AMQP channel and declared an exchange and a queue.

Called when the Freddy.Consumer process has been disconnected from the AMQP broker.

Called when the process receives a message. This callback has the same arguments as the GenServer equivalent and the :noreply and :stop return tuples behave the same.

Called when a message is delivered from the queue.

Called when the AMQP server has registered the process as a consumer and it will start to receive messages.

Called when the Freddy.Consumer process is first started.

This callback is the same as the GenServer equivalent and is called when the process terminates. The first argument is the reason the process is about to exit with.

Link to this section Types

Specs

action() :: :ack | :nack | :reject
Link to this type

connection()

Specs

connection() :: GenServer.server()
Link to this type

connection_info()

Specs

connection_info() :: %{
  channel: Freddy.Core.Channel.t(),
  queue: Freddy.Core.Queue.t(),
  exchange: Freddy.Core.Exchange.t()
}
Link to this type

consumer_options()

Specs

consumer_options() :: [{:channel_open_timeout, timeout()}]

Specs

error() :: term()

Specs

meta() :: map()

Specs

payload() :: term()
Link to this type

routing_key()

Specs

routing_key() :: String.t()

Specs

state() :: term()

Link to this section Functions

Link to this function

ack(meta, opts \\ [])

Specs

ack(meta :: map(), opts :: Keyword.t()) :: :ok

Ack's a message given its meta

Link to this function

call(consumer, message, timeout \\ 5000)

See Connection.call/3.

Link to this function

cast(consumer, message)

See Connection.cast/2.

Link to this function

nack(meta, opts \\ [])

Specs

nack(meta :: map(), opts :: Keyword.t()) :: :ok

Nack's a message given its meta

Link to this function

reject(meta, opts \\ [])

Specs

reject(meta :: map(), opts :: Keyword.t()) :: :ok

Rejects a message given its meta

Link to this function

start(mod, connection, config, initial, opts \\ [])

Start a Freddy.Consumer process without linking to the current process.

See start_link/5 for more information.

Link to this function

start_link(mod, connection, config, initial, opts \\ [])

Specs

start_link(
  module(),
  connection(),
  Keyword.t(),
  initial :: term(),
  GenServer.options()
) :: GenServer.on_start()

Start a Freddy.Consumer process linked to the current process.

Arguments:

  • mod - the module that defines the server callbacks (like GenServer)
  • connection - the pid of a Freddy.Connection process
  • config - the configuration of the consumer
  • initial - the value that will be given to init/1
  • opts - the GenServer options
Link to this function

stop(consumer, reason \\ :normal)

See GenServer.stop/2.

Link to this section Callbacks

Link to this callback

decode_message(payload, meta, state)

Specs

decode_message(payload :: String.t(), meta(), state()) ::
  {:ok, payload(), state()}
  | {:ok, payload(), meta(), state()}
  | {:reply, action(), opts :: Keyword.t(), state()}
  | {:reply, action(), state()}
  | {:noreply, state()}
  | {:stop, reason :: term(), state()}

Called when a message is delivered from the queue before passing it into a handle_message function.

The arguments are the message's raw payload, some metatdata and the internal state. The metadata is a map containing all metadata given by the AMQP client when receiving the message plus the :exchange and :queue values.

Returning {:ok, payload, state} or {:ok, payload, meta, state} will pass the decoded payload and meta into handle_message/3 function.

Returning {:reply, action, opts, state} or {:reply, action, state} will immediately ack, nack or reject the message.

Returning {:noreply, state} will do nothing, and therefore the message should be acknowledged by using Freddy.Consumer.ack/2, Freddy.Consumer.nack/2 or Freddy.Consumer.reject/2.

Returning {:stop, reason, state} will terminate the main loop and call terminate(reason, state) before the process exits with reason reason.

Link to this callback

handle_call(request, arg2, state)

Specs

handle_call(request :: term(), GenServer.from(), state()) ::
  {:reply, reply :: term(), state()}
  | {:reply, reply :: term(), state(), timeout() | :hibernate}
  | {:noreply, state()}
  | {:noreply, state(), timeout() | :hibernate}
  | {:stop, reason :: term(), state()}
  | {:stop, reason :: term(), reply :: term(), state()}

Called when the process receives a call message sent by call/3. This callback has the same arguments as the GenServer equivalent and the :reply, :noreply and :stop return tuples behave the same.

Link to this callback

handle_cast(request, state)

Specs

handle_cast(request :: term(), state()) ::
  {:noreply, state()}
  | {:noreply, state(), timeout() | :hibernate}
  | {:stop, reason :: term(), state()}

Called when the process receives a cast message sent by cast/2. This callback has the same arguments as the GenServer equivalent and the :noreply and :stop return tuples behave the same.

Link to this callback

handle_connected(meta, state)

Specs

handle_connected(meta :: connection_info(), state()) ::
  {:noreply, state()}
  | {:noreply, state(), timeout() | :hibernate}
  | {:error, state()}
  | {:stop, reason :: term(), state()}

Called when the Freddy.Consumer process has opened and AMQP channel and declared an exchange and a queue.

First argument is a map, containing :channel, :exchange and :queue structures.

Returning {:noreply, state} will cause the process to enter the main loop with the given state.

Returning {:error, state} will indicate that process failed to perform some critical actions and must reconnect.

Returning {:stop, reason, state} will terminate the main loop and call terminate/2 before the process exits with reason reason.

Link to this callback

handle_disconnected(reason, state)

Specs

handle_disconnected(reason :: term(), state()) ::
  {:noreply, state()} | {:stop, reason :: term(), state()}

Called when the Freddy.Consumer process has been disconnected from the AMQP broker.

Returning {:noreply, state} causes the process to enter the main loop with the given state. The process will not consume any new messages until connection to AMQP broker is established again.

Returning {:stop, reason, state} will terminate the main loop and call terminate/2 before the process exits with reason reason.

Link to this callback

handle_info(term, state)

Specs

handle_info(term(), state()) ::
  {:noreply, state()}
  | {:noreply, state(), timeout() | :hibernate}
  | {:stop, reason :: term(), state()}

Called when the process receives a message. This callback has the same arguments as the GenServer equivalent and the :noreply and :stop return tuples behave the same.

Link to this callback

handle_message(payload, meta, state)

Specs

handle_message(payload(), meta(), state()) ::
  {:reply, action(), state()}
  | {:reply, action(), opts :: Keyword.t(), state()}
  | {:noreply, state()}
  | {:noreply, state(), timeout() | :hibernate}
  | {:stop, reason :: term(), state()}

Called when a message is delivered from the queue.

The arguments are the message's decoded payload, some metadata and the internal state. The metadata is a map containing all metadata given by the adapter when receiving the message plus the :exchange and :queue values received at the connect/2 callback.

Returning {:reply, :ack | :nack | :reject, state} will ack, nack or reject the message.

Returning {:reply, :ack | :nack | :reject, opts, state} will ack, nack or reject the message with the given opts.

Returning {:noreply, state} will do nothing, and therefore the message should be acknowledged by using Freddy.Consumer.ack/2, Freddy.Consumer.nack/2 or Freddy.Consumer.reject/2.

Returning {:stop, reason, state} will terminate the main loop and call terminate(reason, state) before the process exits with reason reason.

Link to this callback

handle_ready(meta, state)

Specs

handle_ready(meta(), state()) ::
  {:noreply, state()}
  | {:noreply, state(), timeout() | :hibernate}
  | {:stop, reason :: term(), state()}

Called when the AMQP server has registered the process as a consumer and it will start to receive messages.

Returning {:noreply, state} will causes the process to enter the main loop with the given state.

Returning {:stop, reason, state} will terminate the main loop and call terminate(reason, state) before the process exits with reason reason.

Specs

init(state()) ::
  {:ok, state()}
  | {:ok, state(), consumer_options()}
  | :ignore
  | {:stop, reason :: term()}

Called when the Freddy.Consumer process is first started.

Returning {:ok, state} will cause start_link/3 to return {:ok, pid} and attempt to open a channel on the given connection and initialize an actor (it depends on the actor's nature, consumer will, for example, declare an exchange, a queue and a bindings).

After that it will enter the main loop with state as its internal state.

Returning :ignore will cause start_link/3 to return :ignore and the process will exit normally without entering the loop, opening a channel or calling terminate/2.

Returning {:stop, reason} will cause start_link/3 to return {:error, reason} and the process will exit with reason reason without entering the loop, opening a channel, or calling terminate/2.

Link to this callback

terminate(reason, state)

Specs

terminate(reason :: term(), state()) :: any()

This callback is the same as the GenServer equivalent and is called when the process terminates. The first argument is the reason the process is about to exit with.