View Source Broadway behaviour (Broadway v1.1.0)

Broadway is a concurrent, multi-stage tool for building data ingestion and data processing pipelines.

It allows developers to consume data efficiently from different sources, such as Amazon SQS, Apache Kafka, Google Cloud PubSub, RabbitMQ and others.

Built-in features

  • Back-pressure - by relying on GenStage, we only get the amount of events necessary from upstream sources, never flooding the pipeline.

  • Automatic acknowledgements - Broadway automatically acknowledges messages at the end of the pipeline or in case of errors.

  • Batching - Broadway provides built-in batching, allowing you to group messages either by size and/or by time. This is important in systems such as Amazon SQS, where batching is the most efficient way to consume messages, both in terms of time and cost.

  • Fault tolerance - Broadway pipelines are carefully designed to manage failures. Producers are isolated from the rest of the pipeline and automatically resubscribe to your data source in case of crashes. At the same time, all of your Broadway callbacks are stateless, which allows Broadway to handle any errors locally. This provides a stable foundation that play well with your producers, regardless if their delivery guarantees are at least once, at most once, or exactly once.

  • Graceful shutdown - Broadway integrates with the VM to provide graceful shutdown. By starting Broadway as part of your supervision tree, it will guarantee all events are flushed once the VM shuts down.

  • Built-in testing - Broadway ships with a built-in test API, making it easy to push test messages through the pipeline and making sure the event was properly processed.

  • Custom failure handling - Broadway provides a handle_failed/2 callback where developers can outline custom code to handle errors. For example, if they want to move messages to another queue for further processing.

  • Dynamic batching - Broadway allows developers to batch messages based on custom criteria. For example, if your pipeline needs to build batches based on the user_id, email address, etc, it can be done by calling Broadway.Message.put_batch_key/2.

  • Ordering and Partitioning - Broadway allows developers to partition messages across workers, guaranteeing messages within the same partition are processed in order. For example, if you want to guarantee all events tied to a given user_id are processed in order and not concurrently, you can set the :partition_by option. See "Ordering and partitioning".

  • Rate limiting - Broadway allows developers to rate limit all producers in a single node by a given number of messages in a time period, allowing developers to easily work sources or sinks that cannot cope with a high number of requests. See the :rate_limiting option for producers in start_link/2.

  • Metrics - Broadway uses the :telemetry library for instrumentation, see "Telemetry" section below for more information.

The Broadway behaviour

In order to use Broadway, you need to:

  1. Define your pipeline configuration
  2. Define a module implementing the Broadway behaviour

Example

Broadway is a process-based behaviour, and you begin by defining a module that invokes use Broadway. Processes defined by these modules will often be started by a supervisor, and so a start_link/1 function is frequently also defined but not strictly necessary.

defmodule MyBroadway do
  use Broadway

  def start_link(_opts) do
    Broadway.start_link(MyBroadway,
      name: MyBroadwayExample,
      producer: [
        module: {Counter, []},
        concurrency: 1
      ],
      processors: [
        default: [concurrency: 2]
      ]
    )
  end

  ...callbacks...
end

Then add your Broadway pipeline to your supervision tree (usually in lib/my_app/application.ex):

children = [
  {MyBroadway, []}
]

Supervisor.start_link(children, strategy: :one_for_one)

Adding your pipeline to your supervision tree in this way calls the default child_spec/1 function that is generated when use Broadway is invoked. If you would like to customize the child spec passed to the supervisor, you can override the child_spec/1 function in your module or explicitly pass a child spec to the supervisor when adding it to your supervision tree.

The configuration above defines a pipeline with:

  • One producer
  • Two processors

Here is how this pipeline would be represented:

                     [producer_1]
                         / \
                        /   \
                       /     \
                      /       \
             [processor_1] [processor_2]   <- process each message

After the pipeline is defined, you need to implement the handle_message/3 callback which will be invoked by processors for each message.

handle_message/3 receives every message as a Broadway.Message struct and it must return an updated message.

Batching

Depending on the scenario, you may want to group processed messages as batches before publishing your data. This is common and especially important when working with services like AWS S3 and SQS that provide a specific API for sending and retrieving batches. This can drastically increase throughput and consequently improve the overall performance of your pipeline.

To create batches, define the :batchers configuration option:

defmodule MyBroadway do
  use Broadway

  def start_link(_opts) do
    Broadway.start_link(MyBroadway,
      name: MyBroadwayExample,
      producer: [
        module: {Counter, []},
        concurrency: 1
      ],
      processors: [
        default: [concurrency: 2]
      ],
      batchers: [
        sqs: [concurrency: 2, batch_size: 10],
        s3: [concurrency: 1, batch_size: 10]
      ]
    )
  end

  # ...callbacks...
end

The configuration above defines a pipeline with:

  • One producer
  • Two processors
  • One batcher named :sqs with two batch processors
  • One batcher named :s3 with one batch processor

Here is how this pipeline would be represented:

                     [producer_1]
                         / \
                        /   \
                       /     \
                      /       \
             [processor_1] [processor_2]   <- process each message
                      /\     /\
                     /  \   /  \
                    /    \ /    \
                   /      x      \
                  /      / \      \
                 /      /   \      \
                /      /     \      \
           [batcher_sqs]    [batcher_s3]
                /\                  \
               /  \                  \
              /    \                  \
             /      \                  \
   [batch_sqs_1] [batch_sqs_2]    [batch_s3_1] <- process each batch

