Block Decomposition Method (BDM)

View Source
Mix.install([
  {:nx, "~> 0.6"},
  {:kino, "~> 0.11"},
  {:explorer, "~> 0.7"},
  {:vega_lite, "~> 0.1"},
  {:kino_vega_lite, "~> 0.1"},
  {:bdm, "~> 0.3.1"}
])

alias VegaLite, as: Vl

Introduction

The Block Decomposition Method (BDM) is a computational approach developed by Hector Zenil and colleagues to approximate the algorithmic complexity of datasets. This method extends the Coding Theorem Method (CTM) to handle larger datasets by decomposing them into smaller blocks and aggregating their complexities.

Key Concepts

  • Algorithmic complexity (also known as Kolmogorov complexity): Measures the length of the shortest computer program that can generate a given dataset. While this is theoretically uncomputable, BDM provides a practical approximation.
  • Algorithmic Probability: Theoretical foundation based on Solomonoff-Levin theory. The algorithmic probability of an object x is the probability AP of a binary computer program p producing x by chance running on a Turing-complete computer language L and halting.
  • Coding Theorem Method (CTM): Uses precomputed output frequency distributions from Turing machines
  • Block Decomposition: Breaks large datasets into manageable blocks

Interactive Examples

Example 1: Simple Binary Sequences

random_seq = for _ <- 1..200, do: Enum.random([0, 1])
ordered_seq = List.duplicate([0, 1], 100) |> List.flatten()
constant_seq = List.duplicate(0, 200)
pattern_seq = [0, 0, 1] |> List.duplicate(70) |> List.flatten() |> Enum.take(200)

bdm = BDM.new(1, 2, 3, :ignore)

Kino.DataTable.new(
  [
    %{"Pattern" => "random", "BDM" => BDM.compute(bdm, random_seq)},
    %{"Pattern" => "ordered", "BDM" => BDM.compute(bdm, ordered_seq)},
    %{"Pattern" => "constant", "BDM" => BDM.compute(bdm, constant_seq)},
    %{"Pattern" => "pattern", "BDM" => BDM.compute(bdm, pattern_seq)}
  ],
  keys: ["Pattern", "BDM"],
  name: "BDM of binary sequences"
)

Example 2: 2D Pattern Analysis

defmodule TestData do
  def create_2d_pattern(:random, size) do
      for _ <- 1..size, do: for(_ <- 1..size, do: Enum.random([0, 1]))
  end

  def create_2d_pattern(:checkerboard, size) do
    for i <- 0..(size-1) do
      for j <- 0..(size-1) do
        rem(i + j, 2)
      end
    end
  end

  def create_2d_pattern(:diagonal, size) do
    for i <- 0..(size-1) do
      for j <- 0..(size-1) do
        if i == j, do: 1, else: 0
      end
    end
  end
 
  def create_2d_pattern(:constant, size) do
    for _ <- 1..size, do: List.duplicate(0, size)
  end
end

patterns = %{
  random: TestData.create_2d_pattern(:random, 8),
  checkerboard: TestData.create_2d_pattern(:checkerboard, 8),
  diagonal: TestData.create_2d_pattern(:diagonal, 8),
  constant: TestData.create_2d_pattern(:constant, 8)
}
defmodule Visualizer do
  def create_heatmap(image, title) do
    data = image
    |> Enum.with_index()
    |> Enum.flat_map(fn {row, i} ->
      row
      |> Enum.with_index()
      |> Enum.map(fn {value, j} ->
        %{x: j, y: i, value: value, image: title}
      end)
    end)

    Vl.new(width: 200, height: 200, title: title)
    |> Vl.data_from_values(data)
    |> Vl.mark(:rect)
    |> Vl.encode_field(:x, "x", type: :ordinal, title: "Column")
    |> Vl.encode_field(:y, "y", type: :ordinal, title: "Row", sort: :descending)
    |> Vl.encode_field(:color, "value", type: :quantitative, scale: [scheme: "viridis"], title: "Value")
  end
