# `EctoPGMQ.Producer`
[🔗](https://github.com/gdwoolbert3/ecto_pgmq/blob/main/lib/ecto_pgmq/producer.ex#L2)

A `Broadway.Producer` implementation for PGMQ queues.

This module requires the optional `Broadway` dependency.

This module can be used in a Broadway pipeline like any other producer:

```elixir
Broadway.start_link(MyBroadway,
  producer: [
    # Second tuple element contains producer options
    module: {EctoPGMQ.Producer, [repo: Repo, queue: "my_queue", ...]},
    ...
  ],
  ...
)
```

## Notifications and Producer Polling

`EctoPGMQ.Producer` supports poll-based, notification-based, and hybrid
queue consumption.

### Poll-Based Consumption

When using poll-based consumption, a producer will attempt to read messages
whenever demand is received. If demand cannot be met, the producer will poll
for messages until it can. The polling interval is determined by the
`:read_interval` option. Sample producer options for poll-based consumption
can be seen below:

```elixir
[listener: nil, read_interval: 5_000, ...]
```

> #### Info {: .info}
>
> This is the default approach to consumption.

### Notification-Based Consumption

When using notification-based consumption, a producer will listen for
notifications and only attempt to read messages if it has reason to believe
that the queue is not empty. Depending on the characteristics of the queue,
this approach to consumption may be less taxing on the repo connection pool
than poll-based consumption. For more information about notifications and
when, to use them see `EctoPGMQ.Notifications`. Sample producer options for
notification-based consumption can be seen below:

```elixir
[listener: MyListener, read_interval: :infinity, ...]
```

> #### Warning {: .warning}
>
> A purely notification-based consumer will be subject to race conditions
> when connecting/reconnecting to the DB. In practice, this means that there
> is a risk of messages being available in the queue but not being read
> until _more_ messages arrive and trigger another notification. For this
> reason, [Hybrid Consumption](#hybrid-consumption) is preferred over
> Notification-Based consumption.

### Hybrid Consumption

Like with notification-based consumption, a producer using hybrid
consumption will listen for notifications **BUT** it will also poll for
messages to make notification race conditions less problematic. The
`:read_interval` option then effectively becomes the maximum amount of time
a visible message can wait in the queue if a notification is missed. Sample
producer options for hybrid consumption can be seen below:

```elixir
[listener: MyListener, read_interval: 300_000, ...]
```

> #### Tip {: .tip}
>
> This is the preferred approach to consumption when leveraging
> notifications.

## Acknowledgements

`EctoPGMQ.Producer` supports a number of different acknowledgement actions
(see `t:ack_action/0`). Default acknowledgement actions can be set for both
successful and failed messages. Additionally, `EctoPGMQ.Producer` supports
configuring individual message acknowledgements by calling
`Broadway.Message.configure_ack/2` with the following options:

  * `:ack_action` - A required `t:ack_action/0` denoting how to acknowledge
    the message.

When used together, default acknowledgement actions and individual message
acknowledgement configuration can be used to implement more complex message
handling. For example, an `EctoPGMQ.Producer` can use `:delete` as the
default success acknowledgement, `:nothing` as the default failure
acknowledgement, and a code snippet like the one below to implement a
maximum number of attempts for each message:

```elixir
alias Broadway.Message

@max_attempts 3

@impl Broadway
def handle_message(_, message, _) do
  case do_process_pgmq_message(message.data) do
    :ok -> message
    {:error, reason} -> Message.failed(message, reason)
  end
end

@impl Broadway
def handle_failed(messages, _) do
  Enum.map(messages, fn
    %{data: %{reads: r}} = msg when r < @max_attempts -> msg
    msg -> Message.configure_ack(msg, ack_action: :archive)
  end)
end
```

> #### Warning {: .warning}
>
> All acknowledgement configuration is effectively ignored when deleting
> messages on read.

## Options

An `EctoPGMQ.Producer` can be started with the following options:

  * `:dynamic_repo` - An optional `t:atom/0` name or `t:pid/0` of a dynamic
    repo to use for all DB operations. For more information about dynamic
    repos, see
    [Dynamic repositories](https://hexdocs.pm/ecto/replicas-and-dynamic-repositories.html#dynamic-repositories).

  * `:listener` - An optional listener specification that can take any of
    the following forms:

      * An opts `t:keyword/0` to be passed to
        `EctoPGMQ.Notifications.start_link/1` to start a listener under
        Broadway's supervision tree. The `t:keyword/0` **MUST** contain a
        `:name` key.

      * An existing `t:EctoPGMQ.Notifications.listener/0`. This is useful
        for sharing a single listener (and, by proxy, a single Postgres
        connection) between multiple producers.

      * `nil` to not subscribe to notifications.

    Defaults to `nil`. For more information about configuring notifications
    for a producer, see
    [Notifications and Producer Polling](#notifications-and-producer-polling).
    For more information about notifications in general, see
    `EctoPGMQ.Notifications`.

  * `:on_failure` - An optional `t:ack_action/0` denoting the default
    acknowledgement for failed messages. Defaults to `:archive`. For more
    information about acknowledgements, see
    [Acknowledgements](#acknowledgements).

  * `:on_success` - An optional `t:ack_action/0` denoting the default
    acknowledgement for successful messages. Defaults to `:delete`. For
    more information about acknowledgements, see
    [Acknowledgements](#acknowledgements).

  * `:queue` - A required `t:EctoPGMQ.Queue.name/0` to read messages from.

  * `:read_interval` - An optional `t:Duration.t/0` or `t:timeout/0`
    denoting how long to wait between polls when there is outstanding demand
    (`:infinity` to disable polling). Defaults to `5_000`. For more
    information about configuring polling for a producer, see
    [Notifications and Producer Polling](#notifications-and-producer-polling).

  * `:read_opts` - An optional `t:EctoPGMQ.read_messages_opts/0` to be used
    when reading messages. Defaults to `[]`.

  * `:repo` - A required `t:Ecto.Repo.t/0` to be used for all DB operations.

  * `:visibility_timeout` - A required `t:EctoPGMQ.visibility_timeout/0` for
    read operations.

# `ack_action`

```elixir
@type ack_action() ::
  :delete
  | :archive
  | :nothing
  | {:update_visibility_timeout, EctoPGMQ.visibility_timeout()}
```

An acknowledgement action for PGMQ messages.

This can take any of the following forms:

  * `:delete` to delete messages from the queue.

  * `:archive` to move messages from the queue to the archive.

  * `:nothing` to retry messages once the visibility timeout expires

  * `{:update_visibility_timeout, EctoPGMQ.visibility_timeout()}` to
    retry messages when the updated `t:EctoPGMQ.visibility_timeout/0`
    expires.

---

*Consult [api-reference.md](api-reference.md) for complete listing*
