An SDK that fully covers the PGMQ API-space.
Instead of implementing a corresponding function for every alias and parameterization supported by PGMQ, this module relies on client-side defaults to implement a single function for each distinct piece of PGMQ functionality.
Query Options
All of the functions in this module support a common set of query options.
For a detailed description of these options, see query_opt/0.
Summary
Types
Filter conditions to be applied when reading messages from a queue.
A delay before a message becomes visible.
The number of partitions to create preemptively.
The interval at which new partitions should be created.
The time (in milliseconds) to wait between polls.
The maximum time (in seconds) to poll for messages.
The number of purged messages.
The maximum number of messages to read.
A query configuration option.
The interval at which old partitions should be dropped.
The minimum time (in milliseconds) between notifications.
The time from now (in seconds) that a message is invisible.
Functions
Archives the given messages in the given queue.
Converts the archive for the given queue into a partitioned table.
Creates an index to optimize FIFO message group read performance for the given queue.
Creates an unpartitioned queue with the given name.
Creates a partitioned queue with the given name.
Creates an unlogged queue with the given name.
Deletes the given messages from the given queue.
Disables insert notifications for the given queue.
Drops the given queue.
Enables insert notifications for the given queue.
Lists all queues.
Returns metrics for all queues.
Simultaneously fetches and deletes messages from the given queue.
Purges all messages from the given queue and returns the number of messages that were deleted.
Reads messages from the given queue.
Reads messages from the given queue while respecting FIFO message groups and optimizing throughput.
Reads messages from the given queue while respecting and round-robin interleaving FIFO message groups.
Reads messages from the given queue with a Postgres server-side poll while respecting and round-robin interleaving FIFO message groups.
Reads messages from the given queue with a Postgres server-side poll while respecting FIFO message groups and optimizing throughput.
Reads messages from the given queue with a Postgres server-side poll.
Sends the given messages to the given queue.
Sets the visibility timeout of the given messages in the given queue.
Types
Filter conditions to be applied when reading messages from a queue.
Note that the filter conditions are applied to the message body, not the headers.
Warning
As stated in the PGMQ docs, conditional message reading is an experimental feature and the API might be subject to change in future releases.
@type delay() :: non_neg_integer() | DateTime.t()
A delay before a message becomes visible.
This can take either of the following forms:
An
integer/0denoting the time (in seconds) to wait before a message becomes visible.A
DateTime.t/0denoting when a message should become visible.
@type leading_partitions() :: non_neg_integer()
The number of partitions to create preemptively.
@type partition_interval() :: pos_integer() | Duration.t()
The interval at which new partitions should be created.
This can take either of the following forms:
A
pos_integer/0denoting how many messages per partition.A
Duration.t/0denoting a time range per partition.
@type poll_interval() :: pos_integer()
The time (in milliseconds) to wait between polls.
@type poll_timeout() :: pos_integer()
The maximum time (in seconds) to poll for messages.
@type purged_messages() :: non_neg_integer()
The number of purged messages.
@type quantity() :: pos_integer()
The maximum number of messages to read.
A query configuration option.
The following query configuration options are supported:
@type retention_interval() :: pos_integer() | Duration.t()
The interval at which old partitions should be dropped.
This can take either of the following forms:
A
pos_integer/0denoting how many messages to retain in total.A
Duration.t/0denoting a total time range to retain.
@type throttle_interval() :: non_neg_integer()
The minimum time (in milliseconds) between notifications.
A throttle interval of 0 effectively disables notification throttling.
For more information about notification throttling, see Throttling.
@type visibility_timeout() :: integer()
The time from now (in seconds) that a message is invisible.
Functions
@spec archive(Ecto.Repo.t(), EctoPGMQ.Queue.name(), [EctoPGMQ.Message.id()], [ query_opt() ]) :: :ok
Archives the given messages in the given queue.
For more information about this function, see the PGMQ docs.
Examples
iex> message_ids = send_batch(Repo, "my_queue", [%{"foo" => 1}, %{"bar" => 2}])
iex> archive(Repo, "my_queue", message_ids)
:ok
@spec convert_archive_partitioned( Ecto.Repo.t(), EctoPGMQ.Queue.name(), partition_interval(), retention_interval(), leading_partitions(), [query_opt()] ) :: :ok
Converts the archive for the given queue into a partitioned table.
Warning
This function postfixes the old archive table name with _old and leaves
its contents untouched. Additional cleanup (table deletion, message
movement, etc.) is left to the user.
For more information about partitioning, see Partitioning.
For more information about this function, see the PGMQ docs.
Examples
iex> convert_archive_partitioned(Repo, "my_queue", 10_000, 100_000, 10)
:ok
iex> partition = Duration.new!(hour: 1)
iex> retention = Duration.new!(day: 1)
iex> convert_archive_partitioned(Repo, "my_queue", partition, retention, 10)
:ok
@spec create_fifo_index(Ecto.Repo.t(), EctoPGMQ.Queue.name(), [query_opt()]) :: :ok
Creates an index to optimize FIFO message group read performance for the given queue.
For more information about FIFO message groups, see FIFO Message Groups.
For more information about this function, see the PGMQ docs.
Examples
iex> create_fifo_index(Repo, "my_queue")
:ok
@spec create_non_partitioned(Ecto.Repo.t(), EctoPGMQ.Queue.name(), [query_opt()]) :: :ok
Creates an unpartitioned queue with the given name.
For more information about this function, see the PGMQ docs.
Examples
iex> create_non_partitioned(Repo, "my_unpartitioned_queue")
:ok
@spec create_partitioned( Ecto.Repo.t(), EctoPGMQ.Queue.name(), partition_interval(), retention_interval(), [query_opt()] ) :: :ok
Creates a partitioned queue with the given name.
For more information about partitioning, see Partitioning.
For more information about this function, see the PGMQ docs.
Examples
iex> create_partitioned(Repo, "my_partitioned_queue", 10_000, 100_000)
:ok
iex> partition = Duration.new!(hour: 1)
iex> retention = Duration.new!(day: 1)
iex> create_partitioned(Repo, "my_partitioned_queue", partition, retention)
:ok
@spec create_unlogged(Ecto.Repo.t(), EctoPGMQ.Queue.name(), [query_opt()]) :: :ok
Creates an unlogged queue with the given name.
For more information about this function, see the PGMQ docs.
Warning
Unlogged tables benefit from faster write operations but they risk data loss if the Postgres server restarts. Use with caution.
Examples
iex> create_unlogged(Repo, "my_unlogged_queue")
:ok
@spec delete(Ecto.Repo.t(), EctoPGMQ.Queue.name(), [EctoPGMQ.Message.id()], [ query_opt() ]) :: :ok
Deletes the given messages from the given queue.
For more information about this function, see the PGMQ docs.
Examples
iex> message_ids = send_batch(Repo, "my_queue", [%{"foo" => 1}, %{"bar" => 2}])
iex> delete(Repo, "my_queue", message_ids)
:ok
@spec disable_notify_insert(Ecto.Repo.t(), EctoPGMQ.Queue.name(), [query_opt()]) :: :ok
Disables insert notifications for the given queue.
For more information about notifications, see EctoPGMQ.Notifications.
For more information about this function, see the PGMQ docs.
Examples
iex> disable_notify_insert(Repo, "my_queue")
:ok
@spec drop_queue(Ecto.Repo.t(), EctoPGMQ.Queue.name(), [query_opt()]) :: :ok
Drops the given queue.
For more information about this function, see the PGMQ docs.
Examples
iex> drop_queue(Repo, "my_queue")
:ok
@spec enable_notify_insert( Ecto.Repo.t(), EctoPGMQ.Queue.name(), throttle_interval(), [query_opt()] ) :: :ok
Enables insert notifications for the given queue.
For more information about notifications, see EctoPGMQ.Notifications.
For more information about this function, see the PGMQ docs.
Examples
iex> enable_notify_insert(Repo, "my_queue", 1_000)
:ok
@spec list_queues(Ecto.Repo.t(), [query_opt()]) :: [EctoPGMQ.Queue.t()]
Lists all queues.
Because this function naively wraps the corresponding PGMQ function, the
:metrics and :notifications fields in the returned EctoPGMQ.Queue structs
will not be populated.
For more information about this function, see the PGMQ docs.
Examples
iex> queues = list_queues(Repo)
iex> Enum.all?(queues, &is_struct(&1, EctoPGMQ.Queue))
true
@spec metrics_all(Ecto.Repo.t(), [query_opt()]) :: [EctoPGMQ.Metrics.t()]
Returns metrics for all queues.
For more information about this function, see the PGMQ docs.
Examples
iex> metrics = metrics_all(Repo)
iex> Enum.all?(metrics, &is_struct(&1, EctoPGMQ.Metrics))
true
@spec pop(Ecto.Repo.t(), EctoPGMQ.Queue.name(), quantity(), [query_opt()]) :: [ EctoPGMQ.Message.t() ]
Simultaneously fetches and deletes messages from the given queue.
This function does NOT increment the read count of the fetched messages.
For more information about this function, see the PGMQ docs.
Examples
iex> send_batch(Repo, "my_queue", [%{"foo" => 1}])
iex> [message] = pop(Repo, "my_queue", 1)
iex> match?(%EctoPGMQ.Message{reads: 0}, message)
true
@spec purge_queue(Ecto.Repo.t(), EctoPGMQ.Queue.name(), [query_opt()]) :: purged_messages()
Purges all messages from the given queue and returns the number of messages that were deleted.
For more information about this function, see the PGMQ docs.
Examples
iex> send_batch(Repo, "my_queue", [%{"foo" => 1}, %{"bar" => 2}])
iex> purge_queue(Repo, "my_queue")
2
@spec read( Ecto.Repo.t(), EctoPGMQ.Queue.name(), visibility_timeout(), quantity(), conditional(), [ query_opt() ] ) :: [EctoPGMQ.Message.t()]
Reads messages from the given queue.
For more information about this function, see the PGMQ docs.
Examples
iex> send_batch(Repo, "my_queue", [%{"foo" => 1}])
iex> [message] = read(Repo, "my_queue", 5, 1)
iex> match?(%EctoPGMQ.Message{reads: 1}, message)
true
@spec read_grouped( Ecto.Repo.t(), EctoPGMQ.Queue.name(), visibility_timeout(), quantity(), [query_opt()] ) :: [EctoPGMQ.Message.t()]
Reads messages from the given queue while respecting FIFO message groups and optimizing throughput.
For more information about FIFO message groups, see FIFO Message Groups.
For more information about this function, see the PGMQ docs.
Examples
iex> send_batch(Repo, "my_queue", [%{"foo" => 1}])
iex> [message] = read_grouped(Repo, "my_queue", 5, 1)
iex> match?(%EctoPGMQ.Message{reads: 1}, message)
true
@spec read_grouped_rr( Ecto.Repo.t(), EctoPGMQ.Queue.name(), visibility_timeout(), quantity(), [ query_opt() ] ) :: [EctoPGMQ.Message.t()]
Reads messages from the given queue while respecting and round-robin interleaving FIFO message groups.
For more information about FIFO message groups, see FIFO Message Groups.
For more information about this function, see the PGMQ docs.
Examples
iex> send_batch(Repo, "my_queue", [%{"foo" => 1}])
iex> [message] = read_grouped_rr(Repo, "my_queue", 5, 1)
iex> match?(%EctoPGMQ.Message{reads: 1}, message)
true
@spec read_grouped_rr_with_poll( Ecto.Repo.t(), EctoPGMQ.Queue.name(), visibility_timeout(), quantity(), poll_timeout(), poll_interval(), [query_opt()] ) :: [EctoPGMQ.Message.t()]
Reads messages from the given queue with a Postgres server-side poll while respecting and round-robin interleaving FIFO message groups.
For more information about FIFO message groups, see FIFO Message Groups.
For more information about polling, see Polling.
For more information about this function, see the PGMQ docs.
Examples
iex> send_batch(Repo, "my_queue", [%{"foo" => 1}])
iex> [message] = read_grouped_rr_with_poll(Repo, "my_queue", 5, 1, 5, 500)
iex> match?(%EctoPGMQ.Message{reads: 1}, message)
true
@spec read_grouped_with_poll( Ecto.Repo.t(), EctoPGMQ.Queue.name(), visibility_timeout(), quantity(), poll_timeout(), poll_interval(), [query_opt()] ) :: [EctoPGMQ.Message.t()]
Reads messages from the given queue with a Postgres server-side poll while respecting FIFO message groups and optimizing throughput.
For more information about FIFO message groups, see FIFO Message Groups.
For more information about polling, see Polling.
For more information about this function, see the PGMQ docs.
Examples
iex> send_batch(Repo, "my_queue", [%{"foo" => 1}])
iex> [message] = read_grouped_with_poll(Repo, "my_queue", 5, 1, 5, 500)
iex> match?(%EctoPGMQ.Message{reads: 1}, message)
true
@spec read_with_poll( Ecto.Repo.t(), EctoPGMQ.Queue.name(), visibility_timeout(), quantity(), poll_timeout(), poll_interval(), conditional(), [query_opt()] ) :: [EctoPGMQ.Message.t()]
Reads messages from the given queue with a Postgres server-side poll.
For more information about polling, see Polling.
For more information about this function, see the PGMQ docs.
Examples
iex> send_batch(Repo, "my_queue", [%{"foo" => 1}])
iex> [message] = read_with_poll(Repo, "my_queue", 5, 1, 5, 500)
iex> match?(%EctoPGMQ.Message{reads: 1}, message)
true
@spec send_batch( Ecto.Repo.t(), EctoPGMQ.Queue.name(), [EctoPGMQ.Message.payload()], [EctoPGMQ.Message.headers() | nil] | nil, delay(), [query_opt()] ) :: [EctoPGMQ.Message.id()]
Sends the given messages to the given queue.
The headers arg defaults to nil, which is a shorthand for NULL headers
for all messages. If a list is given for the headers arg, the length of the
list must match the length of the given list of messages.
For more information about this function, see the PGMQ docs.
Examples
iex> message_ids = send_batch(Repo, "my_queue", [%{"foo" => 1}, %{"bar" => 2}])
iex> Enum.all?(message_ids, &is_integer/1)
true
iex> delay = DateTime.utc_now()
iex> message_ids = send_batch(Repo, "my_queue", [%{"foo" => 1}, %{"bar" => 2}], nil, delay)
iex> Enum.all?(message_ids, &is_integer/1)
true
@spec set_vt( Ecto.Repo.t(), EctoPGMQ.Queue.name(), [EctoPGMQ.Message.id()], visibility_timeout(), [ query_opt() ] ) :: [EctoPGMQ.Message.t()]
Sets the visibility timeout of the given messages in the given queue.
For more information about this function, see the PGMQ docs.
Examples
iex> message_ids = send_batch(Repo, "my_queue", [%{"foo" => 1}])
iex> [read_message] = read(Repo, "my_queue", 5, 1)
iex> [updated_message] = set_vt(Repo, "my_queue", message_ids, 10)
iex> DateTime.diff(updated_message.visible_at, read_message.visible_at) > 0
true