Additionally, you have to define the handle_batch/4 callback, which batch processors invoke for each batch. You can then call Broadway.Message.put_batcher/2 inside handle_message/3 to control which batcher the message should go to.

The batcher receives processed messages and creates batches specified by the batch_size and batch_timeout configuration. The goal is to create a batch with at most batch_size entries within batch_timeout milliseconds. Each message goes into a particular batch, controlled by calling Broadway.Message.put_batch_key/2 in handle_message/3. Once a batch is created in the batcher, it is sent to a separate process (the batch processor) that will call handle_batch/4, passing the batcher, the batch itself (a list of messages), a Broadway.BatchInfo struct, and the Broadway context.

For example, imagine your producer generates integers as data. You want to route the odd integers to SQS and the even ones to S3. Your pipeline would look like this:

defmodule MyBroadway do
  use Broadway
  import Integer

  alias Broadway.Message

  # ...start_link...

  @impl true
  def handle_message(_, %Message{data: data} = message, _) when is_odd(data) do
    message
    |> Message.update_data(&process_data/1)
    |> Message.put_batcher(:sqs)
  end

  def handle_message(_, %Message{data: data} = message, _) when is_even(data) do
    message
    |> Message.update_data(&process_data/1)
    |> Message.put_batcher(:s3)
  end

  defp process_data(data) do
    # Do some calculations, generate a JSON representation, etc.
  end

  @impl true
  def handle_batch(:sqs, messages, _batch_info, _context) do
    # Send batch of successful messages as ACKs to SQS
    # This tells SQS that this list of messages were successfully processed
  end

  def handle_batch(:s3, messages, _batch_info, _context) do
    # Send batch of messages to S3
  end
end

See the callbacks documentation for more information on the arguments given to each callback and their expected return types.

The default batcher

Once you define the :batchers configuration key for your Broadway pipeline, then all messages get batched. By default, unless you call Broadway.Message.put_batcher/2, messages have their batcher set to the :default batcher. If you don't define configuration for it, Broadway is going to raise an error.

For example, imagine you want to batch "special" messages and handle them differently then all other messages. You can configure your pipeline like this:

defmodule MyBroadway do
  use Broadway

  def start_link(_opts) do
    Broadway.start_link(MyBroadway,
      name: MyBroadwayExample,
      producer: [
        module: {Counter, []},
        concurrency: 1
      ],
      processors: [
        default: [concurrency: 2]
      ],
      batchers: [
        special: [concurrency: 2, batch_size: 10],
        default: [concurrency: 1, batch_size: 10]
      ]
    )
  end

  def handle_message(_, message, _) do
    if special?(message) do
      message
      |> Broadway.Message.put_batcher(:special)
    else
      message
    end
  end

  def handle_batch(:special, messages, _batch_info, _context) do
    # Handle special batch
  end

  def handle_batch(:default, messages, _batch_info, _context) do
    # Handle all other messages in batches
  end

Now you are ready to get started. See the start_link/2 function for a complete reference on the arguments and options allowed.

Also makes sure to check out GUIDES in the documentation sidebar for more examples, how tos and more.

Acknowledgements and failures

At the end of the pipeline, messages are automatically acknowledged.

If there are no batchers, the acknowledgement will be done by processors. The number of messages acknowledged, assuming the pipeline is running at full scale, will be max_demand - min_demand. Since the default values are 10 and 5 respectively, we will be acknowledging in groups of 5.

If there are batchers, the acknowledgement is done by the batchers, using the batch_size.

In case of failures, Broadway does its best to keep the failures contained and avoid losing messages. The failed message or batch is acknowledged as failed immediately. For every failure, a log report is also emitted. If your Broadway module also defines the handle_failed/2 callback, that callback will be invoked with all the failed messages before they get acknowledged.

Note however, that Broadway does not provide any sort of retries out of the box. This is left completely as a responsibility of the producer. For instance, if you are using Amazon SQS, the default behaviour is to retry unacknowledged messages after a user-defined timeout. If you don't want unacknowledged messages to be retried, is your responsibility to configure a dead-letter queue as target for those messages.

Producer concurrency

Setting producer concurrency is a tradeoff between latency and internal queueing.

For efficiency, you should generally limit the amount of internal queueing. Whenever additional messages are sitting in a busy processor's mailbox, they can't be delivered to another processor which may be available or become available first.

One possible cause of internal queueing is multiple producers. This is because each processor's demand will be sent to all producers. For example, if a processor demands 2 messages and there are 2 producers, each producer will try to produce 2 messages (for example, by pulling from a queue or whatever the specific producer does) and give them to the processor. So the processor may receive max_demand * <producer concurrency> messages.

Setting producer concurrency: 1 will reduce internal queueing. This is likely a good choice for producers which take minimal time to produce a message, such as BroadwayRabbitMQ, which receives messages as they are pushed by RabbitMQ and can specify how many to prefetch.

On the other hand, when using a producer such as BroadwaySQS which must make a network round trip to fetch from an external source, it may be better to use multiple producers and accept some internal queueing to avoid having fetch messages whenever there is new demand.

Measure your system to decide which setting is most appropriate.