end

random = Visualizer.create_heatmap(patterns.random, "Random")
checkerboard = Visualizer.create_heatmap(patterns.checkerboard, "Checkerboard")
diagonal = Visualizer.create_heatmap(patterns.diagonal, "Diagonal")
constant = Visualizer.create_heatmap(patterns.constant, "Constant")

Kino.Layout.grid([random, checkerboard, diagonal, constant], columns: 2)

Calculate BDM for each pattern:

bdm = BDM.new(2, 2, 2, :ignore)

pattern_viz = 
  patterns
  |> Enum.map(fn {name, pattern} ->
    bdm_value = BDM.compute(bdm, pattern)
    %{"Pattern" => Atom.to_string(name), "BDM" => bdm_value}
  end)

Kino.DataTable.new(
  pattern_viz,
  keys: ["Pattern", "BDM"],
  name: "BDM of 2D patterns"
)

Example 3: Interactive Visualization

bdm = BDM.new(1, 2, 3, :ignore)

sequence_lengths = [10, 20, 30, 40, 50]
complexity_data = 
  for len <- sequence_lengths do
    random_seq = for _ <- 1..len, do: Enum.random([0, 1])
    ordered_seq =
      List.duplicate([0, 1], div(len, 2))
      |> List.flatten()
      |> Enum.take(len)
    constant_seq = List.duplicate(0, len)
    
    %{
      length: len,
      random: BDM.compute(bdm, random_seq),
      ordered: BDM.compute(bdm, ordered_seq),
      constant: BDM.compute(bdm, constant_seq)
    }
  end

viz_data = 
  complexity_data
  |> Enum.flat_map(fn row ->
    [
      %{length: row.length, complexity: row.random, type: "Random"},
      %{length: row.length, complexity: row.ordered, type: "Ordered"},
      %{length: row.length, complexity: row.constant, type: "Constant"}
    ]
  end)

Vl.new(width: 600, height: 400)
|> Vl.data_from_values(viz_data)
|> Vl.mark(:line, point: true)
|> Vl.encode_field(:x, "length", type: :quantitative, title: "Sequence Length")
|> Vl.encode_field(:y, "complexity", type: :quantitative, title: "BDM Complexity")
|> Vl.encode_field(:color, "type", type: :nominal, title: "Sequence Type")
|> Vl.config(legend: [orient: "top"])

Advanced Features

Block Size Analysis

Analyzing how block size affects BDM estimation:

block_sizes = 2..12
block_analysis_viz = 
  for block_size <- block_sizes do
    bdm = BDM.new(1, 2, block_size, :ignore)
    bdm_value = BDM.compute(bdm, pattern_seq)
    %{"Block Size" => block_size, "BDM" => bdm_value}
  end

Kino.DataTable.new(
  block_analysis_viz,
  keys: ["Block Size", "BDM"],
  name: "Block Size Analysis"
)
Vl.new(width: 800, height: 400, title: "Block Size Analysis")
|> Vl.data_from_values(block_analysis_viz)
|> Vl.mark(:line, point: true)
|> Vl.encode_field(:x, "Block Size", type: :quantitative)
|> Vl.encode_field(:y, "BDM", type: :quantitative)
|> Vl.config(legend: [orient: "top"])

Comparative Analysis with Shannon Entropy and LZW

Utility modules:

defmodule Entropy do
  def shannon_entropy(data) do
    frequencies = Enum.frequencies(data)
    total = length(data)
    
    frequencies
    |> Enum.map(fn {_, count} ->
      p = count / total
      -p * :math.log2(p)
    end)
    |> Enum.sum()
  end
end

