Bias Detection and Mitigation Algorithms

View Source

Overview

This document details the algorithms implemented in ExFairness for detecting and mitigating bias in ML systems.

Bias Detection Algorithms

1. Statistical Parity Testing

Purpose: Detect violations of demographic parity using statistical hypothesis tests.

Algorithm:

Input: predictions, sensitive_attr, alpha (significance level)
Output: test_statistic, p_value, reject_null

1. Compute observed rates:
   rate_A = mean(predictions[sensitive_attr == 0])
   rate_B = mean(predictions[sensitive_attr == 1])

2. Under null hypothesis (no disparity):
   SE = sqrt(p(1-p)(1/n_A + 1/n_B))
   where p = (n_A * rate_A + n_B * rate_B) / (n_A + n_B)

3. Test statistic:
   z = (rate_A - rate_B) / SE

4. P-value:
   p_value = 2 * P(Z > |z|)  # Two-tailed test

5. Decision:
   reject_null = p_value < alpha

Implementation:

def statistical_parity_test(predictions, sensitive_attr, opts \\ []) do
  alpha = Keyword.get(opts, :alpha, 0.05)

  group_a_mask = Nx.equal(sensitive_attr, 0)
  group_b_mask = Nx.equal(sensitive_attr, 1)

  n_a = Nx.sum(group_a_mask) |> Nx.to_number()
  n_b = Nx.sum(group_b_mask) |> Nx.to_number()

  rate_a = positive_rate(predictions, group_a_mask) |> Nx.to_number()
  rate_b = positive_rate(predictions, group_b_mask) |> Nx.to_number()

  p_pooled = (n_a * rate_a + n_b * rate_b) / (n_a + n_b)
  se = :math.sqrt(p_pooled * (1 - p_pooled) * (1/n_a + 1/n_b))

  z_stat = (rate_a - rate_b) / se
  p_value = 2 * (1 - Statistics.Distributions.Normal.cdf(abs(z_stat), 0, 1))

  %{
    test_statistic: z_stat,
    p_value: p_value,
    reject_null: p_value < alpha,
    interpretation: interpret_test_result(p_value, alpha)
  }
end

2. Intersectional Bias Detection

Purpose: Identify bias in combinations of sensitive attributes.

Algorithm:

Input: predictions, labels, [attr1, attr2, ...], metric
Output: bias_map, most_disadvantaged_group

1. Create all attribute combinations:
   groups = cartesian_product(unique(attr1), unique(attr2), ...)

2. For each group g:
   a. Filter data: data_g = data[matches_group(g)]
   b. Compute metric: metric_g = compute_metric(data_g)
   c. Store: bias_map[g] = metric_g

3. Find reference group (typically majority):
   ref_group = group with best metric value

4. Compute disparities:
   For each group g:
     disparity_g = |metric_g - metric_ref|

5. Identify most disadvantaged:
   most_disadvantaged = argmax(disparity_g)

Implementation:

def intersectional_fairness(predictions, labels, opts) do
  sensitive_attrs = Keyword.fetch!(opts, :sensitive_attrs)
  attr_names = Keyword.get(opts, :attr_names, 1..length(sensitive_attrs))
  metric = Keyword.get(opts, :metric, :true_positive_rate)

  # Create all combinations
  groups = create_intersectional_groups(sensitive_attrs)

  # Compute metric for each group
  results = Enum.map(groups, fn group_mask ->
    group_predictions = Nx.select(group_mask, predictions, 0)
    group_labels = Nx.select(group_mask, labels, 0)

    value = compute_metric(metric, group_predictions, group_labels)
    count = Nx.sum(group_mask) |> Nx.to_number()

    {group_mask, value, count}
  end)

  # Find reference (highest metric)
  {_ref_mask, ref_value, _ref_count} = Enum.max_by(results, fn {_, v, _} -> v end)

  # Compute disparities
  disparities = Enum.map(results, fn {mask, value, count} ->
    {mask, abs(value - ref_value), count}
  end)

  most_disadvantaged = Enum.max_by(disparities, fn {_, disp, _} -> disp end)

  %{
    group_metrics: results,
    disparities: disparities,
    most_disadvantaged: most_disadvantaged,
    reference_value: ref_value
  }
