Forge.Stage.Executor (Forge v0.1.1)

View Source

Executes stages with retry logic and error handling.

Implements the retry strategy defined in ADR-003, including:

  • Per-stage retry policies
  • Exponential backoff with jitter
  • Error classification (retriable vs non-retriable)
  • Dead-letter queue integration

Usage

alias Forge.Stage.Executor

sample = Forge.Sample.new(id: "123", pipeline: :test, data: %{value: 42})

case Executor.apply_with_retry(sample, MyStage) do
  {:ok, processed_sample} ->
    # Stage succeeded

  {:error, :max_retries, reason} ->
    # Stage failed after max attempts, send to DLQ
end

Summary

Functions

Applies a stage to a sample with retry logic.

Gets the retry policy for a stage.

Functions

apply_with_retry(sample, stage_module)

Applies a stage to a sample with retry logic.

Returns:

  • {:ok, sample} - Stage succeeded
  • {:skip, reason} - Sample was filtered by stage
  • {:error, :max_retries, reason} - Stage failed after max attempts

The function will:

  1. Get the retry policy from the stage (or use default)
  2. Attempt to process the sample
  3. On error, classify if it's retriable
  4. If retriable and attempts remain, wait and retry
  5. If non-retriable or max attempts reached, return error

get_retry_policy(stage_module)

Gets the retry policy for a stage.

If the stage implements retry_policy/0, uses that. Otherwise, returns the default policy.