handle_message, go back to Broadway module for more information.
handle_message( processor :: atom(), message :: Broadway.Message.t(), context :: term() ) :: Broadway.Message.t()
Invoked to handle/process individual messages sent from a producer.
processoris the key that defined the processor.
Broadway.Messagestruct to be processed.
contextis the user defined data structure passed to
And it must return the (potentially) updated
This is the place to do any kind of processing with the incoming message, e.g., transform the data into another data structure, call specific business logic to do calculations. Basically, any CPU bounded task that runs against a single message should be processed here.
In order to update the data after processing, use the
Broadway.Message.update_data/2 function. This way the new message can be
properly forwarded and handled by the batcher:
@impl true def handle_message(_, message, _) do message |> update_data(&do_calculation_and_returns_the_new_data/1) end
In case more than one batcher have been defined in the configuration,
you need to specify which of them the resulting message will be forwarded
to. You can do this by calling
put_batcher/2 and returning the new
@impl true def handle_message(_, message, _) do # Do whatever you need with the data ... message |> put_batcher(:s3) end
Any message that has not been explicitly failed will be forwarded to the next step in the pipeline. If there are no extra steps, it will be automatically acknowledged.
In case of errors in this callback, the error will be logged and that particular message will be immediately acknowledged as failed, not proceeding to the next steps of the pipeline. This callback also traps exits, so failures due to broken links between processes do not automatically cascade.