Adding another single-producer pipeline, or another node running the pipeline, are other ways you may consider to increase throughput.

Batcher concurrency

If a batcher's concurrency is greater than 1, Broadway will use as few of the batcher processes as possible at any given moment, attempting to satisfy the batch_size of one batcher process within the batch_timeout before sending messages to another.

Testing

Many producers receive data from external systems and hitting the network is usually undesirable when running the tests.

For testing purposes, we recommend developers to use Broadway.DummyProducer. This producer does not produce any messages by itself and instead the test_message/3 and test_batch/3 functions should be used to publish messages.

With test_message/3, you can push a message into the pipeline and receive a process message when the pipeline acknowledges the data you have pushed has been processed.

Let's see an example. Imagine the following Broadway module:

defmodule MyBroadway do
  use Broadway

  def start_link() do
    producer_module = Application.fetch_env!(:my_app, :producer_module)
    producer_options = Application.get_env(:my_app, :producer_options, [])

    Broadway.start_link(__MODULE__,
      name: __MODULE__,
      producer: [
        module: {producer_module, producer_options}
      ],
      processors: [
        default: []
      ],
      batchers: [
        default: [batch_size: 10]
      ]
    )
  end

  @impl true
  def handle_message(_processor, message, _context) do
    message
  end

  @impl true
  def handle_batch(_batcher, messages, _batch_info, _context) do
    messages
  end
end

Now in config/test.exs you could do:

config :my_app,
  producer_module: Broadway.DummyProducer,
  producer_options: [] # change if required for your dev/prod producer

And we can test it like this:

defmodule MyBroadwayTest do
  use ExUnit.Case, async: true

  test "test message" do
    ref = Broadway.test_message(MyBroadway, 1)
    assert_receive {:ack, ^ref, [%{data: 1}], []}
  end
end

Note that at the end we received a message in the format of:

{:ack, ^ref, successful_messages, failure_messages}

You can use the acknowledgment to guarantee the message has been processed and therefore any side-effect from the pipeline should be visible.

When using test_message/3, the message will be delivered as soon as possible, without waiting for the pipeline batch_size to be reached or without waiting for batch_timeout. This behaviour is useful to test and verify single messages, without imposing high timeouts to our test suites.

In case you want to test multiple messages, then you need to use test_batch/3. test_batch/3 will respect the batching configuration, which most likely means you need to increase your test timeouts:

test "batch messages" do
  ref = Broadway.test_batch(MyBroadway, [1, 2, 3])
  assert_receive {:ack, ^ref, [%{data: 1}, %{data: 2}, %{data: 3}], []}, 1000
end

However, keep in mind that, generally speaking, there is no guarantee the messages will arrive in the same order that you have sent them, especially for large batches, as Broadway will process large batches concurrently and order will be lost.

If you want to send more than one test message at once, then we recommend setting the :batch_mode to :bulk, especially if you want to assert how the code will behave with large batches. Otherwise the batcher will flush messages as soon as possible and in small batches.

However, keep in mind that, regardless of the :batch_mode you cannot rely on ordering, as Broadway pipelines are inherently concurrent. For example, if you send those messages:

test "multiple batch messages" do
  ref = Broadway.test_batch(MyBroadway, [1, 2, 3, 4, 5, 6, 7], batch_mode: :bulk)
  assert_receive {:ack, ^ref, [%{data: 1}], []}, 1000
end

Testing with Ecto

If you are using Ecto in your Broadway processors and you want to run your tests concurrently, you need to tell Broadway to use the Ecto SQL Sandbox during tests. This can be done in two steps.

First, when you call test_messages/3 in your tests, include the :ecto_sandbox process in the message metadata:

Broadway.test_message(MyApp.Pipeline, message, metadata: %{ecto_sandbox: self()})

Now we can use Broadway telemetry callbacks to fetch the sandbox process and enable it inside the processor. Add to your test/test_helper.exs:

defmodule BroadwayEctoSandbox do
  def attach(repo) do
    events = [
      [:broadway, :processor, :start],
      [:broadway, :batch_processor, :start],
    ]

    :telemetry.attach_many({__MODULE__, repo}, events, &__MODULE__.handle_event/4, %{repo: repo})
  end

  def handle_event(_event_name, _event_measurement, %{messages: messages}, %{repo: repo}) do
    with [%Broadway.Message{metadata: %{ecto_sandbox: pid}} | _] <- messages do
      Ecto.Adapters.SQL.Sandbox.allow(repo, pid, self())
    end

    :ok
  end
end

BroadwayEctoSandbox.attach(MyApp.Repo)

And now you should have concurrent Broadway tests that talk to the database.

Ordering and partitioning

By default, Broadway processes all messages and batches concurrently, which means ordering is not guaranteed. Some producers may impose some ordering (for instance, Apache Kafka), but if the ordering comes from a business requirement, you will have to impose the ordering yourself. This can be done with the :partition_by option, which enforces that messages with a given property are always forwarded to the same stage.

In order to provide partitioning throughout the whole pipeline, just set :partition_by at the root of your configuration:

defmodule MyBroadway do
  use Broadway

  def start_link(_opts) do
    Broadway.start_link(__MODULE__,
      name: __MODULE__,
      producer: [
        module: {Counter, []},
        concurrency: 1
      ],
      processors: [
        default: [concurrency: 2]
      ],
      batchers: [
        sqs: [concurrency: 2, batch_size: 10],
        s3: [concurrency: 1, batch_size: 10]
      ],
      partition_by: &partition/1
    )
  end

  defp partition(msg) do
    msg.data.user_id
  end

