Mix.install([
{:dux, "~> 0.3.0"},
{:kino_dux, "~> 0.1"},
{:flame, "~> 0.5"}
])Overview
This guide walks through building an ad-hoc distributed compute cluster using FLAME and Fly.io. We'll query the Ookla Speedtest open dataset — ~20GB of global internet speed measurements stored as Parquet on S3.
Each FLAME runner boots a fresh machine with its own DuckDB, reads S3 data directly, and auto-terminates when idle. Think of it as Spark-style elastic compute, but on the BEAM — no JVM, no YARN, no cluster manager.
Prerequisites:
- A Fly.io account with a
FLY_API_TOKEN - This notebook running on a Fly.io Livebook instance
The Dataset
Ookla publishes quarterly internet speed test data as open Parquet files:
s3://ookla-open-data/parquet/performance/
type={fixed,mobile}/
year={2019..2025}/
quarter={1..4}/
*.parquet~56 files, Hive-partitioned by connection type, year, and quarter. Each file contains millions of tile-level measurements: download/upload speeds, latency, test counts, and geographic quadkeys.
The data is public — no S3 credentials needed.
require Dux1. Configure Anonymous S3 Access
DuckDB reads S3 via the httpfs extension. For public buckets,
we create a secret with empty credentials — DuckDB makes unsigned requests.
Dux.exec("INSTALL httpfs; LOAD httpfs")
Dux.create_secret(:ookla, type: :s3, provider: :config, key_id: "", secret: "", region: "us-west-2")2. Explore Locally First
Before spinning up a cluster, let's look at a single quarter to understand the data.
one_quarter =
Dux.from_parquet(
"s3://ookla-open-data/parquet/performance/type=fixed/year=2024/quarter=4/*.parquet",
hive_partitioning: true
)
one_quarter
|> Dux.head(5)
|> Dux.to_rows()# How big is one quarter?
one_quarter |> Dux.n_rows()# Speed distribution
one_quarter
|> Dux.summarise(
median_download: median(avg_d_kbps / 1000.0),
median_upload: median(avg_u_kbps / 1000.0),
median_latency: median(avg_lat_ms),
total_tests: sum(tests),
total_devices: sum(devices)
)
|> Dux.to_rows()3. Start the FLAME Pool
Now let's scale out. The pool configuration controls the machines FLAME boots.
Kino.start_child!(
{FLAME.Pool,
name: :dux_pool,
code_sync: [
start_apps: true,
sync_beams: [Path.join(System.tmp_dir!(), "livebook_runtime")]
],
min: 0,
max: 10,
max_concurrency: 1,
backend: {FLAME.FlyBackend,
cpu_kind: "performance",
cpus: 4,
memory_mb: 8192,
token: System.fetch_env!("FLY_API_TOKEN"),
env: %{"LIVEBOOK_COOKIE" => Atom.to_string(Node.get_cookie())}
},
boot_timeout: 120_000,
idle_shutdown_after: :timer.minutes(5)}
)Key settings:
max_concurrency: 1— one DuckDB per machine. DuckDB saturates cores internally.memory_mb: 8192— 8GB per worker. DuckDB spills to/tmpif needed.idle_shutdown_after: 5 min— machines auto-terminate. You pay only for active compute.
4. Spin Up Workers
# Start with 3 workers — each takes ~30s to boot (driver download + setup).
# Scale up with more if needed.
workers = Dux.Flame.spin_up(3,
pool: :dux_pool,
memory_limit: "4GB",
setup: fn ->
# Each worker needs httpfs + anonymous S3 access
Dux.exec("INSTALL httpfs; LOAD httpfs")
Dux.create_secret(:ookla, type: :s3, provider: :config, key_id: "", secret: "", region: "us-west-2")
end
)
IO.puts("#{length(workers)} workers ready")5. Query the Full Dataset
Now read all years of fixed broadband data across the cluster. Each worker reads its assigned Parquet files directly from S3 — no data flows through your machine.
all_fixed =
Dux.from_parquet(
"s3://ookla-open-data/parquet/performance/type=fixed/year=*/quarter=*/*.parquet",
hive_partitioning: true
)
# Global broadband trends by year
trends =
all_fixed
|> Dux.distribute(workers)
|> Dux.group_by(:year)
|> Dux.summarise(
median_download: median(avg_d_kbps / 1000.0),
median_upload: median(avg_u_kbps / 1000.0),
median_latency: median(avg_lat_ms),
total_tests: sum(tests),
total_devices: sum(devices)
)
|> Dux.sort_by(:year)
|> Dux.collect()
|> Dux.to_rows()6. Compare Fixed vs Mobile
Query both connection types in one pipeline using SQL macros.
Dux.define(:speed_tier, [:mbps], """
CASE
WHEN mbps >= 100 THEN 'fast (100+ Mbps)'
WHEN mbps >= 25 THEN 'moderate (25-100 Mbps)'
WHEN mbps >= 10 THEN 'slow (10-25 Mbps)'
ELSE 'very slow (<10 Mbps)'
END
""")
all_data =
Dux.from_parquet(
"s3://ookla-open-data/parquet/performance/type=*/year=2024/quarter=*/*.parquet",
hive_partitioning: true
)
speed_distribution =
all_data
|> Dux.distribute(workers)
|> Dux.mutate_with(tier: "speed_tier(avg_d_kbps / 1000.0)")
|> Dux.group_by([:type, :tier])
|> Dux.summarise(
tiles: count(tier),
total_tests: sum(tests)
)
|> Dux.sort_by([:type, desc: :tiles])
|> Dux.collect()
|> Dux.to_rows()7. Heavy Aggregation: Latency by Quadkey Prefix
Quadkeys encode geographic tiles. The first few characters identify the region. Let's find the areas with the worst latency.
worst_latency =
all_fixed
|> Dux.distribute(workers)
|> Dux.filter(tests >= 10)
|> Dux.mutate_with(region: "LEFT(quadkey, 6)")
|> Dux.group_by(:region)
|> Dux.summarise(
avg_latency: avg(avg_lat_ms),
total_tests: sum(tests),
n_tiles: count(region)
)
|> Dux.filter(total_tests > 1000)
|> Dux.sort_by(desc: :avg_latency)
|> Dux.head(20)
|> Dux.collect()
|> Dux.to_rows()8. Writing Results
Distributed writes go directly from workers to S3.
# Write the aggregated trends back to your own bucket
# (uncomment and set your bucket)
# all_fixed
# |> Dux.distribute(workers)
# |> Dux.mutate(download_mbps: avg_d_kbps / 1000.0)
# |> Dux.to_parquet("s3://your-bucket/ookla-processed/", partition_by: [:year])9. Cleanup
Workers auto-terminate after the idle timeout. To shut down immediately:
Enum.each(workers, &GenServer.stop/1)
IO.puts("Workers stopped. FLAME runners will terminate shortly.")What Just Happened
You built a 5-machine compute cluster from a Livebook notebook. Each machine:
- Booted in ~30s via FLAME + Fly.io
- Got a full copy of your notebook's compiled code
- Started its own DuckDB with 4 cores and 8GB RAM
- Read its assigned Parquet files directly from S3
- Executed filter + group + aggregate locally
- Sent small aggregated results back to the coordinator
- Auto-terminated after 5 minutes idle
No infrastructure to manage. No cluster to maintain. Just notebooks and queries.