A Broadway.Producer implementation for PGMQ queues.
This module requires the optional Broadway dependency.
This module can be used in a Broadway pipeline like any other producer:
Broadway.start_link(MyBroadway,
producer: [
# Second tuple element contains producer options
module: {EctoPGMQ.Producer, [repo: Repo, queue: "my_queue", ...]},
...
],
...
)Notifications and Producer Polling
EctoPGMQ.Producer supports poll-based, notification-based, and hybrid
queue consumption.
Poll-Based Consumption
When using poll-based consumption, a producer will attempt to read messages
whenever demand is received. If demand cannot be met, the producer will poll
for messages until it can. The polling interval is determined by the
:read_interval option. Sample producer options for poll-based consumption
can be seen below:
[listener: nil, read_interval: 5_000, ...]Info
This is the default approach to consumption.
Notification-Based Consumption
When using notification-based consumption, a producer will listen for
notifications and only attempt to read messages if it has reason to believe
that the queue is not empty. Depending on the characteristics of the queue,
this approach to consumption may be less taxing on the repo connection pool
than poll-based consumption. For more information about notifications and
when, to use them see EctoPGMQ.Notifications. Sample producer options for
notification-based consumption can be seen below:
[listener: MyListener, read_interval: :infinity, ...]Warning
A purely notification-based consumer will be subject to race conditions when connecting/reconnecting to the DB. In practice, this means that there is a risk of messages being available in the queue but not being read until more messages arrive and trigger another notification. For this reason, Hybrid Consumption is preferred over Notification-Based consumption.
Hybrid Consumption
Like with notification-based consumption, a producer using hybrid
consumption will listen for notifications BUT it will also poll for
messages to make notification race conditions less problematic. The
:read_interval option then effectively becomes the maximum amount of time
a visible message can wait in the queue if a notification is missed. Sample
producer options for hybrid consumption can be seen below:
[listener: MyListener, read_interval: 300_000, ...]Tip
This is the preferred approach to consumption when leveraging notifications.
Acknowledgements
EctoPGMQ.Producer supports a number of different acknowledgement actions
(see ack_action/0). Default acknowledgement actions can be set for both
successful and failed messages. Additionally, EctoPGMQ.Producer supports
configuring individual message acknowledgements by calling
Broadway.Message.configure_ack/2 with the following options:
:ack_action- A requiredack_action/0denoting how to acknowledge the message.
When used together, default acknowledgement actions and individual message
acknowledgement configuration can be used to implement more complex message
handling. For example, an EctoPGMQ.Producer can use :delete as the
default success acknowledgement, :nothing as the default failure
acknowledgement, and a code snippet like the one below to implement a
maximum number of attempts for each message:
alias Broadway.Message
@max_attempts 3
@impl Broadway
def handle_message(_, message, _) do
case do_process_pgmq_message(message.data) do
:ok -> message
{:error, reason} -> Message.failed(message, reason)
end
end
@impl Broadway
def handle_failed(messages, _) do
Enum.map(messages, fn
%{data: %{reads: r}} = msg when r < @max_attempts -> msg
msg -> Message.configure_ack(msg, ack_action: :archive)
end)
endWarning
All acknowledgement configuration is effectively ignored when deleting messages on read.
Options
An EctoPGMQ.Producer can be started with the following options:
:dynamic_repo- An optionalatom/0name orpid/0of a dynamic repo to use for all DB operations. For more information about dynamic repos, see Dynamic repositories.:listener- An optional listener specification that can take any of the following forms:An opts
keyword/0to be passed toEctoPGMQ.Notifications.start_link/1to start a listener under Broadway's supervision tree. Thekeyword/0MUST contain a:namekey.An existing
EctoPGMQ.Notifications.listener/0. This is useful for sharing a single listener (and, by proxy, a single Postgres connection) between multiple producers.nilto not subscribe to notifications.
Defaults to
nil. For more information about configuring notifications for a producer, see Notifications and Producer Polling. For more information about notifications in general, seeEctoPGMQ.Notifications.:on_failure- An optionalack_action/0denoting the default acknowledgement for failed messages. Defaults to:archive. For more information about acknowledgements, see Acknowledgements.:on_success- An optionalack_action/0denoting the default acknowledgement for successful messages. Defaults to:delete. For more information about acknowledgements, see Acknowledgements.:queue- A requiredEctoPGMQ.Queue.name/0to read messages from.:read_interval- An optionalDuration.t/0ortimeout/0denoting how long to wait between polls when there is outstanding demand (:infinityto disable polling). Defaults to5_000. For more information about configuring polling for a producer, see Notifications and Producer Polling.:read_opts- An optionalEctoPGMQ.read_messages_opts/0to be used when reading messages. Defaults to[].:repo- A requiredEcto.Repo.t/0to be used for all DB operations.:visibility_timeout- A requiredEctoPGMQ.visibility_timeout/0for read operations.
Summary
Types
An acknowledgement action for PGMQ messages.
Types
@type ack_action() :: :delete | :archive | :nothing | {:update_visibility_timeout, EctoPGMQ.visibility_timeout()}
An acknowledgement action for PGMQ messages.
This can take any of the following forms:
:deleteto delete messages from the queue.:archiveto move messages from the queue to the archive.:nothingto retry messages once the visibility timeout expires{:update_visibility_timeout, EctoPGMQ.visibility_timeout()}to retry messages when the updatedEctoPGMQ.visibility_timeout/0expires.