end

3. Temporal Bias Drift Detection

Purpose: Monitor fairness metrics over time to detect degradation.

Algorithm (CUSUM - Cumulative Sum Control Chart):

Input: metric_values_over_time, threshold
Output: drift_detected, change_point

1. Initialize:
   S_pos = 0, S_neg = 0
   baseline = mean(metric_values[initial_period])

2. For each time t:
   deviation = metric_values[t] - baseline

   S_pos = max(0, S_pos + deviation - allowance)
   S_neg = max(0, S_neg - deviation - allowance)

   if S_pos > threshold or S_neg > threshold:
     return drift_detected = true, change_point = t

3. Return drift_detected = false

Implementation:

def temporal_drift(metrics_history, opts \\ []) do
  threshold = Keyword.get(opts, :threshold, 0.05)
  allowance = Keyword.get(opts, :allowance, 0.01)
  baseline_period = Keyword.get(opts, :baseline_period, 10)

  # Extract values and timestamps
  {timestamps, values} = Enum.unzip(metrics_history)

  # Compute baseline
  baseline = Enum.take(values, baseline_period)
  |> Enum.sum()
  |> Kernel./(baseline_period)

  # CUSUM
  {drift_detected, change_point, s_pos, s_neg} =
    values
    |> Enum.with_index()
    |> Enum.reduce_while({false, nil, 0, 0}, fn {value, idx}, {_, _, s_pos, s_neg} ->
      deviation = value - baseline

      new_s_pos = max(0, s_pos + deviation - allowance)
      new_s_neg = max(0, s_neg - deviation - allowance)

      if new_s_pos > threshold or new_s_neg > threshold do
        {:halt, {true, Enum.at(timestamps, idx), new_s_pos, new_s_neg}}
      else
        {:cont, {false, nil, new_s_pos, new_s_neg}}
      end
    end)

  %{
    drift_detected: drift_detected,
    change_point: change_point,
    drift_magnitude: max(s_pos, s_neg),
    alert_level: classify_alert(max(s_pos, s_neg), threshold)
  }
end

4. Label Bias Detection

Purpose: Identify bias in training labels themselves.

Algorithm (Label Distribution Analysis):

Input: labels, features, sensitive_attr
Output: bias_indicators

1. For each sensitive group:
   a. Compute label distribution:
      P(Y=1 | A=a, X=x)

   b. For similar feature vectors across groups:
      Find pairs (x_i, x_j) where d(x_i, x_j) < threshold
      but A_i  A_j

   c. Compute label discrepancy:
      discrepancy = |Y_i - Y_j| for similar pairs

2. Statistical test:
   H0: No label bias
   H1: Label bias exists

   Test if discrepancy is significantly greater than
   expected by chance

Implementation:

def detect_label_bias(labels, features, sensitive_attr, opts \\ []) do
  similarity_threshold = Keyword.get(opts, :similarity_threshold, 0.1)
  min_pairs = Keyword.get(opts, :min_pairs, 100)

  # Find similar cross-group pairs
  similar_pairs = find_similar_cross_group_pairs(
    features,
    sensitive_attr,
    similarity_threshold
  )

  # Compute label discrepancies
  discrepancies = Enum.map(similar_pairs, fn {i, j} ->
    abs(Nx.to_number(labels[i]) - Nx.to_number(labels[j]))
  end)

  mean_discrepancy = Enum.sum(discrepancies) / length(discrepancies)

  # Baseline: discrepancy for random pairs
  random_pairs = sample_random_pairs(length(similar_pairs))
  baseline_discrepancies = Enum.map(random_pairs, fn {i, j} ->
    abs(Nx.to_number(labels[i]) - Nx.to_number(labels[j]))
  end)
  baseline_mean = Enum.sum(baseline_discrepancies) / length(baseline_discrepancies)

  # Statistical test
  {t_stat, p_value} = t_test(discrepancies, baseline_discrepancies)

  %{
    mean_discrepancy: mean_discrepancy,
    baseline_discrepancy: baseline_mean,
    excess_discrepancy: mean_discrepancy - baseline_mean,
    test_statistic: t_stat,
    p_value: p_value,
    bias_detected: p_value < 0.05
  }
