Setup
Install SparkEx from the local path with its optional dependencies (Explorer for Arrow decoding, Kino for interactive rendering).
Prerequisites: A Spark Connect server must be running on localhost:15002.
Start one with:
export JAVA_HOME="/opt/homebrew/opt/openjdk@21/libexec/openjdk.jdk/Contents/Home"
export SPARK_HOME="test/spark-4.1.1-bin-hadoop3-connect"
bash "$SPARK_HOME/sbin/start-connect-server.sh"
Mix.install([
{:spark_ex, path: Path.expand("..", __DIR__)},
{:explorer, "~> 0.10"},
{:kino, "~> 0.14"}
])Connect to Spark
{:ok, session} = SparkEx.connect(url: "sc://localhost:15002")
{:ok, version} = SparkEx.spark_version(session)
IO.puts("Connected to Spark #{version}")Kino.Render — automatic DataFrame rendering
Simply returning a SparkEx.DataFrame from a cell renders it with interactive tabs:
Schema, Preview (sortable/pageable Kino.DataTable), Explain, and Raw.
The preview tab uses to_explorer/2 with a bounded LIMIT (100 rows by default),
so it is safe to run on large datasets without OOM.
SQL DataFrame
SparkEx.sql(session, """
SELECT * FROM VALUES
(1, 'Alice', 'Engineering', 95000),
(2, 'Bob', 'Engineering', 105000),
(3, 'Carol', 'Marketing', 88000),
(4, 'Dave', 'Marketing', 92000),
(5, 'Eve', 'Sales', 78000),
(6, 'Frank', 'Sales', 82000),
(7, 'Grace', 'Engineering', 115000),
(8, 'Hank', 'Marketing', 97000)
AS employees(id, name, department, salary)
""")Range DataFrame (large — preview is bounded)
This creates a DataFrame with 1,000,000 rows, but preview only fetches 100.
SparkEx.range(session, 1_000_000)Transforms with Kino.Render
Transforms are lazy — they build up a plan without executing. The plan only runs
when the cell evaluates the DataFrame and Kino.Render kicks in.
Click the Explain tab to see the full query plan.
import SparkEx.Functions
alias SparkEx.{DataFrame, Column}
SparkEx.sql(session, """
SELECT * FROM VALUES
(1, 'Alice', 'Engineering', 95000),
(2, 'Bob', 'Engineering', 105000),
(3, 'Carol', 'Marketing', 88000),
(4, 'Dave', 'Marketing', 92000),
(5, 'Eve', 'Sales', 78000),
(6, 'Frank', 'Sales', 82000),
(7, 'Grace', 'Engineering', 115000),
(8, 'Hank', 'Marketing', 97000)
AS employees(id, name, department, salary)
""")
|> DataFrame.filter(col("salary") |> Column.gt(lit(85000)))
|> DataFrame.with_column("bonus", col("salary") |> Column.multiply(lit(0.1)))
|> DataFrame.order_by([col("salary") |> Column.desc()])SparkEx.Livebook helpers
The helper functions return Kino terms directly. They do not call Kino.render/1
internally — you control when and how they are rendered.
Preview with options
preview/2 returns a Kino.DataTable with sorting enabled. Pass :num_rows
to control how many rows are fetched (bounded query, safe for large tables).
employees =
SparkEx.sql(session, """
SELECT * FROM VALUES
(1, 'Alice', 'Engineering', 95000),
(2, 'Bob', 'Engineering', 105000),
(3, 'Carol', 'Marketing', 88000),
(4, 'Dave', 'Marketing', 92000),
(5, 'Eve', 'Sales', 78000),
(6, 'Frank', 'Sales', 82000),
(7, 'Grace', 'Engineering', 115000),
(8, 'Hank', 'Marketing', 97000)
AS employees(id, name, department, salary)
""")
SparkEx.Livebook.preview(employees, num_rows: 50, name: "Employees")Schema
SparkEx.Livebook.schema(employees)Explain
SparkEx.Livebook.explain(employees, mode: :extended)Joins and aggregates
departments =
SparkEx.sql(session, """
SELECT * FROM VALUES
('Engineering', 'Building 1'),
('Marketing', 'Building 2'),
('Sales', 'Building 3')
AS departments(department, building)
""")
result =
DataFrame.join(employees, departments, ["department"], :inner)
|> DataFrame.group_by(["department", "building"])
|> SparkEx.GroupedData.agg([
Column.alias_(count(col("id")), "headcount"),
Column.alias_(sum(col("salary")), "total_salary"),
Column.alias_(avg(col("salary")), "avg_salary")
])
|> DataFrame.order_by([col("department")])
resultto_explorer/2 — bounded materialization
to_explorer/2 returns an Explorer.DataFrame. By default, it injects a LIMIT
into the Spark plan to prevent unbounded collection and OOM.
{:ok, explorer_df} = DataFrame.to_explorer(result)
explorer_dfDemonstrating OOM protection
Attempting to materialize a large dataset without limits raises LimitExceeded:
large = SparkEx.range(session, 100_000)
case DataFrame.to_explorer(large, max_rows: 1_000) do
{:ok, df} ->
IO.puts("Got #{Explorer.DataFrame.n_rows(df)} rows (bounded by LIMIT injection)")
df
{:error, %SparkEx.Error.LimitExceeded{} = e} ->
IO.puts("Protected from OOM: #{Exception.message(e)}")
endUnsafe mode (opt-in)
Pass unsafe: true to skip LIMIT injection. Use only when you know the result fits in memory.
small = SparkEx.range(session, 25)
{:ok, explorer_df} = DataFrame.to_explorer(small, unsafe: true)
IO.puts("Collected #{Explorer.DataFrame.n_rows(explorer_df)} rows (no LIMIT)")
explorer_dfReading files from disk
spark_home = Path.expand("../test/spark-4.1.1-bin-hadoop3-connect", __DIR__)
resources = Path.join(spark_home, "examples/src/main/resources")
SparkEx.Reader.json(session, Path.join(resources, "employees.json"))
|> DataFrame.order_by([col("salary") |> Column.desc()])SparkEx.Reader.csv(
session,
Path.join(resources, "people.csv"),
options: %{"delimiter" => ";", "header" => "true"},
schema: "name STRING, age INT, job STRING"
)Set operations
top_earners =
SparkEx.sql(session, """
SELECT * FROM VALUES ('Alice'), ('Bob'), ('Grace') AS t(name)
""")
engineers =
SparkEx.sql(session, """
SELECT * FROM VALUES ('Alice'), ('Bob'), ('Grace') AS t(name)
""")
marketers =
SparkEx.sql(session, """
SELECT * FROM VALUES ('Carol'), ('Dave'), ('Hank') AS t(name)
""")
Kino.Layout.tabs([
{"Union", SparkEx.Livebook.preview(DataFrame.union(engineers, marketers), name: "Union")},
{"Intersect",
SparkEx.Livebook.preview(DataFrame.intersect(top_earners, engineers), name: "Intersect")},
{"Except",
SparkEx.Livebook.preview(DataFrame.except(top_earners, marketers), name: "Except")}
])Cleanup
SparkEx.Session.stop(session)
:ok