Accrue.Webhooks.DLQ (accrue v0.3.0)

Copy Markdown View Source

Dead-letter queue replay and retention for webhook events (WH-08 / D4-04).

Replay inserts a fresh Oban dispatch job. Oban's own retry_job/2 refuses jobs in :discarded/:cancelled states, and dead-lettered webhook events correspond to exactly those — so the only correct path is to insert a brand-new job whose args reference the existing WebhookEvent row by id.

Public API

  • requeue/1 — single dead-lettered row → fresh dispatch job
  • requeue_where/2 — bulk replay with batch + stagger + dry-run + max-rows cap
  • list/2 — paginated browse for ops tooling
  • count/1 — accurate count for confirm prompts
  • prune/1 — delete :dead rows older than N days
  • prune_succeeded/1 — delete :succeeded rows older than N days

Each public function ships in dual bang/tuple form per the D-05 convention.

Replay-death-loop prevention

When a replayed event re-enters the dispatch worker and the processor fetch returns {:error, :not_found} (e.g., the underlying upstream resource has been deleted since the original failure), the worker treats it as terminal-skip — status becomes :replayed, no re-dead-letter — so a single bad row cannot loop forever.

Summary

Types

filter()

@type filter() :: [
  type: String.t() | [String.t()],
  since: DateTime.t(),
  until: DateTime.t(),
  livemode: boolean()
]

replay_error()

@type replay_error() ::
  :not_found
  | :already_replayed
  | :not_dead_lettered
  | :replay_too_large
  | term()

replay_opts()

@type replay_opts() :: [
  batch_size: pos_integer(),
  stagger_ms: non_neg_integer(),
  dry_run: boolean(),
  force: boolean()
]

Functions

count(filter)

@spec count(filter()) :: non_neg_integer()

list(filter, opts \\ [])

@spec list(
  filter(),
  keyword()
) :: [Accrue.Webhook.WebhookEvent.t()]

prune(days)

@spec prune(pos_integer() | :infinity) :: {:ok, non_neg_integer()}

prune_succeeded(days)

@spec prune_succeeded(pos_integer() | :infinity) :: {:ok, non_neg_integer()}

requeue(id)

@spec requeue(Ecto.UUID.t()) ::
  {:ok, Accrue.Webhook.WebhookEvent.t()} | {:error, replay_error()}

requeue!(id)

requeue_where(filter, opts \\ [])

@spec requeue_where(filter(), replay_opts()) ::
  {:ok, map()} | {:error, :replay_too_large | term()}

requeue_where!(filter, opts \\ [])

@spec requeue_where!(filter(), replay_opts()) :: %{
  requeued: non_neg_integer(),
  skipped: non_neg_integer()
}