EctoPGMQ.PGMQ (ecto_pgmq v1.0.0)

Copy Markdown View Source

An SDK that fully covers the PGMQ API-space.

Instead of implementing a corresponding function for every alias and parameterization supported by PGMQ, this module relies on client-side defaults to implement a single function for each distinct piece of PGMQ functionality.

Query Options

All of the functions in this module support a common set of query options.

For a detailed description of these options, see query_opt/0.

Summary

Types

Filter conditions to be applied when reading messages from a queue.

A delay before a message becomes visible.

The number of partitions to create preemptively.

The interval at which new partitions should be created.

The time (in milliseconds) to wait between polls.

The maximum time (in seconds) to poll for messages.

The number of purged messages.

The maximum number of messages to read.

A query configuration option.

The interval at which old partitions should be dropped.

The minimum time (in milliseconds) between notifications.

The time from now (in seconds) that a message is invisible.

Functions

Archives the given messages in the given queue.

Creates an index to optimize FIFO message group read performance for the given queue.

Creates an unpartitioned queue with the given name.

Creates an unlogged queue with the given name.

Deletes the given messages from the given queue.

Disables insert notifications for the given queue.

Drops the given queue.

Enables insert notifications for the given queue.

Lists all queues.

Returns metrics for all queues.

Simultaneously fetches and deletes messages from the given queue.

Purges all messages from the given queue and returns the number of messages that were deleted.

Reads messages from the given queue while respecting FIFO message groups and optimizing throughput.

Reads messages from the given queue while respecting and round-robin interleaving FIFO message groups.

Reads messages from the given queue with a Postgres server-side poll while respecting and round-robin interleaving FIFO message groups.

Reads messages from the given queue with a Postgres server-side poll while respecting FIFO message groups and optimizing throughput.

Sets the visibility timeout of the given messages in the given queue.

Types

conditional()

@type conditional() :: %{optional(String.t()) => term()}

Filter conditions to be applied when reading messages from a queue.

Note that the filter conditions are applied to the message body, not the headers.

Warning

As stated in the PGMQ docs, conditional message reading is an experimental feature and the API might be subject to change in future releases.

delay()

@type delay() :: non_neg_integer() | DateTime.t()

A delay before a message becomes visible.

This can take either of the following forms:

  • An integer/0 denoting the time (in seconds) to wait before a message becomes visible.

  • A DateTime.t/0 denoting when a message should become visible.

leading_partitions()

@type leading_partitions() :: non_neg_integer()

The number of partitions to create preemptively.

partition_interval()

@type partition_interval() :: pos_integer() | Duration.t()

The interval at which new partitions should be created.

This can take either of the following forms:

poll_interval()

@type poll_interval() :: pos_integer()

The time (in milliseconds) to wait between polls.

poll_timeout()

@type poll_timeout() :: pos_integer()

The maximum time (in seconds) to poll for messages.

purged_messages()

@type purged_messages() :: non_neg_integer()

The number of purged messages.

quantity()

@type quantity() :: pos_integer()

The maximum number of messages to read.

query_opt()

@type query_opt() :: {:log, boolean()} | {:timeout, timeout()}

A query configuration option.

The following query configuration options are supported:

  • :log - A boolean/0 denoting whether or to log the query. Defaults to true.

  • :timeout - A timeout/0 for the query (in milliseconds). Defaults to 15_000.

retention_interval()

@type retention_interval() :: pos_integer() | Duration.t()

The interval at which old partitions should be dropped.

This can take either of the following forms:

throttle_interval()

@type throttle_interval() :: non_neg_integer()

The minimum time (in milliseconds) between notifications.

A throttle interval of 0 effectively disables notification throttling.

For more information about notification throttling, see Throttling.

visibility_timeout()

@type visibility_timeout() :: integer()

The time from now (in seconds) that a message is invisible.

Functions

archive(repo, queue, message_ids, opts \\ [])

@spec archive(Ecto.Repo.t(), EctoPGMQ.Queue.name(), [EctoPGMQ.Message.id()], [
  query_opt()
]) :: :ok

