View Source BroadwayRabbitMQ.Producer (BroadwayRabbitMQ v0.8.2)
A RabbitMQ producer for Broadway.
Features
- Automatically acknowledges/rejects messages.
- Handles connection outages using backoff for retries.
For a quick getting started on using Broadway with RabbitMQ, please see the RabbitMQ Guide.
Options
:buffer_size
(non_neg_integer/0
) - Optional, but required if:prefetch_count
under:qos
is set to0
. Defines the size of the buffer to store events without demand. Can be:infinity
to signal no limit on the buffer size. This is used to configure the GenStage producer, see theGenStage
docs for more details. Defaults to:prefetch_count * 5
.:buffer_keep
- Optional. Used in the GenStage producer configuration. Defines whether the:first
or:last
entries should be kept on the buffer in case the buffer size is exceeded. Defaults to:last
.:on_success
- Configures the acking behaviour for successful messages. See the "Acking" section below for all the possible values. This option can also be changed for each message throughBroadway.Message.configure_ack/2
. The default value is:ack
.:on_failure
- Configures the acking behaviour for failed messages. See the "Acking" section below for all the possible values. This option can also be changed for each message throughBroadway.Message.configure_ack/2
. The default value is:reject_and_requeue
.:backoff_min
(non_neg_integer/0
) - The minimum backoff interval (default:1_000
).:backoff_max
(non_neg_integer/0
) - The maximum backoff interval (default:30_000
).:backoff_type
- The backoff strategy::stop
for no backoff and to stop,:exp
for exponential,:rand
for random, and:rand_exp
for random exponential (default::rand_exp
). The default value is:rand_exp
.
The following options apply to the underlying AMQP connection:
:queue
(String.t/0
) - Required. The name of the queue. If""
, then the queue name will be autogenerated by the server but for this to work you have to declare the queue through the:declare
option.:connection
- Defines an AMQP URI (a string), a custom pool, or a set of options used by the RabbitMQ client to open the connection with the RabbitMQ broker. To use a custom pool, pass a{:custom_pool, module, args}
tuple, seeBroadwayRabbitMQ.ChannelPool
for more information. If passing an AMQP URI or a list of options, this producer manages the AMQP connection instead. SeeAMQP.Connection.open/1
for the full list of connection options. The default value is[]
.:qos
(keyword/0
) - Defines a set of prefetch options used by the RabbitMQ client. SeeAMQP.Basic.qos/2
for the full list of options. Note that the:global
option is not supported by Broadway since each producer holds only one channel per connection. The default value is[]
.:prefetch_size
(non_neg_integer/0
):prefetch_count
(non_neg_integer/0
) - The default value is50
.
:name
- The name of the AMQP connection to use. The default value is:undefined
.:metadata
(list ofatom/0
) - The list of AMQP metadata fields to copy (default:[]
). Note that everyBroadway.Message
contains an:amqp_channel
in itsmetadata
field. See the "Metadata" section. The default value is[]
.:declare
(keyword/0
) - A list of options used to declare the:queue
. The queue is only declared (and possibly created if not already there) if this option is present and notnil
. Note that if you use""
as the queue name (which means that the queue name will be autogenerated on the server), then every producer stage will declare a different queue. If you want all producer stages to consume from the same queue, use a specific queue name. You can still declare the same queue as many times as you want because queue creation is idempotent (as long as you don't use thepassive: true
option). For the available options, seeAMQP.Queue.declare/3
,:nowait
is not supported.:bindings
- A list of bindings for the:queue
. This option allows you to bind the queue to one or more exchanges. Each binding is a tuple{exchange_name, binding_options}
where so that the queue will be bound toexchange_name
throughAMQP.Queue.bind/4
usingbinding_options
as the options. Bindings are idempotent so you can bind the same queue to the same exchange multiple times. The default value is[]
.:merge_options
(function of arity 1) - A function that takes the index of the producer in the Broadway topology and returns a keyword list of options. The returned options are merged with the other options given to the producer. This option is useful to dynamically change options based on the index of the producer. For example, you can use this option to "shard" load between a few queues where a subset of the producer stages is connected to each queue, or to connect producers to different RabbitMQ nodes (for example through partitioning). Note that the options are evaluated every time a connection is established (for example, in case of disconnections). This means that you can also use this option to choose different options on every reconnections. This can be particularly useful if you have multiple RabbitMQ URLs: in that case, you can reconnect to a different URL every time you reconnect to RabbitMQ, which avoids the case where the producer tries to always reconnect to a URL that is down.:after_connect
(function of arity 1) - A function that takes the AMQP channel that the producer is connected to and can run arbitrary setup. This is useful for declaring complex RabbitMQ topologies with possibly multiple queues, bindings, or exchanges. RabbitMQ declarations are generally idempotent so running this function from all producer stages after every time they connect is likely fine. This function can return:ok
if everything went well or{:error, reason}
. In the error case then the producer will consider the connection failed and will try to reconnect later (same behavior as when the connection drops, for example). This function is run before the declaring and binding queues according to the:declare
and:bindings
options (described above).:consume_options
(keyword/0
) - Options passed down toAMQP.Basic.consume/4
. Not all options supported byAMQP.Basic.consume/4
are available here as some options would conflict with the internal implementation of this producer. The default value is[]
.:consumer_tag
(String.t/0
):no_local
(boolean/0
):no_ack
(boolean/0
):exclusive
(boolean/0
):arguments
(term/0
)
Note AMQP provides the possibility to define the AMQP connection globally. This is not supported by Broadway. You must configure the connection directly in the Broadway pipeline, as shown in the next section.
Example
@processor_concurrency 50
@max_demand 2
Broadway.start_link(MyBroadway,
name: MyBroadway,
producer: [
module:
{BroadwayRabbitMQ.Producer,
queue: "my_queue",
connection: [
username: "user",
password: "password",
host: "192.168.0.10"
],
qos: [
# See "Back-pressure and `:prefetch_count`" section
prefetch_count: @processor_concurrency * @max_demand
],
on_failure: :reject_and_requeue},
# See "Producer concurrency" section
concurrency: 1
],
processors: [
default: [
concurrency: @processor_concurrency,
# See "Max demand" section
max_demand: @max_demand
]
]
)
Producer concurrency
For efficiency, you should generally limit the amount of internal queueing. Whenever additional messages are sitting in a busy processor's mailbox, they can't be delivered to another processor which may be available or become available first.
One possible cause of internal queueing is multiple producers. This is because
each processor's demand will be sent to all producers. For example, if a
processor demands 2
messages and there are 2
producers, each producer
will try to pull 2
messages and give them to the processor. So the
processor may receive max_demand * <producer concurrency>
messages.
Setting producer concurrency: 1
will reduce internal queueing, so this is
the recommended setting to start with. Only increase producer concurrency
if you can measure performance improvements in your system. Adding another
single-producer pipeline, or another node running the pipeline, are other
ways you may consider to increase throughput.
Back-pressure and :prefetch_count
Unlike the BroadwaySQS producer, which polls for new messages,
BroadwayRabbitMQ receives messages as they are are pushed by RabbitMQ. The
:prefetch_count
setting instructs RabbitMQ to limit the number of
unacknowledged messages a consumer will have at a given
moment (except with a value
of 0
, which RabbitMQ treats as infinity).
Setting a prefetch limit creates back-pressure from Broadway to RabbitMQ so
that the pipeline is not overwhelmed with messages. But setting the limit too
low will limit throughput. For example, if the :prefetch_count
were 1
,
only one message could be processed at a time, regardless of other settings.
Although the RabbitMQ client has a default :prefetch_count
of 0
,
BroadwayRabbitMQ overwrites the default value to 50
, enabling the
back-pressure mechanism. To ensure that all processors in a given pipeline
can receive messages, the value should be set to at least max_demand * <number of processors>
, as in the example above.
Increasing it beyond that could be helpful if latency from RabbitMQ were high, and in the long term would not cause the pipeline to receive an unfair share of messages, since RabbitMQ uses round-robin delivery to all subscribers. It could mean that a newly-added subscriber would initially receives no messages, as they would have all been prefetched by the existing producer.
If you're using batchers, you'll need a larger :prefetch_count
to allow all
batchers and processors to be busy simultaneously. Measure your system to
decide what number works best.
You can define :prefetch_count
as 0
if you wish to disable back-pressure.
However, if you do this, make sure the machine has enough resources to handle
the number of messages coming from the broker, and set :buffer_size
to an
appropriate value.
Max demand
The best value for max_demand
depends on how long your messages take to
process. If processing time is long, consider setting it to 1
. Otherwise,
the default value of 10
is a good starting point.
Measure throughput in your own system to see how this setting affects it.
Connection loss and backoff
In case the connection cannot be opened or if an established connection is lost,
the producer will try to reconnect using an exponential random backoff strategy.
The strategy can be configured using the :backoff_type
option.
Declaring queues and binding them to exchanges
In RabbitMQ, it's common for consumers to declare the queue they're going
to consume from and bind it to the appropriate exchange when they start up.
You can do these steps (either or both) when setting up your Broadway pipeline
through the :declare
and :bindings
options.
Broadway.start_link(MyBroadway,
name: MyBroadway,
producer: [
module:
{BroadwayRabbitMQ.Producer,
queue: "my_queue",
declare: [],
bindings: [{"my-exchange", []}]},
concurrency: 1
],
processors: [
default: []
]
)
Acking
You can use the :on_success
and :on_failure
options to control how messages
are acked on RabbitMQ. By default, successful messages are acked and failed
messages are rejected. You can set :on_success
and :on_failure
when starting
the RabbitMQ producer, or change them for each message through
Broadway.Message.configure_ack/2
. You can also ack a message before the end of the Broadway
pipeline by using Broadway.Message.ack_immediately/1
, which determines whether to ack or
reject based on :on_success
/:on_failure
too.
Here is the list of all possible values supported by :on_success
and :on_failure
:
:ack
- acknowledge the message. RabbitMQ will mark the message as acked and will not redeliver it to any other consumer. This is done viaAMQP.Basic.ack/3
.:reject
- rejects the message without requeuing (basically, discards the message). RabbitMQ will not redeliver the message to any other consumer, but a queue can be configured to send rejected messages to a dead letter exchange, where another consumer can see why it was dead lettered, how many times, and so on, and potentially republish it. Rejecting is done throughAMQP.Basic.reject/3
with the:requeue
option set tofalse
.:reject_and_requeue
- rejects the message and tells RabbitMQ to requeue it so that it can be delivered to a consumer again.:reject_and_requeue
always requeues the message. If the message is unprocessable, this will cause an infinite loop of retries. Rejecting is done throughAMQP.Basic.reject/3
with the:requeue
option set totrue
.:reject_and_requeue_once
- rejects the message and tells RabbitMQ to requeue it the first time. If a message was already requeued and redelivered, it will be rejected and not requeued again. This feature uses Broadway-specific message metadata, not RabbitMQ's dead lettering feature. Rejecting is done throughAMQP.Basic.reject/3
.
If you pass the no_ack: true
option under :consume_options
, then RabbitMQ will consider
every message delivered to a consumer as acked, so the settings above have no effect.
In those cases, calling Broadway.Message.ack_immediately/1
also has no effect.
Choosing the right requeue strategy
Choose the requeue strategy carefully.
If you set the value to :reject
or :reject_and_requeue_once
, make sure you handle failed
messages properly, either by logging them somewhere or redirecting them to a dead-letter queue
for future inspection. These strategies are useful when you want to implement at most once
processing: you want your messages to be processed at most once, but if they fail, you prefer
that they're not re-processed. It's common to pair this requeue strategy with the use of
Broadway.Message.ack_immediately/1
in order to ack the message before doing any work,
so that if the consumer loses connection to RabbitMQ while processing, the message will have
been acked and RabbitMQ will not deliver it to another consumer. For example:
def handle_message(_, message, _context) do
Broadway.Message.ack_immediately(message)
process_message(message)
message
end
:reject_and_requeue
is commonly used when you are implementing at least once processing
semantics. You want messages to be processed at least once, so if something goes wrong and they
get rejected, they'll be requeued and redelivered to a consumer.
When using :reject_and_requeue
, pay attention that requeued messages by default will
be instantly redelivered, which may result in very high unnecessary workload.
One way to handle this is by using Dead Letter Exchanges
and TTL and Expiration.
Metadata
You can retrieve additional information about your message by setting the :metadata
option
when starting the producer. This is useful in a handful of situations like when you are
interested in the message headers or in knowing if the message is new or redelivered.
Metadata is added to the metadata
field in the Broadway.Message
struct.
These are the keys in the metadata map that are always present:
:amqp_channel
- It contains theAMQP.Channel
struct. You can use it to do things like publish messages back to RabbitMQ (for use cases such as RPCs). You should not do things with the channel other than publish messages withAMQP.Basic.publish/5
. Other operations may result in undesired effects.
Here is the list of all possible values supported by :metadata
:
:delivery_tag
- an integer that uniquely identifies the delivery on a channel. It's used internally in AMQP client library methods, like acknowledging or rejecting a message.:redelivered
- a boolean representing if the message was already rejected and requeued before.:exchange
- the name of the exchange the queue was bound to.:routing_key
- the name of the queue from which the message was consumed.:content_type
- the MIME type of the message.:content_encoding
- the MIME content encoding of the message.:headers
- the headers of the message, which are returned in tuples of type{String.t(), argument_type(), term()}
. The last value of the tuple is the value of the header. You can find a list of argument types here.:persistent
- a boolean stating whether or not the message was published with disk persistence.:priority
- an integer representing the message priority on the queue.:correlation_id
- it's a useful property of AMQP protocol to correlate RPC requests. You can read more about RPC in RabbitMQ here.:message_id
- application specific message identifier.:timestamp
- a timestamp associated with the message.:type
- message type as a string.:user_id
- a user identifier that could have been assigned during message publication. RabbitMQ validated this value against the active connection when the message was published.:app_id
- publishing application identifier.:cluster_id
- RabbitMQ cluster identifier.:reply_to
- name of the reply queue.
Telemetry
This producer emits a few Telemetry events which are listed below.
[:broadway_rabbitmq, :amqp, :open_connection, :start | :stop | :exception]
spans - these events are emitted in "span style" when opening an AMQP connection. See:telemetry.span/3
.All these events have the measurements described in
:telemetry.span/3
. The events contain the following metadata::connection_name
- the name of the AMQP connection (ornil
if it doesn't have a name):connection
- the connection info passed when starting the producer (either a URI or a keyword list of options)
[:broadway_rabbitmq, :amqp, :ack, :start | :stop | :exception]
span - these events are emitted in "span style" when acking messages on RabbitMQ. See:telemetry.span/3
.All these events have the measurements described in
:telemetry.span/3
. The events contain no metadata.[:broadway_rabbitmq, :amqp, :reject, :start | :stop | :exception]
span - these events are emitted in "span style" when rejecting messages on RabbitMQ. See:telemetry.span/3
.All these events have the measurements described in
:telemetry.span/3
. The[..., :start]
event contains the following metadata::requeue
- a boolean telling if this "reject" is asking RabbitMQ to requeue the message or not.
[:broadway_rabbitmq, :amqp, :connection_failure]
execution - this event is executed when the connection to RabbitMQ fails. See:telemetry.execute/3
.The event contains the following metadata:
:reason
- the reason for the failure.
Dead-letter Exchanges
Here's an example of how to use a dead-letter exchange setup with broadway_rabbitmq:
defmodule MyPipeline do
use Broadway
@queue "my_queue"
@exchange "my_exchange"
@queue_dlx "my_queue.dlx"
@exchange_dlx "my_exchange.dlx"
def start_link(_opts) do
Broadway.start_link(__MODULE__,
name: __MODULE__,
producer: [
module: {
BroadwayRabbitMQ.Producer,
on_failure: :reject,
after_connect: &declare_rabbitmq_topology/1,
queue: @queue,
declare: [
durable: true,
arguments: [
{"x-dead-letter-exchange", :longstr, @exchange_dlx},
{"x-dead-letter-routing-key", :longstr, @queue_dlx}
]
],
bindings: [{@exchange, []}],
},
concurrency: 2
],
processors: [default: [concurrency: 4]]
)
end
defp declare_rabbitmq_topology(amqp_channel) do
with :ok <- AMQP.Exchange.declare(amqp_channel, @exchange, :fanout, durable: true),
:ok <- AMQP.Exchange.declare(amqp_channel, @exchange_dlx, :fanout, durable: true),
{:ok, _} <- AMQP.Queue.declare(amqp_channel, @queue_dlx, durable: true),
:ok <- AMQP.Queue.bind(amqp_channel, @queue_dlx, @exchange_dlx) do
:ok
end
end
@impl true
def handle_message(_processor, message, _context) do
# Raising errors or returning a "failed" message here sends the message to the
# dead-letter queue.
end
end