In the example above, we are partitioning the pipeline by user_id. This means any message with the same user_id will be handled by the same processor and batch processor.

The partition function must return a non-negative integer, starting at zero, which is routed to a stage by using the remainder option.

If the data you want to partition by is not an integer, you can explicitly hash it by calling :erlang.phash2/1. However, note that hash does not guarantee an equal distribution of events across partitions. So some partitions may be more overloaded than others, slowing down the whole pipeline.

In the example above, we have set the same partition for all processors and batchers. You can also specify the :partition_by function for each "processor" and "batcher" individually.

Even partitions

Broadway partitions assume an even distribution of partitions. This means that, if one partition is slow, it will slow down all order partitions. This implies two things:

  • Using :partition_by with a high level of concurrency can actually be detrimental to performance. For example, if concurrency is set to 100, you need all 100 processors to make progress at the same time.

  • Avoid using :partition_by with a low value of min_demand. For example, setting max_demand to 1 (which implies min_demand of 0), means that each processor will receive a single message and only receive further messages once all processors complete.

When partitioning, the default values for concurrency (which is equal to the number of cores) and max_demand (which is equal to 10), are good starting points.

Error semantics

Beware of the error semantics when using partitioning. If you require messages to be processed in order and a message fails, the partition will continue processing messages, which may be undesired. If your producer supports retrying, the failed message may be retried later, out of its original order. Those issues happen regardless of Broadway and solutions to said problems almost always need to be addressed outside of Broadway too.

Telemetry

Broadway currently exposes following Telemetry events:

  • [:broadway, :topology, :init] - Dispatched when the topology for a Broadway pipeline is initialized. The config key in the metadata contains the configuration options that were provided to Broadway.start_link/2.

    • Measurement: %{system_time: integer}
    • Metadata: %{supervisor_pid: pid, config: keyword}
  • [:broadway, :processor, :start] - Dispatched by a Broadway processor before the optional prepare_messages/2

    • Measurement: %{system_time: integer}

    • Metadata:

      %{
        topology_name: atom,
        name: atom,
        processor_key: atom,
        index: non_neg_integer,
        messages: [Broadway.Message.t],
        telemetry_span_context: reference,
        producer: {atom, list}
      }
  • [:broadway, :processor, :stop] - Dispatched by a Broadway processor after prepare_messages/2 and after all handle_message/3 callback has been invoked for all individual messages

    • Measurement: %{duration: native_time}

    • Metadata:

      %{
        topology_name: atom,
        name: atom,
        processor_key: atom,
        index: non_neg_integer,
        successful_messages_to_ack: [Broadway.Message.t],
        successful_messages_to_forward: [Broadway.Message.t],
        failed_messages: [Broadway.Message.t],
        telemetry_span_context: reference,
        producer: {atom, list}
      }
  • [:broadway, :processor, :message, :start] - Dispatched by a Broadway processor before your handle_message/3 callback is invoked

    • Measurement: %{system_time: integer}

    • Metadata:

      %{
        processor_key: atom,
        topology_name: atom,
        name: atom,
        index: non_neg_integer,
        message: Broadway.Message.t,
        telemetry_span_context: reference
      }
  • [:broadway, :processor, :message, :stop] - Dispatched by a Broadway processor after your handle_message/3 callback has returned

    • Measurement: %{duration: native_time}

    • Metadata:

      %{
        processor_key: atom,
        topology_name: atom,
        name: atom,
        index: non_neg_integer,
        message: Broadway.Message.t,
        telemetry_span_context: reference
      }
  • [:broadway, :processor, :message, :exception] - Dispatched by a Broadway processor if your handle_message/3 callback encounters an exception

    • Measurement: %{duration: native_time}

    • Metadata:

      %{
        processor_key: atom,
        topology_name: atom,
        name: atom,
        index: non_neg_integer,
        message: Broadway.Message.t,
        kind: kind,
        reason: reason,
        stacktrace: stacktrace,
        telemetry_span_context: reference
      }
  • [:broadway, :batch_processor, :start] - Dispatched by a Broadway batch processor before your handle_batch/4 callback is invoked

    • Measurement: %{system_time: integer}

    • Metadata:

      %{
        topology_name: atom,
        name: atom,
        index: non_neg_integer,
        messages: [Broadway.Message.t],
        batch_info: Broadway.BatchInfo.t,
        telemetry_span_context: reference,
        producer: {atom, list}
      }
  • [:broadway, :batch_processor, :stop] - Dispatched by a Broadway batch processor after your handle_batch/4 callback has returned

    • Measurement: %{duration: native_time}

    • Metadata:

      %{
        topology_name: atom,
        name: atom,
        index: non_neg_integer,
        successful_messages: [Broadway.Message.t],
        failed_messages: [Broadway.Message.t],
        batch_info: Broadway.BatchInfo.t,
        telemetry_span_context: reference,
        producer: {atom, list}
      }
  • [:broadway, :batcher, :start] - Dispatched by a Broadway batcher before handling events

    • Measurement: %{system_time: integer}

    • Metadata:

      %{
        topology_name: atom,
        name: atom,
        batcher_key: atom,
        messages: [Broadway.Message.t],
        telemetry_span_context: reference
      }
  • [:broadway, :batcher, :stop] - Dispatched by a Broadway batcher after handling events

    • Measurement: %{duration: native_time}
    • Metadata:
      %{
        topology_name: atom,
        name: atom,
        batcher_key: atom,
        telemetry_span_context: reference
      }

