You're seeing just the callback handle_message, go back to Broadway module for more information.
Link to this callback

handle_message(processor, message, context)

View Source


  processor :: atom(),
  message :: Broadway.Message.t(),
  context :: term()
) :: Broadway.Message.t()

Invoked to handle/process individual messages sent from a producer.

It receives:

  • processor is the key that defined the processor.
  • message is the Broadway.Message struct to be processed.
  • context is the user defined data structure passed to start_link/2.

And it must return the (potentially) updated Broadway.Message struct.

This is the place to do any kind of processing with the incoming message, e.g., transform the data into another data structure, call specific business logic to do calculations. Basically, any CPU bounded task that runs against a single message should be processed here.

In order to update the data after processing, use the Broadway.Message.update_data/2 function. This way the new message can be properly forwarded and handled by the batcher:

@impl true
def handle_message(_, message, _) do
  |> update_data(&do_calculation_and_returns_the_new_data/1)

In case more than one batcher have been defined in the configuration, you need to specify which of them the resulting message will be forwarded to. You can do this by calling put_batcher/2 and returning the new updated message:

@impl true
def handle_message(_, message, _) do
  # Do whatever you need with the data

  |> put_batcher(:s3)

Any message that has not been explicitly failed will be forwarded to the next step in the pipeline. If there are no extra steps, it will be automatically acknowledged.

In case of errors in this callback, the error will be logged and that particular message will be immediately acknowledged as failed, not proceeding to the next steps of the pipeline. This callback also traps exits, so failures due to broken links between processes do not automatically cascade.