Broadway.Producer behaviour (Broadway v1.0.0) View Source

A Broadway producer is a GenStage producer that emits Broadway.Message structs as events.

The Broadway.Producer is declared in a Broadway topology via the :module option:

producer: [
  module: {MyProducer, options}
]

Once declared, MyProducer is expected to implement and behave as a GenStage producer. When Broadway starts, the GenStage.init/1 callback will be invoked with the given options.

If options is a keyword list, a :broadway option is injected into such keyword list containing the configuration for the complete Broadway topology. For example, you can use options[:broadway][:name] to uniquely identify the topology, allowing you to write terms to persistent_term or ets.

The :broadway configuration also has an :index key which is the index of the producer in its supervision tree (starting from 0). This allows a features such having even producers connect to some server while odd producers connect to another.

If options is any other term, it is passed as is to the GenStage.init/1 callback as is. All other functions behave precisely as in GenStage with the requirements that all emitted events must be Broadway.Message structs.

Optional callbacks

A Broadway.Producer can implement two optional Broadway callbacks: prepare_for_start/2 and prepare_for_draining/1, which are useful for booting up and shutting down Broadway topologies respectively.

Link to this section Summary

Callbacks

Invoked by the terminator right before Broadway starts draining in-flight messages during shutdown.

Link to this section Callbacks

Link to this callback

prepare_for_draining(state)

View Source (optional)

Specs

prepare_for_draining(state :: any()) ::
  {:noreply, [event], new_state}
  | {:noreply, [event], new_state, :hibernate}
  | {:stop, reason :: term(), new_state}
when new_state: term(), event: term()

Invoked by the terminator right before Broadway starts draining in-flight messages during shutdown.

This callback should be implemented by producers that need to do additional work before shutting down. That includes active producers like RabbitMQ that must ask the data provider to stop sending messages. It will be invoked for each producer stage.

state is the current state of the producer.

Link to this callback

prepare_for_start(module, options)

View Source (optional)

Specs

prepare_for_start(module :: atom(), options :: keyword()) ::
  {[child_spec], updated_options :: keyword()}
when child_spec: :supervisor.child_spec() | {module(), any()} | module()

Invoked once by Broadway during Broadway.start_link/2.

The goal of this callback is to manipulate the general topology options, if necessary at all, and introduce any new child specs that will be started before the producers supervisor in Broadway's supervision tree. Broadway's supervision tree is a rest_for_one supervisor (see the documentation for Supervisor), which means that if the children returned from this callback crash they will bring down the rest of the pipeline before being restarted.

This callback is guaranteed to be invoked inside the Broadway main process.

module is the Broadway module passed as the first argument to Broadway.start_link/2. options is all of Broadway topology options passed as the second argument to Broadway.start_link/2.

The return value of this callback is a tuple {child_specs, options}. child_specs is the list of child specs to be started under Broadway's supervision tree. updated_options is a potentially-updated list of Broadway options that will be used instead of the ones passed to Broadway.start_link/2. This can be used to modify the characteristics of the Broadway topology to accommodated for the children started here.

Examples

defmodule MyProducer do
  @behaviour Broadway.Producer

  # other callbacks...

  @impl true
  def prepare_for_start(_module, broadway_options) do
     children = [
       {DynamicSupervisor, strategy: :one_for_one, name: MyApp.DynamicSupervisor}
     ]

     {children, broadway_options}
  end
end