Broadway v0.2.0 Broadway.Message View Source

A struct that holds all information about a message.

A message is first created by the producers. Once created, the message is sent downstream and gets updated multiple times, either by the module implementing the Broadway behaviour through the Broadway.handle_message/3 callback or internaly by one of the built-in stages of Broadway.

In order to manipulate a message, you should use one of the imported functions provided by this module.

Link to this section Summary

Functions

Mark a message as failed.

Defines the batch key within a batcher for the message.

Defines the target batcher which the message should be forwarded to.

Updates the data from a message.

Link to this section Types

Link to this type

t() View Source
t() :: %Broadway.Message{
  acknowledger: {module(), ack_ref :: term(), data :: term()},
  batch_key: term(),
  batcher: atom(),
  data: term(),
  status: :ok | {:failed, reason :: binary()}
}

Link to this section Functions

Link to this function

failed(message, reason) View Source
failed(message :: Broadway.Message.t(), reason :: term()) ::
  Broadway.Message.t()

Mark a message as failed.

Failed messages are sent directly to the related acknowledger so they're not forwarded to the next step in the pipeline.

Link to this function

put_batch_key(message, batch_key) View Source
put_batch_key(message :: Broadway.Message.t(), batch_key :: term()) ::
  Broadway.Message.t()

Defines the batch key within a batcher for the message.

Inside each batcher, we attempt to build batch_size within batch_timeout for each batch_key.

Link to this function

put_batcher(message, batcher) View Source
put_batcher(message :: Broadway.Message.t(), batcher :: atom()) ::
  Broadway.Message.t()

Defines the target batcher which the message should be forwarded to.

Link to this function

update_data(message, fun) View Source
update_data(message :: Broadway.Message.t(), fun :: (term() -> term())) ::
  Broadway.Message.t()

Updates the data from a message.

This function is usually used inside the handle_message/3 implementation in order to replace the data with the new processed data.