hare v0.1.9 Hare.Consumer behaviour

A behaviour module for implementing AMQP consumer processes.

The Hare.Consumer module provides a way to create processes that hold, monitor, and restart a channel in case of failure, and have some callbacks to hook into the process lifecycle and handle messages.

An example Hare.Consumer process that ignores and acks messages with the payload "hello", otherwise it does not ack but calls a given handler function with the payload and a callback function, so the handler can ack the message when finished processing it:

defmodule MyConsumer do
  use Hare.Consumer

  def start_link(conn, config, handler) do
    Hare.Consumer.start_link(__MODULE__, conn, config, handler)
  end

  def init(handler) do
    {:ok, %{handler: handler}}
  end

  def handle_message("hello", _meta, state) do
    {:reply, :ack, state}
  end
  def handle_message(payload, meta, %{handler: handler} = state) do
    ack_callback = fn ->
      Hare.Consumer.ack(meta)
    end

    handler.(payload, ack_callback)
    {:noreply, state}
  end
end

Channel handling

When the Hare.Consumer starts with start_link/5 it runs the init/1 callback and responds with {:ok, pid} on success, like a GenServer.

After starting the process it attempts to open a channel on the given connection. It monitors the channel, and in case of failure it tries to reopen again and again on the same connection.

Context setup

The context setup process for a consumer is to declare an exchange, then declare a queue to consume, and then bind the queue to the exchange.

Every time a channel is open the context is set up, meaning that the queue and the exchange are declared and binded through the new channel based on the given configuration.

The configuration must be a Keyword.t that contains the following keys:

Summary

Functions

Ack’s a message given its meta

Nack’s a message given its meta

Rejects a message given its meta

Starts a Hare.Consumer process linked to the current process

Callbacks

Called every time the channel has been opened and the queue, exchange, and binding has been declared

Called when the process receives a message

Called when a message is delivered from the queue

Called when the AMQP server has registered the process as a consumer and it will start to receive messages

Called when the consumer process is first started. start_link/5 will block until it returns

This callback is the same as the GenServer equivalent and is called when the process terminates. The first argument is the reason the process is about to exit with

Types

action()
action() :: :ack | :nack | :reject
meta()
meta() :: map
opts()
opts() :: Hare.Adapter.opts
payload()
payload() :: Hare.Adapter.payload
state()
state() :: term

Functions

ack(meta, opts \\ [])
ack(meta, opts) :: :ok

Ack’s a message given its meta

nack(meta, opts \\ [])
nack(meta, opts) :: :ok

Nack’s a message given its meta

reject(meta, opts \\ [])
reject(meta, opts) :: :ok

Rejects a message given its meta

start_link(mod, conn, config, initial, opts \\ [])
start_link(module, pid, config, initial :: term, GenServer.options) :: GenServer.on_start

Starts a Hare.Consumer process linked to the current process.

This function is used to start a Hare.Consumer process in a supervision tree. The process will be started by calling init with the given initial value.

Arguments:

  • mod - the module that defines the server callbacks (like GenServer)
  • conn - the pid of a Hare.Core.Conn process
  • config - the configuration of the publisher (describing the exchange to declare)
  • initial - the value that will be given to init/1
  • opts - the GenServer options

Callbacks

connected(meta, state)
connected(meta, state) ::
  {:noreply, state} |
  {:stop, reason :: term, state}

Called every time the channel has been opened and the queue, exchange, and binding has been declared.

It is called with two arguments: some metadata and the process’ internal state.

The metadata is a map with a two fields:

Returning {:noreply, state} will cause the process to enter the main loop with state as its internal state.

Returning {:stop, reason, state} will terminate the loop and call terminate(reason, state) before the process exists with reason reason.

handle_info(meta, state)
handle_info(meta, state) ::
  {:noreply, state} |
  {:stop, reason :: term, state}

Called when the process receives a message.

Returning {:noreply, state} will causes the process to enter the main loop with the given state.

Returning {:stop, reason, state} will not send the message, terminate the main loop and call terminate(reason, state) before the process exists with reason reason.

handle_message(payload, meta, state)
handle_message(payload, meta, state) ::
  {:reply, action, state} |
  {:reply, action, opts :: Keyword.t, state} |
  {:noreply, state} |
  {:stop, reason :: term, state}

Called when a message is delivered from the queue.

The arguments are the message’s payload, some metadata and the internal state. The metadata is a map containing all metadata given by the adapter when receiving the message plus the :exchange and :queue values received at the connect/2 callback.

Returning {:reply, :ack | :nack | :reject, state} will ack, nack or reject the message.

Returning {:reply, :ack | :nack | :reject, opts, state} will ack, nack or reject the message with the given opts.

Returning {:noreply, state} will do nothing, and therefore the message should be acknowledged by using Hare.Consumer.ack/2, Hare.Consumer.nack/2 or Hare.Consumer.reject/2.

Returning {:stop, reason, state} will terminate the main loop and call terminate(reason, state) before the process exists with reason reason.

handle_ready(meta, state)
handle_ready(meta, state) ::
  {:noreply, state} |
  {:stop, reason :: term, state}

Called when the AMQP server has registered the process as a consumer and it will start to receive messages.

Returning {:noreply, state} will causes the process to enter the main loop with the given state.

Returning {:stop, reason, state} will terminate the main loop and call terminate(reason, state) before the process exists with reason reason.

init(initial)
init(initial :: term) ::
  {:ok, state} |
  :ignore |
  {:stop, reason :: term}

Called when the consumer process is first started. start_link/5 will block until it returns.

It receives as argument the fourth argument given to start_link/5.

Returning {:ok, state} will cause start_link/5 to return {:ok, pid} and attempt to open a channel on the given connection and declare the queue, the exchange, and the binding. After that it will enter the main loop with state as its internal state.

Returning :ignore will cause start_link/5 to return :ignore and the process will exit normally without entering the loop, opening a channel or calling terminate/2.

Returning {:stop, reason} will cause start_link/5 to return {:error, reason} and the process will exit with reason reason without entering the loop, opening a channel, or calling terminate/2.

terminate(reason, state)
terminate(reason :: term, state) :: any

This callback is the same as the GenServer equivalent and is called when the process terminates. The first argument is the reason the process is about to exit with.