Most of the events follow the :telemetry.span/3 convention for measurements. This means that "start" events have a :system_time representing the start of that event using System.system_time/0. The "stop" or "exception" events have the duration value, which is the difference in monotonic time between the start and stop events.

Summary

Callbacks

Invoked to handle generated batches.

Invoked for failed messages (if defined).

Invoked to handle/process individual messages sent from a producer.

Invoked for preparing messages before handling (if defined).

Invoked to get the process name of this Broadway pipeline.

Functions

Returns all running Broadway names.

Gets the current values used for the producer rate limiting of the given pipeline.

Returns the names of producers.

Sends a list of Broadway.Messages to the Broadway pipeline.

Starts a Broadway process linked to the current process.

Synchronously stops the Broadway pipeline with the given reason.

Sends a list of data as a batch of messages to the Broadway pipeline.

Sends a test message through the Broadway pipeline.

Returns the topology details for a pipeline.

Updates the producer rate limiting of the given pipeline at runtime.

Types

@type name() :: atom() | {:via, module(), term()}
@type on_start() ::
  {:ok, pid()} | :ignore | {:error, {:already_started, pid()} | term()}

Returned by start_link/2.

Callbacks

Link to this callback

handle_batch(batcher, messages, batch_info, context)

View Source (optional)
@callback handle_batch(
  batcher :: atom(),
  messages :: [Broadway.Message.t()],
  batch_info :: Broadway.BatchInfo.t(),
  context :: term()
) :: [Broadway.Message.t()]

Invoked to handle generated batches.

It expects:

It must return an updated list of messages. All messages received must be returned, otherwise an error will be logged. All messages after this step will be acknowledged according to their status.

In case of errors in this callback, the error will be logged and the whole batch will be failed. This callback also traps exits, so failures due to broken links between processes do not automatically cascade.

For more information on batching, see the "Batching" section in the Broadway documentation.

Link to this callback

handle_failed(messages, context)

View Source (optional) (since 0.5.0)
@callback handle_failed(messages :: [Broadway.Message.t()], context :: term()) :: [
  Broadway.Message.t()
]

Invoked for failed messages (if defined).

It expects:

  • messages is the list of messages that failed. If a message is failed in handle_message/3, this will be a list with a single message in it. If some messages are failed in handle_batch/4, this will be the list of failed messages.

  • context is the user-defined data structure passed to start_link/2.

This callback must return the same messages given to it, possibly updated. For example, you could update the message data or use Broadway.Message.configure_ack/2 in a centralized place to configure how to ack the message based on the failure reason.

This callback is optional. If present, it's called before the messages are acknowledged according to the producer. This gives you a chance to do something with the message before it's acknowledged, such as storing it in an external persistence layer or similar.

This callback is also invoked if handle_message/3 or handle_batch/4 crash or raise an error. If this callback crashes or raises an error, the messages are failed internally by Broadway to avoid crashing the process.

Link to this callback

handle_message(processor, message, context)

View Source
@callback handle_message(
  processor :: atom(),
  message :: Broadway.Message.t(),
  context :: term()
) ::
  Broadway.Message.t()

Invoked to handle/process individual messages sent from a producer.

It receives:

  • processor is the key that defined the processor.
  • message is the Broadway.Message struct to be processed.
  • context is the user defined data structure passed to start_link/2.

And it must return the (potentially) updated Broadway.Message struct.

This is the place to do any kind of processing with the incoming message, e.g., transform the data into another data structure, call specific business logic to do calculations. Basically, any CPU bounded task that runs against a single message should be processed here.

In order to update the data after processing, use the Broadway.Message.update_data/2 function. This way the new message can be properly forwarded and handled by the batcher:

@impl true
def handle_message(_, message, _) do
  message
  |> update_data(&do_calculation_and_returns_the_new_data/1)
end

In case more than one batcher have been defined in the configuration, you need to specify which of them the resulting message will be forwarded to. You can do this by calling put_batcher/2 and returning the new updated message:

@impl true
def handle_message(_, message, _) do
  # Do whatever you need with the data
  ...

  message
  |> put_batcher(:s3)
end

Any message that has not been explicitly failed will be forwarded to the next step in the pipeline. If there are no extra steps, it will be automatically acknowledged.

In case of errors in this callback, the error will be logged and that particular message will be immediately acknowledged as failed, not proceeding to the next steps of the pipeline. This callback also traps exits, so failures due to broken links between processes do not automatically cascade.

Link to this callback

prepare_messages(messages, context)

View Source (optional)
@callback prepare_messages(messages :: [Broadway.Message.t()], context :: term()) :: [
  Broadway.Message.t()
]

Invoked for preparing messages before handling (if defined).

It expects:

This is the place to prepare and preload any information that will be used by handle_message/3. For example, if you need to query the database, instead of doing it once per message, you can do it on this callback as a best-effort optimization.

