Block Decomposition Method (BDM)
View SourceMix.install([
{:nx, "~> 0.6"},
{:kino, "~> 0.11"},
{:explorer, "~> 0.7"},
{:vega_lite, "~> 0.1"},
{:kino_vega_lite, "~> 0.1"},
{:bdm, "~> 0.3.1"}
])
alias VegaLite, as: VlIntroduction
The Block Decomposition Method (BDM) is a computational approach developed by Hector Zenil and colleagues to approximate the algorithmic complexity of datasets. This method extends the Coding Theorem Method (CTM) to handle larger datasets by decomposing them into smaller blocks and aggregating their complexities.
Key Concepts
- Algorithmic complexity (also known as Kolmogorov complexity): Measures the length of the shortest computer program that can generate a given dataset. While this is theoretically uncomputable, BDM provides a practical approximation.
- Algorithmic Probability: Theoretical foundation based on Solomonoff-Levin theory. The algorithmic probability of an object x is the probability AP of a binary computer program p producing x by chance running on a Turing-complete computer language L and halting.
- Coding Theorem Method (CTM): Uses precomputed output frequency distributions from Turing machines
- Block Decomposition: Breaks large datasets into manageable blocks
Interactive Examples
Example 1: Simple Binary Sequences
random_seq = for _ <- 1..200, do: Enum.random([0, 1])
ordered_seq = List.duplicate([0, 1], 100) |> List.flatten()
constant_seq = List.duplicate(0, 200)
pattern_seq = [0, 0, 1] |> List.duplicate(70) |> List.flatten() |> Enum.take(200)
bdm = BDM.new(1, 2, 3, :ignore)
Kino.DataTable.new(
[
%{"Pattern" => "random", "BDM" => BDM.compute(bdm, random_seq)},
%{"Pattern" => "ordered", "BDM" => BDM.compute(bdm, ordered_seq)},
%{"Pattern" => "constant", "BDM" => BDM.compute(bdm, constant_seq)},
%{"Pattern" => "pattern", "BDM" => BDM.compute(bdm, pattern_seq)}
],
keys: ["Pattern", "BDM"],
name: "BDM of binary sequences"
)Example 2: 2D Pattern Analysis
defmodule TestData do
def create_2d_pattern(:random, size) do
for _ <- 1..size, do: for(_ <- 1..size, do: Enum.random([0, 1]))
end
def create_2d_pattern(:checkerboard, size) do
for i <- 0..(size-1) do
for j <- 0..(size-1) do
rem(i + j, 2)
end
end
end
def create_2d_pattern(:diagonal, size) do
for i <- 0..(size-1) do
for j <- 0..(size-1) do
if i == j, do: 1, else: 0
end
end
end
def create_2d_pattern(:constant, size) do
for _ <- 1..size, do: List.duplicate(0, size)
end
end
patterns = %{
random: TestData.create_2d_pattern(:random, 8),
checkerboard: TestData.create_2d_pattern(:checkerboard, 8),
diagonal: TestData.create_2d_pattern(:diagonal, 8),
constant: TestData.create_2d_pattern(:constant, 8)
}defmodule Visualizer do
def create_heatmap(image, title) do
data = image
|> Enum.with_index()
|> Enum.flat_map(fn {row, i} ->
row
|> Enum.with_index()
|> Enum.map(fn {value, j} ->
%{x: j, y: i, value: value, image: title}
end)
end)
Vl.new(width: 200, height: 200, title: title)
|> Vl.data_from_values(data)
|> Vl.mark(:rect)
|> Vl.encode_field(:x, "x", type: :ordinal, title: "Column")
|> Vl.encode_field(:y, "y", type: :ordinal, title: "Row", sort: :descending)
|> Vl.encode_field(:color, "value", type: :quantitative, scale: [scheme: "viridis"], title: "Value")
end
end
random = Visualizer.create_heatmap(patterns.random, "Random")
checkerboard = Visualizer.create_heatmap(patterns.checkerboard, "Checkerboard")
diagonal = Visualizer.create_heatmap(patterns.diagonal, "Diagonal")
constant = Visualizer.create_heatmap(patterns.constant, "Constant")
Kino.Layout.grid([random, checkerboard, diagonal, constant], columns: 2)Calculate BDM for each pattern:
bdm = BDM.new(2, 2, 2, :ignore)
pattern_viz =
patterns
|> Enum.map(fn {name, pattern} ->
bdm_value = BDM.compute(bdm, pattern)
%{"Pattern" => Atom.to_string(name), "BDM" => bdm_value}
end)
Kino.DataTable.new(
pattern_viz,
keys: ["Pattern", "BDM"],
name: "BDM of 2D patterns"
)Example 3: Interactive Visualization
bdm = BDM.new(1, 2, 3, :ignore)
sequence_lengths = [10, 20, 30, 40, 50]
complexity_data =
for len <- sequence_lengths do
random_seq = for _ <- 1..len, do: Enum.random([0, 1])
ordered_seq =
List.duplicate([0, 1], div(len, 2))
|> List.flatten()
|> Enum.take(len)
constant_seq = List.duplicate(0, len)
%{
length: len,
random: BDM.compute(bdm, random_seq),
ordered: BDM.compute(bdm, ordered_seq),
constant: BDM.compute(bdm, constant_seq)
}
end
viz_data =
complexity_data
|> Enum.flat_map(fn row ->
[
%{length: row.length, complexity: row.random, type: "Random"},
%{length: row.length, complexity: row.ordered, type: "Ordered"},
%{length: row.length, complexity: row.constant, type: "Constant"}
]
end)
Vl.new(width: 600, height: 400)
|> Vl.data_from_values(viz_data)
|> Vl.mark(:line, point: true)
|> Vl.encode_field(:x, "length", type: :quantitative, title: "Sequence Length")
|> Vl.encode_field(:y, "complexity", type: :quantitative, title: "BDM Complexity")
|> Vl.encode_field(:color, "type", type: :nominal, title: "Sequence Type")
|> Vl.config(legend: [orient: "top"])Advanced Features
Block Size Analysis
Analyzing how block size affects BDM estimation:
block_sizes = 2..12
block_analysis_viz =
for block_size <- block_sizes do
bdm = BDM.new(1, 2, block_size, :ignore)
bdm_value = BDM.compute(bdm, pattern_seq)
%{"Block Size" => block_size, "BDM" => bdm_value}
end
Kino.DataTable.new(
block_analysis_viz,
keys: ["Block Size", "BDM"],
name: "Block Size Analysis"
)Vl.new(width: 800, height: 400, title: "Block Size Analysis")
|> Vl.data_from_values(block_analysis_viz)
|> Vl.mark(:line, point: true)
|> Vl.encode_field(:x, "Block Size", type: :quantitative)
|> Vl.encode_field(:y, "BDM", type: :quantitative)
|> Vl.config(legend: [orient: "top"])Comparative Analysis with Shannon Entropy and LZW
Utility modules:
defmodule Entropy do
def shannon_entropy(data) do
frequencies = Enum.frequencies(data)
total = length(data)
frequencies
|> Enum.map(fn {_, count} ->
p = count / total
-p * :math.log2(p)
end)
|> Enum.sum()
end
end
defmodule LZW do
def compress_list(data) when is_list(data) do
unique_elements = data |> Enum.uniq()
initial_dict =
unique_elements
|> Enum.with_index()
|> Enum.into(%{}, fn {elem, idx} -> {[elem], idx} end)
next_code = map_size(initial_dict)
{result, _dict, _code} = compress_helper(data, [], initial_dict, next_code, [])
Enum.reverse(result)
end
def compress_matrix(matrix) when is_list(matrix) do
rows = length(matrix)
cols = if rows > 0, do: length(hd(matrix)), else: 0
flat_data = List.flatten(matrix)
compressed = compress_list(flat_data)
{compressed, {rows, cols}}
end
defp compress_helper([], current_string, dict, next_code, result) do
case current_string do
[] -> {result, dict, next_code}
_ ->
code = Map.get(dict, current_string)
{[code | result], dict, next_code}
end
end
defp compress_helper([char | rest], current_string, dict, next_code, result) do
new_string = current_string ++ [char]
case Map.get(dict, new_string) do
nil ->
current_code = Map.get(dict, current_string)
new_dict = Map.put(dict, new_string, next_code)
compress_helper(rest, [char], new_dict, next_code + 1, [current_code | result])
_code ->
compress_helper(rest, new_string, dict, next_code, result)
end
end
endComparison:
bdm = BDM.new(1, 2, 3, :ignore)
comparison_data = [
{constant_seq, "constant"},
{random_seq, "random"},
{ordered_seq, "alternating"},
{pattern_seq, "pattern 001"}
]
data_viz =
for {sequence, name} <- comparison_data do
bdm_val = BDM.compute(bdm, sequence)
entropy_val = Entropy.shannon_entropy(sequence)
lzw_val = length(LZW.compress_list(sequence)) / length(sequence)
%{
"Pattern" => name,
"BDM" => Float.round(bdm_val, 2),
"Entropy" => Float.round(entropy_val, 2),
"LZW" => Float.round(lzw_val, 2),
}
end
Kino.DataTable.new(
data_viz,
keys: ["Pattern", "BDM", "Entropy", "LZW"],
name: "BDM vs Shannon Entropy and LZW Comparison"
)Real-World Applications
Cellular Automata Analysis
defmodule CellularAutomata do
def rule_30(initial_state, steps) do
evolve(initial_state, steps, &rule_30_step/1)
end
defp rule_30_step(state) do
len = length(state)
for i <- 0..(len-1) do
left = Enum.at(state, rem(i - 1 + len, len))
center = Enum.at(state, i)
right = Enum.at(state, rem(i + 1, len))
case {left, center, right} do
{1, 1, 1} -> 0
{1, 1, 0} -> 0
{1, 0, 1} -> 0
{1, 0, 0} -> 1
{0, 1, 1} -> 1
{0, 1, 0} -> 1
{0, 0, 1} -> 1
{0, 0, 0} -> 0
end
end
end
defp evolve(state, 0, _rule), do: [state]
defp evolve(state, steps, rule) do
next_state = rule.(state)
[state | evolve(next_state, steps - 1, rule)]
end
end
initial_state = [0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0]
evolution = CellularAutomata.rule_30(initial_state, 20)
bdm = BDM.new(1, 2, 2, :ignore)
# Calculate BDM for each step
evolution_complexity =
evolution
|> Enum.with_index()
|> Enum.map(fn {state, step} ->
bdm_val = BDM.compute(bdm, state)
%{step: step, complexity: bdm_val}
end)
viz_data =
evolution_complexity
|> Enum.map(fn row ->
%{step: row.step, complexity: Float.round(row.complexity, 2)}
end)
Vl.new(width: 600, height: 400, title: "Cellular Automata Evolution Complexity")
|> Vl.data_from_values(viz_data)
|> Vl.mark(:line, point: true)
|> Vl.encode_field(:x, "step", type: :quantitative, title: "Step")
|> Vl.encode_field(:y, "complexity", type: :quantitative, title: "BDM Complexity")
|> Vl.config(legend: [orient: "top"])Performance Considerations
Memory and Time Complexity
defmodule Benchmark do
def time(func) do
start_time = System.monotonic_time(:millisecond)
result = func.()
end_time = System.monotonic_time(:millisecond)
{result, end_time - start_time}
end
end
bdm = BDM.new(1, 2, 2, :ignore)
viz_data = for len <- 10000..100000//5000 do
test_seq = for _ <- 1..len, do: Enum.random([0, 1])
{_, time_ms} = Benchmark.time(fn -> BDM.compute(bdm, test_seq) end)
%{
"l" => len,
"t" => time_ms
}
end
Vl.new(width: 600, height: 400, title: "Performance Analysis")
|> Vl.data_from_values(viz_data)
|> Vl.mark(:line, point: true)
|> Vl.encode_field(:x, "l", type: :quantitative, title: "Dataset Length")
|> Vl.encode_field(:y, "t", type: :quantitative, title: "Time (ms)")
|> Vl.config(legend: [orient: "top"])Perturbation Analysis
alias BDM.PerturbationAnalysisUnderstanding Sensitivity to Changes
Perturbation analysis reveals how robust BDM complexity estimates are to small changes in the data. This is crucial for understanding the stability of complexity measures and identifying critical structures.
Interactive Perturbation Visualization
Creating 1D test patterns for perturbation analysis and analyzing sensitivity for each pattern:
test_patterns_1d = %{
structured: [0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1],
random: Enum.take(Stream.repeatedly(fn -> Enum.random([0, 1]) end), 16),
mostly_zeros: [0, 0, 0, 0, 1, 0, 0, 0, 0, 1, 0, 0, 0, 0, 1, 0],
clustered: [1, 1, 1, 1, 0, 0, 0, 0, 1, 1, 1, 1, 0, 0, 0, 0]
}
bdm_1d = BDM.new(1, 2, 3, :ignore)
sensitivity_analyses =
for {name, pattern} <- test_patterns_1d do
sensitivity_profile = PerturbationAnalysis.sensitivity_profile(bdm_1d, pattern)
{name, pattern, sensitivity_profile}
end
sensitivity_data =
for {name, _, profile} <- sensitivity_analyses do
max_sensitivity =
profile
|> Enum.map(&(&1.sensitivity))
|> Enum.max()
avg_sensitivity =
profile
|> Enum.map(&(&1.sensitivity))
|> Enum.sum()
|> Kernel./(length(profile))
%{
"Pattern" => Atom.to_string(name),
"Max sensitivity" => Float.round(max_sensitivity, 3),
"Avg sensitivity" => Float.round(avg_sensitivity, 3)
}
end
Kino.DataTable.new(
sensitivity_data,
name: "Sensitivity Analysis",
keys: ["Pattern", "Max sensitivity", "Avg sensitivity"]
)viz_data =
for {name, _pattern, profile} <- sensitivity_analyses do
profile
|> Enum.map(fn point ->
Map.put(point, :pattern, to_string(name))
end)
end
|> List.flatten()
Vl.new(width: 700, height: 400, title: "Position Sensitivity Analysis")
|> Vl.data_from_values(viz_data)
|> Vl.mark(:line, point: true, stroke_width: 2)
|> Vl.encode_field(:x, "position", type: :quantitative, title: "Bit Position")
|> Vl.encode_field(:y, "sensitivity", type: :quantitative, title: "Sensitivity (|ΔBDM|)")
|> Vl.encode_field(:color, "pattern", type: :nominal, title: "Pattern Type")
|> Vl.encode_field(:stroke_dash, "pattern", type: :nominal)Noise Level Analysis
Analyze how different noise levels affect complexity:
noise_levels = [0.05, 0.1, 0.2, 0.3, 0.4, 0.5]
base_pattern = [0, 1, 0, 1, 0, 1, 0, 1] |> List.duplicate(4) |> List.flatten()
bdm = BDM.new(1, 2, 3, :ignore)
noise_analysis_data =
for noise_level <- noise_levels do
# Generate multiple perturbations at this noise level
perturbations = PerturbationAnalysis.random_perturbations(bdm, base_pattern, 20, noise_level)
{original_bdm, results} = PerturbationAnalysis.calculate_perturbation_effects(bdm, base_pattern, perturbations)
# Calculate statistics
delta_bdms = Enum.map(results, &(&1.delta_bdm))
mean_delta = Enum.sum(delta_bdms) / length(delta_bdms)
std_delta = :math.sqrt(Enum.sum(Enum.map(delta_bdms, fn x -> (x - mean_delta) * (x - mean_delta) end)) / length(delta_bdms))
%{
"Noise%" => trunc(noise_level * 100),
"BDM" => Float.round(original_bdm, 2),
"Δ" => Float.round(std_delta, 2),
"Mean Δ" => Float.round(mean_delta, 2),
"Min Δ" => Float.round(Enum.min(delta_bdms), 2),
"Max Δ" => Float.round(Enum.max(delta_bdms), 2)
}
end
Kino.DataTable.new(
noise_analysis_data,
name: "Noise Level Analysis",
keys: ["Noise%", "BDM", "Δ", "Mean Δ", "Min Δ", "Max Δ"]
)Noise level impact (how complexity changes scale with perturbation intensity) visualization:
viz_data =
noise_analysis_data
|> Enum.flat_map(fn result ->
noise_level = Map.get(result, "Noise%")
[
%{noise_level: noise_level, value: Map.get(result, "Mean Δ"), measure: "Mean Change"},
%{noise_level: noise_level, value: Map.get(result, "Δ"), measure: "Std Deviation"},
%{noise_level: noise_level, value: Map.get(result, "Max Δ"), measure: "Max Change"}
]
end)
Vl.new(width: 600, height: 400, title: "BDM Response to Noise Levels")
|> Vl.data_from_values(viz_data)
|> Vl.mark(:line, point: true)
|> Vl.encode_field(:x, "noise_level", type: :quantitative, title: "Noise Level (%)")
|> Vl.encode_field(:y, "value", type: :quantitative, title: "BDM Change")
|> Vl.encode_field(:color, "measure", type: :nominal, title: "Measure")
|> Vl.config(legend: [orient: "top"])Critical Point Detection
Analyze critical points (where structural changes have maximum impact) for structured pattern:
structured_pattern = [0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1]
sensitivity_profile = PerturbationAnalysis.sensitivity_profile(bdm, structured_pattern)
critical_points = PerturbationAnalysis.detect_critical_positions(sensitivity_profile, 0.5)
critical_point_data =
for point <- Enum.take(critical_points, 5) do
%{
"Position" => point.position,
"Sensitivity" => Float.round(point.sensitivity, 3),
"Relative" => Float.round(point.relative_sensitivity, 3)
}
end
Kino.DataTable.new(
critical_point_data,
name: "Critical Points Analysis",
keys: ["Position", "Sensitivity", "Relative"]
)Perturbation Phase Space
Phase space visualization showing relationship between original complexity and perturbation effects:
viz_data =
for {name, pattern, _} <- sensitivity_analyses do
original_bdm = BDM.compute(bdm, pattern)
perturbations = PerturbationAnalysis.single_bit_perturbations(bdm, pattern)
perturbations
|> Enum.with_index()
|> Enum.map(fn {perturbed, index} ->
perturbed_bdm = BDM.compute(bdm, perturbed)
hamming_distance = Enum.zip(pattern, perturbed)
|> Enum.count(fn {a, b} -> a != b end)
%{
pattern: to_string(name),
original_bdm: original_bdm,
perturbed_bdm: perturbed_bdm,
delta_bdm: perturbed_bdm - original_bdm,
hamming_distance: hamming_distance,
position: index
}
end)
end
|> List.flatten()
Vl.new(width: 600, height: 400, title: "Perturbation Phase Space")
|> Vl.data_from_values(viz_data)
|> Vl.mark(:circle, size: 60, opacity: 0.7)
|> Vl.encode_field(:x, "original_bdm", type: :quantitative, title: "Original BDM")
|> Vl.encode_field(:y, "perturbed_bdm", type: :quantitative, title: "Perturbed BDM")
|> Vl.encode_field(:color, "pattern", type: :nominal, title: "Pattern Type")
|> Vl.encode_field(:size, "hamming_distance", type: :quantitative, title: "Hamming Distance")Robustness Analysis
Compare robustness (which pattern types are most robust to perturbations) across different pattern types:
robustness_comparison =
for {name, pattern} <- test_patterns_1d do
analysis = PerturbationAnalysis.stability_coefficient(bdm, pattern, 30, 0.15)
Map.put(analysis, :pattern, name)
end
viz_data =
robustness_comparison
|> Enum.map(fn result ->
%{
pattern: to_string(result.pattern),
stability_score: result.stability_score,
coefficient_variation: result.coefficient_of_variation
}
end)
Vl.new(width: 500, height: 300, title: "Pattern Robustness Analysis")
|> Vl.data_from_values(viz_data)
|> Vl.mark(:bar)
|> Vl.encode_field(:x, "pattern", type: :nominal, title: "Pattern Type")
|> Vl.encode_field(:y, "stability_score", type: :quantitative, title: "Stability Score")
|> Vl.encode_field(:color, "pattern", type: :nominal)Perturbation landscape
2D landscape:
bdm = BDM.new(2, 2, 2, :ignore)
landscape = PerturbationAnalysis.perturbation_landscape(bdm, patterns.random, 2)defmodule LandscapeVisualizer do
def extract_max_effects(landscape_result, flip_count \\ 1) do
for point <- landscape_result do
effect = Enum.find(point.effects, fn e -> e.num_flips == flip_count end)
max_effect = if effect, do: effect.max_effect, else: 0.0
max_row = if length(landscape_result) > 0 do
landscape_result |> Enum.map(& &1.center_row) |> Enum.max()
else
0
end
size = max_row + 2 # Add 1 for 0-based indexing + 1 for radius
%{
x: point.center_col,
y: size - 1 - point.center_row, # Flip y coordinate
max_effect: max_effect,
abs_effect: abs(max_effect)
}
end
end
def create_landscape_heatmap(data, title) do
Vl.new(width: 300, height: 300, title: title)
|> Vl.data_from_values(data)
|> Vl.mark(:rect)
|> Vl.encode_field(:x, "x", type: :ordinal, title: "Column")
|> Vl.encode_field(:y, "y", type: :ordinal, title: "Row")
|> Vl.encode_field(:color, "abs_effect",
type: :quantitative,
scale: [scheme: "viridis"],
legend: [title: "Perturbation Effect"])
|> Vl.encode(:tooltip, [
[field: "x", type: :quantitative],
[field: "y", type: :quantitative],
[field: "max_effect", type: :quantitative, title: "Max Effect"],
[field: "abs_effect", type: :quantitative, title: "Abs Effect"]
])
end
end
plot_data = LandscapeVisualizer.extract_max_effects(landscape, 1)
LandscapeVisualizer.create_landscape_heatmap(
plot_data,
"Perturbation Landscape for random (1-bit flips)"
)