EctoPGMQ (ecto_pgmq v1.0.0)

Copy Markdown View Source

An opinionated PGMQ client for Elixir that builds on top of Ecto and the Ecto.Adapters.Postgres adapter.

PGMQ Installation

Because PGMQ is entirely made up of SQL objects, there are two available installation methods:

  • Extension Installation - This method installs PGMQ as a traditional Postgres extension. This is the preferred installation method but it requires access to the Postgres server file system and, therefore, may not always be feasible.

  • SQL Installation - This method installs PGMQ by manually creating all of the necessary SQL objects and works entirely within the database.

EctoPGMQ.Migrations contains helper functions for managing both installation methods.

For more information about managing the PGMQ extension, see the PGMQ Installation and Updating guides.

Partitioning

PGMQ supports partitioning both queues and archives.

The pg_partman extension must be available in order to use partitioning.

For more information about partitioning, see the PGMQ docs.

Polling

PGMQ supports Postgres server-side polling during read operations. Reading with a poll can be used to reduce network round trips if there is a good chance that demand can be satisfied in a short time BUT doing so utilizes a connection for the duration of the read operation. As such, polling should be avoided in situations where the DB connection pool is a bottleneck.

FIFO Message Groups

While PGMQ queues are FIFO data structures, the order of message processing can be non-deterministic when there are multiple consumers. This is usually fine but there is sometimes a need to consume messages strictly in order within a group. In order to support this, PGMQ exposes a number of functions that read messages while guaranteeing FIFO ordering for messages with the same x-pgmq-group header.

There are two slightly different methodologies for reading messages while respecting FIFO message groups: round-robin reading and throughput-optimized reading.

Round-Robin Reading

This method will fairly interleave messages from all available groups.

iex> specs = [{%{}, "A"}, {%{}, "A"}, {%{}, "B"}, {%{}, "B"}, {%{}, "C"}]
iex> [id_1, id_2, id_3, id_4, id_5] = send_messages(Repo, "my_queue", specs)
iex> messages = read_messages(Repo, "my_queue", 300, 5, message_grouping: :round_robin)
iex> Enum.map(messages, & &1.id) == [id_1, id_3, id_5, id_2, id_4]
true

Throughput-Optimized Reading

This method will prioritize messages from the same group. As the name implies, this method will often be more efficient than round-robin reading.

iex> specs = [{%{}, "A"}, {%{}, "A"}, {%{}, "B"}, {%{}, "B"}, {%{}, "C"}]
iex> message_ids = send_messages(Repo, "my_queue", specs)
iex> messages = read_messages(Repo, "my_queue", 300, 5, message_grouping: :throughput_optimized)
iex> Enum.map(messages, & &1.id) == message_ids
true

Warning

If message groups are long-lived and high-volume, this method of reading can effectively starve later groups. For more information, see Performance Considerations.

Performance Considerations

In general, FIFO message groups are more performant when the following conditions are met:

Further Information

For more information about FIFO message groups, see the PGMQ docs.

Summary

Types

A delay before a message becomes visible.

Message update attributes.

The minimum time between notifications.

A queue partition configuration.

A message polling configuration.

Queue creation attributes.

Queue update attributes.

Options for reading messages.

The time from now that a message is invisible.

Message API

Archives the given messages from the given queue.

Deletes the given messages from the given queue.

Sends messages to the given queue.

Updates the given messages in the given queue.

Types

delay()

@type delay() :: Duration.t() | EctoPGMQ.PGMQ.delay()

A delay before a message becomes visible.

This can take any of the following forms:

  • A Duration.t/0 denoting the time to wait before a message becomes visible.

  • An integer/0 denoting the time (in seconds) to wait before a message becomes visible.

  • A DateTime.t/0 denoting when a message should become visible.

For more information about this type, see EctoPGMQ.PGMQ.delay/0.

message_update_attributes()

@type message_update_attributes() :: %{visibility_timeout: visibility_timeout()}

Message update attributes.

The following attributes are supported:

notification_throttle()

@type notification_throttle() :: Duration.t() | EctoPGMQ.PGMQ.throttle_interval()

The minimum time between notifications.

This can take either of the following forms:

  • A Duration.t/0 denoting the minimum time between notifications.

  • A non_neg_integer/0 denoting the minimum time (in milliseconds) between notifications.

For more information about notifications, see EctoPGMQ.Notifications.

For more information about this type, see EctoPGMQ.PGMQ.throttle_interval/0.

partition_config()

A queue partition configuration.

A partition configuration is a tuple containing two elements: the partition interval and the retention interval.

Both elements can take either of the following forms:

For more information about partitioning, see Partitioning.

For more information about this type, see EctoPGMQ.PGMQ.partition_interval/0 and EctoPGMQ.PGMQ.retention_interval/0.