The length of the list of messages received by this callback is often based on the min_demand/max_demand configuration in the processor but ultimately it depends on the producer and on the frequency data arrives. A pipeline that receives messages rarely will most likely emit lists of length below the min_demand value. Producers which are push-based, rather than pull-based, such as BroadwayRabbitMQ.Producer, are more likely to send messages as they arrive (which may skip batching altogether and always be single element lists). In other words, this callback is simply a convenience for preparing messages, it does not guarantee the messages will be accumulated to a certain length. For effective batch processing, see handle_batch/4.

This callback must always return all messages it receives, as handle_message/3 is still called individually for each message afterwards.

Failed Messages

Even if prepare_messages/2 fails some messages (Broadway.Message.failed/2), the failed messages are still passed down to handle_message/3. If your pipeline wants to avoid processing messages failed in prepare_messages/2, it will have to pattern match on %Broadway.Message{status: {:failed, reason}} in its handle_message/3 callback and act accordingly.

Link to this callback

process_name(broadway_name, base_name)

View Source (optional) (since 1.1.0)
@callback process_name(broadway_name :: name(), base_name :: String.t()) :: name()

Invoked to get the process name of this Broadway pipeline.

broadway_name is the name given to start_link/2 in the :name option. base_name is a string used by Broadway to identify different components of the pipeline whose name needs to be registered (such as "batcher" or "processor").

The return value of this callback must be a process name that is valid for registration. See the name registration rules in the documentation for GenServer.

This callback is optional. If not defined, the broadway_name given to start_link/2 must be an atom: the default implementation of this callback will fail otherwise.

Examples

@impl Broadway
def process_name({:via, module, term}, base_name) do
  {:via, module, {term, base_name}}
end

Functions

Link to this function

all_running()

View Source (since 1.0.0)
@spec all_running() :: [name()]

Returns all running Broadway names.

It's important to notice that no order is guaranteed.

Link to this function

get_rate_limiting(broadway)

View Source (since 0.6.0)
@spec get_rate_limiting(server :: name()) ::
  {:ok, rate_limiting_info} | {:error, :rate_limiting_not_enabled}
when rate_limiting_info: %{
       interval: non_neg_integer(),
       allowed_messages: non_neg_integer()
     }

Gets the current values used for the producer rate limiting of the given pipeline.

Returns {:ok, info} if rate limiting is enabled for the given pipeline or {:error, reason} if the given pipeline doesn't have rate limiting enabled.

The returned info is a map with the following keys:

  • :interval
  • :allowed_messages

See the :rate_limiting options in the module documentation for more information.

Examples

Broadway.get_rate_limiting(broadway)
#=> {:ok, %{allowed_messages: 2000, interval: 1000}}
Link to this function

producer_names(broadway)

View Source (since 0.5.0)
@spec producer_names(name()) :: [name()]

Returns the names of producers.

Examples

iex> Broadway.producer_names(MyBroadway)
[MyBroadway.Producer_0, MyBroadway.Producer_1, ..., MyBroadway.Producer_7]
Link to this function

push_messages(broadway, messages)

View Source
@spec push_messages(broadway :: name(), messages :: [Broadway.Message.t()]) :: :ok

Sends a list of Broadway.Messages to the Broadway pipeline.

The producer is randomly chosen among all sets of producers/stages. This is used to send out of band data to a Broadway pipeline.

Link to this function

start_link(module, opts)

View Source
@spec start_link(
  module(),
  keyword()
) :: on_start()

Starts a Broadway process linked to the current process.

  • module is the module implementing the Broadway behaviour.

Options

In order to set up how the pipeline created by Broadway should work, you need to specify the blueprint of the pipeline. You can do this by passing a set of options to start_link/2. Each component of the pipeline has its own set of options.

The Broadway options are:

  • :name - Required. Used for name registration. When an atom, all processes/stages created will be named using this value as prefix.

  • :shutdown (pos_integer/0) - Optional. The time in milliseconds given for Broadway to gracefully shutdown without discarding events. The default value is 30000.

  • :max_restarts (non_neg_integer/0) - The default value is 3.

  • :max_seconds (pos_integer/0) - The default value is 5.

  • :resubscribe_interval (non_neg_integer/0) - The interval in milliseconds that processors wait until they resubscribe to a failed producers. The default value is 100.

  • :context (term/0) - A user defined data structure that will be passed to handle_message/3 and handle_batch/4. The default value is :context_not_set.

  • :producer (non-empty keyword/0) - Required. A keyword list of options. See "Producers options" section below. Only a single producer is allowed.

  • :processors (non-empty keyword/0) - Required. A keyword list of named processors where the key is an atom as identifier and the value is another keyword list of options. See "Processors options" section below. Currently only a single processor is allowed.

  • :batchers (keyword/0) - A keyword list of named batchers where the key is an atom as identifier and the value is another keyword list of options. See "Batchers options" section below. The default value is [].

  • :partition_by (function of arity 1) - A function that controls how data is partitioned across all processors and batchers. It receives a Broadway.Message and it must return a non-negative integer, starting with zero, that will be mapped to one of the existing processors. See "Ordering and Partitioning" in the module docs for more information and known pitfalls.

  • :spawn_opt (keyword/0) - Low-level options given when starting a process. Applies to producers, processors, and batchers. See erlang:spawn_opt/2 for more information.

  • :hibernate_after (pos_integer/0) - If a process does not receive any message within this interval, it will hibernate, compacting memory. Applies to producers, processors, and batchers. Defaults to 15_000 (millisecond). The default value is 15000.

