View Source Broadway.Message (Broadway v1.1.0)

This struct holds all information about a message.

A message is first created by the producers. It is then sent downstream and gets updated multiple times, either by a module implementing the Broadway behaviour through the Broadway.handle_message/3 callback or internally by one of the built-in stages of Broadway.

Instead of modifying the struct directly, you should use the functions provided by this module to manipulate messages. However, if you are implementing a Broadway.Producer of your own, see the %Broadway.Message{} documentation to see what fields you should set.

Summary

Types

The acknowledger of the message.

t()

The Broadway message struct.

Functions

Immediately acknowledges the given message or list of messages.

Configures the acknowledger of this message.

Mark a message as failed.

Defines the message batch key.

Sets the batching mode for the message.

Defines the target batcher which the message should be forwarded to.

Stores the given data in the message.

Updates the data in the message.

Types

Link to this type

acknowledger()

View Source (since 1.1.0)
@type acknowledger() :: {module(), ack_ref :: term(), data :: term()}

The acknowledger of the message.

This tuple contains:

@type t() :: %Broadway.Message{
  acknowledger: acknowledger(),
  batch_key: term(),
  batch_mode: :bulk | :flush,
  batcher: atom(),
  data: term(),
  metadata: %{optional(atom()) => term()},
  status:
    :ok
    | {:failed, reason :: term()}
    | {:throw | :error | :exit, term(), Exception.stacktrace()}
}

The Broadway message struct.

Most of these fields are manipulated by Broadway itself. You can read the :metadata field, and you can use the functions in this module to update most of the other fields. If you are implementing your own producer, see the Broadway.Producer documentation for more information on how to create and manipulate message structs.

Functions

Link to this function

ack_immediately(message_or_messages)

View Source (since 0.5.0)
@spec ack_immediately(message :: t()) :: t()
@spec ack_immediately(messages :: [t(), ...]) :: [t(), ...]

Immediately acknowledges the given message or list of messages.

This function can be used to acknowledge a message (or list of messages) immediately without waiting for the rest of the pipeline.

Acknowledging a message sets that message's acknowledger to a no-op acknowledger so that it's safe to ack at the end of the pipeline.

Returns the updated acked message if a message is passed in, or the updated list of acked messages if a list of messages is passed in.

Link to this function

configure_ack(message, options)

View Source (since 0.5.0)
@spec configure_ack(message :: t(), options :: keyword()) :: t()

Configures the acknowledger of this message.

This function calls the Broadway.Acknowledger.configure/3 callback to change the configuration of the acknowledger for the given message.

This function can only be called if the acknowledger implements the configure/3 callback. If it doesn't, an error is raised.

@spec failed(message :: t(), reason :: term()) :: t()

Mark a message as failed.

Failed messages are sent directly to the related acknowledger at the end of this step and therefore they're not forwarded to the next step in the pipeline.

Failing a message does not emit any log but it does trigger the Broadway.handle_failed/2 callback.

Link to this function

put_batch_key(message, batch_key)

View Source
@spec put_batch_key(message :: t(), batch_key :: term()) :: t()

Defines the message batch key.

The batch key identifies the batch the message belongs to, within a given batcher. Each batcher then groups batches with the same batch_key, with size of at most batch_size within period batch_timeout. Both batch_size and batch_timeout are managed per batch key, so a batcher is capable of grouping multiple batch keys at the same time, regardless of the concurrency level.

If a given batcher has multiple batch processors (concurrency > 1), all messages with the same batch key are routed to the same processor. So different batch keys may run concurrently but the same batch key is always run serially and in the same batcher processor.

Link to this function

put_batch_mode(message, mode)

View Source
@spec put_batch_mode(message :: t(), mode :: :bulk | :flush) :: t()

Sets the batching mode for the message.

When the mode is :bulk, the batch that the message is in is delivered after the batch size or batch timeout is reached.

When the mode is :flush, the batch that the message is in is delivered immediately after processing. Note it doesn't mean the batch contains only a single element but rather that all messages received from the processor are delivered without waiting.

The default mode for messages is :bulk.

Link to this function

put_batcher(message, batcher)

View Source
@spec put_batcher(message :: t(), batcher :: atom()) :: t()

Defines the target batcher which the message should be forwarded to.

Link to this function

put_data(message, data)

View Source (since 1.0.0)
@spec put_data(message :: t(), term()) :: t()

Stores the given data in the message.

This function is usually used inside the Broadway.handle_message/3 implementation to replace data with new processed data.

Link to this function

update_data(message, fun)

View Source
@spec update_data(message :: t(), fun :: (term() -> term())) :: t()

Updates the data in the message.

This function is usually used inside the Broadway.handle_message/3 implementation to update data with new processed data.