Broadway.handle_message
handle_message
, go back to Broadway module for more information.
Specs
handle_message( processor :: atom(), message :: Broadway.Message.t(), context :: term() ) :: Broadway.Message.t()
Invoked to handle/process individual messages sent from a producer.
It receives:
processor
is the key that defined the processor.message
is theBroadway.Message
struct to be processed.context
is the user defined data structure passed tostart_link/2
.
And it must return the (potentially) updated Broadway.Message
struct.
This is the place to do any kind of processing with the incoming message, e.g., transform the data into another data structure, call specific business logic to do calculations. Basically, any CPU bounded task that runs against a single message should be processed here.
In order to update the data after processing, use the
Broadway.Message.update_data/2
function. This way the new message can be
properly forwarded and handled by the batcher:
@impl true
def handle_message(_, message, _) do
message
|> update_data(&do_calculation_and_returns_the_new_data/1)
end
In case more than one batcher have been defined in the configuration,
you need to specify which of them the resulting message will be forwarded
to. You can do this by calling put_batcher/2
and returning the new
updated message:
@impl true
def handle_message(_, message, _) do
# Do whatever you need with the data
...
message
|> put_batcher(:s3)
end
Any message that has not been explicitly failed will be forwarded to the next step in the pipeline. If there are no extra steps, it will be automatically acknowledged.
In case of errors in this callback, the error will be logged and that particular message will be immediately acknowledged as failed, not proceeding to the next steps of the pipeline. This callback also traps exits, so failures due to broken links between processes do not automatically cascade.