Forge.Stage behaviour (Forge v0.1.1)

View Source

Behaviour for pipeline stages.

Stages transform samples as they flow through a pipeline. Each stage processes one sample at a time and can:

  • Transform the sample data
  • Filter out samples (skip)
  • Change the sample status
  • Add metadata or measurements
  • Fail with an error

Return Values

  • {:ok, sample} - Continue with the transformed sample
  • {:skip, reason} - Remove this sample from the pipeline
  • {:error, reason} - Stage failed, error handling depends on runner config

Examples

defmodule NormalizeStage do
  @behaviour Forge.Stage

  def process(sample) do
    normalized = sample.data.value / 100.0
    data = Map.put(sample.data, :normalized, normalized)
    {:ok, %{sample | data: data}}
  end
end

defmodule FilterStage do
  @behaviour Forge.Stage

  def process(sample) do
    if sample.data.value > 0 do
      {:ok, sample}
    else
      {:skip, :negative_value}
    end
  end
end

Summary

Callbacks

Whether this stage should be executed asynchronously.

Concurrency limit for async stages.

Process a single sample.

Timeout for stage execution in milliseconds.

Functions

Helper to check if a stage module supports async execution.

Helper to get stage concurrency setting.

Helper to get stage timeout.

Callbacks

async?()

(optional)
@callback async?() :: boolean()

Whether this stage should be executed asynchronously.

Async stages are executed concurrently with backpressure control. Useful for I/O-bound stages (LLM calls, embeddings, HTTP APIs).

Defaults to false (synchronous execution).

concurrency()

(optional)
@callback concurrency() :: pos_integer()

Concurrency limit for async stages.

Limits the number of concurrent executions for this stage. Defaults to System.schedulers_online() if not specified.

process(sample)

@callback process(sample :: Forge.Sample.t()) ::
  {:ok, Forge.Sample.t()} | {:skip, reason :: any()} | {:error, reason :: any()}

Process a single sample.

The sample is passed through the stage and can be transformed, filtered, or cause an error.

timeout()

(optional)
@callback timeout() :: pos_integer()

Timeout for stage execution in milliseconds.

Stages exceeding this timeout will be killed. Defaults to 30_000 (30 seconds).

Functions

async?(stage_module)

Helper to check if a stage module supports async execution.

concurrency(stage_module)

Helper to get stage concurrency setting.

timeout(stage_module)

Helper to get stage timeout.