# `EctoPGMQ`
[🔗](https://github.com/gdwoolbert3/ecto_pgmq/blob/main/lib/ecto_pgmq.ex#L1)

An opinionated PGMQ client for Elixir that builds on top of `Ecto` and the
`Ecto.Adapters.Postgres` adapter.

## PGMQ Installation

Because PGMQ is entirely made up of SQL objects, there are two available
installation methods:

  * Extension Installation - This method installs PGMQ as a traditional
    Postgres extension. This is the preferred installation method but it
    requires access to the Postgres server file system and, therefore, may not
    always be feasible.

  * SQL Installation - This method installs PGMQ by manually creating all of
    the necessary SQL objects and works entirely within the database.

`EctoPGMQ.Migrations` contains helper functions for managing both installation
methods.

For more information about managing the PGMQ extension, see the PGMQ
[Installation](https://github.com/pgmq/pgmq/blob/main/INSTALLATION.md) and
[Updating](https://github.com/pgmq/pgmq/blob/main/pgmq-extension/UPDATING.md)
guides.

## Partitioning

PGMQ supports partitioning both queues and archives.

The [pg_partman extension](https://github.com/pgpartman/pg_partman) must be
available in order to use partitioning.

For more information about partitioning, see the
[PGMQ docs](https://github.com/pgmq/pgmq/tree/main?tab=readme-ov-file#partitioned-queues).

## Polling

PGMQ supports Postgres server-side polling during read operations. Reading
with a poll can be used to reduce network round trips if there is a good
chance that demand can be satisfied in a short time **BUT** doing so utilizes
a connection for the duration of the read operation. As such, polling should
be avoided in situations where the DB connection pool is a bottleneck.

## FIFO Message Groups

While PGMQ queues are FIFO data structures, the order of message processing
can be non-deterministic when there are multiple consumers. This is usually
fine but there is sometimes a need to consume messages strictly in order
within a group. In order to support this, PGMQ exposes a number of functions
that read messages while guaranteeing FIFO ordering for messages with the same
`x-pgmq-group` header.

There are two slightly different methodologies for reading messages while
respecting FIFO message groups: round-robin reading and throughput-optimized
reading.

### Round-Robin Reading

This method will fairly interleave messages from all available groups.

    iex> specs = [{%{}, "A"}, {%{}, "A"}, {%{}, "B"}, {%{}, "B"}, {%{}, "C"}]
    iex> [id_1, id_2, id_3, id_4, id_5] = send_messages(Repo, "my_queue", specs)
    iex> messages = read_messages(Repo, "my_queue", 300, 5, message_grouping: :round_robin)
    iex> Enum.map(messages, & &1.id) == [id_1, id_3, id_5, id_2, id_4]
    true

### Throughput-Optimized Reading

This method will prioritize messages from the same group. As the name implies,
this method will often be more efficient than round-robin reading.

    iex> specs = [{%{}, "A"}, {%{}, "A"}, {%{}, "B"}, {%{}, "B"}, {%{}, "C"}]
    iex> message_ids = send_messages(Repo, "my_queue", specs)
    iex> messages = read_messages(Repo, "my_queue", 300, 5, message_grouping: :throughput_optimized)
    iex> Enum.map(messages, & &1.id) == message_ids
    true

> #### Warning {: .warning}
>
> If message groups are long-lived and high-volume, this method of reading can
> effectively starve later groups. For more information, see
> [Performance Considerations](#performance-considerations).

### Performance Considerations

In general, FIFO message groups are more performant when the following
conditions are met:

  * There are many low-volume groups

  * Messages are removed from the queue relatively quickly

  * The queue is optimized for FIFO message group reads (see
    `EctoPGMQ.create_queue/4` and `EctoPGMQ.update_queue/4`).

### Further Information

For more information about FIFO message groups, see the
[PGMQ docs](https://github.com/pgmq/pgmq/blob/main/docs/fifo-queues.md).

# `delay`

```elixir
@type delay() :: Duration.t() | EctoPGMQ.PGMQ.delay()
```

A delay before a message becomes visible.

This can take any of the following forms:

  * A `t:Duration.t/0` denoting the time to wait before a message becomes
    visible.

  * An `t:integer/0` denoting the time (in seconds) to wait before a message
    becomes visible.

  * A `t:DateTime.t/0` denoting when a message should become visible.

For more information about this type, see `t:EctoPGMQ.PGMQ.delay/0`.

# `message_update_attributes`

```elixir
@type message_update_attributes() :: %{visibility_timeout: delay()}
```

Message update attributes.

The following attributes are supported:

  * `:visibility_timeout` - A required `t:delay/0` for the messages.

# `notification_throttle`

```elixir
@type notification_throttle() :: Duration.t() | EctoPGMQ.PGMQ.throttle_interval()
```

The minimum time between notifications.

This can take either of the following forms:

  * A `t:Duration.t/0` denoting the minimum time between notifications.

  * A `t:non_neg_integer/0` denoting the minimum time (in milliseconds)
    between notifications.

For more information about notifications, see `EctoPGMQ.Notifications`.

For more information about this type, see
`t:EctoPGMQ.PGMQ.throttle_interval/0`.

# `partition_config`

```elixir
@type partition_config() ::
  {EctoPGMQ.PGMQ.partition_interval(), EctoPGMQ.PGMQ.retention_interval()}
```

A queue partition configuration.

A partition configuration is a tuple containing two elements: the partition
interval and the retention interval.

Both elements can take either of the following forms:

  * A `t:Duration.t/0` denoting a time-based interval.

  * A `t:pos_integer/0` denoting a message-based interval.

For more information about partitioning, see [Partitioning](#partitioning).

For more information about this type, see
`t:EctoPGMQ.PGMQ.partition_interval/0` and
`t:EctoPGMQ.PGMQ.retention_interval/0`.

# `poll_config`

```elixir
@type poll_config() ::
  {Duration.t() | EctoPGMQ.PGMQ.poll_interval(),
   Duration.t() | EctoPGMQ.PGMQ.poll_timeout()}
```

A message polling configuration.

A polling configuration is a tuple containing two elements: the poll interval
and the poll timeout.

Both elements can take either of the following forms:

  * A `t:Duration.t/0` denoting a length of time.

  * A `t:pos_integer/0` denoting a length of time. The unit for the poll
    interval is milliseconds and the unit for the timeout is seconds.

For more information about polling, see [Polling](#polling).

For more information about this type, see `t:EctoPGMQ.PGMQ.poll_interval/0`
and `t:EctoPGMQ.PGMQ.poll_timeout/0`.

# `queue_create_attributes`

```elixir
@type queue_create_attributes() :: %{
  optional(:message_groups?) =&gt; boolean(),
  optional(:notifications) =&gt; notification_throttle() | nil,
  optional(:partitions) =&gt; partition_config() | nil,
  optional(:unlogged?) =&gt; boolean()
}
```

Queue creation attributes.

The following attributes are supported:

  * `:message_groups?` - An optional `t:boolean/0` denoting whether or not the
    queue should be optimized for FIFO message group reads. Defaults to
    `false`. For more information about FIFO message groups, see
    [FIFO Message Groups](#fifo-message-groups).

  * `:notifications` - An optional `t:notification_throttle/0` for the queue
    or `nil` to leave notifications disabled. Defaults to `nil`. For more
    information about notifications, see `EctoPGMQ.Notifications`.

  * `:partitions` - An optional `t:partition_config/0` for the queue or
    `nil` to disable partitioning. This option is ignored for unlogged queues.
    Defaults to `nil`. For more information about partitioning, see
    [Partitioning](#partitioning).

  * `:unlogged?` - An optional `t:boolean/0` denoting whether or not the queue
    should be unlogged. Defaults to `false`.

# `queue_update_attributes`

```elixir
@type queue_update_attributes() :: %{
  optional(:message_groups?) =&gt; true,
  optional(:notifications) =&gt; notification_throttle() | nil
}
```

Queue update attributes.

The following attributes are supported:

  * `:message_groups?` - `true` to optimize the queue for FIFO message group
    reads. Note that `true` is the only valid value because this operation
    cannot be undone. For more information about FIFO message groups, see
    [FIFO Message Groups](#fifo-message-groups).

  * `:notifications` - An optional `t:notification_throttle/0` for the queue or
    `nil` to disable notifications. For more information about notifications,
    see `EctoPGMQ.Notifications`.

# `read_messages_opts`

```elixir
@type read_messages_opts() :: [
  {:delete?, boolean()}
  | {:polling, poll_config() | nil}
  | {:message_grouping, :round_robin | :throughput_optimized | nil}
  | EctoPGMQ.PGMQ.query_opt()
]
```

Options for reading messages.

In addition to the standard [query options](`m:EctoPGMQ.PGMQ#query-options`),
messages can be read with the following options:

  * `:delete?` - An optional `t:boolean/0` denoting whether or not to delete
    messages immediately after reading them. Defaults to `false`. For more
    information, see `EctoPGMQ.PGMQ.pop/4`.

  * `:message_grouping` - An optional value specifying how to handle message
    groups when reading messages. Possible values are `:round_robin`,
    `:throughput_optimized`, or `nil` to ignore message groups when reading.
    This option is ignored when deleting on read. Defaults to `nil`. For more
    information about FIFO message groups, see
    [FIFO Message Groups](#fifo-message-groups).

  * `:polling` - An optional `t:poll_config/0` for the read operation or nil
    to disable polling. This option is ignored when deleting on read. Defaults
    to `nil`. For more information about polling, see [Polling](#polling).

# `visibility_timeout`

```elixir
@type visibility_timeout() :: Duration.t() | EctoPGMQ.PGMQ.visibility_timeout()
```

The time from now that a message is invisible.

This can take either of the following forms:

  * A `t:Duration.t/0` denoting how long a message is invisible.

  * An `t:integer/0` denoting how long (in seconds) a message is invisible.

For more information about this type, see
`t:EctoPGMQ.PGMQ.visibility_timeout/0`.

# `archive_messages`

```elixir
@spec archive_messages(
  Ecto.Repo.t(),
  EctoPGMQ.Queue.name(),
  [EctoPGMQ.Message.id()],
  [
    EctoPGMQ.PGMQ.query_opt()
  ]
) :: :ok
```

Archives the given messages from the given queue.

## Options

This function supports the standard
[query options](`m:EctoPGMQ.PGMQ#query-options`).

## Examples

    iex> message_ids = send_messages(Repo, "my_queue", [%{"foo" => 1}, %{"bar" => 2}])
    iex> archive_messages(Repo, "my_queue", message_ids)
    :ok

# `delete_messages`

```elixir
@spec delete_messages(Ecto.Repo.t(), EctoPGMQ.Queue.name(), [EctoPGMQ.Message.id()], [
  EctoPGMQ.PGMQ.query_opt()
]) :: :ok
```

Deletes the given messages from the given queue.

## Options

This function supports the standard
[query options](`m:EctoPGMQ.PGMQ#query-options`).

## Examples

    iex> message_ids = send_messages(Repo, "my_queue", [%{"foo" => 1}, %{"bar" => 2}])
    iex> delete_messages(Repo, "my_queue", message_ids)
    :ok

# `read_messages`

```elixir
@spec read_messages(
  Ecto.Repo.t(),
  EctoPGMQ.Queue.name(),
  visibility_timeout(),
  EctoPGMQ.PGMQ.quantity(),
  [
    {:delete?, boolean()}
    | {:polling, poll_config() | nil}
    | {:message_grouping, :round_robin | :throughput_optimized | nil}
    | EctoPGMQ.PGMQ.query_opt()
  ]
) :: [EctoPGMQ.Message.t()]
```

Reads messages from the given queue.

## Options

See `t:read_messages_opts/0` for information about the options supported by
this function.

## Examples

    iex> send_messages(Repo, "my_queue", [%{"foo" => 1}])
    iex> [message] = read_messages(Repo, "my_queue", 5, 2)
    iex> match?(%EctoPGMQ.Message{reads: 1}, message)
    true

# `send_messages`

```elixir
@spec send_messages(
  Ecto.Repo.t(),
  EctoPGMQ.Queue.name(),
  [EctoPGMQ.Message.specification()],
  [
    {:delay, delay()} | EctoPGMQ.PGMQ.query_opt()
  ]
) :: [EctoPGMQ.Message.id()]
```

Sends messages to the given queue.

## Options

In addition to the standard [query options](`m:EctoPGMQ.PGMQ#query-options`),
this function also supports the following options:

  * `:delay` - An optional `t:delay/0` for the messages. Defaults to `0`.

## Examples

    iex> delay = Duration.new!(hour: 1)
    iex> message_ids = send_messages(Repo, "my_queue", [%{"foo" => 1}, %{"bar" => 2}], delay: delay)
    iex> Enum.all?(message_ids, &is_integer/1)
    true

# `update_messages`

```elixir
@spec update_messages(
  Ecto.Repo.t(),
  EctoPGMQ.Queue.name(),
  [EctoPGMQ.Message.id()],
  message_update_attributes(),
  [EctoPGMQ.PGMQ.query_opt()]
) :: :ok
```

Updates the given messages in the given queue.

## Options

This function supports the standard
[query options](`m:EctoPGMQ.PGMQ#query-options`).

## Examples

    iex> visibility_timeout = Duration.new!(minute: 5)
    iex> message_ids = send_messages(Repo, "my_queue", [%{"foo" => 1}, %{"bar" => 2}])
    iex> update_messages(Repo, "my_queue", message_ids, %{visibility_timeout: visibility_timeout})
    :ok

# `all_queues`

```elixir
@spec all_queues(Ecto.Repo.t(), [EctoPGMQ.PGMQ.query_opt()]) :: [EctoPGMQ.Queue.t()]
```

Lists all queues.

## Options

This function supports the standard
[query options](`m:EctoPGMQ.PGMQ#query-options`).

## Examples

    iex> queues = all_queues(Repo)
    iex> Enum.all?(queues, &is_struct(&1, EctoPGMQ.Queue))
    true

# `create_queue`

```elixir
@spec create_queue(Ecto.Repo.t(), EctoPGMQ.Queue.name(), queue_create_attributes(), [
  EctoPGMQ.PGMQ.query_opt()
]) :: EctoPGMQ.Queue.t()
```

Creates a queue with the given name.

To create a queue in an `Ecto.Migration`, see
`EctoPGMQ.Migrations.create_queue/2`.

## Options

This function supports the standard
[query options](`m:EctoPGMQ.PGMQ#query-options`).

## Examples

    iex> queue = create_queue(Repo, "my_unpartitioned_queue", %{notifications: 1_000})
    iex> match?(%EctoPGMQ.Queue{notifications: %EctoPGMQ.Throttle{}}, queue)
    true

    iex> queue = create_queue(Repo, "my_partitioned_queue", %{partitions: {10_000, 100_000}})
    iex> match?(%EctoPGMQ.Queue{is_partitioned: true}, queue)
    true

    iex> partitions = {Duration.new!(hour: 1), Duration.new!(day: 1)}
    iex> queue = create_queue(Repo, "my_partitioned_queue", %{partitions: partitions})
    iex> match?(%EctoPGMQ.Queue{is_partitioned: true}, queue)
    true

    iex> queue = create_queue(Repo, "my_unlogged_queue", %{unlogged?: true})
    iex> match?(%EctoPGMQ.Queue{is_unlogged: true}, queue)
    true

# `drop_queue`

```elixir
@spec drop_queue(Ecto.Repo.t(), EctoPGMQ.Queue.name(), [EctoPGMQ.PGMQ.query_opt()]) ::
  :ok
```

Drops the given queue.

To drop a queue in an `Ecto.Migration`, see
`EctoPGMQ.Migrations.drop_queue/2`.

## Options

This function supports the standard
[query options](`m:EctoPGMQ.PGMQ#query-options`).

## Examples

    iex> drop_queue(Repo, "my_queue")
    :ok

# `get_queue`

```elixir
@spec get_queue(Ecto.Repo.t(), EctoPGMQ.Queue.name(), [EctoPGMQ.PGMQ.query_opt()]) ::
  EctoPGMQ.Queue.t() | nil
```

Gets the given queue.

## Options

This function supports the standard
[query options](`m:EctoPGMQ.PGMQ#query-options`).

## Examples

    iex> queue = get_queue(Repo, "my_queue")
    iex> match?(%EctoPGMQ.Queue{}, queue)
    true

    iex> get_queue(Repo, "my_non_existent_queue")
    nil

# `purge_queue`

```elixir
@spec purge_queue(Ecto.Repo.t(), EctoPGMQ.Queue.name(), [EctoPGMQ.PGMQ.query_opt()]) ::
  EctoPGMQ.PGMQ.purged_messages()
```

Purges the given queue.

## Options

This function supports the standard
[query options](`m:EctoPGMQ.PGMQ#query-options`).

## Examples

    iex> send_messages(Repo, "my_queue", [%{"foo" => 1}, %{"bar" => 2}])
    iex> purge_queue(Repo, "my_queue")
    2

# `update_queue`

```elixir
@spec update_queue(Ecto.Repo.t(), EctoPGMQ.Queue.name(), queue_create_attributes(), [
  EctoPGMQ.PGMQ.query_opt()
]) :: EctoPGMQ.Queue.t()
```

Updates the given queue.

> #### Warning {: .warning}
>
> Because the underlying tables are owned by PGMQ, this function avoids row
> locks so as not to disturb any internal PGMQ processes. As a result, if
> multiple processes attempt to update a queue simultaneously, they may get
> unexpected results.

To update a queue in an `Ecto.Migration`, see
`EctoPGMQ.Migrations.update_queue/2`.

## Options

This function supports the standard
[query options](`m:EctoPGMQ.PGMQ#query-options`).

## Examples

    iex> throttle = Duration.new!(second: 5)
    iex> queue = update_queue(Repo, "my_queue", %{notifications: throttle})
    iex> match?(%EctoPGMQ.Queue{notifications: %EctoPGMQ.Throttle{}}, queue)
    true

---

*Consult [api-reference.md](api-reference.md) for complete listing*