poll_config()

A message polling configuration.

A polling configuration is a tuple containing two elements: the poll interval and the poll timeout.

Both elements can take either of the following forms:

  • A Duration.t/0 denoting a length of time.

  • A pos_integer/0 denoting a length of time. The unit for the poll interval is milliseconds and the unit for the timeout is seconds.

For more information about polling, see Polling.

For more information about this type, see EctoPGMQ.PGMQ.poll_interval/0 and EctoPGMQ.PGMQ.poll_timeout/0.

queue_create_attributes()

@type queue_create_attributes() :: %{
  optional(:message_groups?) => boolean(),
  optional(:notifications) => notification_throttle() | nil,
  optional(:partitions) => partition_config() | nil,
  optional(:unlogged?) => boolean()
}

Queue creation attributes.

The following attributes are supported:

  • :message_groups? - An optional boolean/0 denoting whether or not the queue should be optimized for FIFO message group reads. Defaults to false. For more information about FIFO message groups, see FIFO Message Groups.

  • :notifications - An optional notification_throttle/0 for the queue or nil to leave notifications disabled. Defaults to nil. For more information about notifications, see EctoPGMQ.Notifications.

  • :partitions - An optional partition_config/0 for the queue or nil to disable partitioning. This option is ignored for unlogged queues. Defaults to nil. For more information about partitioning, see Partitioning.

  • :unlogged? - An optional boolean/0 denoting whether or not the queue should be unlogged. Defaults to false.

queue_update_attributes()

@type queue_update_attributes() :: %{
  optional(:message_groups?) => true,
  optional(:notifications) => notification_throttle() | nil
}

Queue update attributes.

The following attributes are supported:

  • :message_groups? - true to optimize the queue for FIFO message group reads. Note that true is the only valid value because this operation cannot be undone. For more information about FIFO message groups, see FIFO Message Groups.

  • :notifications - An optional notification_throttle/0 for the queue or nil to disable notifications. For more information about notifications, see EctoPGMQ.Notifications.

read_messages_opts()

@type read_messages_opts() :: [
  {:delete?, boolean()}
  | {:polling, poll_config() | nil}
  | {:message_grouping, :round_robin | :throughput_optimized | nil}
  | EctoPGMQ.PGMQ.query_opt()
]

Options for reading messages.

In addition to the standard query options, messages can be read with the following options:

  • :delete? - An optional boolean/0 denoting whether or not to delete messages immediately after reading them. Defaults to false. For more information, see EctoPGMQ.PGMQ.pop/4.

  • :message_grouping - An optional value specifying how to handle message groups when reading messages. Possible values are :round_robin, :throughput_optimized, or nil to ignore message groups when reading. This option is ignored when deleting on read. Defaults to nil. For more information about FIFO message groups, see FIFO Message Groups.

  • :polling - An optional poll_config/0 for the read operation or nil to disable polling. This option is ignored when deleting on read. Defaults to nil. For more information about polling, see Polling.

visibility_timeout()

@type visibility_timeout() :: Duration.t() | EctoPGMQ.PGMQ.visibility_timeout()

The time from now that a message is invisible.

This can take either of the following forms:

  • A Duration.t/0 denoting how long a message is invisible.

  • An integer/0 denoting how long (in seconds) a message is invisible.

For more information about this type, see EctoPGMQ.PGMQ.visibility_timeout/0.

Message API

archive_messages(repo, queue, message_ids, opts \\ [])

@spec archive_messages(
  Ecto.Repo.t(),
  EctoPGMQ.Queue.name(),
  [EctoPGMQ.Message.id()],
  [
    EctoPGMQ.PGMQ.query_opt()
  ]
) :: :ok

Archives the given messages from the given queue.

Options

This function supports the standard query options.

Examples

iex> message_ids = send_messages(Repo, "my_queue", [%{"foo" => 1}, %{"bar" => 2}])
iex> archive_messages(Repo, "my_queue", message_ids)
:ok

delete_messages(repo, queue, message_ids, opts \\ [])

@spec delete_messages(Ecto.Repo.t(), EctoPGMQ.Queue.name(), [EctoPGMQ.Message.id()], [
  EctoPGMQ.PGMQ.query_opt()
]) :: :ok

Deletes the given messages from the given queue.

Options

This function supports the standard query options.

Examples

iex> message_ids = send_messages(Repo, "my_queue", [%{"foo" => 1}, %{"bar" => 2}])
iex> delete_messages(Repo, "my_queue", message_ids)
:ok

read_messages(repo, queue, visibility_timeout, quantity, opts \\ [])

