An opinionated PGMQ client for Elixir that builds on top of Ecto and the
Ecto.Adapters.Postgres adapter.
PGMQ Installation
Because PGMQ is entirely made up of SQL objects, there are two available installation methods:
Extension Installation - This method installs PGMQ as a traditional Postgres extension. This is the preferred installation method but it requires access to the Postgres server file system and, therefore, may not always be feasible.
SQL Installation - This method installs PGMQ by manually creating all of the necessary SQL objects and works entirely within the database.
EctoPGMQ.Migrations contains helper functions for managing both installation
methods.
For more information about managing the PGMQ extension, see the PGMQ Installation and Updating guides.
Partitioning
PGMQ supports partitioning both queues and archives.
The pg_partman extension must be available in order to use partitioning.
For more information about partitioning, see the PGMQ docs.
Polling
PGMQ supports Postgres server-side polling during read operations. Reading with a poll can be used to reduce network round trips if there is a good chance that demand can be satisfied in a short time BUT doing so utilizes a connection for the duration of the read operation. As such, polling should be avoided in situations where the DB connection pool is a bottleneck.
FIFO Message Groups
While PGMQ queues are FIFO data structures, the order of message processing
can be non-deterministic when there are multiple consumers. This is usually
fine but there is sometimes a need to consume messages strictly in order
within a group. In order to support this, PGMQ exposes a number of functions
that read messages while guaranteeing FIFO ordering for messages with the same
x-pgmq-group header.
There are two slightly different methodologies for reading messages while respecting FIFO message groups: round-robin reading and throughput-optimized reading.
Round-Robin Reading
This method will fairly interleave messages from all available groups.
iex> specs = [{%{}, "A"}, {%{}, "A"}, {%{}, "B"}, {%{}, "B"}, {%{}, "C"}]
iex> [id_1, id_2, id_3, id_4, id_5] = send_messages(Repo, "my_queue", specs)
iex> messages = read_messages(Repo, "my_queue", 300, 5, message_grouping: :round_robin)
iex> Enum.map(messages, & &1.id) == [id_1, id_3, id_5, id_2, id_4]
trueThroughput-Optimized Reading
This method will prioritize messages from the same group. As the name implies, this method will often be more efficient than round-robin reading.
iex> specs = [{%{}, "A"}, {%{}, "A"}, {%{}, "B"}, {%{}, "B"}, {%{}, "C"}]
iex> message_ids = send_messages(Repo, "my_queue", specs)
iex> messages = read_messages(Repo, "my_queue", 300, 5, message_grouping: :throughput_optimized)
iex> Enum.map(messages, & &1.id) == message_ids
trueWarning
If message groups are long-lived and high-volume, this method of reading can effectively starve later groups. For more information, see Performance Considerations.
Performance Considerations
In general, FIFO message groups are more performant when the following conditions are met:
There are many low-volume groups
Messages are removed from the queue relatively quickly
The queue is optimized for FIFO message group reads (see
EctoPGMQ.create_queue/4andEctoPGMQ.update_queue/4).
Further Information
For more information about FIFO message groups, see the PGMQ docs.
Summary
Types
A delay before a message becomes visible.
Message update attributes.
The minimum time between notifications.
A queue partition configuration.
A message polling configuration.
Queue creation attributes.
Queue update attributes.
Options for reading messages.
The time from now that a message is invisible.
Queue API
Lists all queues.
Creates a queue with the given name.
Drops the given queue.
Gets the given queue.
Purges the given queue.
Updates the given queue.
Types
@type delay() :: Duration.t() | EctoPGMQ.PGMQ.delay()
A delay before a message becomes visible.
This can take any of the following forms:
A
Duration.t/0denoting the time to wait before a message becomes visible.An
integer/0denoting the time (in seconds) to wait before a message becomes visible.A
DateTime.t/0denoting when a message should become visible.
For more information about this type, see EctoPGMQ.PGMQ.delay/0.
@type message_update_attributes() :: %{visibility_timeout: visibility_timeout()}
Message update attributes.
The following attributes are supported:
:visibility_timeout- A requiredvisibility_timeout/0for the messages.
@type notification_throttle() :: Duration.t() | EctoPGMQ.PGMQ.throttle_interval()
The minimum time between notifications.
This can take either of the following forms:
A
Duration.t/0denoting the minimum time between notifications.A
non_neg_integer/0denoting the minimum time (in milliseconds) between notifications.
For more information about notifications, see EctoPGMQ.Notifications.
For more information about this type, see
EctoPGMQ.PGMQ.throttle_interval/0.
@type partition_config() :: {EctoPGMQ.PGMQ.partition_interval(), EctoPGMQ.PGMQ.retention_interval()}
A queue partition configuration.
A partition configuration is a tuple containing two elements: the partition interval and the retention interval.
Both elements can take either of the following forms:
A
Duration.t/0denoting a time-based interval.A
pos_integer/0denoting a message-based interval.
For more information about partitioning, see Partitioning.
For more information about this type, see
EctoPGMQ.PGMQ.partition_interval/0 and
EctoPGMQ.PGMQ.retention_interval/0.
@type poll_config() :: {Duration.t() | EctoPGMQ.PGMQ.poll_interval(), Duration.t() | EctoPGMQ.PGMQ.poll_timeout()}
A message polling configuration.
A polling configuration is a tuple containing two elements: the poll interval and the poll timeout.
Both elements can take either of the following forms:
A
Duration.t/0denoting a length of time.A
pos_integer/0denoting a length of time. The unit for the poll interval is milliseconds and the unit for the timeout is seconds.
For more information about polling, see Polling.
For more information about this type, see EctoPGMQ.PGMQ.poll_interval/0
and EctoPGMQ.PGMQ.poll_timeout/0.
@type queue_create_attributes() :: %{ optional(:message_groups?) => boolean(), optional(:notifications) => notification_throttle() | nil, optional(:partitions) => partition_config() | nil, optional(:unlogged?) => boolean() }
Queue creation attributes.
The following attributes are supported:
:message_groups?- An optionalboolean/0denoting whether or not the queue should be optimized for FIFO message group reads. Defaults tofalse. For more information about FIFO message groups, see FIFO Message Groups.:notifications- An optionalnotification_throttle/0for the queue ornilto leave notifications disabled. Defaults tonil. For more information about notifications, seeEctoPGMQ.Notifications.:partitions- An optionalpartition_config/0for the queue ornilto disable partitioning. This option is ignored for unlogged queues. Defaults tonil. For more information about partitioning, see Partitioning.:unlogged?- An optionalboolean/0denoting whether or not the queue should be unlogged. Defaults tofalse.
@type queue_update_attributes() :: %{ optional(:message_groups?) => true, optional(:notifications) => notification_throttle() | nil }
Queue update attributes.
The following attributes are supported:
:message_groups?-trueto optimize the queue for FIFO message group reads. Note thattrueis the only valid value because this operation cannot be undone. For more information about FIFO message groups, see FIFO Message Groups.:notifications- An optionalnotification_throttle/0for the queue ornilto disable notifications. For more information about notifications, seeEctoPGMQ.Notifications.
@type read_messages_opts() :: [ {:delete?, boolean()} | {:polling, poll_config() | nil} | {:message_grouping, :round_robin | :throughput_optimized | nil} | EctoPGMQ.PGMQ.query_opt() ]
Options for reading messages.
In addition to the standard query options, messages can be read with the following options:
:delete?- An optionalboolean/0denoting whether or not to delete messages immediately after reading them. Defaults tofalse. For more information, seeEctoPGMQ.PGMQ.pop/4.:message_grouping- An optional value specifying how to handle message groups when reading messages. Possible values are:round_robin,:throughput_optimized, ornilto ignore message groups when reading. This option is ignored when deleting on read. Defaults tonil. For more information about FIFO message groups, see FIFO Message Groups.:polling- An optionalpoll_config/0for the read operation or nil to disable polling. This option is ignored when deleting on read. Defaults tonil. For more information about polling, see Polling.
@type visibility_timeout() :: Duration.t() | EctoPGMQ.PGMQ.visibility_timeout()
The time from now that a message is invisible.
This can take either of the following forms:
A
Duration.t/0denoting how long a message is invisible.An
integer/0denoting how long (in seconds) a message is invisible.
For more information about this type, see
EctoPGMQ.PGMQ.visibility_timeout/0.
Message API
Queue API
@spec all_queues(Ecto.Repo.t(), [EctoPGMQ.PGMQ.query_opt()]) :: [EctoPGMQ.Queue.t()]
Lists all queues.
Options
This function supports the standard query options.
Examples
iex> queues = all_queues(Repo)
iex> Enum.all?(queues, &is_struct(&1, EctoPGMQ.Queue))
true
@spec create_queue(Ecto.Repo.t(), EctoPGMQ.Queue.name(), queue_create_attributes(), [ EctoPGMQ.PGMQ.query_opt() ]) :: EctoPGMQ.Queue.t()
Creates a queue with the given name.
To create a queue in an Ecto.Migration, see
EctoPGMQ.Migrations.create_queue/2.
Options
This function supports the standard query options.
Examples
iex> queue = create_queue(Repo, "my_unpartitioned_queue", %{notifications: 1_000})
iex> match?(%EctoPGMQ.Queue{notifications: %EctoPGMQ.Throttle{}}, queue)
true
iex> queue = create_queue(Repo, "my_partitioned_queue", %{partitions: {10_000, 100_000}})
iex> match?(%EctoPGMQ.Queue{is_partitioned: true}, queue)
true
iex> partitions = {Duration.new!(hour: 1), Duration.new!(day: 1)}
iex> queue = create_queue(Repo, "my_partitioned_queue", %{partitions: partitions})
iex> match?(%EctoPGMQ.Queue{is_partitioned: true}, queue)
true
iex> queue = create_queue(Repo, "my_unlogged_queue", %{unlogged?: true})
iex> match?(%EctoPGMQ.Queue{is_unlogged: true}, queue)
true
@spec drop_queue(Ecto.Repo.t(), EctoPGMQ.Queue.name(), [EctoPGMQ.PGMQ.query_opt()]) :: :ok
Drops the given queue.
To drop a queue in an Ecto.Migration, see
EctoPGMQ.Migrations.drop_queue/2.
Options
This function supports the standard query options.
Examples
iex> drop_queue(Repo, "my_queue")
:ok
@spec get_queue(Ecto.Repo.t(), EctoPGMQ.Queue.name(), [EctoPGMQ.PGMQ.query_opt()]) :: EctoPGMQ.Queue.t() | nil
Gets the given queue.
Options
This function supports the standard query options.
Examples
iex> queue = get_queue(Repo, "my_queue")
iex> match?(%EctoPGMQ.Queue{}, queue)
true
iex> get_queue(Repo, "my_non_existent_queue")
nil
@spec purge_queue(Ecto.Repo.t(), EctoPGMQ.Queue.name(), [EctoPGMQ.PGMQ.query_opt()]) :: EctoPGMQ.PGMQ.purged_messages()
Purges the given queue.
Options
This function supports the standard query options.
Examples
iex> send_messages(Repo, "my_queue", [%{"foo" => 1}, %{"bar" => 2}])
iex> purge_queue(Repo, "my_queue")
2
@spec update_queue(Ecto.Repo.t(), EctoPGMQ.Queue.name(), queue_create_attributes(), [ EctoPGMQ.PGMQ.query_opt() ]) :: EctoPGMQ.Queue.t()
Updates the given queue.
Warning
Because the underlying tables are owned by PGMQ, this function avoids row locks so as not to disturb any internal PGMQ processes. As a result, if multiple processes attempt to update a queue simultaneously, they may get unexpected results.
To update a queue in an Ecto.Migration, see
EctoPGMQ.Migrations.update_queue/2.
Options
This function supports the standard query options.
Examples
iex> throttle = Duration.new!(second: 5)
iex> queue = update_queue(Repo, "my_queue", %{notifications: throttle})
iex> match?(%EctoPGMQ.Queue{notifications: %EctoPGMQ.Throttle{}}, queue)
true