Dead-letter queue replay and retention for webhook events (WH-08 / D4-04).
Replay inserts a fresh Oban dispatch job. Oban's own retry_job/2
refuses jobs in :discarded/:cancelled states, and dead-lettered
webhook events correspond to exactly those — so the only correct path
is to insert a brand-new job whose args reference the existing
WebhookEvent row by id.
Public API
requeue/1— single dead-lettered row → fresh dispatch jobrequeue_where/2— bulk replay with batch + stagger + dry-run + max-rows caplist/2— paginated browse for ops toolingcount/1— accurate count for confirm promptsprune/1— delete:deadrows older than N daysprune_succeeded/1— delete:succeededrows older than N days
Each public function ships in dual bang/tuple form per the D-05 convention.
Replay-death-loop prevention
When a replayed event re-enters the dispatch worker and the processor
fetch returns {:error, :not_found} (e.g., the underlying upstream
resource has been deleted since the original failure), the worker
treats it as terminal-skip — status becomes :replayed, no
re-dead-letter — so a single bad row cannot loop forever.
Summary
Types
@type filter() :: [ type: String.t() | [String.t()], since: DateTime.t(), until: DateTime.t(), livemode: boolean() ]
@type replay_error() :: :not_found | :already_replayed | :not_dead_lettered | :replay_too_large | term()
@type replay_opts() :: [ batch_size: pos_integer(), stagger_ms: non_neg_integer(), dry_run: boolean(), force: boolean() ]
Functions
@spec count(filter()) :: non_neg_integer()
@spec list( filter(), keyword() ) :: [Accrue.Webhook.WebhookEvent.t()]
@spec prune(pos_integer() | :infinity) :: {:ok, non_neg_integer()}
@spec prune_succeeded(pos_integer() | :infinity) :: {:ok, non_neg_integer()}
@spec requeue(Ecto.UUID.t()) :: {:ok, Accrue.Webhook.WebhookEvent.t()} | {:error, replay_error()}
@spec requeue!(Ecto.UUID.t()) :: Accrue.Webhook.WebhookEvent.t()
@spec requeue_where(filter(), replay_opts()) :: {:ok, map()} | {:error, :replay_too_large | term()}
@spec requeue_where!(filter(), replay_opts()) :: %{ requeued: non_neg_integer(), skipped: non_neg_integer() }