Archives the given messages in the given queue.

For more information about this function, see the PGMQ docs.

Examples

iex> message_ids = send_batch(Repo, "my_queue", [%{"foo" => 1}, %{"bar" => 2}])
iex> archive(Repo, "my_queue", message_ids)
:ok

convert_archive_partitioned(repo, queue, partition_interval \\ 10000, retention_interval \\ 100_000, leading_partitions \\ 10, opts \\ [])

@spec convert_archive_partitioned(
  Ecto.Repo.t(),
  EctoPGMQ.Queue.name(),
  partition_interval(),
  retention_interval(),
  leading_partitions(),
  [query_opt()]
) :: :ok

Converts the archive for the given queue into a partitioned table.

Warning

This function postfixes the old archive table name with _old and leaves its contents untouched. Additional cleanup (table deletion, message movement, etc.) is left to the user.

For more information about partitioning, see Partitioning.

For more information about this function, see the PGMQ docs.

Examples

iex> convert_archive_partitioned(Repo, "my_queue", 10_000, 100_000, 10)
:ok

iex> partition = Duration.new!(hour: 1)
iex> retention = Duration.new!(day: 1)
iex> convert_archive_partitioned(Repo, "my_queue", partition, retention, 10)
:ok

create_fifo_index(repo, queue, opts \\ [])

@spec create_fifo_index(Ecto.Repo.t(), EctoPGMQ.Queue.name(), [query_opt()]) :: :ok

Creates an index to optimize FIFO message group read performance for the given queue.

For more information about FIFO message groups, see FIFO Message Groups.

For more information about this function, see the PGMQ docs.

Examples

iex> create_fifo_index(Repo, "my_queue")
:ok

create_non_partitioned(repo, queue, opts \\ [])

@spec create_non_partitioned(Ecto.Repo.t(), EctoPGMQ.Queue.name(), [query_opt()]) ::
  :ok

Creates an unpartitioned queue with the given name.

For more information about this function, see the PGMQ docs.

Examples

iex> create_non_partitioned(Repo, "my_unpartitioned_queue")
:ok

create_partitioned(repo, queue, partition_interval \\ 10000, retention_interval \\ 100_000, opts \\ [])

@spec create_partitioned(
  Ecto.Repo.t(),
  EctoPGMQ.Queue.name(),
  partition_interval(),
  retention_interval(),
  [query_opt()]
) :: :ok

Creates a partitioned queue with the given name.

For more information about partitioning, see Partitioning.

For more information about this function, see the PGMQ docs.

Examples

iex> create_partitioned(Repo, "my_partitioned_queue", 10_000, 100_000)
:ok

iex> partition = Duration.new!(hour: 1)
iex> retention = Duration.new!(day: 1)
iex> create_partitioned(Repo, "my_partitioned_queue", partition, retention)
:ok

create_unlogged(repo, queue, opts \\ [])

@spec create_unlogged(Ecto.Repo.t(), EctoPGMQ.Queue.name(), [query_opt()]) :: :ok

Creates an unlogged queue with the given name.

For more information about this function, see the PGMQ docs.

Warning

Unlogged tables benefit from faster write operations but they risk data loss if the Postgres server restarts. Use with caution.

Examples

iex> create_unlogged(Repo, "my_unlogged_queue")
:ok

delete(repo, queue, message_ids, opts \\ [])

@spec delete(Ecto.Repo.t(), EctoPGMQ.Queue.name(), [EctoPGMQ.Message.id()], [
  query_opt()
]) :: :ok

Deletes the given messages from the given queue.

For more information about this function, see the PGMQ docs.

Examples

iex> message_ids = send_batch(Repo, "my_queue", [%{"foo" => 1}, %{"bar" => 2}])
iex> delete(Repo, "my_queue", message_ids)
:ok

disable_notify_insert(repo, queue, opts \\ [])

@spec disable_notify_insert(Ecto.Repo.t(), EctoPGMQ.Queue.name(), [query_opt()]) ::
  :ok