end

Bias Mitigation Algorithms

1. Reweighting (Pre-processing)

Purpose: Adjust training sample weights to achieve fairness.

Algorithm:

Input: data, sensitive_attr, target_metric
Output: weights

1. Compute group and label combinations:
   groups = {(A=0, Y=0), (A=0, Y=1), (A=1, Y=0), (A=1, Y=1)}

2. For each combination (a, y):
   P_a_y = P(A=a, Y=y)  # Observed probability
   P_a = P(A=a)
   P_y = P(Y=y)

3. Compute ideal weights for demographic parity:
   w(a, y) = P_y / P_a_y

   # Intuition: Up-weight underrepresented combinations

4. Normalize weights:
   weights = weights / mean(weights)

Implementation:

def reweight(labels, sensitive_attr, opts \\ []) do
  target = Keyword.get(opts, :target, :demographic_parity)

  # Compute observed probabilities
  n = Nx.size(labels)

  group_label_counts = %{
    {0, 0} => count_where(sensitive_attr, 0, labels, 0),
    {0, 1} => count_where(sensitive_attr, 0, labels, 1),
    {1, 0} => count_where(sensitive_attr, 1, labels, 0),
    {1, 1} => count_where(sensitive_attr, 1, labels, 1)
  }

  # Compute probabilities
  probs = Map.new(group_label_counts, fn {{a, y}, count} ->
    {{a, y}, count / n}
  end)

  # Marginal probabilities
  p_a0 = probs[{0, 0}] + probs[{0, 1}]
  p_a1 = probs[{1, 0}] + probs[{1, 1}]
  p_y0 = probs[{0, 0}] + probs[{1, 0}]
  p_y1 = probs[{0, 1}] + probs[{1, 1}]

  # Compute weights
  weights = Nx.broadcast(0.0, labels)

  weights = case target do
    :demographic_parity ->
      # w(a,y) = P(y) / P(a,y)
      compute_demographic_parity_weights(sensitive_attr, labels, probs, p_y0, p_y1)

    :equalized_odds ->
      # w(a,y) = P(y) / (P(a) * P(a,y))
      compute_equalized_odds_weights(sensitive_attr, labels, probs, p_a0, p_a1, p_y0, p_y1)
  end

  # Normalize
  weights = Nx.divide(weights, Nx.mean(weights))

  weights
end

2. Threshold Optimization (Post-processing)

Purpose: Find group-specific decision thresholds to satisfy fairness constraints.

Algorithm (for Equalized Odds):

Input: probabilities, labels, sensitive_attr
Output: threshold_A, threshold_B

1. Define objective:
   Maximize accuracy subject to:
   |TPR_A - TPR_B|  ε
   |FPR_A - FPR_B|  ε

2. Grid search over threshold pairs:
   For t_A in [0, 1]:
     For t_B in [0, 1]:
       predictions_A = (probs_A >= t_A)
       predictions_B = (probs_B >= t_B)

       TPR_A, FPR_A = compute_rates(predictions_A, labels_A)
       TPR_B, FPR_B = compute_rates(predictions_B, labels_B)

       if |TPR_A - TPR_B|  ε and |FPR_A - FPR_B|  ε:
         accuracy = compute_accuracy(predictions_A, predictions_B, labels)
         if accuracy > best_accuracy:
           best = (t_A, t_B, accuracy)

3. Return best thresholds

Implementation:

def optimize_thresholds(probabilities, labels, sensitive_attr, opts \\ []) do
  target_metric = Keyword.get(opts, :target_metric, :equalized_odds)
  epsilon = Keyword.get(opts, :epsilon, 0.05)
  grid_size = Keyword.get(opts, :grid_size, 100)

  group_a_mask = Nx.equal(sensitive_attr, 0)
  group_b_mask = Nx.equal(sensitive_attr, 1)

  probs_a = Nx.select(group_a_mask, probabilities, 0.0)
  probs_b = Nx.select(group_b_mask, probabilities, 0.0)
  labels_a = Nx.select(group_a_mask, labels, 0)
  labels_b = Nx.select(group_b_mask, labels, 0)

  # Grid search
  thresholds = Nx.linspace(0.0, 1.0, n: grid_size)

  best = Enum.reduce(thresholds, {nil, 0.0}, fn t_a, {best_thresholds, best_acc} ->
    Enum.reduce(thresholds, {best_thresholds, best_acc}, fn t_b, {curr_best, curr_acc} ->
      preds_a = Nx.greater_equal(probs_a, t_a)
      preds_b = Nx.greater_equal(probs_b, t_b)

      {tpr_a, fpr_a} = compute_rates(preds_a, labels_a)
      {tpr_b, fpr_b} = compute_rates(preds_b, labels_b)

      satisfies_constraint = case target_metric do
        :equalized_odds ->
          abs(tpr_a - tpr_b) <= epsilon and abs(fpr_a - fpr_b) <= epsilon
        :equal_opportunity ->
          abs(tpr_a - tpr_b) <= epsilon
      end

      if satisfies_constraint do
        accuracy = compute_overall_accuracy(preds_a, preds_b, labels_a, labels_b)
        if accuracy > curr_acc do
          {{t_a, t_b}, accuracy}
        else
          {curr_best, curr_acc}
        end
      else
        {curr_best, curr_acc}
      end
    end)
  end)

  {thresholds, accuracy} = best

  %{
    group_a_threshold: elem(thresholds, 0),
    group_b_threshold: elem(thresholds, 1),
    accuracy: accuracy
  }
end

3. Adversarial Debiasing (In-processing)

Purpose: Train a model to maximize accuracy while minimizing an adversary's ability to predict sensitive attributes.

Algorithm:

Input: features X, labels Y, sensitive_attr A
Output: fair_model

1. Model architecture:
   - Predictor: f(X) -> Ŷ
   - Adversary: g(f(X)) -> Â

2. Loss function:
   L = L_prediction(Ŷ, Y) - λ * L_adversary(Â, A)

   where:
   - L_prediction: Standard loss (cross-entropy, MSE)
   - L_adversary: Adversary loss (tries to predict A from f(X))
   - λ: Adversarial strength parameter

3. Training:
   Alternate between:
   a. Update predictor: Minimize L w.r.t. predictor parameters
   b. Update adversary: Maximize L_adversary w.r.t. adversary parameters

4. At convergence:
   - Predictor is accurate
   - Adversary cannot predict sensitive attribute from predictor's representations

Implementation (Axon integration):

def adversarial_debiasing(features, labels, sensitive_attr, opts \\ []) do
  adversary_strength = Keyword.get(opts, :adversary_strength, 0.5)
  hidden_dim = Keyword.get(opts, :hidden_dim, 64)

  # Predictor network
  predictor = Axon.input("features")
  |> Axon.dense(hidden_dim, activation: :relu, name: "predictor_hidden")
  |> Axon.dense(1, activation: :sigmoid, name: "predictor_output")

  # Adversary network (takes predictor hidden layer)
  adversary = Axon.nx(predictor, & &1["predictor_hidden"])
  |> Axon.dense(32, activation: :relu)
  |> Axon.dense(1, activation: :sigmoid, name: "adversary_output")

  # Combined loss
  loss_fn = fn predictor_out, adversary_out, y, a ->
    prediction_loss = Axon.Losses.binary_cross_entropy(y, predictor_out)
    adversary_loss = Axon.Losses.binary_cross_entropy(a, adversary_out)

    prediction_loss - adversary_strength * adversary_loss
  end

  # Training loop
  # ... (Alternating updates to predictor and adversary)

  predictor
end

4. Fair Representation Learning

Purpose: Learn a representation of data that is independent of sensitive attributes but useful for prediction.

Algorithm (Variational Fair Autoencoder):