defmodule LZW do
  def compress_list(data) when is_list(data) do
    unique_elements = data |> Enum.uniq()
    initial_dict = 
      unique_elements
      |> Enum.with_index()
      |> Enum.into(%{}, fn {elem, idx} -> {[elem], idx} end)
    
    next_code = map_size(initial_dict)
    
    {result, _dict, _code} = compress_helper(data, [], initial_dict, next_code, [])
    Enum.reverse(result)
  end

  def compress_matrix(matrix) when is_list(matrix) do
    rows = length(matrix)
    cols = if rows > 0, do: length(hd(matrix)), else: 0
    
    flat_data = List.flatten(matrix)
    compressed = compress_list(flat_data)
    
    {compressed, {rows, cols}}
  end

  defp compress_helper([], current_string, dict, next_code, result) do
    case current_string do
      [] -> {result, dict, next_code}
      _ -> 
        code = Map.get(dict, current_string)
        {[code | result], dict, next_code}
    end
  end

  defp compress_helper([char | rest], current_string, dict, next_code, result) do
    new_string = current_string ++ [char]
    
    case Map.get(dict, new_string) do
      nil ->
        current_code = Map.get(dict, current_string)
        new_dict = Map.put(dict, new_string, next_code)
        compress_helper(rest, [char], new_dict, next_code + 1, [current_code | result])
      
      _code ->
        compress_helper(rest, new_string, dict, next_code, result)
    end
  end
end

Comparison:

bdm = BDM.new(1, 2, 3, :ignore)

comparison_data = [
  {constant_seq, "constant"},
  {random_seq, "random"},
  {ordered_seq, "alternating"},
  {pattern_seq, "pattern 001"}
]

data_viz =
  for {sequence, name} <- comparison_data do
    bdm_val = BDM.compute(bdm, sequence)
    entropy_val = Entropy.shannon_entropy(sequence)
    lzw_val = length(LZW.compress_list(sequence)) / length(sequence)
    %{
      "Pattern" => name,
      "BDM" => Float.round(bdm_val, 2),
      "Entropy" => Float.round(entropy_val, 2),
      "LZW" => Float.round(lzw_val, 2),
    }
  end

Kino.DataTable.new(
  data_viz,
  keys: ["Pattern", "BDM", "Entropy", "LZW"],
  name: "BDM vs Shannon Entropy and LZW Comparison"
)

Real-World Applications

Cellular Automata Analysis

defmodule CellularAutomata do
  def rule_30(initial_state, steps) do
    evolve(initial_state, steps, &rule_30_step/1)
  end
  
  defp rule_30_step(state) do
    len = length(state)
    for i <- 0..(len-1) do
      left = Enum.at(state, rem(i - 1 + len, len))
      center = Enum.at(state, i)
      right = Enum.at(state, rem(i + 1, len))
      
      case {left, center, right} do
        {1, 1, 1} -> 0
        {1, 1, 0} -> 0
        {1, 0, 1} -> 0
        {1, 0, 0} -> 1
        {0, 1, 1} -> 1
        {0, 1, 0} -> 1
        {0, 0, 1} -> 1
        {0, 0, 0} -> 0
      end
    end
  end
  
  defp evolve(state, 0, _rule), do: [state]
  defp evolve(state, steps, rule) do
    next_state = rule.(state)
    [state | evolve(next_state, steps - 1, rule)]
  end
end

initial_state = [0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0]
evolution = CellularAutomata.rule_30(initial_state, 20)

bdm = BDM.new(1, 2, 2, :ignore)

# Calculate BDM for each step
evolution_complexity = 
  evolution
  |> Enum.with_index()
  |> Enum.map(fn {state, step} ->
    bdm_val = BDM.compute(bdm, state)
    %{step: step, complexity: bdm_val}
  end)

viz_data = 
  evolution_complexity
  |> Enum.map(fn row ->
      %{step: row.step, complexity: Float.round(row.complexity, 2)}
  end)

Vl.new(width: 600, height: 400, title: "Cellular Automata Evolution Complexity")
|> Vl.data_from_values(viz_data)
|> Vl.mark(:line, point: true)
|> Vl.encode_field(:x, "step", type: :quantitative, title: "Step")
|> Vl.encode_field(:y, "complexity", type: :quantitative, title: "BDM Complexity")
|> Vl.config(legend: [orient: "top"])