Disables insert notifications for the given queue.

For more information about notifications, see EctoPGMQ.Notifications.

For more information about this function, see the PGMQ docs.

Examples

iex> disable_notify_insert(Repo, "my_queue")
:ok

drop_queue(repo, queue, opts \\ [])

@spec drop_queue(Ecto.Repo.t(), EctoPGMQ.Queue.name(), [query_opt()]) :: :ok

Drops the given queue.

For more information about this function, see the PGMQ docs.

Examples

iex> drop_queue(Repo, "my_queue")
:ok

enable_notify_insert(repo, queue, throttle_interval \\ 250, opts \\ [])

@spec enable_notify_insert(
  Ecto.Repo.t(),
  EctoPGMQ.Queue.name(),
  throttle_interval(),
  [query_opt()]
) ::
  :ok

Enables insert notifications for the given queue.

For more information about notifications, see EctoPGMQ.Notifications.

For more information about this function, see the PGMQ docs.

Examples

iex> enable_notify_insert(Repo, "my_queue", 1_000)
:ok

list_queues(repo, opts \\ [])

@spec list_queues(Ecto.Repo.t(), [query_opt()]) :: [EctoPGMQ.Queue.t()]

Lists all queues.

Because this function naively wraps the corresponding PGMQ function, the :metrics and :notifications fields in the returned EctoPGMQ.Queue structs will not be populated.

For more information about this function, see the PGMQ docs.

Examples

iex> queues = list_queues(Repo)
iex> Enum.all?(queues, &is_struct(&1, EctoPGMQ.Queue))
true

metrics_all(repo, opts \\ [])

@spec metrics_all(Ecto.Repo.t(), [query_opt()]) :: [EctoPGMQ.Metrics.t()]

Returns metrics for all queues.

For more information about this function, see the PGMQ docs.

Examples

iex> metrics = metrics_all(Repo)
iex> Enum.all?(metrics, &is_struct(&1, EctoPGMQ.Metrics))
true

pop(repo, queue, quantity, opts \\ [])

Simultaneously fetches and deletes messages from the given queue.

This function does NOT increment the read count of the fetched messages.

For more information about this function, see the PGMQ docs.

Examples

iex> send_batch(Repo, "my_queue", [%{"foo" => 1}])
iex> [message] = pop(Repo, "my_queue", 1)
iex> match?(%EctoPGMQ.Message{reads: 0}, message)
true

purge_queue(repo, queue, opts \\ [])

@spec purge_queue(Ecto.Repo.t(), EctoPGMQ.Queue.name(), [query_opt()]) ::
  purged_messages()

Purges all messages from the given queue and returns the number of messages that were deleted.

For more information about this function, see the PGMQ docs.

Examples

iex> send_batch(Repo, "my_queue", [%{"foo" => 1}, %{"bar" => 2}])
iex> purge_queue(Repo, "my_queue")
2

read(repo, queue, visibility_timeout, quantity, conditional \\ %{}, opts \\ [])

Reads messages from the given queue.

For more information about this function, see the PGMQ docs.

Examples

iex> send_batch(Repo, "my_queue", [%{"foo" => 1}])
iex> [message] = read(Repo, "my_queue", 5, 1)
iex> match?(%EctoPGMQ.Message{reads: 1}, message)
true

read_grouped(repo, queue, visibility_timeout, quantity, opts \\ [])

Reads messages from the given queue while respecting FIFO message groups and optimizing throughput.

For more information about FIFO message groups, see FIFO Message Groups.

For more information about this function, see the PGMQ docs.

Examples

iex> send_batch(Repo, "my_queue", [%{"foo" => 1}])
iex> [message] = read_grouped(Repo, "my_queue", 5, 1)
iex> match?(%EctoPGMQ.Message{reads: 1}, message)
true

read_grouped_rr(repo, queue, visibility_timeout, quantity, opts \\ [])

@spec read_grouped_rr(
  Ecto.Repo.t(),
  EctoPGMQ.Queue.name(),
  visibility_timeout(),
  quantity(),
  [
    query_opt()
  ]
) :: [EctoPGMQ.Message.t()]

