View Source Broadway.Producer behaviour (Broadway v1.0.7)

A Broadway producer is a GenStage producer that emits Broadway.Message structs as events.

The Broadway.Producer is declared in a Broadway topology via the :module option (see Broadway.start_link/2):

producer: [
  module: {MyProducer, options}
]

Once declared, MyProducer is expected to implement and behave as a GenStage producer. When Broadway starts, the GenStage.init/1 callback will be invoked directly with the given options.

injected-broadway-configuration

Injected Broadway configuration

If options is a keyword list, Broadway injects a :broadway option into such keyword list. This option contains the configuration for the complete Broadway topology (see Broadway.start_link/2. For example, you can use options[:broadway][:name] to uniquely identify the topology, allowing you to write terms to things such as :persistent_term or ETS tables.

The :broadway configuration also has an :index key. This is the index of the producer in its supervision tree (starting from 0). This allows a features such having even producers connect to some server while odd producers connect to another.

If options is any other term, it is passed as is to the GenStage.init/1 callback. All other functions behave precisely as in GenStage with the requirements that all emitted events must be Broadway.Message structs.

optional-callbacks

Optional callbacks

A Broadway.Producer can implement two optional Broadway callbacks, prepare_for_start/2 and prepare_for_draining/1, which are useful for booting up and shutting down Broadway topologies respectively.

producing-broadway-messages

Producing Broadway messages

You should generally modify Broadway.Message structs by using the functions in the Broadway.Message module. However, if you are implementing your own producer, you can manipulate some of the struct's fields directly.

These fields are:

  • :data (required) - the data of the message. Even though the function Broadway.Message.put_data/2 exists, when creating a %Broadway.Message{} struct from scratch you will have to pass in the :data field directly.

  • :acknowledger (required) - the acknowledger of the message, of type Broadway.Message.acknowledger/0.

  • :metadata (optional) - metadata about the message that your producer can attach to the message. This is useful when you want to add some metadata to messages, and document it for users to use in their pipelines.

For example, a producer could create a message by doing something like this:

%Broadway.Message{
  data: "some data here",
  acknowledger: Broadway.NoopAcknowledger.init()
}

Link to this section Summary

Callbacks

Invoked by the terminator right before Broadway starts draining in-flight messages during shutdown.

Link to this section Callbacks

Link to this callback

prepare_for_draining(state)

View Source (optional)
@callback prepare_for_draining(state :: any()) ::
  {:noreply, [event], new_state}
  | {:noreply, [event], new_state, :hibernate}
  | {:stop, reason :: term(), new_state}
when new_state: term(), event: term()

Invoked by the terminator right before Broadway starts draining in-flight messages during shutdown.

This callback should be implemented by producers that need to do additional work before shutting down. That includes active producers like RabbitMQ that must ask the data provider to stop sending messages. It will be invoked for each producer stage.

state is the current state of the producer.

Link to this callback

prepare_for_start(module, options)

View Source (optional) (since 0.5.0)
@callback prepare_for_start(module :: atom(), options :: keyword()) ::
  {[child_spec], updated_options :: keyword()}
when child_spec: :supervisor.child_spec() | {module(), any()} | module()

Invoked once by Broadway during Broadway.start_link/2.

The goal of this callback is to manipulate the general topology options, if necessary at all, and introduce any new child specs that will be started before the producers supervisor in Broadway's supervision tree. Broadway's supervision tree is a rest_for_one supervisor (see the documentation for Supervisor), which means that if the children returned from this callback crash they will bring down the rest of the pipeline before being restarted.

This callback is guaranteed to be invoked inside the Broadway main process.

module is the Broadway module passed as the first argument to Broadway.start_link/2. options is all of Broadway topology options passed as the second argument to Broadway.start_link/2.

The return value of this callback is a tuple {child_specs, options}. child_specs is the list of child specs to be started under Broadway's supervision tree. updated_options is a potentially-updated list of Broadway options that will be used instead of the ones passed to Broadway.start_link/2. This can be used to modify the characteristics of the Broadway topology to accommodated for the children started here.

examples

Examples

defmodule MyProducer do
  @behaviour Broadway.Producer

  # other callbacks...

  @impl true
  def prepare_for_start(_module, broadway_options) do
     children = [
       {DynamicSupervisor, strategy: :one_for_one, name: MyApp.DynamicSupervisor}
     ]

     {children, broadway_options}
  end
end