Performance Considerations

Memory and Time Complexity

defmodule Benchmark do
  def time(func) do
    start_time = System.monotonic_time(:millisecond)
    result = func.()
    end_time = System.monotonic_time(:millisecond)
    {result, end_time - start_time}
  end
end

bdm = BDM.new(1, 2, 2, :ignore)

viz_data = for len <- 10000..100000//5000 do
  test_seq = for _ <- 1..len, do: Enum.random([0, 1])
  {_, time_ms} = Benchmark.time(fn -> BDM.compute(bdm, test_seq) end)
  %{
    "l" => len,
    "t" => time_ms
  }
end

Vl.new(width: 600, height: 400, title: "Performance Analysis")
|> Vl.data_from_values(viz_data)
|> Vl.mark(:line, point: true)
|> Vl.encode_field(:x, "l", type: :quantitative, title: "Dataset Length")
|> Vl.encode_field(:y, "t", type: :quantitative, title: "Time (ms)")
|> Vl.config(legend: [orient: "top"])

Perturbation Analysis

alias BDM.PerturbationAnalysis

Understanding Sensitivity to Changes

Perturbation analysis reveals how robust BDM complexity estimates are to small changes in the data. This is crucial for understanding the stability of complexity measures and identifying critical structures.

Interactive Perturbation Visualization

Creating 1D test patterns for perturbation analysis and analyzing sensitivity for each pattern:

test_patterns_1d = %{
  structured: [0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1],
  random: Enum.take(Stream.repeatedly(fn -> Enum.random([0, 1]) end), 16),
  mostly_zeros: [0, 0, 0, 0, 1, 0, 0, 0, 0, 1, 0, 0, 0, 0, 1, 0],
  clustered: [1, 1, 1, 1, 0, 0, 0, 0, 1, 1, 1, 1, 0, 0, 0, 0]
}

bdm_1d = BDM.new(1, 2, 3, :ignore)

sensitivity_analyses = 
  for {name, pattern} <- test_patterns_1d do
    sensitivity_profile = PerturbationAnalysis.sensitivity_profile(bdm_1d, pattern)
    {name, pattern, sensitivity_profile}
  end

sensitivity_data =
  for {name, _, profile} <- sensitivity_analyses do
    max_sensitivity =
      profile
      |> Enum.map(&(&1.sensitivity))
      |> Enum.max()
    avg_sensitivity =
      profile
      |> Enum.map(&(&1.sensitivity))
      |> Enum.sum()
      |> Kernel./(length(profile))

    %{
      "Pattern" => Atom.to_string(name),
      "Max sensitivity" => Float.round(max_sensitivity, 3),
      "Avg sensitivity" => Float.round(avg_sensitivity, 3)
    }
  end

Kino.DataTable.new(
  sensitivity_data,
  name: "Sensitivity Analysis",
  keys: ["Pattern", "Max sensitivity", "Avg sensitivity"]
)
viz_data = 
  for {name, _pattern, profile} <- sensitivity_analyses do
    profile
    |> Enum.map(fn point ->
      Map.put(point, :pattern, to_string(name))
    end)
  end
  |> List.flatten()

Vl.new(width: 700, height: 400, title: "Position Sensitivity Analysis")
|> Vl.data_from_values(viz_data)
|> Vl.mark(:line, point: true, stroke_width: 2)
|> Vl.encode_field(:x, "position", type: :quantitative, title: "Bit Position")
|> Vl.encode_field(:y, "sensitivity", type: :quantitative, title: "Sensitivity (|ΔBDM|)")
|> Vl.encode_field(:color, "pattern", type: :nominal, title: "Pattern Type")
|> Vl.encode_field(:stroke_dash, "pattern", type: :nominal)

Noise Level Analysis

Analyze how different noise levels affect complexity:

