Forge.Stage behaviour (Forge v0.1.1)
View SourceBehaviour for pipeline stages.
Stages transform samples as they flow through a pipeline. Each stage processes one sample at a time and can:
- Transform the sample data
- Filter out samples (skip)
- Change the sample status
- Add metadata or measurements
- Fail with an error
Return Values
{:ok, sample}- Continue with the transformed sample{:skip, reason}- Remove this sample from the pipeline{:error, reason}- Stage failed, error handling depends on runner config
Examples
defmodule NormalizeStage do
@behaviour Forge.Stage
def process(sample) do
normalized = sample.data.value / 100.0
data = Map.put(sample.data, :normalized, normalized)
{:ok, %{sample | data: data}}
end
end
defmodule FilterStage do
@behaviour Forge.Stage
def process(sample) do
if sample.data.value > 0 do
{:ok, sample}
else
{:skip, :negative_value}
end
end
end
Summary
Callbacks
Whether this stage should be executed asynchronously.
Concurrency limit for async stages.
Process a single sample.
Timeout for stage execution in milliseconds.
Functions
Helper to check if a stage module supports async execution.
Helper to get stage concurrency setting.
Helper to get stage timeout.
Callbacks
@callback async?() :: boolean()
Whether this stage should be executed asynchronously.
Async stages are executed concurrently with backpressure control. Useful for I/O-bound stages (LLM calls, embeddings, HTTP APIs).
Defaults to false (synchronous execution).
@callback concurrency() :: pos_integer()
Concurrency limit for async stages.
Limits the number of concurrent executions for this stage. Defaults to System.schedulers_online() if not specified.
@callback process(sample :: Forge.Sample.t()) :: {:ok, Forge.Sample.t()} | {:skip, reason :: any()} | {:error, reason :: any()}
Process a single sample.
The sample is passed through the stage and can be transformed, filtered, or cause an error.
@callback timeout() :: pos_integer()
Timeout for stage execution in milliseconds.
Stages exceeding this timeout will be killed. Defaults to 30_000 (30 seconds).