Broadway.Message

A struct that holds all information about a message.

A message is first created by the producers. Once created, the message is sent downstream and gets updated multiple times, either by the module implementing the Broadway behaviour through the Broadway.handle_message/3 callback or internaly by one of the built-in stages of Broadway.

In order to manipulate a message, you should use one of the imported functions provided by this module.

Immediately acknowledges the given message or list of messages.

Configures the acknowledger of this message.

Mark a message as failed.

Defines the batch key within a batcher for the message.

Sets the batching mode for the message.

Defines the target batcher which the message should be forwarded to.

Updates the data from a message.

t()
t() :: %Broadway.Message{
  acknowledger: {module(), ack_ref :: term(), data :: term()},
  batch_key: term(),
  batch_mode: :bulk | :flush,
  batcher: atom(),
  data: term(),
  metadata: %{optional(atom()) => term()},
    | {:failed, reason :: binary()}
    | {:throw | :error | :exit, term(), Exception.stacktrace()}

ack_immediately(message_or_messages)
ack_immediately(message :: Broadway.Message.t()) :: Broadway.Message.t()
ack_immediately(messages :: [Broadway.Message.t(), ...]) :: [

Immediately acknowledges the given message or list of messages.

This function can be used to acknowledge a message (or list of messages) immediately withouth waiting for the rest of the pipeline.

Acknowledging a message sets that message's acknowledger to a no-op acknowledger so that it's safe to ack at the end of the pipeline.

Returns the updated acked message if a message is passed in, or the updated list of acked messages if a list of messages is passed in.

configure_ack(message, options)
configure_ack(message :: Broadway.Message.t(), options :: keyword()) ::

Configures the acknowledger of this message.

This function calls the Broadway.Acknowledger.configure/3 callback to change the configuration of the acknowledger for the given message.

This function can only be called if the acknowledger implements the configure/3 callback. If it doesn't, an error is raised.

failed(message, reason)
failed(message :: Broadway.Message.t(), reason :: term()) ::

Mark a message as failed.

Failed messages are sent directly to the related acknowledger so they're not forwarded to the next step in the pipeline.

put_batch_key(message, batch_key)
put_batch_key(message :: Broadway.Message.t(), batch_key :: term()) ::

Defines the batch key within a batcher for the message.

Inside each batcher, we attempt to build batch_size within batch_timeout for each batch_key.

put_batch_mode(message, mode)
put_batch_mode(message :: Broadway.Message.t(), mode :: :bulk | :flush) ::

Sets the batching mode for the message.

When the mode is :bulk, the batch that the message is in is delivered after the batch size or the batch timeout is reached.

When the mode is :flush, the batch that the message is in is delivered immediately after processing. Note it doesn't mean the batch contains only a single element but rather that all messages receives from the processor are delivered without waiting.

The default mode for messages is :bulk.

put_batcher(message, batcher)
put_batcher(message :: Broadway.Message.t(), batcher :: atom()) ::

Defines the target batcher which the message should be forwarded to.

update_data(message, fun)
update_data(message :: Broadway.Message.t(), fun :: (term() -> term())) ::

Updates the data from a message.

This function is usually used inside the handle_message/3 implementation in order to replace the data with the new processed data.