noise_levels = [0.05, 0.1, 0.2, 0.3, 0.4, 0.5]
base_pattern = [0, 1, 0, 1, 0, 1, 0, 1] |> List.duplicate(4) |> List.flatten()

bdm = BDM.new(1, 2, 3, :ignore)

noise_analysis_data = 
  for noise_level <- noise_levels do
    # Generate multiple perturbations at this noise level
    perturbations = PerturbationAnalysis.random_perturbations(bdm, base_pattern, 20, noise_level)
    {original_bdm, results} = PerturbationAnalysis.calculate_perturbation_effects(bdm, base_pattern, perturbations)
    
    # Calculate statistics
    delta_bdms = Enum.map(results, &(&1.delta_bdm))
    mean_delta = Enum.sum(delta_bdms) / length(delta_bdms)
    std_delta = :math.sqrt(Enum.sum(Enum.map(delta_bdms, fn x -> (x - mean_delta) * (x - mean_delta) end)) / length(delta_bdms))
    
    %{
      "Noise%" => trunc(noise_level * 100),
      "BDM" => Float.round(original_bdm, 2),
      "Δ" => Float.round(std_delta, 2),
      "Mean Δ" => Float.round(mean_delta, 2),
      "Min Δ" => Float.round(Enum.min(delta_bdms), 2),
      "Max Δ" => Float.round(Enum.max(delta_bdms), 2)
    }
  end

Kino.DataTable.new(
  noise_analysis_data,
  name: "Noise Level Analysis",
  keys: ["Noise%", "BDM", "Δ", "Mean Δ", "Min Δ", "Max Δ"]
)

Noise level impact (how complexity changes scale with perturbation intensity) visualization:

viz_data = 
  noise_analysis_data
  |> Enum.flat_map(fn result ->
    noise_level = Map.get(result, "Noise%")
    [
      %{noise_level: noise_level, value: Map.get(result, "Mean Δ"), measure: "Mean Change"},
      %{noise_level: noise_level, value: Map.get(result, "Δ"), measure: "Std Deviation"},
      %{noise_level: noise_level, value: Map.get(result, "Max Δ"), measure: "Max Change"}
    ]
  end)

Vl.new(width: 600, height: 400, title: "BDM Response to Noise Levels")
|> Vl.data_from_values(viz_data)
|> Vl.mark(:line, point: true)
|> Vl.encode_field(:x, "noise_level", type: :quantitative, title: "Noise Level (%)")
|> Vl.encode_field(:y, "value", type: :quantitative, title: "BDM Change")
|> Vl.encode_field(:color, "measure", type: :nominal, title: "Measure")
|> Vl.config(legend: [orient: "top"])

Critical Point Detection

Analyze critical points (where structural changes have maximum impact) for structured pattern:

structured_pattern = [0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1]
sensitivity_profile = PerturbationAnalysis.sensitivity_profile(bdm, structured_pattern)
critical_points = PerturbationAnalysis.detect_critical_positions(sensitivity_profile, 0.5)

critical_point_data =
  for point <- Enum.take(critical_points, 5) do
    %{
      "Position" => point.position,
      "Sensitivity" => Float.round(point.sensitivity, 3),
      "Relative" => Float.round(point.relative_sensitivity, 3)
    }
end

Kino.DataTable.new(
  critical_point_data,
  name: "Critical Points Analysis",
  keys: ["Position", "Sensitivity", "Relative"]
)

Perturbation Phase Space

Phase space visualization showing relationship between original complexity and perturbation effects:

viz_data = 
  for {name, pattern, _} <- sensitivity_analyses do
    original_bdm = BDM.compute(bdm, pattern)
    perturbations = PerturbationAnalysis.single_bit_perturbations(bdm, pattern)
    
    perturbations
    |> Enum.with_index()
    |> Enum.map(fn {perturbed, index} ->
      perturbed_bdm =  BDM.compute(bdm, perturbed)
      hamming_distance = Enum.zip(pattern, perturbed) 
                        |> Enum.count(fn {a, b} -> a != b end)
      
      %{
        pattern: to_string(name),
        original_bdm: original_bdm,
        perturbed_bdm: perturbed_bdm,
        delta_bdm: perturbed_bdm - original_bdm,
        hamming_distance: hamming_distance,
        position: index
      }
    end)
  end
  |> List.flatten()

