EctoPGMQ.Producer (ecto_pgmq v1.0.0)

Copy Markdown View Source

A Broadway.Producer implementation for PGMQ queues.

This module requires the optional Broadway dependency.

This module can be used in a Broadway pipeline like any other producer:

Broadway.start_link(MyBroadway,
  producer: [
    # Second tuple element contains producer options
    module: {EctoPGMQ.Producer, [repo: Repo, queue: "my_queue", ...]},
    ...
  ],
  ...
)

Notifications and Producer Polling

EctoPGMQ.Producer supports poll-based, notification-based, and hybrid queue consumption.

Poll-Based Consumption

When using poll-based consumption, a producer will attempt to read messages whenever demand is received. If demand cannot be met, the producer will poll for messages until it can. The polling interval is determined by the :read_interval option. Sample producer options for poll-based consumption can be seen below:

[listener: nil, read_interval: 5_000, ...]

Info

This is the default approach to consumption.

Notification-Based Consumption

When using notification-based consumption, a producer will listen for notifications and only attempt to read messages if it has reason to believe that the queue is not empty. Depending on the characteristics of the queue, this approach to consumption may be less taxing on the repo connection pool than poll-based consumption. For more information about notifications and when, to use them see EctoPGMQ.Notifications. Sample producer options for notification-based consumption can be seen below:

[listener: MyListener, read_interval: :infinity, ...]

Warning

A purely notification-based consumer will be subject to race conditions when connecting/reconnecting to the DB. In practice, this means that there is a risk of messages being available in the queue but not being read until more messages arrive and trigger another notification. For this reason, Hybrid Consumption is preferred over Notification-Based consumption.

Hybrid Consumption

Like with notification-based consumption, a producer using hybrid consumption will listen for notifications BUT it will also poll for messages to make notification race conditions less problematic. The :read_interval option then effectively becomes the maximum amount of time a visible message can wait in the queue if a notification is missed. Sample producer options for hybrid consumption can be seen below:

[listener: MyListener, read_interval: 300_000, ...]

Tip

This is the preferred approach to consumption when leveraging notifications.

Acknowledgements

EctoPGMQ.Producer supports a number of different acknowledgement actions (see ack_action/0). Default acknowledgement actions can be set for both successful and failed messages. Additionally, EctoPGMQ.Producer supports configuring individual message acknowledgements by calling Broadway.Message.configure_ack/2 with the following options:

  • :ack_action - A required ack_action/0 denoting how to acknowledge the message.

When used together, default acknowledgement actions and individual message acknowledgement configuration can be used to implement more complex message handling. For example, an EctoPGMQ.Producer can use :delete as the default success acknowledgement, :nothing as the default failure acknowledgement, and a code snippet like the one below to implement a maximum number of attempts for each message:

alias Broadway.Message

@max_attempts 3

@impl Broadway
def handle_message(_, message, _) do
  case do_process_pgmq_message(message.data) do
    :ok -> message
    {:error, reason} -> Message.failed(message, reason)
  end
end

@impl Broadway
def handle_failed(messages, _) do
  Enum.map(messages, fn
    %{data: %{reads: r}} = msg when r < @max_attempts -> msg
    msg -> Message.configure_ack(msg, ack_action: :archive)
  end)
end

Warning

All acknowledgement configuration is effectively ignored when deleting messages on read.

Options

An EctoPGMQ.Producer can be started with the following options:

Summary

Types

An acknowledgement action for PGMQ messages.

Types

ack_action()

@type ack_action() ::
  :delete
  | :archive
  | :nothing
  | {:update_visibility_timeout, EctoPGMQ.visibility_timeout()}

An acknowledgement action for PGMQ messages.

This can take any of the following forms:

  • :delete to delete messages from the queue.

  • :archive to move messages from the queue to the archive.

  • :nothing to retry messages once the visibility timeout expires

  • {:update_visibility_timeout, EctoPGMQ.visibility_timeout()} to retry messages when the updated EctoPGMQ.visibility_timeout/0 expires.