Broadway v0.5.0 Broadway.Message View Source

A struct that holds all information about a message.

A message is first created by the producers. Once created, the message is sent downstream and gets updated multiple times, either by the module implementing the Broadway behaviour through the Broadway.handle_message/3 callback or internaly by one of the built-in stages of Broadway.

In order to manipulate a message, you should use one of the imported functions provided by this module.

Link to this section Summary

Functions

Immediately acknowledges the given message or list of messages.

Configures the acknowledger of this message.

Mark a message as failed.

Defines the batch key within a batcher for the message.

Sets the batching mode for the message.

Defines the target batcher which the message should be forwarded to.

Updates the data from a message.

Link to this section Types

Link to this type

t() View Source
t() :: %Broadway.Message{
  acknowledger: {module(), ack_ref :: term(), data :: term()},
  batch_key: term(),
  batch_mode: :bulk | :flush,
  batcher: atom(),
  data: term(),
  metadata: %{optional(atom()) => term()},
  status:
    :ok
    | {:failed, reason :: binary()}
    | {:throw | :error | :exit, term(), Exception.stacktrace()}
}

Link to this section Functions

Link to this function

ack_immediately(message_or_messages) View Source
ack_immediately(message :: Broadway.Message.t()) :: Broadway.Message.t()
ack_immediately(messages :: [Broadway.Message.t(), ...]) :: [
  Broadway.Message.t(),
  ...
]

Immediately acknowledges the given message or list of messages.

This function can be used to acknowledge a message (or list of messages) immediately withouth waiting for the rest of the pipeline.

Acknowledging a message sets that message's acknowledger to a no-op acknowledger so that it's safe to ack at the end of the pipeline.

Returns the updated acked message if a message is passed in, or the updated list of acked messages if a list of messages is passed in.

Link to this function

configure_ack(message, options) View Source
configure_ack(message :: Broadway.Message.t(), options :: keyword()) ::
  Broadway.Message.t()

Configures the acknowledger of this message.

This function calls the Broadway.Acknowledger.configure/3 callback to change the configuration of the acknowledger for the given message.

This function can only be called if the acknowledger implements the configure/3 callback. If it doesn't, an error is raised.

Link to this function

failed(message, reason) View Source
failed(message :: Broadway.Message.t(), reason :: term()) ::
  Broadway.Message.t()

Mark a message as failed.

Failed messages are sent directly to the related acknowledger so they're not forwarded to the next step in the pipeline.

Link to this function

put_batch_key(message, batch_key) View Source
put_batch_key(message :: Broadway.Message.t(), batch_key :: term()) ::
  Broadway.Message.t()

Defines the batch key within a batcher for the message.

Inside each batcher, we attempt to build batch_size within batch_timeout for each batch_key.

Link to this function

put_batch_mode(message, mode) View Source
put_batch_mode(message :: Broadway.Message.t(), mode :: :bulk | :flush) ::
  Broadway.Message.t()

Sets the batching mode for the message.

When the mode is :bulk, the batch that the message is in is delivered after the batch size or the batch timeout is reached.

When the mode is :flush, the batch that the message is in is delivered immediately after processing. Note it doesn't mean the batch contains only a single element but rather that all messages receives from the processor are delivered without waiting.

The default mode for messages is :bulk.

Link to this function

put_batcher(message, batcher) View Source
put_batcher(message :: Broadway.Message.t(), batcher :: atom()) ::
  Broadway.Message.t()

Defines the target batcher which the message should be forwarded to.

Link to this function

update_data(message, fun) View Source
update_data(message :: Broadway.Message.t(), fun :: (term() -> term())) ::
  Broadway.Message.t()

Updates the data from a message.

This function is usually used inside the handle_message/3 implementation in order to replace the data with the new processed data.