Vl.new(width: 600, height: 400, title: "Perturbation Phase Space")
|> Vl.data_from_values(viz_data)
|> Vl.mark(:circle, size: 60, opacity: 0.7)
|> Vl.encode_field(:x, "original_bdm", type: :quantitative, title: "Original BDM")
|> Vl.encode_field(:y, "perturbed_bdm", type: :quantitative, title: "Perturbed BDM")
|> Vl.encode_field(:color, "pattern", type: :nominal, title: "Pattern Type")
|> Vl.encode_field(:size, "hamming_distance", type: :quantitative, title: "Hamming Distance")

Robustness Analysis

Compare robustness (which pattern types are most robust to perturbations) across different pattern types:

robustness_comparison = 
  for {name, pattern} <- test_patterns_1d do
    analysis = PerturbationAnalysis.stability_coefficient(bdm, pattern, 30, 0.15)
    Map.put(analysis, :pattern, name)
  end

viz_data = 
  robustness_comparison
  |> Enum.map(fn result ->
    %{
      pattern: to_string(result.pattern),
      stability_score: result.stability_score,
      coefficient_variation: result.coefficient_of_variation
    }
  end)

Vl.new(width: 500, height: 300, title: "Pattern Robustness Analysis")
|> Vl.data_from_values(viz_data)
|> Vl.mark(:bar)
|> Vl.encode_field(:x, "pattern", type: :nominal, title: "Pattern Type")
|> Vl.encode_field(:y, "stability_score", type: :quantitative, title: "Stability Score")
|> Vl.encode_field(:color, "pattern", type: :nominal)

Perturbation landscape

2D landscape:

bdm = BDM.new(2, 2, 2, :ignore)

landscape = PerturbationAnalysis.perturbation_landscape(bdm, patterns.random, 2)
defmodule LandscapeVisualizer do
  def extract_max_effects(landscape_result, flip_count \\ 1) do
    for point <- landscape_result do
      effect = Enum.find(point.effects, fn e -> e.num_flips == flip_count end)
      max_effect = if effect, do: effect.max_effect, else: 0.0
      max_row = if length(landscape_result) > 0 do
          landscape_result |> Enum.map(& &1.center_row) |> Enum.max()
        else
          0
        end
      size = max_row + 2  # Add 1 for 0-based indexing + 1 for radius
      
      %{
        x: point.center_col,
        y: size - 1 - point.center_row,  # Flip y coordinate
        max_effect: max_effect,
        abs_effect: abs(max_effect)
      }
    end
  end
  
  def create_landscape_heatmap(data, title) do
    Vl.new(width: 300, height: 300, title: title)
    |> Vl.data_from_values(data)
    |> Vl.mark(:rect)
    |> Vl.encode_field(:x, "x", type: :ordinal, title: "Column")
    |> Vl.encode_field(:y, "y", type: :ordinal, title: "Row")
    |> Vl.encode_field(:color, "abs_effect", 
        type: :quantitative,
        scale: [scheme: "viridis"],
        legend: [title: "Perturbation Effect"])
    |> Vl.encode(:tooltip, [
      [field: "x", type: :quantitative],
      [field: "y", type: :quantitative], 
      [field: "max_effect", type: :quantitative, title: "Max Effect"],
      [field: "abs_effect", type: :quantitative, title: "Abs Effect"]
    ])
  end
end

plot_data = LandscapeVisualizer.extract_max_effects(landscape, 1)
LandscapeVisualizer.create_landscape_heatmap(
  plot_data, 
  "Perturbation Landscape for random (1-bit flips)"
)