LlmGuard.Pipeline (LlmGuard v0.3.1)

View Source

Security pipeline for orchestrating detector execution.

The pipeline is responsible for:

  • Executing multiple detectors in sequence or parallel
  • Collecting and aggregating detection results
  • Error handling and recovery
  • Performance monitoring and telemetry
  • Early termination on detection

Execution Modes

Sequential Execution (default)

Detectors run one after another. Supports early termination.

Parallel Execution

Detectors run concurrently for improved performance.

Configuration Options

  • :early_termination - Stop on first detection (default: true)
  • :continue_on_error - Continue pipeline if detector fails (default: false)
  • :confidence_threshold - Minimum confidence to consider detection (default: 0.7)
  • :timeout - Maximum time for pipeline execution in ms (default: 5000)
  • :parallel - Execute detectors in parallel (default: false)

Examples

# Run pipeline with default settings
{:ok, result} = Pipeline.run(input, [Detector1, Detector2], %{})

# Run with early termination disabled
{:error, :detected, result} = Pipeline.run(
  input,
  [Detector1, Detector2],
  %{early_termination: false}
)

# Run asynchronously
task = Pipeline.async_run(input, detectors, config)
result = Task.await(task)

Summary

Functions

Runs the pipeline asynchronously.

Runs the security pipeline on the given input.

Sanitizes and validates input before processing.

Types

detection()

@type detection() :: %{
  detector: String.t(),
  confidence: float(),
  category: atom(),
  patterns_matched: [String.t()],
  metadata: map()
}

detector()

@type detector() :: module()

detector_result()

@type detector_result() :: %{
  detector: String.t(),
  result: :safe | :detected | :error,
  duration_ms: non_neg_integer(),
  details: map()
}

pipeline_config()

@type pipeline_config() :: map()

pipeline_result()

@type pipeline_result() :: %{
  input: String.t(),
  safe?: boolean(),
  detections: [detection()],
  detector_results: [detector_result()],
  total_duration_ms: non_neg_integer(),
  error: map() | nil
}

run_result()

@type run_result() ::
  {:ok, pipeline_result()}
  | {:error, :detected, pipeline_result()}
  | {:error, :pipeline_error, pipeline_result()}

Functions

async_run(input, detectors, config \\ %{})

@spec async_run(String.t(), [detector()], pipeline_config()) :: Task.t()

Runs the pipeline asynchronously.

Returns a Task that can be awaited for the result.

Parameters

  • input - The text to analyze
  • detectors - List of detector modules
  • config - Pipeline configuration

Returns

A Task that will yield the pipeline result.

Examples

task = Pipeline.async_run(input, detectors, config)
result = Task.await(task, timeout)

run(input, detectors, config \\ %{})

@spec run(String.t(), [detector()], pipeline_config()) :: run_result()

Runs the security pipeline on the given input.

Executes all configured detectors and returns aggregated results.

Parameters

  • input - The text to analyze
  • detectors - List of detector modules to execute
  • config - Pipeline configuration map

Returns

  • {:ok, result} - No threats detected
  • {:error, :detected, result} - Threats detected
  • {:error, :pipeline_error, result} - Pipeline execution error

sanitize_input(input, config)

@spec sanitize_input(String.t(), LlmGuard.Config.t() | map()) ::
  {:ok, String.t()} | {:error, atom(), map()}

Sanitizes and validates input before processing.

Parameters

  • input - The input string to sanitize
  • config - Configuration (can be Config struct or map)

Returns

  • {:ok, sanitized_input} - Input is valid
  • {:error, reason, details} - Input validation failed