FLAME Clusters: Ad-Hoc Spark on the BEAM

Copy Markdown View Source
Mix.install([
  {:dux, "~> 0.3.0"},
  {:kino_dux, "~> 0.1"},
  {:flame, "~> 0.5"}
])

Overview

This guide walks through building an ad-hoc distributed compute cluster using FLAME and Fly.io. We'll query the Ookla Speedtest open dataset — ~20GB of global internet speed measurements stored as Parquet on S3.

Each FLAME runner boots a fresh machine with its own DuckDB, reads S3 data directly, and auto-terminates when idle. Think of it as Spark-style elastic compute, but on the BEAM — no JVM, no YARN, no cluster manager.

Prerequisites:

  • A Fly.io account with a FLY_API_TOKEN
  • This notebook running on a Fly.io Livebook instance

The Dataset

Ookla publishes quarterly internet speed test data as open Parquet files:

s3://ookla-open-data/parquet/performance/
  type={fixed,mobile}/
    year={2019..2025}/
      quarter={1..4}/
        *.parquet

~56 files, Hive-partitioned by connection type, year, and quarter. Each file contains millions of tile-level measurements: download/upload speeds, latency, test counts, and geographic quadkeys.

The data is public — no S3 credentials needed.

require Dux

1. Configure Anonymous S3 Access

DuckDB reads S3 via the httpfs extension. For public buckets, we create a secret with empty credentials — DuckDB makes unsigned requests.

Dux.exec("INSTALL httpfs; LOAD httpfs")
Dux.create_secret(:ookla, type: :s3, provider: :config, key_id: "", secret: "", region: "us-west-2")

2. Explore Locally First

Before spinning up a cluster, let's look at a single quarter to understand the data.

one_quarter =
  Dux.from_parquet(
    "s3://ookla-open-data/parquet/performance/type=fixed/year=2024/quarter=4/*.parquet",
    hive_partitioning: true
  )

one_quarter
|> Dux.head(5)
|> Dux.to_rows()
# How big is one quarter?
one_quarter |> Dux.n_rows()
# Speed distribution
one_quarter
|> Dux.summarise(
  median_download: median(avg_d_kbps / 1000.0),
  median_upload: median(avg_u_kbps / 1000.0),
  median_latency: median(avg_lat_ms),
  total_tests: sum(tests),
  total_devices: sum(devices)
)
|> Dux.to_rows()

3. Start the FLAME Pool

Now let's scale out. The pool configuration controls the machines FLAME boots.

Kino.start_child!(
  {FLAME.Pool,
    name: :dux_pool,
    code_sync: [
      start_apps: true,
      sync_beams: [Path.join(System.tmp_dir!(), "livebook_runtime")]
    ],
    min: 0,
    max: 10,
    max_concurrency: 1,
    backend: {FLAME.FlyBackend,
      cpu_kind: "performance",
      cpus: 4,
      memory_mb: 8192,
      token: System.fetch_env!("FLY_API_TOKEN"),
      env: %{"LIVEBOOK_COOKIE" => Atom.to_string(Node.get_cookie())}
    },
    boot_timeout: 120_000,
    idle_shutdown_after: :timer.minutes(5)}
)

Key settings:

  • max_concurrency: 1 — one DuckDB per machine. DuckDB saturates cores internally.
  • memory_mb: 8192 — 8GB per worker. DuckDB spills to /tmp if needed.
  • idle_shutdown_after: 5 min — machines auto-terminate. You pay only for active compute.

4. Spin Up Workers

# Start with 3 workers — each takes ~30s to boot (driver download + setup).
# Scale up with more if needed.
workers = Dux.Flame.spin_up(3,
  pool: :dux_pool,
  memory_limit: "4GB",
  setup: fn ->
    # Each worker needs httpfs + anonymous S3 access
    Dux.exec("INSTALL httpfs; LOAD httpfs")
    Dux.create_secret(:ookla, type: :s3, provider: :config, key_id: "", secret: "", region: "us-west-2")
  end
)

