AMQP.DirectConsumer (amqp v2.1.1) View Source

AMQP.DirectConsumer is an example custom consumer. It's argument is a pid of a process which the channel is meant to forward the messages to.

Usage

iex> AMQP.Channel.open(conn, {AMQP.DirectConsumer, self()})

This will forward all the messages from the channel to the calling process.

This is an Elixir reimplementation of :amqp_direct_consumer. (https://github.com/rabbitmq/rabbitmq-erlang-client/blob/master/src/amqp_direct_consumer.erl)

For more information see: https://www.rabbitmq.com/erlang-client-user-guide.html#consumers-imlementation

Caveat

AMQP 2.0 comes with an improved version of the default consumer(AMQP.SelectiveConsumer). There is not an obvious reason using DirectConsumer after the version 2.0. We highly recommend you to use the default consumer which lets you decouple the consumer concern from the channel. If you still want to keep using DirectConsumer, keep reading the caveat below.

Close the channel explicitly

By default, the DirectConsumer detects the user consumer down then...

  • DirectConsumer returns :error in handle_info which results the process to exit.
  • The channel supervisor will detect the DirectConsumer shutdown but the restart will be failing as the user consumer is still down.
  • It ends up the channel process to shut down.

However this is an abnormal shutdown and can cause an unintended race condition.

To avoid it, make sure to close the channel explicitly and shut down your consumer with a :normal signal.

Ignore the user consumer shutdown

DirectConsumer follows the Erlang version and ignores only :normal signals for the user consumer exit.

You might want to also ignore :shutdown signals as it can also be called before the channel is closed.

You can set the additional reasons to ignore with the following options:

iex> opts = [ignore_consumer_down: [:normal, :shutdown]]
iex> AMQP.Channel.open(conn, {AMQP.DirectConsumer, {self(), opts}})

You can also ignore the user consumer down completely by setting true to the value:

iex> opts = [ignore_consumer_down: true]
iex> AMQP.Channel.open(conn, {AMQP.DirectConsumer, {self(), opts}})