SparkEx Livebook Demo

Copy Markdown View Source

Setup

Install SparkEx from the local path with its optional dependencies (Explorer for Arrow decoding, Kino for interactive rendering).

Prerequisites: A Spark Connect server must be running on localhost:15002. Start one with:

export JAVA_HOME="/opt/homebrew/opt/openjdk@21/libexec/openjdk.jdk/Contents/Home"
export SPARK_HOME="test/spark-4.1.1-bin-hadoop3-connect"
bash "$SPARK_HOME/sbin/start-connect-server.sh"
Mix.install([
  {:spark_ex, path: Path.expand("..", __DIR__)},
  {:explorer, "~> 0.10"},
  {:kino, "~> 0.14"}
])

Connect to Spark

{:ok, session} = SparkEx.connect(url: "sc://localhost:15002")
{:ok, version} = SparkEx.spark_version(session)
IO.puts("Connected to Spark #{version}")

Kino.Render — automatic DataFrame rendering

Simply returning a SparkEx.DataFrame from a cell renders it with interactive tabs: Schema, Preview (sortable/pageable Kino.DataTable), Explain, and Raw.

The preview tab uses to_explorer/2 with a bounded LIMIT (100 rows by default), so it is safe to run on large datasets without OOM.

SQL DataFrame

SparkEx.sql(session, """
  SELECT * FROM VALUES
    (1, 'Alice', 'Engineering', 95000),
    (2, 'Bob', 'Engineering', 105000),
    (3, 'Carol', 'Marketing', 88000),
    (4, 'Dave', 'Marketing', 92000),
    (5, 'Eve', 'Sales', 78000),
    (6, 'Frank', 'Sales', 82000),
    (7, 'Grace', 'Engineering', 115000),
    (8, 'Hank', 'Marketing', 97000)
  AS employees(id, name, department, salary)
""")

Range DataFrame (large — preview is bounded)

This creates a DataFrame with 1,000,000 rows, but preview only fetches 100.

SparkEx.range(session, 1_000_000)

Transforms with Kino.Render

Transforms are lazy — they build up a plan without executing. The plan only runs when the cell evaluates the DataFrame and Kino.Render kicks in.

Click the Explain tab to see the full query plan.

import SparkEx.Functions
alias SparkEx.{DataFrame, Column}

SparkEx.sql(session, """
  SELECT * FROM VALUES
    (1, 'Alice', 'Engineering', 95000),
    (2, 'Bob', 'Engineering', 105000),
    (3, 'Carol', 'Marketing', 88000),
    (4, 'Dave', 'Marketing', 92000),
    (5, 'Eve', 'Sales', 78000),
    (6, 'Frank', 'Sales', 82000),
    (7, 'Grace', 'Engineering', 115000),
    (8, 'Hank', 'Marketing', 97000)
  AS employees(id, name, department, salary)
""")
|> DataFrame.filter(col("salary") |> Column.gt(lit(85000)))
|> DataFrame.with_column("bonus", col("salary") |> Column.multiply(lit(0.1)))
|> DataFrame.order_by([col("salary") |> Column.desc()])

SparkEx.Livebook helpers

The helper functions return Kino terms directly. They do not call Kino.render/1 internally — you control when and how they are rendered.

Preview with options

preview/2 returns a Kino.DataTable with sorting enabled. Pass :num_rows to control how many rows are fetched (bounded query, safe for large tables).

employees =
  SparkEx.sql(session, """
    SELECT * FROM VALUES
      (1, 'Alice', 'Engineering', 95000),
      (2, 'Bob', 'Engineering', 105000),
      (3, 'Carol', 'Marketing', 88000),
      (4, 'Dave', 'Marketing', 92000),
      (5, 'Eve', 'Sales', 78000),
      (6, 'Frank', 'Sales', 82000),
      (7, 'Grace', 'Engineering', 115000),
      (8, 'Hank', 'Marketing', 97000)
    AS employees(id, name, department, salary)
  """)

SparkEx.Livebook.preview(employees, num_rows: 50, name: "Employees")

Schema

SparkEx.Livebook.schema(employees)

Explain

SparkEx.Livebook.explain(employees, mode: :extended)

Joins and aggregates

departments =
  SparkEx.sql(session, """
    SELECT * FROM VALUES
      ('Engineering', 'Building 1'),
      ('Marketing', 'Building 2'),
      ('Sales', 'Building 3')
    AS departments(department, building)
  """)

result =
  DataFrame.join(employees, departments, ["department"], :inner)
  |> DataFrame.group_by(["department", "building"])
  |> SparkEx.GroupedData.agg([
    Column.alias_(count(col("id")), "headcount"),
    Column.alias_(sum(col("salary")), "total_salary"),
    Column.alias_(avg(col("salary")), "avg_salary")
  ])
  |> DataFrame.order_by([col("department")])

result

to_explorer/2 — bounded materialization

to_explorer/2 returns an Explorer.DataFrame. By default, it injects a LIMIT into the Spark plan to prevent unbounded collection and OOM.

{:ok, explorer_df} = DataFrame.to_explorer(result)
explorer_df

Demonstrating OOM protection

Attempting to materialize a large dataset without limits raises LimitExceeded:

large = SparkEx.range(session, 100_000)

case DataFrame.to_explorer(large, max_rows: 1_000) do
  {:ok, df} ->
    IO.puts("Got #{Explorer.DataFrame.n_rows(df)} rows (bounded by LIMIT injection)")
    df

  {:error, %SparkEx.Error.LimitExceeded{} = e} ->
    IO.puts("Protected from OOM: #{Exception.message(e)}")
end

Unsafe mode (opt-in)

Pass unsafe: true to skip LIMIT injection. Use only when you know the result fits in memory.

small = SparkEx.range(session, 25)
{:ok, explorer_df} = DataFrame.to_explorer(small, unsafe: true)
IO.puts("Collected #{Explorer.DataFrame.n_rows(explorer_df)} rows (no LIMIT)")
explorer_df

Reading files from disk

spark_home = Path.expand("../test/spark-4.1.1-bin-hadoop3-connect", __DIR__)
resources = Path.join(spark_home, "examples/src/main/resources")

SparkEx.Reader.json(session, Path.join(resources, "employees.json"))
|> DataFrame.order_by([col("salary") |> Column.desc()])
SparkEx.Reader.csv(
  session,
  Path.join(resources, "people.csv"),
  options: %{"delimiter" => ";", "header" => "true"},
  schema: "name STRING, age INT, job STRING"
)

Set operations

top_earners =
  SparkEx.sql(session, """
    SELECT * FROM VALUES ('Alice'), ('Bob'), ('Grace') AS t(name)
  """)

engineers =
  SparkEx.sql(session, """
    SELECT * FROM VALUES ('Alice'), ('Bob'), ('Grace') AS t(name)
  """)

marketers =
  SparkEx.sql(session, """
    SELECT * FROM VALUES ('Carol'), ('Dave'), ('Hank') AS t(name)
  """)

Kino.Layout.tabs([
  {"Union", SparkEx.Livebook.preview(DataFrame.union(engineers, marketers), name: "Union")},
  {"Intersect",
   SparkEx.Livebook.preview(DataFrame.intersect(top_earners, engineers), name: "Intersect")},
  {"Except",
   SparkEx.Livebook.preview(DataFrame.except(top_earners, marketers), name: "Except")}
])

Cleanup

SparkEx.Session.stop(session)
:ok