# `EctoPGMQ.PGMQ`
[🔗](https://github.com/gdwoolbert3/ecto_pgmq/blob/main/lib/ecto_pgmq/pgmq.ex#L1)

An SDK that fully covers the
[PGMQ API-space](https://github.com/pgmq/pgmq/blob/main/docs/api/sql/functions.md).

Instead of implementing a corresponding function for every alias and
parameterization supported by PGMQ, this module relies on client-side defaults
to implement a single function for each distinct piece of PGMQ functionality.

## Query Options

All of the functions in this module support a common set of query options.

For a detailed description of these options, see `t:query_opt/0`.

# `conditional`

```elixir
@type conditional() :: %{optional(String.t()) =&gt; term()}
```

Filter conditions to be applied when reading messages from a queue.

Note that the filter conditions are applied to the message body, not the
headers.

> #### Warning {: .warning}
>
> As stated in the
> [PGMQ docs](https://github.com/pgmq/pgmq/blob/main/docs/api/sql/functions.md#reading-messages),
> conditional message reading is an experimental feature and the API might be
> subject to change in future releases.

# `delay`

```elixir
@type delay() :: non_neg_integer() | DateTime.t()
```

A delay before a message becomes visible.

This can take either of the following forms:

  * An `t:integer/0` denoting the time (in seconds) to wait before a message
    becomes visible.

  * A `t:DateTime.t/0` denoting when a message should become visible.

# `leading_partitions`

```elixir
@type leading_partitions() :: non_neg_integer()
```

The number of partitions to create preemptively.

# `partition_interval`

```elixir
@type partition_interval() :: pos_integer() | Duration.t()
```

The interval at which new partitions should be created.

This can take either of the following forms:

  * A `t:pos_integer/0` denoting how many messages per partition.

  * A `t:Duration.t/0` denoting a time range per partition.

# `poll_interval`

```elixir
@type poll_interval() :: pos_integer()
```

The time (in milliseconds) to wait between polls.

# `poll_timeout`

```elixir
@type poll_timeout() :: pos_integer()
```

The maximum time (in seconds) to poll for messages.

# `purged_messages`

```elixir
@type purged_messages() :: non_neg_integer()
```

The number of purged messages.

# `quantity`

```elixir
@type quantity() :: pos_integer()
```

The maximum number of messages to read.

# `query_opt`

```elixir
@type query_opt() :: {:log, boolean()} | {:timeout, timeout()}
```

A query configuration option.

The following query configuration options are supported:

  * `:log` - A `t:boolean/0` denoting whether or to log the query. Defaults to
  `true`.

  * `:timeout` - A `t:timeout/0` for the query (in milliseconds). Defaults to
  `15_000`.

# `retention_interval`

```elixir
@type retention_interval() :: pos_integer() | Duration.t()
```

The interval at which old partitions should be dropped.

This can take either of the following forms:

  * A `t:pos_integer/0` denoting how many messages to retain in total.

  * A `t:Duration.t/0` denoting a total time range to retain.

# `throttle_interval`

```elixir
@type throttle_interval() :: non_neg_integer()
```

The minimum time (in milliseconds) between notifications.

A throttle interval of `0` effectively disables notification throttling.

For more information about notification throttling, see
[Throttling](m:EctoPGMQ.Notifications#throttling).

# `visibility_timeout`

```elixir
@type visibility_timeout() :: integer()
```

The time from now (in seconds) that a message is invisible.

# `archive`

```elixir
@spec archive(Ecto.Repo.t(), EctoPGMQ.Queue.name(), [EctoPGMQ.Message.id()], [
  query_opt()
]) :: :ok
```

Archives the given messages in the given queue.

For more information about this function, see the
[PGMQ docs](https://github.com/pgmq/pgmq/blob/main/docs/api/sql/functions.md#archive-batch).

## Examples

    iex> message_ids = send_batch(Repo, "my_queue", [%{"foo" => 1}, %{"bar" => 2}])
    iex> archive(Repo, "my_queue", message_ids)
    :ok

# `convert_archive_partitioned`

```elixir
@spec convert_archive_partitioned(
  Ecto.Repo.t(),
  EctoPGMQ.Queue.name(),
  partition_interval(),
  retention_interval(),
  leading_partitions(),
  [query_opt()]
) :: :ok
```

Converts the archive for the given queue into a partitioned table.

> #### Warning {: .warning}
>
> This function postfixes the old archive table name with `_old` and leaves
> its contents untouched. Additional cleanup (table deletion, message
> movement, etc.) is left to the user.

For more information about partitioning, see
[Partitioning](`m:EctoPGMQ#partitioning`).

For more information about this function, see the
[PGMQ docs](https://github.com/pgmq/pgmq/blob/main/docs/api/sql/functions.md#convert_archive_partitioned).

## Examples

    iex> convert_archive_partitioned(Repo, "my_queue", 10_000, 100_000, 10)
    :ok

    iex> partition = Duration.new!(hour: 1)
    iex> retention = Duration.new!(day: 1)
    iex> convert_archive_partitioned(Repo, "my_queue", partition, retention, 10)
    :ok

# `create_fifo_index`

```elixir
@spec create_fifo_index(Ecto.Repo.t(), EctoPGMQ.Queue.name(), [query_opt()]) :: :ok
```

Creates an index to optimize FIFO message group read performance for the given
queue.

For more information about FIFO message groups, see
[FIFO Message Groups](`m:EctoPGMQ#fifo-message-groups`).

For more information about this function, see the
[PGMQ docs](https://github.com/pgmq/pgmq/blob/main/docs/api/sql/functions.md#create_fifo_index).

## Examples

    iex> create_fifo_index(Repo, "my_queue")
    :ok

# `create_non_partitioned`

```elixir
@spec create_non_partitioned(Ecto.Repo.t(), EctoPGMQ.Queue.name(), [query_opt()]) ::
  :ok
```

Creates an unpartitioned queue with the given name.

For more information about this function, see the
[PGMQ docs](https://github.com/pgmq/pgmq/blob/main/docs/api/sql/functions.md#create_non_partitioned).

## Examples

    iex> create_non_partitioned(Repo, "my_unpartitioned_queue")
    :ok

# `create_partitioned`

```elixir
@spec create_partitioned(
  Ecto.Repo.t(),
  EctoPGMQ.Queue.name(),
  partition_interval(),
  retention_interval(),
  [query_opt()]
) :: :ok
```

Creates a partitioned queue with the given name.

For more information about partitioning, see
[Partitioning](`m:EctoPGMQ#partitioning`).

For more information about this function, see the
[PGMQ docs](https://github.com/pgmq/pgmq/blob/main/docs/api/sql/functions.md#create_partitioned).

## Examples

    iex> create_partitioned(Repo, "my_partitioned_queue", 10_000, 100_000)
    :ok

    iex> partition = Duration.new!(hour: 1)
    iex> retention = Duration.new!(day: 1)
    iex> create_partitioned(Repo, "my_partitioned_queue", partition, retention)
    :ok

# `create_unlogged`

```elixir
@spec create_unlogged(Ecto.Repo.t(), EctoPGMQ.Queue.name(), [query_opt()]) :: :ok
```

Creates an unlogged queue with the given name.

For more information about this function, see the
[PGMQ docs](https://github.com/pgmq/pgmq/blob/main/docs/api/sql/functions.md#create_unlogged).

> #### Warning {: .warning}
>
> Unlogged tables benefit from faster write operations but they risk data loss
> if the Postgres server restarts. Use with caution.

## Examples

    iex> create_unlogged(Repo, "my_unlogged_queue")
    :ok

# `delete`

```elixir
@spec delete(Ecto.Repo.t(), EctoPGMQ.Queue.name(), [EctoPGMQ.Message.id()], [
  query_opt()
]) :: :ok
```

Deletes the given messages from the given queue.

For more information about this function, see the
[PGMQ docs](https://github.com/pgmq/pgmq/blob/main/docs/api/sql/functions.md#delete-batch).

## Examples

    iex> message_ids = send_batch(Repo, "my_queue", [%{"foo" => 1}, %{"bar" => 2}])
    iex> delete(Repo, "my_queue", message_ids)
    :ok

# `disable_notify_insert`

```elixir
@spec disable_notify_insert(Ecto.Repo.t(), EctoPGMQ.Queue.name(), [query_opt()]) ::
  :ok
```

Disables insert notifications for the given queue.

For more information about notifications, see `EctoPGMQ.Notifications`.

For more information about this function, see the
[PGMQ docs](https://github.com/pgmq/pgmq/blob/main/docs/api/sql/functions.md#disable_notify_insert).

## Examples

    iex> disable_notify_insert(Repo, "my_queue")
    :ok

# `drop_queue`

```elixir
@spec drop_queue(Ecto.Repo.t(), EctoPGMQ.Queue.name(), [query_opt()]) :: :ok
```

Drops the given queue.

For more information about this function, see the
[PGMQ docs](https://github.com/pgmq/pgmq/blob/main/docs/api/sql/functions.md#drop_queue).

## Examples

    iex> drop_queue(Repo, "my_queue")
    :ok

# `enable_notify_insert`

```elixir
@spec enable_notify_insert(
  Ecto.Repo.t(),
  EctoPGMQ.Queue.name(),
  throttle_interval(),
  [query_opt()]
) ::
  :ok
```

Enables insert notifications for the given queue.

For more information about notifications, see `EctoPGMQ.Notifications`.

For more information about this function, see the
[PGMQ docs](https://github.com/pgmq/pgmq/blob/main/docs/api/sql/functions.md#enable_notify_insert).

## Examples

    iex> enable_notify_insert(Repo, "my_queue", 1_000)
    :ok

# `list_queues`

```elixir
@spec list_queues(Ecto.Repo.t(), [query_opt()]) :: [EctoPGMQ.Queue.t()]
```

Lists all queues.

Because this function naively wraps the corresponding PGMQ function, the
`:metrics` and `:notifications` fields in the returned `EctoPGMQ.Queue` structs
will not be populated.

For more information about this function, see the
[PGMQ docs](https://github.com/pgmq/pgmq/blob/main/docs/api/sql/functions.md#list_queues).

## Examples

    iex> queues = list_queues(Repo)
    iex> Enum.all?(queues, &is_struct(&1, EctoPGMQ.Queue))
    true

# `metrics_all`

```elixir
@spec metrics_all(Ecto.Repo.t(), [query_opt()]) :: [EctoPGMQ.Metrics.t()]
```

Returns metrics for all queues.

For more information about this function, see the
[PGMQ docs](https://github.com/pgmq/pgmq/blob/main/docs/api/sql/functions.md#metrics_all).

## Examples

    iex> metrics = metrics_all(Repo)
    iex> Enum.all?(metrics, &is_struct(&1, EctoPGMQ.Metrics))
    true

# `pop`

```elixir
@spec pop(Ecto.Repo.t(), EctoPGMQ.Queue.name(), quantity(), [query_opt()]) :: [
  EctoPGMQ.Message.t()
]
```

Simultaneously fetches and deletes messages from the given queue.

This function does **NOT** increment the read count of the fetched messages.

For more information about this function, see the
[PGMQ docs](https://github.com/pgmq/pgmq/blob/main/docs/api/sql/functions.md#pop).

## Examples

    iex> send_batch(Repo, "my_queue", [%{"foo" => 1}])
    iex> [message] = pop(Repo, "my_queue", 1)
    iex> match?(%EctoPGMQ.Message{reads: 0}, message)
    true

# `purge_queue`

```elixir
@spec purge_queue(Ecto.Repo.t(), EctoPGMQ.Queue.name(), [query_opt()]) ::
  purged_messages()
```

Purges all messages from the given queue and returns the number of messages
that were deleted.

For more information about this function, see the
[PGMQ docs](https://github.com/pgmq/pgmq/blob/main/docs/api/sql/functions.md#purge_queue).

## Examples

    iex> send_batch(Repo, "my_queue", [%{"foo" => 1}, %{"bar" => 2}])
    iex> purge_queue(Repo, "my_queue")
    2

# `read`

```elixir
@spec read(
  Ecto.Repo.t(),
  EctoPGMQ.Queue.name(),
  visibility_timeout(),
  quantity(),
  conditional(),
  [
    query_opt()
  ]
) :: [EctoPGMQ.Message.t()]
```

Reads messages from the given queue.

For more information about this function, see the
[PGMQ docs](https://github.com/pgmq/pgmq/blob/main/docs/api/sql/functions.md#read).

## Examples

    iex> send_batch(Repo, "my_queue", [%{"foo" => 1}])
    iex> [message] = read(Repo, "my_queue", 5, 1)
    iex> match?(%EctoPGMQ.Message{reads: 1}, message)
    true

# `read_grouped`

```elixir
@spec read_grouped(
  Ecto.Repo.t(),
  EctoPGMQ.Queue.name(),
  visibility_timeout(),
  quantity(),
  [query_opt()]
) ::
  [EctoPGMQ.Message.t()]
```

Reads messages from the given queue while respecting FIFO message groups and
optimizing throughput.

For more information about FIFO message groups, see
[FIFO Message Groups](`m:EctoPGMQ#fifo-message-groups`).

For more information about this function, see the
[PGMQ docs](https://github.com/pgmq/pgmq/blob/main/docs/api/sql/functions.md#read_grouped).

## Examples

    iex> send_batch(Repo, "my_queue", [%{"foo" => 1}])
    iex> [message] = read_grouped(Repo, "my_queue", 5, 1)
    iex> match?(%EctoPGMQ.Message{reads: 1}, message)
    true

# `read_grouped_rr`

```elixir
@spec read_grouped_rr(
  Ecto.Repo.t(),
  EctoPGMQ.Queue.name(),
  visibility_timeout(),
  quantity(),
  [
    query_opt()
  ]
) :: [EctoPGMQ.Message.t()]
```

Reads messages from the given queue while respecting and round-robin
interleaving FIFO message groups.

For more information about FIFO message groups, see
[FIFO Message Groups](`m:EctoPGMQ#fifo-message-groups`).

For more information about this function, see the
[PGMQ docs](https://github.com/pgmq/pgmq/blob/main/docs/api/sql/functions.md#read_grouped_rr).

## Examples

    iex> send_batch(Repo, "my_queue", [%{"foo" => 1}])
    iex> [message] = read_grouped_rr(Repo, "my_queue", 5, 1)
    iex> match?(%EctoPGMQ.Message{reads: 1}, message)
    true

# `read_grouped_rr_with_poll`

```elixir
@spec read_grouped_rr_with_poll(
  Ecto.Repo.t(),
  EctoPGMQ.Queue.name(),
  visibility_timeout(),
  quantity(),
  poll_timeout(),
  poll_interval(),
  [query_opt()]
) :: [EctoPGMQ.Message.t()]
```

Reads messages from the given queue with a Postgres server-side poll while
respecting and round-robin interleaving FIFO message groups.

For more information about FIFO message groups, see
[FIFO Message Groups](`m:EctoPGMQ#fifo-message-groups`).

For more information about polling, see [Polling](`m:EctoPGMQ#polling`).

For more information about this function, see the
[PGMQ docs](https://github.com/pgmq/pgmq/blob/main/docs/api/sql/functions.md#read_grouped_rr_with_poll).

## Examples

    iex> send_batch(Repo, "my_queue", [%{"foo" => 1}])
    iex> [message] = read_grouped_rr_with_poll(Repo, "my_queue", 5, 1, 5, 500)
    iex> match?(%EctoPGMQ.Message{reads: 1}, message)
    true

# `read_grouped_with_poll`

```elixir
@spec read_grouped_with_poll(
  Ecto.Repo.t(),
  EctoPGMQ.Queue.name(),
  visibility_timeout(),
  quantity(),
  poll_timeout(),
  poll_interval(),
  [query_opt()]
) :: [EctoPGMQ.Message.t()]
```

Reads messages from the given queue with a Postgres server-side poll while
respecting FIFO message groups and optimizing throughput.

For more information about FIFO message groups, see
[FIFO Message Groups](`m:EctoPGMQ#fifo-message-groups`).

For more information about polling, see [Polling](`m:EctoPGMQ#polling`).

For more information about this function, see the
[PGMQ docs](https://github.com/pgmq/pgmq/blob/main/docs/api/sql/functions.md#read_grouped_with_poll).

## Examples

    iex> send_batch(Repo, "my_queue", [%{"foo" => 1}])
    iex> [message] = read_grouped_with_poll(Repo, "my_queue", 5, 1, 5, 500)
    iex> match?(%EctoPGMQ.Message{reads: 1}, message)
    true

# `read_with_poll`

```elixir
@spec read_with_poll(
  Ecto.Repo.t(),
  EctoPGMQ.Queue.name(),
  visibility_timeout(),
  quantity(),
  poll_timeout(),
  poll_interval(),
  conditional(),
  [query_opt()]
) :: [EctoPGMQ.Message.t()]
```

Reads messages from the given queue with a Postgres server-side poll.

For more information about polling, see [Polling](`m:EctoPGMQ#polling`).

For more information about this function, see the
[PGMQ docs](https://github.com/pgmq/pgmq/blob/main/docs/api/sql/functions.md#read_with_poll).

## Examples

    iex> send_batch(Repo, "my_queue", [%{"foo" => 1}])
    iex> [message] = read_with_poll(Repo, "my_queue", 5, 1, 5, 500)
    iex> match?(%EctoPGMQ.Message{reads: 1}, message)
    true

# `send_batch`

```elixir
@spec send_batch(
  Ecto.Repo.t(),
  EctoPGMQ.Queue.name(),
  [EctoPGMQ.Message.payload()],
  [EctoPGMQ.Message.headers() | nil] | nil,
  delay(),
  [query_opt()]
) :: [EctoPGMQ.Message.id()]
```

Sends the given messages to the given queue.

The `headers` arg defaults to `nil`, which is a shorthand for `NULL` headers
for all messages. If a list is given for the `headers` arg, the length of the
list must match the length of the given list of messages.

For more information about this function, see the
[PGMQ docs](https://github.com/pgmq/pgmq/blob/main/docs/api/sql/functions.md#send_batch).

## Examples

    iex> message_ids = send_batch(Repo, "my_queue", [%{"foo" => 1}, %{"bar" => 2}])
    iex> Enum.all?(message_ids, &is_integer/1)
    true

    iex> delay = DateTime.utc_now()
    iex> message_ids = send_batch(Repo, "my_queue", [%{"foo" => 1}, %{"bar" => 2}], nil, delay)
    iex> Enum.all?(message_ids, &is_integer/1)
    true

# `set_vt`

```elixir
@spec set_vt(Ecto.Repo.t(), EctoPGMQ.Queue.name(), [EctoPGMQ.Message.id()], delay(), [
  query_opt()
]) :: [
  EctoPGMQ.Message.t()
]
```

Sets the visibility timeout of the given messages in the given queue.

For more information about this function, see the
[PGMQ docs](https://github.com/pgmq/pgmq/blob/main/docs/api/sql/functions.md#set_vt-batch).

## Examples

    iex> message_ids = send_batch(Repo, "my_queue", [%{"foo" => 1}])
    iex> [read_message] = read(Repo, "my_queue", 5, 1)
    iex> [updated_message] = set_vt(Repo, "my_queue", message_ids, 10)
    iex> DateTime.diff(updated_message.visible_at, read_message.visible_at) > 0
    true

---

*Consult [api-reference.md](api-reference.md) for complete listing*