IO.puts("#{length(workers)} workers ready")

5. Query the Full Dataset

Now read all years of fixed broadband data across the cluster. Each worker reads its assigned Parquet files directly from S3 — no data flows through your machine.

all_fixed =
  Dux.from_parquet(
    "s3://ookla-open-data/parquet/performance/type=fixed/year=*/quarter=*/*.parquet",
    hive_partitioning: true
  )

# Global broadband trends by year
trends =
  all_fixed
  |> Dux.distribute(workers)
  |> Dux.group_by(:year)
  |> Dux.summarise(
    median_download: median(avg_d_kbps / 1000.0),
    median_upload: median(avg_u_kbps / 1000.0),
    median_latency: median(avg_lat_ms),
    total_tests: sum(tests),
    total_devices: sum(devices)
  )
  |> Dux.sort_by(:year)
  |> Dux.collect()
  |> Dux.to_rows()

6. Compare Fixed vs Mobile

Query both connection types in one pipeline using SQL macros.

Dux.define(:speed_tier, [:mbps], """
  CASE
    WHEN mbps >= 100 THEN 'fast (100+ Mbps)'
    WHEN mbps >= 25  THEN 'moderate (25-100 Mbps)'
    WHEN mbps >= 10  THEN 'slow (10-25 Mbps)'
    ELSE 'very slow (<10 Mbps)'
  END
""")

all_data =
  Dux.from_parquet(
    "s3://ookla-open-data/parquet/performance/type=*/year=2024/quarter=*/*.parquet",
    hive_partitioning: true
  )

speed_distribution =
  all_data
  |> Dux.distribute(workers)
  |> Dux.mutate_with(tier: "speed_tier(avg_d_kbps / 1000.0)")
  |> Dux.group_by([:type, :tier])
  |> Dux.summarise(
    tiles: count(tier),
    total_tests: sum(tests)
  )
  |> Dux.sort_by([:type, desc: :tiles])
  |> Dux.collect()
  |> Dux.to_rows()

7. Heavy Aggregation: Latency by Quadkey Prefix

Quadkeys encode geographic tiles. The first few characters identify the region. Let's find the areas with the worst latency.

worst_latency =
  all_fixed
  |> Dux.distribute(workers)
  |> Dux.filter(tests >= 10)
  |> Dux.mutate_with(region: "LEFT(quadkey, 6)")
  |> Dux.group_by(:region)
  |> Dux.summarise(
    avg_latency: avg(avg_lat_ms),
    total_tests: sum(tests),
    n_tiles: count(region)
  )
  |> Dux.filter(total_tests > 1000)
  |> Dux.sort_by(desc: :avg_latency)
  |> Dux.head(20)
  |> Dux.collect()
  |> Dux.to_rows()

8. Writing Results

Distributed writes go directly from workers to S3.

# Write the aggregated trends back to your own bucket
# (uncomment and set your bucket)

# all_fixed
# |> Dux.distribute(workers)
# |> Dux.mutate(download_mbps: avg_d_kbps / 1000.0)
# |> Dux.to_parquet("s3://your-bucket/ookla-processed/", partition_by: [:year])

9. Cleanup

Workers auto-terminate after the idle timeout. To shut down immediately:

Enum.each(workers, &GenServer.stop/1)
IO.puts("Workers stopped. FLAME runners will terminate shortly.")

What Just Happened

You built a 5-machine compute cluster from a Livebook notebook. Each machine:

  1. Booted in ~30s via FLAME + Fly.io
  2. Got a full copy of your notebook's compiled code
  3. Started its own DuckDB with 4 cores and 8GB RAM
  4. Read its assigned Parquet files directly from S3
  5. Executed filter + group + aggregate locally
  6. Sent small aggregated results back to the coordinator
  7. Auto-terminated after 5 minutes idle

No infrastructure to manage. No cluster to maintain. Just notebooks and queries.