Producers options

The producer options allow users to set up the producer.

The available options are:

  • :module - Required. A tuple representing a GenStage producer. The tuple format should be {mod, arg}, where mod is the module that implements the GenStage behaviour and arg the argument that will be passed to the init/1 callback of the producer. See Broadway.Producer for more information.

  • :concurrency (pos_integer/0) - The number of concurrent producers that will be started by Broadway. Use this option to control the concurrency level of each set of producers. The default value is 1.

  • :transformer - A tuple representing a transformer that translates a produced GenStage event into a %Broadway.Message{}. The tuple format should be {mod, fun, opts} and the function should have the following spec (event :: term, opts :: term) :: Broadway.Message.t This function must be used sparingly and exclusively to convert regular messages into Broadway.Message. That's because a failure in the :transformer callback will cause the whole producer to terminate, possibly leaving unacknowledged messages along the way. The default value is nil.

  • :spawn_opt (keyword/0) - Overrides the top-level :spawn_opt.

  • :hibernate_after (pos_integer/0) - Overrides the top-level :hibernate_after.

  • :rate_limiting (non-empty keyword/0) - A list of options to enable and configure rate limiting for producing. If this option is present, rate limiting is enabled, otherwise it isn't. Rate limiting refers to the rate at which producers will forward messages to the rest of the pipeline. The rate limiting is applied to and shared by all producers within the time limit. The following options are supported:

    • :allowed_messages (pos_integer/0) - Required. An integer that describes how many messages are allowed in the specified interval.

    • :interval (pos_integer/0) - Required. An integer that describes the interval (in milliseconds) during which the number of allowed messages is allowed. If the producer produces more than allowed_messages in interval, only allowed_messages will be published until the end of interval, and then more messages will be published.

Processors options

You don't need multiple processors

A common misconception is that, if your data requires multiple transformations, each with a different concern, then you must have several processors.

However, that's not quite true. Separation of concerns is modeled by defining several modules and functions, not processors. Processors are ultimately about moving data around and you should only do it when necessary. Using processors for code organization purposes would lead to inefficient pipelines.

  • :concurrency (pos_integer/0) - The number of concurrent process that will be started by Broadway. Use this option to control the concurrency level of the processors. The default value is System.schedulers_online() * 2.

  • :min_demand (non_neg_integer/0) - Set the minimum demand of all processors stages.

  • :max_demand (non_neg_integer/0) - Set the maximum demand of all processors stages. The default value is 10.

  • :partition_by (function of arity 1) - Overrides the top-level :partition_by.

  • :spawn_opt (keyword/0) - Overrides the top-level :spawn_opt.

  • :hibernate_after (pos_integer/0) - Overrides the top-level :hibernate_after.

Batchers options

  • :concurrency (pos_integer/0) - The number of concurrent batch processors that will be started by Broadway. Use this option to control the concurrency level. Note that this only sets the numbers of batch processors for each batcher group, not the number of batchers. The number of batchers will always be one for each batcher key defined. The default value is 1.

  • :batch_size - The size of the generated batches. Default value is 100. It is typically an integer but it can also be tuple of {init_acc, fun} where fun receives two arguments: a Broadway.Message and an acc. The function must return either {:emit, acc} to indicate all batched messages must be emitted or {:cont, acc} to continue batching. init_acc is the initial accumulator used on the first call. You can consider that setting the accumulator to an integer is the equivalent to custom batching function of:

    {batch_size,
     fn
       _message, 1 -> {:emit, batch_size}
       _message, count -> {:cont, count - 1}
     end}

    The default value is 100.

  • :max_demand (pos_integer/0) - Sets the maximum demand of batcher stages. By default it is set to :batch_size, if :batch_size is an integer. Must be set if the :batch_size is a function.

  • :batch_timeout (pos_integer/0) - The time, in milliseconds, that the batcher waits before flushing the list of messages. When this timeout is reached, a new batch is generated and sent downstream, no matter if the :batch_size has been reached or not. The default value is 1000.

  • :partition_by (function of arity 1) - Optional. Overrides the top-level :partition_by.

  • :spawn_opt (keyword/0) - Overrides the top-level :spawn_opt.

  • :hibernate_after (pos_integer/0) - Overrides the top-level :hibernate_after.

Link to this function

stop(broadway, reason \\ :normal, timeout \\ :infinity)

View Source (since 1.0.0)

Synchronously stops the Broadway pipeline with the given reason.

This function returns :ok if the pipeline terminates with the given reason; if it terminates with another reason, the call exits.

This function keeps OTP semantics regarding error reporting. If the reason is any other than :normal, :shutdown or {:shutdown, _}, an error report is logged.

Link to this function

test_batch(broadway, batch_data, opts \\ [])

View Source
@spec test_batch(broadway :: name(), data :: [term()], opts :: Keyword.t()) ::
  reference()

Sends a list of data as a batch of messages to the Broadway pipeline.

This is a convenience used for testing. Each message is automatically wrapped in a Broadway.Message with Broadway.CallerAcknowledger configured to send a message back to the caller once all batches have been fully processed.

