View Source RabbitMQ

RabbitMQ is an open source message broker designed to be highly scalable and distributed. It supports multiple protocols including the Advanced Message Queuing Protocol (AMQP).

getting-started

Getting Started

In order to use Broadway with RabbitMQ, we need to:

  1. Create a queue (or use an existing one)
  2. Configure our Elixir project to use Broadway
  3. Define your pipeline configuration
  4. Implement Broadway callbacks
  5. Run the Broadway pipeline
  6. Tuning the configuration (Optional)

In case you want to work with an existing queue, you can skip step 1 and jump to Configure the project.

Note: BroadwayRabbitMQ does not automatically create any queue. If you configure a pipeline with a non-existent queue, the producers will crash, bringing down the pipeline.

create-a-queue

Create a queue

RabbitMQ runs on many operating systems. Please see Downloading and Installing RabbitMQ for further information. Also, make sure you have the Management plugin enabled, which ships with the command line tool, rabbitmqadmin.

After successfully installing RabbitMQ, you can declare a new queue with the following command:

$ rabbitmqadmin declare queue name=my_queue durable=true

You can list all declared queues to see our the one we've just created:

$ rabbitmqctl list_queues
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
name      messages
my_queue  0

configure-the-project

Configure the project

In this guide, we're going to use BroadwayRabbitMQ, which is a Broadway RabbitMQ Connector provided by Dashbit.

starting-a-new-project

Starting a new project

If you're creating a new project, run:

$ mix new my_app --sup

The --sup flag instructs Elixir to generate an application with a supervision tree.

setting-up-dependencies

Setting up dependencies

Add :broadway_rabbitmq to the list of dependencies in mix.exs:

def deps do
  [
    ...
    {:broadway_rabbitmq, "~> 0.7"},
  ]
end

Don't forget to check for the latest version of dependencies.

define-the-pipeline-configuration

Define the pipeline configuration

Broadway is a process-based behaviour and to define a Broadway pipeline, we need to define three functions: start_link/1, handle_message/3 and optionally handle_batch/4. We will cover start_link/1 in this section and the handle_ callbacks in the next one.

Similar to other process-based behaviours, start_link/1 simply delegates to Broadway.start_link/2, which should define the producers, processors, and batchers in the Broadway pipeline. Assuming we want to consume messages from a queue called my_queue, one possible configuration would be:

defmodule MyBroadway do
  use Broadway

  alias Broadway.Message

  def start_link(_opts) do
    Broadway.start_link(__MODULE__,
      name: MyBroadway,
      producer: [
        module: {BroadwayRabbitMQ.Producer,
          queue: "my_queue",
          qos: [
            prefetch_count: 50,
          ]
        },
        concurrency: 1
      ],
      processors: [
        default: [
          concurrency: 50
        ]
      ],
      batchers: [
        default: [
          batch_size: 10,
          batch_timeout: 1500,
          concurrency: 5
        ]
      ]
    )
  end

  ...callbacks...
end

If you're consuming data from an existing broker that requires authorization, you'll need to provide your credentials using the connection option:

...
producer: [
  module: {BroadwayRabbitMQ.Producer,
    queue: "my_queue",
    connection: [
      username: "user",
      password: "password",
    ]
    ...
  }
]
...

For the full list of connection options, please see AMQP.Connection.open/1

For general information about setting up Broadway, see Broadway module docs as well as Broadway.start_link/2.

implement-broadway-callbacks

Implement Broadway callbacks

In order to process incoming messages, we need to implement the required callbacks. For the sake of simplicity, we're considering that all messages received from the queue are just numbers:

defmodule MyBroadway do
  use Broadway

  alias Broadway.Message

  ...start_link...

  @impl true
  def handle_message(_, message, _) do
    message
    |> Message.update_data(fn data -> {data, String.to_integer(data) * 2} end)
  end

  @impl true
  def handle_batch(_, messages, _, _) do
    list = messages |> Enum.map(fn e -> e.data end)
    IO.inspect(list, label: "Got batch")
    messages
  end
end

We are not doing anything fancy here, but it should be enough for our purpose. First, we update the message's data individually inside handle_message/3 and then we print each batch inside handle_batch/4.

For more information, see Broadway.handle_message/3 and Broadway.handle_batch/4.

Note: Since Broadway v0.2, batching is optional. In case you don't need to group messages as batches for further processing/publishing, you can remove the :batchers configuration along with the handle_batch/4 callback. This is perfectly fine for RabbitMQ, where messages are acknowledged individually and never as a batch.

run-the-broadway-pipeline

Run the Broadway pipeline

To run your Broadway pipeline, you just need to add as a child in a supervision tree. Most applications have a supervision tree defined at lib/my_app/application.ex. You can add Broadway as a child to a supervisor as follows:

children = [
  {MyBroadway, []}
]

Supervisor.start_link(children, strategy: :one_for_one)

Now the Broadway pipeline should be started when your application starts. Also, if your Broadway has any dependency (for example, it needs to talk to the database), make sure that Broadway is listed after its dependencies in the supervision tree.

You can now test your pipeline by entering an iex session:

$ iex -S mix

If everything went fine, you should see lots of info log messages from the amqp supervisors. If you think that's too verbose and want to do something about it, please take a look at the "Log related to amqp supervisors are too verbose" subsection in the amqp's Troubleshooting documentation.

Finally, let's generate some sample messages to be consumed by Broadway with the following code:

{:ok, connection} = AMQP.Connection.open
{:ok, channel} = AMQP.Channel.open(connection)
AMQP.Queue.declare(channel, "my_queue", durable: true)

Enum.each(1..5000, fn i ->
  AMQP.Basic.publish(channel, "", "my_queue", "#{i}")
end)
AMQP.Connection.close(connection)

You should see the output showing the generated batches:

Got batch: [
  {"7", 14},
  {"5", 10},
  {"8", 16},
  {"98", 196},
  {"6", 12},
  {"97", 194},
  {"9", 18},
  {"99", 198},
  {"10", 20},
  {"100", 200}
]
Got batch: [
  {"29", 58},
  {"32", 64},
  ...
]

tuning-the-configuration

Tuning the configuration

Some of the configuration options available for Broadway come already with a "reasonable" default value. However, those values might not suit your requirements. Depending on the number of messages you get, how much processing they need and how much IO work is going to take place, you might need completely different values to optimize the flow of your pipeline. The concurrency option available for every set of producers, processors and batchers, among with max_demand, batch_size, and batch_timeout can give you a great deal of flexibility. The concurrency option controls the concurrency level in each layer of the pipeline. See the notes on Producer concurrency and Batcher concurrency for details.

Another important option to take into account is the :prefetch_count. RabbitMQ will continually push new messages to Broadway as it receives them. The :prefetch_count setting provides back-pressure by instructing RabbitMQ to limit the number of unacknowledged messages a consumer will have at a given moment. See the "Back-pressure and :prefetch_count" section of the BroadwayRabbitMQ documentation for details.

In order to get a good set of configurations for your pipeline, it's important to respect the limitations of the servers you're running, as well as the limitations of the services you're providing/consuming data to/from. Broadway comes with telemetry, so you can measure your pipeline and help ensure your changes are effective.