@spec read_messages(
  Ecto.Repo.t(),
  EctoPGMQ.Queue.name(),
  visibility_timeout(),
  EctoPGMQ.PGMQ.quantity(),
  [
    {:delete?, boolean()}
    | {:polling, poll_config() | nil}
    | {:message_grouping, :round_robin | :throughput_optimized | nil}
    | EctoPGMQ.PGMQ.query_opt()
  ]
) :: [EctoPGMQ.Message.t()]

Reads messages from the given queue.

Options

See read_messages_opts/0 for information about the options supported by this function.

Examples

iex> send_messages(Repo, "my_queue", [%{"foo" => 1}])
iex> [message] = read_messages(Repo, "my_queue", 5, 2)
iex> match?(%EctoPGMQ.Message{reads: 1}, message)
true

send_messages(repo, queue, messages, opts \\ [])

Sends messages to the given queue.

Options

In addition to the standard query options, this function also supports the following options:

  • :delay - An optional delay/0 for the messages. Defaults to 0.

Examples

iex> delay = Duration.new!(hour: 1)
iex> message_ids = send_messages(Repo, "my_queue", [%{"foo" => 1}, %{"bar" => 2}], delay: delay)
iex> Enum.all?(message_ids, &is_integer/1)
true

update_messages(repo, queue, message_ids, map, opts \\ [])

Updates the given messages in the given queue.

Options

This function supports the standard query options.

Examples

iex> visibility_timeout = Duration.new!(minute: 5)
iex> message_ids = send_messages(Repo, "my_queue", [%{"foo" => 1}, %{"bar" => 2}])
iex> update_messages(Repo, "my_queue", message_ids, %{visibility_timeout: visibility_timeout})
:ok

Queue API

all_queues(repo, opts \\ [])

@spec all_queues(Ecto.Repo.t(), [EctoPGMQ.PGMQ.query_opt()]) :: [EctoPGMQ.Queue.t()]

Lists all queues.

Options

This function supports the standard query options.

Examples

iex> queues = all_queues(Repo)
iex> Enum.all?(queues, &is_struct(&1, EctoPGMQ.Queue))
true

create_queue(repo, queue, attributes \\ %{}, opts \\ [])

Creates a queue with the given name.

To create a queue in an Ecto.Migration, see EctoPGMQ.Migrations.create_queue/2.

Options

This function supports the standard query options.

Examples

iex> queue = create_queue(Repo, "my_unpartitioned_queue", %{notifications: 1_000})
iex> match?(%EctoPGMQ.Queue{notifications: %EctoPGMQ.Throttle{}}, queue)
true

iex> queue = create_queue(Repo, "my_partitioned_queue", %{partitions: {10_000, 100_000}})
iex> match?(%EctoPGMQ.Queue{is_partitioned: true}, queue)
true

iex> partitions = {Duration.new!(hour: 1), Duration.new!(day: 1)}
iex> queue = create_queue(Repo, "my_partitioned_queue", %{partitions: partitions})
iex> match?(%EctoPGMQ.Queue{is_partitioned: true}, queue)
true

iex> queue = create_queue(Repo, "my_unlogged_queue", %{unlogged?: true})
iex> match?(%EctoPGMQ.Queue{is_unlogged: true}, queue)
true

drop_queue(repo, queue, opts \\ [])

@spec drop_queue(Ecto.Repo.t(), EctoPGMQ.Queue.name(), [EctoPGMQ.PGMQ.query_opt()]) ::
  :ok

Drops the given queue.

To drop a queue in an Ecto.Migration, see EctoPGMQ.Migrations.drop_queue/2.

Options

This function supports the standard query options.

Examples

iex> drop_queue(Repo, "my_queue")
:ok

get_queue(repo, queue, opts \\ [])

Gets the given queue.

Options

This function supports the standard query options.

Examples

iex> queue = get_queue(Repo, "my_queue")
iex> match?(%EctoPGMQ.Queue{}, queue)
true

iex> get_queue(Repo, "my_non_existent_queue")
nil

purge_queue(repo, queue, opts \\ [])

Purges the given queue.

Options

This function supports the standard query options.

Examples

iex> send_messages(Repo, "my_queue", [%{"foo" => 1}, %{"bar" => 2}])
iex> purge_queue(Repo, "my_queue")
2

update_queue(repo, queue, attributes, opts \\ [])

Updates the given queue.

Warning

Because the underlying tables are owned by PGMQ, this function avoids row locks so as not to disturb any internal PGMQ processes. As a result, if multiple processes attempt to update a queue simultaneously, they may get unexpected results.

To update a queue in an Ecto.Migration, see EctoPGMQ.Migrations.update_queue/2.

Options

This function supports the standard query options.

Examples

iex> throttle = Duration.new!(second: 5)
iex> queue = update_queue(Repo, "my_queue", %{notifications: throttle})
iex> match?(%EctoPGMQ.Queue{notifications: %EctoPGMQ.Throttle{}}, queue)
true