BroadwayRabbitMQ.Producer (BroadwayRabbitMQ v0.6.4) View Source

A RabbitMQ producer for Broadway.

Features

  • Automatically acknowledges/rejects messages.
  • Handles connection outages using backoff for retries.

Options

  • :buffer_size - Optional, but required if :prefetch_count under :qos is set to 0. Defines the size of the buffer to store events without demand. Can be :infinity to signal no limit on the buffer size. This is used to configure the GenStage producer, see the GenStage docs for more details. Defaults to :prefetch_count * 5.

  • :buffer_keep - Optional. Used in the GenStage producer configuration. Defines whether the :first or :last entries should be kept on the buffer in case the buffer size is exceeded. Defaults to :last.

  • :on_success - Configures the acking behaviour for successful messages. See the "Acking" section below for all the possible values. This option can also be changed for each message through Broadway.Message.configure_ack/2. The default value is :ack.

  • :on_failure - Configures the acking behaviour for failed messages. See the "Acking" section below for all the possible values. This option can also be changed for each message through Broadway.Message.configure_ack/2. The default value is :reject_and_requeue.

  • :backoff_min - The minimum backoff interval (default: 1_000).

  • :backoff_max - The maximum backoff interval (default: 30_000).

  • :backoff_type - The backoff strategy: :stop for no backoff and to stop, :exp for exponential, :rand for random, and :rand_exp for random exponential (default: :rand_exp). The default value is :rand_exp.

  • :queue - Required. The name of the queue. If "", then the queue name will be autogenerated by the server but for this to work you have to declare the queue through the :declare option.

  • :connection - Defines an AMQP URI or a set of options used by the RabbitMQ client to open the connection with the RabbitMQ broker. See AMQP.Connection.open/1 for the full list of options. The default value is [].

  • :qos - Defines a set of prefetch options used by the RabbitMQ client. See AMQP.Basic.qos/2 for the full list of options. Note that the :global option is not supported by Broadway since each producer holds only one channel per connection. The default value is [].

    • :prefetch_size
    • :prefetch_count - The default value is 50.
  • :name - The name of the AMQP connection to use. The default value is :undefined.

  • :backoff_min - The minimum backoff interval (default: 1_000).

  • :backoff_max - The maximum backoff interval (default: 30_000).

  • :backoff_type - The backoff strategy. :stop for no backoff and to stop, :exp for exponential, :rand for random and :rand_exp for random exponential (default: :rand_exp).

  • :metadata - The list of AMQP metadata fields to copy (default: []). Note that every Broadway.Message contains an :amqp_channel in its metadata field. See the "Metadata" section. The default value is [].

  • :declare - A list of options used to declare the :queue. The queue is only declared (and possibly created if not already there) if this option is present and not nil. Note that if you use "" as the queue name (which means that the queue name will be autogenerated on the server), then every producer stage will declare a different queue. If you want all producer stages to consume from the same queue, use a specific queue name. You can still declare the same queue as many times as you want because queue creation is idempotent (as long as you don't use the passive: true option). For the available options, see AMQP.Queue.declare/3.

  • :bindings - A list of bindings for the :queue. This option allows you to bind the queue to one or more exchanges. Each binding is a tuple {exchange_name, binding_options} where so that the queue will be bound to exchange_name through AMQP.Queue.bind/4 using binding_options as the options. Bindings are idempotent so you can bind the same queue to the same exchange multiple times. The default value is [].

  • :merge_options - A function that takes the index of the producer in the Broadway topology and returns a keyword list of options. The returned options are merged with the other options given to the producer. This option is useful to dynamically change options based on the index of the producer. For example, you can use this option to "shard" load between a few queues where a subset of the producer stages is connected to each queue, or to connect producers to different RabbitMQ nodes (for example through partitioning). Note that the options are evaluated every time a connection is established (for example, in case of disconnections). This means that you can also use this option to choose different options on every reconnections. This can be particularly useful if you have multiple RabbitMQ URLs: in that case, you can reconnect to a different URL every time you reconnect to RabbitMQ, which avoids the case where the producer tries to always reconnect to a URL that is down.

  • :after_connect - A function that takes the AMQP channel that the producer is connected to and can run arbitrary setup. This is useful for declaring complex RabbitMQ topologies with possibly multiple queues, bindings, or exchanges. RabbitMQ declarations are generally idempotent so running this function from all producer stages after every time they connect is likely fine. This function can return :ok if everything went well or {:error, reason}. In the error case then the producer will consider the connection failed and will try to reconnect later (same behavior as when the connection drops, for example). This function is run before the declaring and binding queues according to the :declare and :bindings options (described above).

Example

Broadway.start_link(MyBroadway,
  name: MyBroadway,
  producer: [
    module:
      {BroadwayRabbitMQ.Producer,
      queue: "my_queue",
      connection: [
        username: "user",
        password: "password",
        host: "192.168.0.10"
      ],
      qos: [
        prefetch_count: 50
      ]},
    concurrency: 5
  ],
  processors: [
    default: []
  ]
)

Back-pressure and :prefetch_count

Unlike the RabbitMQ client that has a default :prefetch_count = 0, which disables back-pressure, BroadwayRabbitMQ overwrite the default value to 50 enabling the back-pressure mechanism. You can still define it as 0, however, if you do this, make sure the machine has enough resources to handle the number of messages coming from the broker, and set :buffer_size to an appropriate value.

This is important because the BroadwayRabbitMQ producer does not work as a poller like BroadwaySQS. Instead, it maintains an active connection with a subscribed consumer that receives messages continuously as they arrive in the queue. This is more efficient than using the basic.get method, however, it removes the ability of the GenStage producer to control the demand. Therefore we need to use the :prefetch_count option to impose back-pressure at the channel level.

Connection loss and backoff

In case the connection cannot be opened or if a stablished connection is lost, the producer will try to reconnect using an exponential random backoff strategy. The strategy can be configured using the :backoff_type option.

Unsupported options

Currently, Broadway does not accept options for Basic.consume/4 which is called internally by the producer with default values. That means options like :no_ack are not supported. If you have a scenario where you need to customize those options, please open an issue, so we can consider adding this feature.

Declaring queues and binding them to exchanges

In RabbitMQ, it's common for consumers to declare the queue they're going to consume from and bind it to the appropriate exchange when they start up. You can do these steps (either or both) when setting up your Broadway pipeline through the :declare and :bindings options.

Broadway.start_link(MyBroadway,
  name: MyBroadway,
  producer: [
    module:
      {BroadwayRabbitMQ.Producer,
      queue: "my_queue",
      declare: [],
      bindings: [{"my-exchange", []}]},
    concurrency: 5
  ],
  processors: [
    default: []
  ]
)

Acking

You can use the :on_success and :on_failure options to control how messages are acked on RabbitMQ. By default, successful messages are acked and failed messages are rejected. You can set :on_success and :on_failure when starting the RabbitMQ producer, or change them for each message through Broadway.Message.configure_ack/2.

Here is the list of all possible values supported by :on_success and :on_failure:

  • :ack - acknowledge the message. RabbitMQ will mark the message as acked and will not redeliver it to any other consumer.

  • :reject - rejects the message without requeuing (basically, discards the message). RabbitMQ will not redeliver the message to any other consumer, but a queue can be configured to send rejected messages to a dead letter exchange, where another consumer can see why it was dead lettered, how many times, etc, and potentially republish it.

  • :reject_and_requeue - rejects the message and tells RabbitMQ to requeue it so that it can be delivered to a consumer again. :reject_and_requeue always requeues the message. If the message is unprocessable, this will cause an infinite loop of retries.

  • :reject_and_requeue_once - rejects the message and tells RabbitMQ to requeue it the first time. If a message was already requeued and redelivered, it will be rejected and not requeued again. This feature uses Broadway-specific message metadata, not RabbitMQ's dead lettering feature.

Note: choose the requeue strategy carefully. If you set the value to :reject or :reject_and_requeue_once, make sure you handle failed messages properly, either by logging them somewhere or redirecting them to a dead-letter queue for future inspection. By sticking with :reject_and_requeue, pay attention that requeued messages by default will be instantly redelivered, which may result in very high unnecessary workload. One way to handle this is by using Dead Letter Exchanges and TTL and Expiration.

Metadata

You can retrieve additional information about your message by setting the :metadata option. This is useful in a handful of situations like when you are interested in the message headers or in knowing if the message is new or redelivered.

These are the keys in the metadata map that are always present:

  • :amqp_channel - It contains the AMQP.Channel struct. You can use it to do things like publish messages back to RabbitMQ (for use cases such as RPCs). You should not do things with the channel other than publish messages with AMQP.Basic.publish/5. Other operations may result in undesired effects.

Here is the list of all possible values supported by :metadata:

  • :delivery_tag - an integer that uniquely identifies the delivery on a channel. It's used internally in AMQP client library methods, like acknowledging or rejecting a message.

  • :redelivered - a boolean representing if the message was already rejected and requeued before.

  • :exchange - the name of the exchange the queue was bound to.

  • :routing_key - the name of the queue from which the message was consumed.

  • :message_count - the current number of messages in the queue.

  • :content_type - the MIME type of the message.

  • :content_encoding - the MIME content encoding of the message.

  • :headers - the headers of the message, which are returned in tuples of type {String.t(), argument_type(), term()}. The last value of the tuple is the value of the header. You can find a list of argument types here.

  • :persistent - a boolean stating whether or not the message was published with disk persistence.

  • :priority - an integer representing the message priority on the queue.

  • :correlation_id - it's a useful property of AMQP protocol to correlate RPC requests. You can read more about RPC in RabbitMQ here.

  • :message_id - application specific message identifier.

  • :timestamp - a timestamp associated with the message.

  • :type - message type as a string.

  • :user_id - a user identifier that could have been assigned during message publication. RabbitMQ validated this value against the active connection when the message was published.

  • :app_id - publishing application identifier.

  • :cluster_id - RabbitMQ cluster identifier.

  • :reply_to - name of the reply queue.