View Source AMQP.DirectConsumer (amqp v4.0.0)
AMQP.DirectConsumer
is an example custom consumer. It's argument is a pid
of a process which the channel is meant to forward the messages to.
Usage
iex> AMQP.Channel.open(conn, {AMQP.DirectConsumer, self()})
This will forward all the messages from the channel to the calling process.
This is an Elixir reimplementation of :amqp_direct_consumer
. (https://github.com/rabbitmq/rabbitmq-erlang-client/blob/master/src/amqp_direct_consumer.erl)
For more information see: https://www.rabbitmq.com/erlang-client-user-guide.html#consumers-imlementation
Caveat
You are recommended to use the default consumer
AMQP 2.0 comes with an improved version of the default consumer(AMQP.SelectiveConsumer
).
There is not an obvious reason using DirectConsumer after the version 2.0.
We highly recommend you to use the default consumer which lets you decouple
the consumer concern from the channel.
If you still want to keep using DirectConsumer
, keep reading the caveat below.
Close the channel explicitly
By default, the DirectConsumer detects the user consumer down then...
- DirectConsumer returns :error in handle_info which results the process to exit.
- The channel supervisor will detect the DirectConsumer shutdown but the restart will be failing as the user consumer is still down.
- It ends up the channel process to shut down.
However this is an abnormal shutdown and can cause an unintended race condition.
To avoid it, make sure to close the channel explicitly and shut down your
consumer with a :normal
signal.
Ignore the user consumer shutdown
DirectConsumer follows the Erlang version and ignores only :normal
signals
for the user consumer exit.
You might want to also ignore :shutdown
signals as it can also be called
before the channel is closed.
You can set the additional reasons to ignore with the following options:
iex> opts = [ignore_consumer_down: [:normal, :shutdown]]
iex> AMQP.Channel.open(conn, {AMQP.DirectConsumer, {self(), opts}})
You can also ignore the user consumer down completely by setting true
to the value:
iex> opts = [ignore_consumer_down: true]
iex> AMQP.Channel.open(conn, {AMQP.DirectConsumer, {self(), opts}})