Input: features X, sensitive_attr A
Output: encoder, decoder

1. Encoder: q(Z | X)
   Maps X to latent representation Z

2. Decoder: p(X | Z)
   Reconstructs X from Z

3. Loss function:
   L = L_reconstruction + L_KL + λ * L_independence

   where:
   - L_reconstruction = -E[log p(X | Z)]
   - L_KL = KL(q(Z|X) || p(Z))  # VAE regularization
   - L_independence = MMD(Z[A=0], Z[A=1])  # Maximum Mean Discrepancy

4. Training:
   Minimize L to get representations Z that:
   - Preserve information (reconstruction)
   - Are regularized (KL divergence)
   - Are independent of A (MMD)

Implementation:

def fair_representation(features, sensitive_attr, opts \\ []) do
  latent_dim = Keyword.get(opts, :latent_dim, 32)
  lambda = Keyword.get(opts, :independence_weight, 1.0)

  # Encoder
  encoder = Axon.input("features")
  |> Axon.dense(64, activation: :relu)
  |> Axon.dense(latent_dim * 2)  # Mean and log-variance

  # Sampling layer
  {z_mean, z_log_var} = split_encoder_output(encoder)
  z = sample_z(z_mean, z_log_var)

  # Decoder
  decoder = z
  |> Axon.dense(64, activation: :relu)
  |> Axon.dense(feature_dim, activation: :sigmoid)

  # MMD loss
  mmd_loss = fn z, sensitive ->
    z_group_0 = Nx.select(Nx.equal(sensitive, 0), z, 0)
    z_group_1 = Nx.select(Nx.equal(sensitive, 1), z, 0)

    maximum_mean_discrepancy(z_group_0, z_group_1)
  end

  # Total loss
  loss = fn x, x_recon, z_mean, z_log_var, z, a ->
    recon_loss = binary_cross_entropy(x, x_recon)
    kl_loss = -0.5 * Nx.sum(1 + z_log_var - Nx.power(z_mean, 2) - Nx.exp(z_log_var))
    independence_loss = mmd_loss.(z, a)

    recon_loss + kl_loss + lambda * independence_loss
  end

  # Train and return encoder
  {encoder, decoder}
end

Algorithm Selection Guide

graph TD
    A[Start] --> B{When to apply mitigation?}

    B -->|Before training| C[Pre-processing]
    B -->|During training| D[In-processing]
    B -->|After training| E[Post-processing]

    C --> C1[Reweighting]
    C --> C2[Resampling]
    C --> C3[Fair Representation Learning]

    D --> D1[Adversarial Debiasing]
    D --> D2[Fairness Constraints]

    E --> E1[Threshold Optimization]
    E --> E2[Calibration]

    C1 --> F{Can retrain model?}
    C2 --> F
    C3 --> F
    D1 --> G[Yes - Use in-processing]
    D2 --> G
    E1 --> H[No - Use post-processing]
    E2 --> H

    F -->|Yes| I[Use pre/in-processing]
    F -->|No| J[Use post-processing only]

Performance Considerations

Computational Complexity

AlgorithmComplexityNotes
ReweightingO(n)Very fast
Threshold OptimizationO(n × g²)g = grid size
Adversarial DebiasingO(n × epochs × layer_size)Training time
Fair RepresentationO(n × epochs × layer_size)Training time
Intersectional DetectionO(n × 2^k)k = number of attributes

Scalability

For large datasets:

  1. Use sampling for grid search
  2. Parallelize intersectional analysis
  3. Use mini-batch training for neural approaches
  4. Leverage GPU acceleration via EXLA

References

  1. Kamiran, F., & Calders, T. (2012). Data preprocessing techniques for classification without discrimination. KAIS.
  2. Hardt, M., et al. (2016). Equality of opportunity in supervised learning. NeurIPS.
  3. Zhang, B. H., et al. (2018). Mitigating unwanted biases with adversarial learning. AIES.
  4. Louizos, C., et al. (2016). The variational fair autoencoder. ICLR.
  5. Feldman, M., et al. (2015). Certifying and removing disparate impact. KDD.