Reads messages from the given queue while respecting and round-robin interleaving FIFO message groups.

For more information about FIFO message groups, see FIFO Message Groups.

For more information about this function, see the PGMQ docs.

Examples

iex> send_batch(Repo, "my_queue", [%{"foo" => 1}])
iex> [message] = read_grouped_rr(Repo, "my_queue", 5, 1)
iex> match?(%EctoPGMQ.Message{reads: 1}, message)
true

read_grouped_rr_with_poll(repo, queue, visibility_timeout, quantity, poll_timeout \\ 5, poll_interval \\ 100, opts \\ [])

@spec read_grouped_rr_with_poll(
  Ecto.Repo.t(),
  EctoPGMQ.Queue.name(),
  visibility_timeout(),
  quantity(),
  poll_timeout(),
  poll_interval(),
  [query_opt()]
) :: [EctoPGMQ.Message.t()]

Reads messages from the given queue with a Postgres server-side poll while respecting and round-robin interleaving FIFO message groups.

For more information about FIFO message groups, see FIFO Message Groups.

For more information about polling, see Polling.

For more information about this function, see the PGMQ docs.

Examples

iex> send_batch(Repo, "my_queue", [%{"foo" => 1}])
iex> [message] = read_grouped_rr_with_poll(Repo, "my_queue", 5, 1, 5, 500)
iex> match?(%EctoPGMQ.Message{reads: 1}, message)
true

read_grouped_with_poll(repo, queue, visibility_timeout, quantity, poll_timeout \\ 5, poll_interval \\ 100, opts \\ [])

Reads messages from the given queue with a Postgres server-side poll while respecting FIFO message groups and optimizing throughput.

For more information about FIFO message groups, see FIFO Message Groups.

For more information about polling, see Polling.

For more information about this function, see the PGMQ docs.

Examples

iex> send_batch(Repo, "my_queue", [%{"foo" => 1}])
iex> [message] = read_grouped_with_poll(Repo, "my_queue", 5, 1, 5, 500)
iex> match?(%EctoPGMQ.Message{reads: 1}, message)
true

read_with_poll(repo, queue, visibility_timeout, quantity, poll_timeout \\ 5, poll_interval \\ 100, conditional \\ %{}, opts \\ [])

Reads messages from the given queue with a Postgres server-side poll.

For more information about polling, see Polling.

For more information about this function, see the PGMQ docs.

Examples

iex> send_batch(Repo, "my_queue", [%{"foo" => 1}])
iex> [message] = read_with_poll(Repo, "my_queue", 5, 1, 5, 500)
iex> match?(%EctoPGMQ.Message{reads: 1}, message)
true

send_batch(repo, queue, payloads, headers \\ nil, delay \\ 0, opts \\ [])

Sends the given messages to the given queue.

The headers arg defaults to nil, which is a shorthand for NULL headers for all messages. If a list is given for the headers arg, the length of the list must match the length of the given list of messages.

For more information about this function, see the PGMQ docs.

Examples

iex> message_ids = send_batch(Repo, "my_queue", [%{"foo" => 1}, %{"bar" => 2}])
iex> Enum.all?(message_ids, &is_integer/1)
true

iex> delay = DateTime.utc_now()
iex> message_ids = send_batch(Repo, "my_queue", [%{"foo" => 1}, %{"bar" => 2}], nil, delay)
iex> Enum.all?(message_ids, &is_integer/1)
true

set_vt(repo, queue, message_ids, visibility_timeout, opts \\ [])

Sets the visibility timeout of the given messages in the given queue.

For more information about this function, see the PGMQ docs.

Examples

iex> message_ids = send_batch(Repo, "my_queue", [%{"foo" => 1}])
iex> [read_message] = read(Repo, "my_queue", 5, 1)
iex> [updated_message] = set_vt(Repo, "my_queue", message_ids, 10)
iex> DateTime.diff(updated_message.visible_at, read_message.visible_at) > 0
true