Forge.Stage.Executor (Forge v0.1.1)
View SourceExecutes stages with retry logic and error handling.
Implements the retry strategy defined in ADR-003, including:
- Per-stage retry policies
- Exponential backoff with jitter
- Error classification (retriable vs non-retriable)
- Dead-letter queue integration
Usage
alias Forge.Stage.Executor
sample = Forge.Sample.new(id: "123", pipeline: :test, data: %{value: 42})
case Executor.apply_with_retry(sample, MyStage) do
{:ok, processed_sample} ->
# Stage succeeded
{:error, :max_retries, reason} ->
# Stage failed after max attempts, send to DLQ
end
Summary
Functions
Applies a stage to a sample with retry logic.
Returns:
{:ok, sample}- Stage succeeded{:skip, reason}- Sample was filtered by stage{:error, :max_retries, reason}- Stage failed after max attempts
The function will:
- Get the retry policy from the stage (or use default)
- Attempt to process the sample
- On error, classify if it's retriable
- If retriable and attempts remain, wait and retry
- If non-retriable or max attempts reached, return error
Gets the retry policy for a stage.
If the stage implements retry_policy/0, uses that. Otherwise, returns
the default policy.