LlmGuard Guardrail Specifications

View Source

Overview

This document provides detailed specifications for all guardrails implemented in LlmGuard. Each guardrail includes its purpose, implementation approach, configuration options, and performance characteristics.

Guardrail Categories

graph TB
    subgraph "Input Guardrails"
        I1[Prompt Injection Filter]
        I2[Jailbreak Detector]
        I3[Length Validator]
        I4[Encoding Validator]
        I5[Policy Engine]
    end

    subgraph "Output Guardrails"
        O1[PII Redactor]
        O2[Content Moderator]
        O3[Fact Checker]
        O4[Format Validator]
        O5[Consistency Checker]
    end

    subgraph "Bidirectional"
        B1[Rate Limiter]
        B2[Audit Logger]
        B3[Anomaly Detector]
    end

Input Guardrails

1. Prompt Injection Filter

Purpose: Detect and block prompt injection attacks

Implementation: Multi-layer detection combining pattern matching, heuristics, and ML

Detection Layers

Layer 1: Pattern Matching

Fast regex-based detection of known attack patterns:

@injection_patterns [
  # Instruction override
  %{
    pattern: ~r/ignore\s+(all\s+)?(previous|prior|earlier|above)\s+(instructions?|prompts?|commands?)/i,
    severity: :high,
    category: :instruction_override
  },

  # System prompt extraction
  %{
    pattern: ~r/(repeat|show|print|output|display)\s+(your\s+)?(system\s+)?(prompt|instructions)/i,
    severity: :high,
    category: :system_extraction
  },

  # Mode switching
  %{
    pattern: ~r/(you\s+are|you're|now\s+in)\s+(debug|developer|admin|god|dan)\s+mode/i,
    severity: :critical,
    category: :mode_switching
  },

  # Delimiter injection
  %{
    pattern: ~r/---\s*(end\s+)?(system|user|assistant)(\s+message|\s+prompt)?/i,
    severity: :high,
    category: :delimiter_injection
  },

  # Role manipulation
  %{
    pattern: ~r/as\s+(a\s+)?(super|admin|root|system)\s+(user|admin|ai)/i,
    severity: :medium,
    category: :role_manipulation
  }
]
Layer 2: Heuristic Analysis

Statistical and structural analysis:

defmodule LlmGuard.Heuristics do
  @doc "Calculate injection likelihood score"
  def analyze(input) do
    %{
      entropy: calculate_entropy(input),
      delimiter_density: count_delimiters(input) / String.length(input),
      keyword_frequency: count_injection_keywords(input),
      structural_anomalies: detect_structural_anomalies(input),
      case_pattern: analyze_case_pattern(input)
    }
  end

  # High entropy in specific sections → suspicious
  defp calculate_entropy(text) do
    text
    |> String.graphemes()
    |> Enum.frequencies()
    |> Enum.map(fn {_, freq} ->
      p = freq / String.length(text)
      -p * :math.log2(p)
    end)
    |> Enum.sum()
  end

  # Excessive delimiters → potential injection
  defp count_delimiters(text) do
    Regex.scan(~r/---|===|###|\*\*\*/, text) |> length()
  end
end
Layer 3: ML Classification

Transformer-based classification for sophisticated attacks:

defmodule LlmGuard.ML.InjectionClassifier do
  @doc "Classify input using fine-tuned model"
  def classify(input, opts \\ []) do
    embedding = get_embedding(input)
    logits = run_inference(embedding)

    %{
      is_injection: logits[:injection] > 0.5,
      confidence: logits[:injection],
      attack_type: classify_attack_type(logits)
    }
  end

  defp get_embedding(text) do
    # Use sentence transformer or similar
    # Returns: [768-dim vector]
  end

  defp run_inference(embedding) do
    # Run through fine-tuned classifier
    # Returns: %{injection: 0.92, benign: 0.08}
  end
end

Configuration

%LlmGuard.Config{
  prompt_injection: %{
    enabled: true,
    confidence_threshold: 0.7,
    layers: [:pattern, :heuristic, :ml],
    pattern_file: "patterns/injection.json",
    action: :block,  # :block | :warn | :log
    whitelist: []
  }
}

Performance

LayerLatencyFalse Positive RateDetection Rate
Pattern~1ms<1%60%
Heuristic~5ms<5%80%
ML~50ms<2%95%
Combined~50ms<1%98%

2. Jailbreak Detector

Purpose: Identify attempts to bypass safety constraints

Implementation: Multi-technique detection

Detection Techniques

Role-Playing Detection
@jailbreak_personas [
  "DAN", "DUDE", "Developer Mode", "Evil Confidant",
  "John", "Stan", "Superior AI", "Unrestricted AI",
  "JailBreak", "BetterDAN", "Maximum", "BasedGPT"
]

def detect_role_playing(input) do
  personas_found = @jailbreak_personas
    |> Enum.filter(&String.contains?(String.downcase(input), String.downcase(&1)))

  %{
    detected: length(personas_found) > 0,
    personas: personas_found,
    confidence: min(1.0, length(personas_found) * 0.3)
  }
end
Hypothetical Scenario Detection
@hypothetical_indicators [
  ~r/hypothetically/i,
  ~r/imagine\s+(if|that|a\s+world)/i,
  ~r/pretend\s+(you|that)/i,
  ~r/for\s+(the\s+sake\s+of\s+)?argument/i,
  ~r/in\s+a\s+(fictional|alternate)\s+world/i,
  ~r/let's\s+say/i
]

def detect_hypothetical(input) do
  matches = @hypothetical_indicators
    |> Enum.map(&Regex.match?(&1, input))
    |> Enum.count(& &1)

  %{
    detected: matches > 0,
    confidence: min(1.0, matches * 0.25),
    technique: :hypothetical_framing
  }
end
Encoding Detection
def detect_encoding(input) do
  encodings = %{
    base64: is_base64?(input),
    hex: is_hex_encoded?(input),
    rot13: contains_rot13?(input),
    unicode_escape: contains_unicode_escape?(input),
    reverse: is_reversed_text?(input)
  }

  detected_encodings = encodings
    |> Enum.filter(fn {_, detected} -> detected end)
    |> Enum.map(fn {type, _} -> type end)

  %{
    detected: length(detected_encodings) > 0,
    encodings: detected_encodings,
    confidence: min(1.0, length(detected_encodings) * 0.4)
  }
end

defp is_base64?(text) do
  # Check for base64 patterns
  Regex.match?(~r/^[A-Za-z0-9+\/]+=*$/, String.trim(text)) and
    rem(String.length(String.trim(text)), 4) == 0
end

Multi-Turn Analysis

defmodule LlmGuard.MultiTurn do
  @doc "Analyze conversation for gradual manipulation"
  def analyze_conversation(messages) do
    scores = messages
      |> Enum.with_index()
      |> Enum.map(fn {msg, idx} ->
        {idx, calculate_risk_score(msg, idx, messages)}
      end)

    escalation = detect_escalation(scores)

    %{
      per_message_scores: scores,
      escalation_detected: escalation > 0.5,
      escalation_rate: escalation,
      cumulative_risk: Enum.sum(Enum.map(scores, fn {_, score} -> score end))
    }
  end

  defp detect_escalation(scores) do
    # Calculate risk increase rate
    if length(scores) < 2 do
      0.0
    else
      diffs = scores
        |> Enum.chunk_every(2, 1, :discard)
        |> Enum.map(fn [{_, s1}, {_, s2}] -> s2 - s1 end)

      average_increase = Enum.sum(diffs) / length(diffs)
      max(0.0, average_increase)
    end
  end
end

3. Length Validator

Purpose: Prevent resource exhaustion and token-based attacks

Implementation:

defmodule LlmGuard.LengthValidator do
  def validate(input, opts \\ []) do
    limits = %{
      max_chars: Keyword.get(opts, :max_chars, 10_000),
      max_tokens: Keyword.get(opts, :max_tokens, 2_000),
      max_lines: Keyword.get(opts, :max_lines, 500)
    }

    char_count = String.length(input)
    token_count = estimate_token_count(input)
    line_count = length(String.split(input, "\n"))

    violations = []
      |> check_limit(char_count, limits.max_chars, :chars)
      |> check_limit(token_count, limits.max_tokens, :tokens)
      |> check_limit(line_count, limits.max_lines, :lines)

    if Enum.empty?(violations) do
      {:ok, input}
    else
      {:error, :length_exceeded, violations}
    end
  end

  defp estimate_token_count(text) do
    # Rough estimation: ~4 chars per token
    div(String.length(text), 4)
  end

  defp check_limit(violations, actual, max, type) do
    if actual > max do
      [{type, actual, max} | violations]
    else
      violations
    end
  end
end

4. Encoding Validator

Purpose: Detect and handle various text encodings

Implementation:

defmodule LlmGuard.EncodingValidator do
  def validate_and_normalize(input) do
    with {:ok, detected} <- detect_encodings(input),
         {:ok, decoded} <- decode_if_needed(input, detected),
         {:ok, normalized} <- normalize_text(decoded) do
      {:ok, normalized, %{detected_encodings: detected}}
    else
      {:error, reason} -> {:error, :encoding_error, reason}
    end
  end

  defp detect_encodings(text) do
    encodings = [
      {:base64, is_base64?(text)},
      {:url_encoded, is_url_encoded?(text)},
      {:html_entities, contains_html_entities?(text)},
      {:unicode_escape, contains_unicode_escape?(text)}
    ]
    |> Enum.filter(fn {_, detected} -> detected end)
    |> Enum.map(fn {type, _} -> type end)

    {:ok, encodings}
  end

  defp decode_if_needed(text, encodings) do
    # Recursively decode until raw text
    Enum.reduce_while(encodings, {:ok, text}, fn encoding, {:ok, current} ->
      case decode_single(current, encoding) do
        {:ok, decoded} -> {:cont, {:ok, decoded}}
        {:error, _} = error -> {:halt, error}
      end
    end)
  end
end

5. Policy Engine

Purpose: Enforce custom security policies

Implementation:

defmodule LlmGuard.Policy do
  defstruct [:name, :rules, :actions, :priority]

  @type rule :: %{
    id: atom(),
    type: :input | :output | :both,
    validator: (String.t() -> boolean()),
    severity: :low | :medium | :high | :critical,
    message: String.t()
  }

  def new(name \\ "default") do
    %__MODULE__{
      name: name,
      rules: [],
      actions: %{
        critical: :block,
        high: :block,
        medium: :warn,
        low: :log
      },
      priority: 100
    }
  end

  def add_rule(policy, rule) do
    %{policy | rules: [rule | policy.rules]}
  end

  def validate(input, policy) do
    results = policy.rules
      |> Enum.filter(&rule_applies?(&1, :input))
      |> Enum.map(&evaluate_rule(&1, input))

    failed = Enum.filter(results, &(not &1.passed))
    max_severity = failed
      |> Enum.map(& &1.severity)
      |> Enum.max(fn -> :low end, &severity_level/1)

    action = Map.get(policy.actions, max_severity, :log)

    %{
      passed: Enum.empty?(failed),
      failed_rules: failed,
      action: action,
      max_severity: max_severity
    }
  end

  defp evaluate_rule(rule, input) do
    passed = rule.validator.(input)

    %{
      rule_id: rule.id,
      passed: passed,
      severity: rule.severity,
      message: rule.message
    }
  end

  defp severity_level(:critical), do: 4
  defp severity_level(:high), do: 3
  defp severity_level(:medium), do: 2
  defp severity_level(:low), do: 1
end

Example Policies:

# No code execution policy
policy = LlmGuard.Policy.new("no_code_execution")
  |> LlmGuard.Policy.add_rule(%{
    id: :no_eval,
    type: :input,
    validator: fn input ->
      not Regex.match?(~r/\b(eval|exec|system|popen)\s*\(/i, input)
    end,
    severity: :critical,
    message: "Code execution keywords detected"
  })

# Content length policy
policy = LlmGuard.Policy.add_rule(policy, %{
  id: :reasonable_length,
  type: :input,
  validator: fn input ->
    String.length(input) <= 5000
  end,
  severity: :medium,
  message: "Input exceeds reasonable length"
})

# No PII in prompts
policy = LlmGuard.Policy.add_rule(policy, %{
  id: :no_pii_input,
  type: :input,
  validator: fn input ->
    not LlmGuard.DataLeakage.contains_pii?(input)
  end,
  severity: :high,
  message: "PII detected in input"
})

Output Guardrails

1. PII Redactor

Purpose: Detect and redact personally identifiable information

Implementation:

defmodule LlmGuard.PIIRedactor do
  @pii_patterns %{
    email: ~r/\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b/,
    phone: ~r/\b(\+?1[-.]?)?\(?\d{3}\)?[-.]?\d{3}[-.]?\d{4}\b/,
    ssn: ~r/\b\d{3}-\d{2}-\d{4}\b/,
    credit_card: ~r/\b\d{4}[-\s]?\d{4}[-\s]?\d{4}[-\s]?\d{4}\b/,
    ip_address: ~r/\b\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}\b/,
    url: ~r/https?:\/\/(www\.)?[-a-zA-Z0-9@:%._\+~#=]{2,256}\.[a-z]{2,6}\b([-a-zA-Z0-9@:%_\+.~#?&\/\/=]*)/
  }

  def scan(text) do
    entities = @pii_patterns
      |> Enum.flat_map(fn {type, pattern} ->
        find_matches(text, pattern, type)
      end)
      |> Enum.sort_by(& &1.start)

    %{
      pii_detected: length(entities) > 0,
      entities: entities,
      count: length(entities)
    }
  end

  def redact(text, opts \\ []) do
    strategy = Keyword.get(opts, :strategy, :mask)
    entities = scan(text).entities

    Enum.reduce(entities, text, fn entity, acc ->
      replacement = get_replacement(entity, strategy)
      String.replace(acc, entity.value, replacement)
    end)
  end

  defp find_matches(text, pattern, type) do
    Regex.scan(pattern, text, return: :index)
    |> Enum.flat_map(fn matches ->
      Enum.map(matches, fn {start, length} ->
        %{
          type: type,
          value: String.slice(text, start, length),
          start: start,
          end: start + length,
          confidence: calculate_confidence(type, String.slice(text, start, length))
        }
      end)
    end)
  end

  defp get_replacement(entity, :mask) do
    "[#{String.upcase(to_string(entity.type))}]"
  end

  defp get_replacement(entity, :hash) do
    :crypto.hash(:sha256, entity.value)
    |> Base.encode16()
    |> String.slice(0..7)
  end

  defp get_replacement(entity, :partial) do
    # Show first and last chars only
    case String.length(entity.value) do
      len when len <= 4 -> String.duplicate("*", len)
      len ->
        first = String.first(entity.value)
        last = String.last(entity.value)
        "#{first}#{String.duplicate("*", len - 2)}#{last}"
    end
  end

  defp calculate_confidence(:email, value) do
    # Additional validation beyond regex
    if String.contains?(value, ["@", "."]) and
       not String.starts_with?(value, ".") do
      0.95
    else
      0.7
    end
  end

  defp calculate_confidence(:ssn, value) do
    # Luhn algorithm or similar validation
    1.0
  end

  defp calculate_confidence(_, _), do: 0.8
end

2. Content Moderator

Purpose: Detect and filter harmful content

Implementation:

defmodule LlmGuard.ContentModerator do
  @categories [
    :violence,
    :hate_speech,
    :sexual_content,
    :self_harm,
    :harassment,
    :illegal_activities,
    :profanity,
    :spam
  ]

  @thresholds %{
    violence: 0.7,
    hate_speech: 0.6,
    sexual_content: 0.8,
    self_harm: 0.5,  # Lower threshold for critical content
    harassment: 0.7,
    illegal_activities: 0.7,
    profanity: 0.8,
    spam: 0.9
  }

  def moderate(content, opts \\ []) do
    scores = @categories
      |> Enum.map(fn category ->
        {category, score_category(content, category)}
      end)
      |> Enum.into(%{})

    flagged = @categories
      |> Enum.filter(fn category ->
        scores[category] >= @thresholds[category]
      end)

    %{
      safe: Enum.empty?(flagged),
      scores: scores,
      flagged_categories: flagged,
      action: determine_action(flagged)
    }
  end

  defp score_category(content, category) do
    # Combine multiple scoring methods
    pattern_score = pattern_based_score(content, category)
    keyword_score = keyword_based_score(content, category)
    ml_score = ml_based_score(content, category)

    # Weighted average
    (pattern_score * 0.3 + keyword_score * 0.2 + ml_score * 0.5)
  end

  defp determine_action(flagged) do
    cond do
      :self_harm in flagged -> :block_and_alert
      :violence in flagged or :illegal_activities in flagged -> :block
      length(flagged) > 0 -> :warn
      true -> :allow
    end
  end
end

3. Format Validator

Purpose: Ensure output matches expected schema

Implementation:

defmodule LlmGuard.FormatValidator do
  def validate(output, schema) do
    case schema.type do
      :json -> validate_json(output, schema)
      :markdown -> validate_markdown(output, schema)
      :plain_text -> validate_plain_text(output, schema)
      :structured -> validate_structured(output, schema)
    end
  end

  defp validate_json(output, schema) do
    with {:ok, parsed} <- Jason.decode(output),
         :ok <- validate_schema(parsed, schema.spec) do
      {:ok, parsed}
    else
      {:error, reason} -> {:error, :invalid_format, reason}
    end
  end

  defp validate_structured(output, schema) do
    required_sections = schema.required_sections || []
    optional_sections = schema.optional_sections || []

    sections_found = extract_sections(output)

    missing = required_sections -- sections_found
    unexpected = sections_found -- (required_sections ++ optional_sections)

    if Enum.empty?(missing) do
      {:ok, %{sections: sections_found, warnings: unexpected}}
    else
      {:error, :missing_sections, missing}
    end
  end
end

Bidirectional Guardrails

Rate Limiter

Implementation: Token Bucket Algorithm

defmodule LlmGuard.RateLimiter do
  defstruct [:user_id, :buckets, :last_refill]

  def new(user_id, config) do
    %__MODULE__{
      user_id: user_id,
      buckets: %{
        requests: new_bucket(config.requests_per_minute, :per_minute),
        tokens: new_bucket(config.tokens_per_minute, :per_minute)
      },
      last_refill: DateTime.utc_now()
    }
  end

  def check(limiter, request) do
    limiter = refill_buckets(limiter)

    with {:ok, limiter} <- consume_tokens(limiter, :requests, 1),
         {:ok, limiter} <- consume_tokens(limiter, :tokens, request.token_count) do
      {:ok, limiter}
    else
      {:error, :rate_limit_exceeded, bucket_type} ->
        retry_after = calculate_retry_after(limiter, bucket_type)
        {:error, :rate_limit_exceeded, retry_after}
    end
  end

  defp refill_buckets(limiter) do
    now = DateTime.utc_now()
    seconds_elapsed = DateTime.diff(now, limiter.last_refill)

    updated_buckets = Map.new(limiter.buckets, fn {name, bucket} ->
      tokens_to_add = bucket.refill_rate * seconds_elapsed
      new_tokens = min(bucket.capacity, bucket.tokens + tokens_to_add)
      {name, %{bucket | tokens: new_tokens}}
    end)

    %{limiter | buckets: updated_buckets, last_refill: now}
  end

  defp consume_tokens(limiter, bucket_name, amount) do
    bucket = limiter.buckets[bucket_name]

    if bucket.tokens >= amount do
      updated_bucket = %{bucket | tokens: bucket.tokens - amount}
      updated_limiter = put_in(limiter.buckets[bucket_name], updated_bucket)
      {:ok, updated_limiter}
    else
      {:error, :rate_limit_exceeded, bucket_name}
    end
  end
end

Performance Characteristics

Latency Budget

GuardrailTarget P50Target P95Target P99
Length Validator<1ms<1ms<2ms
Prompt Injection (Pattern)<2ms<5ms<10ms
Prompt Injection (ML)<50ms<100ms<200ms
Jailbreak Detector<10ms<20ms<50ms
PII Redactor<5ms<15ms<30ms
Content Moderator<30ms<80ms<150ms
Rate Limiter<1ms<2ms<5ms
Total (All Guards)<50ms<150ms<300ms

Accuracy Metrics

GuardrailPrecisionRecallF1 Score
Prompt Injection98%95%96.5%
Jailbreak96%92%94%
PII Detection99%97%98%
Content Safety94%96%95%

Configuration Best Practices

Development Environment

config :llm_guard,
  prompt_injection: %{confidence_threshold: 0.5},
  jailbreak: %{enabled: true, confidence_threshold: 0.6},
  data_leakage: %{action: :warn},
  content_moderation: %{enabled: false},
  rate_limiting: %{requests_per_minute: 1000}

Production Environment

config :llm_guard,
  prompt_injection: %{confidence_threshold: 0.7, layers: [:pattern, :heuristic, :ml]},
  jailbreak: %{enabled: true, confidence_threshold: 0.7},
  data_leakage: %{action: :block, strategy: :mask},
  content_moderation: %{enabled: true, action: :block},
  rate_limiting: %{requests_per_minute: 60, tokens_per_minute: 100_000},
  audit_logging: %{enabled: true, backend: :database}

High-Security Environment

config :llm_guard,
  prompt_injection: %{confidence_threshold: 0.6, layers: [:pattern, :heuristic, :ml]},
  jailbreak: %{enabled: true, confidence_threshold: 0.6, multi_turn: true},
  data_leakage: %{action: :block, strategy: :hash, scan_all: true},
  content_moderation: %{enabled: true, action: :block_and_alert},
  encoding_validation: %{decode_and_check: true, max_recursion: 3},
  rate_limiting: %{requests_per_minute: 30, tokens_per_minute: 50_000},
  audit_logging: %{enabled: true, backend: :database, retention_days: 90}