If there are more messages in the batch than the pipeline batch_size or if the messages in the batch take more time to process than batch_timeout then the caller will receive multiple messages.

It returns a reference that can be used to identify the ack messages.

See "Testing" section in module documentation for more information.

Options

  • :metadata (term/0) - an enumerable of key-value pairs of additional fields to add to the message. This can be used, for example, when testing BroadwayRabbitMQ.Producer. The default value is [].

  • :acknowledger (function of arity 2) - a function that generates ack fields of the sent Broadway.Message.t(). This function receives the acknowledger data and the from field and it must return the acknowledger tuple. The typespec of this function is:

    data :: term(), from :: {pid(), term()} -> {module(), ack_ref :: term(), ack_data :: term()}

  • :batch_mode - when set to :flush, the batch the message is in is immediately delivered. When set to :bulk, batch is delivered when its size or timeout is reached. The default value is :bulk.

Examples

For example, in your tests, you may do:

ref = Broadway.test_batch(broadway, [1, 2, 3])
assert_receive {:ack, ^ref, successful, failed}, 1000
assert length(successful) == 3
assert length(failed) == 0

Note that messages sent using this function will ignore demand and :transform option specified in :producer option in Broadway.start_link/2.

Link to this function

test_message(broadway, data, opts \\ [])

View Source
@spec test_message(broadway :: name(), term(), opts :: Keyword.t()) :: reference()

Sends a test message through the Broadway pipeline.

This is a convenience used for testing. The given data is automatically wrapped in a Broadway.Message with Broadway.CallerAcknowledger configured to send a message back to the caller once the message has been fully processed.

The message is set to be flushed immediately, without waiting for the Broadway pipeline batch_size to be filled or the batch_timeout to be triggered.

It returns a reference that can be used to identify the ack messages.

See "Testing" section in module documentation for more information.

Options

  • :metadata (term/0) - an enumerable of key-value pairs of additional fields to add to the message. This can be used, for example, when testing BroadwayRabbitMQ.Producer. The default value is [].

  • :acknowledger (function of arity 2) - a function that generates ack fields of the sent Broadway.Message.t(). This function receives the acknowledger data and the from field and it must return the acknowledger tuple. The typespec of this function is:

    data :: term(), from :: {pid(), term()} -> {module(), ack_ref :: term(), ack_data :: term()}

Examples

For example, in your tests, you may do:

ref = Broadway.test_message(broadway, 1)
assert_receive {:ack, ^ref, [successful], []}

or if you want to override which acknowledger shall be called, you may do:

acknowledger = fn data, ack_ref -> {MyAck, ack_ref, :ok} end
Broadway.test_message(broadway, 1, acknowledger: acknowledger)

Note that messages sent using this function will ignore demand and :transform option specified in :producer option in Broadway.start_link/2.

Link to this function

topology(broadway)

View Source (since 1.0.0)
@spec topology(broadway :: name()) :: [{key, [stage_info]}]
when key: :producers | :processors | :batchers,
     stage_info: %{
       :name => atom(),
       optional(:concurrency) => pos_integer(),
       optional(:batcher_name) => atom(),
       optional(:batcher_key) => atom(),
       optional(:processor_key) => atom()
     }

Returns the topology details for a pipeline.

The stages that have the :concurrency field in their info indicate a list of processes running with that name prefix. Each process has :name as a prefix plus _ and the index of 0..(concurrency - 1) as an atom. For example, a producer named MyBroadway.Broadway.Producer with concurrency of 1 will only have a single process named MyBroadway.Broadway.Producer_0 in its topology.

Single producer and processor

Broadway does not accept multiple producers neither multiple processors, but we chose to keep in a list for consistency and to ensure we're future proof.

Examples

iex> Broadway.topology(MyBroadway)
[
  producers: [
    %{name: MyBroadway.Broadway.Producer, concurrency: 1}
  ],
  processors: [
    %{name: MyBroadway.Broadway.Processor_default, concurrency: 10, processor_key: :default}
  ],
  batchers: [
    %{
      batcher_name: MyBroadway.Broadway.Batcher_default,
      name: MyBroadway.Broadway.BatchProcessor_default,
      batcher_key: :default,
      concurrency: 5
    },
    %{
      batcher_name: MyBroadway.Broadway.Batcher_s3,
      name: MyBroadway.Broadway.BatchProcessor_s3,
      batcher_key: :s3,
      concurrency: 3
    }
  ]
]

In the example above, for instance, the processor process names would be MyBroadway.Broadway.Processor_default_0, MyBroadway.Broadway.Processor_default_1, and so on.

Link to this function

update_rate_limiting(broadway, opts)

View Source (since 0.6.0)
@spec update_rate_limiting(server :: name(), opts :: Keyword.t()) ::
  :ok | {:error, :rate_limiting_not_enabled}

Updates the producer rate limiting of the given pipeline at runtime.

Supports the following options (see the :rate_limiting options in the module documentation for more information):

  • :allowed_messages
  • :interval
  • :reset

Returns an {:error, reason} tuple if the given broadway pipeline doesn't have rate limiting enabled.

The option :reset defaults to false. This means the rate limit will reset to the new rate limit at the end of the current interval. When :reset is true, the new rate limit takes effect immediately.

Examples

Broadway.update_rate_limiting(broadway, allowed_messages: 100)