Native Elixir client for Apache Spark via the Spark Connect protocol.
SparkEx communicates with Spark over gRPC using the official Spark Connect protobuf contract, giving Elixir and Livebook first-class access to distributed Spark SQL, DataFrames, streaming, and the catalog — without a JVM in your application.
Targets Spark 3.5 - 4.1.
Features
- Lazy DataFrame API — select, filter, join, group, window, set ops, reshape, sampling, and more
- 590+ Spark SQL functions — auto-generated from a declarative registry, covering math, string, date/time, array, map, conditional, window, and aggregate families
- Reader/Writer — Parquet, CSV, JSON, ORC, text, JDBC, plus generic
format/load/saveand V2 DataSource writes - Structured Streaming —
DataStreamReader,DataStreamWriter,StreamingQuery, andStreamingQueryManager - Catalog API — browse and manage databases, tables, functions, columns, and cache
- Explorer integration — materialize Spark results as
Explorer.DataFramewith automatic type mapping - Livebook / Kino — DataFrames render as interactive tables with schema, preview, and explain tabs
- NA/Stat sub-APIs —
fillna,dropna,describe,corr,cov,crosstab,approx_quantile, and more - MERGE INTO — upsert builder with match/not-matched clauses and schema evolution
- UDF/UDTF registration — register Java UDFs and Python UDTFs
- Session lifecycle — clone, release, interrupt by tag/operation, reattachable execution with automatic mid-stream recovery
- Observability — Telemetry events for every RPC, retry, reattach, batch, and progress update
Installation
Add spark_ex to your list of dependencies in mix.exs:
def deps do
[
{:spark_ex, "~> 0.1.0"},
# Optional — for Arrow decoding and to_explorer/2
{:explorer, "~> 0.10"},
# Optional — for Livebook rendering
{:kino, "~> 0.14"}
]
endGetting started
Connect to Spark
{:ok, session} = SparkEx.connect(url: "sc://localhost:15002")
{:ok, "4.1.1"} = SparkEx.spark_version(session)Create and query DataFrames
import SparkEx.Functions
alias SparkEx.{DataFrame, Column}
df =
SparkEx.sql(session, """
SELECT * FROM VALUES
(1, 'Alice', 100), (2, 'Bob', 200), (3, 'Carol', 150)
AS t(id, name, salary)
""")
df
|> DataFrame.filter(Column.gt(col("salary"), lit(120)))
|> DataFrame.select([col("name"), col("salary")])
|> DataFrame.order_by([desc("salary")])
|> DataFrame.collect()
# => {:ok, [%{"name" => "Bob", "salary" => 200}, %{"name" => "Carol", "salary" => 150}]}Joins and aggregates
employees = SparkEx.sql(session, """
SELECT * FROM VALUES (1, 'eng'), (2, 'hr'), (3, 'eng') AS t(id, dept)
""")
departments = SparkEx.sql(session, """
SELECT * FROM VALUES ('eng', 'Engineering'), ('hr', 'People') AS t(dept, name)
""")
employees
|> DataFrame.join(departments, ["dept"], :inner)
|> DataFrame.group_by(["name"])
|> SparkEx.GroupedData.agg([
Column.alias_(count(col("id")), "headcount")
])
|> DataFrame.collect()
# => {:ok, [%{"name" => "Engineering", "headcount" => 2}, %{"name" => "People", "headcount" => 1}]}Window functions
alias SparkEx.{Window, WindowSpec}
df = SparkEx.sql(session, """
SELECT * FROM VALUES
('eng', 'Alice', 100), ('eng', 'Bob', 120), ('hr', 'Carol', 90)
AS t(dept, name, salary)
""")
w =
Window.partition_by(["dept"])
|> WindowSpec.order_by([col("salary") |> Column.desc()])
df
|> DataFrame.with_column("rank", row_number() |> Column.over(w))
|> DataFrame.collect()Read and write data
alias SparkEx.{Reader, Writer}
# Read
df = Reader.parquet(session, "/data/events.parquet")
df = Reader.csv(session, "/data/users.csv", schema: "name STRING, age INT")
df = Reader.json(session, "/data/logs.json")
df = Reader.table(session, "my_catalog.my_table")
# Write
df |> DataFrame.write() |> Writer.parquet("/output/events.parquet")
df |> DataFrame.write() |> Writer.mode("overwrite") |> Writer.save_as_table("results")Create DataFrames from local data
# From a list of maps
{:ok, df} = SparkEx.create_dataframe(session, [
%{"name" => "Alice", "age" => 30},
%{"name" => "Bob", "age" => 25}
])
# From an Explorer.DataFrame
explorer_df = Explorer.DataFrame.new(%{x: [1, 2, 3], y: ["a", "b", "c"]})
{:ok, df} = SparkEx.create_dataframe(session, explorer_df)Explorer integration
# Materialize as Explorer.DataFrame (bounded by default: 10k rows / 64 MB)
{:ok, explorer_df} = DataFrame.to_explorer(df, max_rows: 1_000)
# Unbounded (use with care)
{:ok, explorer_df} = DataFrame.to_explorer(df, unsafe: true, max_rows: :infinity)Structured streaming
alias SparkEx.{StreamReader, StreamWriter, StreamingQuery}
# Start a streaming query
{:ok, query} =
StreamReader.rate(session, rows_per_second: 10)
|> DataFrame.write_stream()
|> StreamWriter.format("memory")
|> StreamWriter.output_mode("append")
|> StreamWriter.query_name("my_stream")
|> StreamWriter.option("checkpointLocation", "/tmp/checkpoint")
|> StreamWriter.start()
# Monitor
{:ok, true} = StreamingQuery.is_active?(query)
{:ok, status} = StreamingQuery.status(query)
# Stop
:ok = StreamingQuery.stop(query)Catalog
alias SparkEx.Catalog
{:ok, dbs} = Catalog.list_databases(session)
{:ok, tables} = Catalog.list_tables(session)
{:ok, cols} = Catalog.list_columns(session, "my_table")
:ok = Catalog.cache_table(session, "frequently_used")
{:ok, true} = Catalog.is_cached?(session, "frequently_used")Temporary views
df = SparkEx.sql(session, "SELECT 1 AS id, 'hello' AS msg")
:ok = DataFrame.create_or_replace_temp_view(df, "greetings")
SparkEx.sql(session, "SELECT * FROM greetings") |> DataFrame.collect()
# => {:ok, [%{"id" => 1, "msg" => "hello"}]}NA handling and statistics
df = SparkEx.sql(session, """
SELECT * FROM VALUES (1, 10.0), (2, null), (null, 30.0) AS t(id, score)
""")
# Fill nulls
DataFrame.fillna(df, %{"id" => 0, "score" => 0.0})
# Drop rows with nulls
DataFrame.dropna(df)
# Descriptive statistics
DataFrame.describe(df) |> DataFrame.collect()MERGE INTO
alias SparkEx.{MergeIntoWriter, Column}
source = SparkEx.sql(session, "SELECT 2 AS id, 'Bobby' AS name")
DataFrame.merge_into(source, "target_table")
|> MergeIntoWriter.on(Column.eq(col("source.id"), col("target.id")))
|> MergeIntoWriter.when_matched_update_all()
|> MergeIntoWriter.when_not_matched_insert_all()
|> MergeIntoWriter.merge()Session management
# Tag operations for selective interruption
df = SparkEx.sql(session, "SELECT * FROM big_table") |> DataFrame.tag("etl-job")
# Interrupt from another process
SparkEx.interrupt_tag(session, "etl-job")
SparkEx.interrupt_all(session)
# Clone a session (shares server state, independent client)
{:ok, session2} = SparkEx.clone_session(session)
# Release server resources
SparkEx.Session.release(session)Livebook
In Livebook, SparkEx.DataFrame structs render automatically with tabs for
Schema, Preview, Explain, and Raw via the Kino.Render protocol.
# Explicit rendering helpers
SparkEx.Livebook.preview(df, num_rows: 50)
SparkEx.Livebook.explain(df, mode: :extended)
SparkEx.Livebook.schema(df)See notebooks/spark_ex_demo.livemd for a
full interactive walkthrough.
Telemetry
SparkEx emits :telemetry events you can
attach to for logging, metrics, or tracing:
| Event | Description |
|---|---|
[:spark_ex, :rpc, :start|stop] | Every gRPC call |
[:spark_ex, :retry, :attempt] | Transient-error retry |
[:spark_ex, :reattach, :attempt] | Mid-stream reattach |
[:spark_ex, :result, :batch] | Each Arrow batch received |
[:spark_ex, :result, :progress] | Query progress update |
Prerequisites
| Requirement | Notes |
|---|---|
| Elixir >= 1.15 | |
| Java 17 or 21 | For running the Spark Connect server |
| Spark >= 3.5 with spark-connect | For running the Spark Connect server |
| Explorer (optional) | Arrow IPC decoding and to_explorer/2 |
| Kino (optional) | Livebook rendering |
protoc + protoc-gen-elixir are only needed if you regenerate the protobuf
stubs (see Proto regeneration below).
Running tests
Unit tests
mix test
Integration tests
Integration tests require a running Spark Connect server and are excluded by default.
One-command runner (starts server, runs tests, tears down):
./test/run_integration.sh
Manual setup:
- Download Spark:
curl -L -o /tmp/spark.tgz \
'https://dlcdn.apache.org/spark/spark-4.1.1/spark-4.1.1-bin-hadoop3-connect.tgz'
tar -xzf /tmp/spark.tgz -C test/
- Start the server:
export JAVA_HOME="/opt/homebrew/opt/openjdk@21/libexec/openjdk.jdk/Contents/Home"
export SPARK_HOME="test/spark-4.1.1-bin-hadoop3-connect"
bash "$SPARK_HOME/sbin/start-connect-server.sh"
- Run:
mix test --include integration
- Or point at a remote server:
SPARK_REMOTE="sc://my-spark-host:15002" mix test --include integration
Proto regeneration
The vendored .proto files in priv/proto/spark/connect/ and the generated
Elixir modules in lib/spark_ex/proto/ are checked in. You do not need
protoc for normal development.
To update protos from a new Spark release:
# Copy protos from Spark source
git clone --depth 1 --branch v4.1.1 https://github.com/apache/spark.git /tmp/spark
cp /tmp/spark/sql/connect/common/src/main/protobuf/spark/connect/*.proto \
priv/proto/spark/connect/
# Regenerate Elixir stubs
mix escript.install hex protobuf # one-time
mix spark_ex.gen_proto
mix format
Project layout
lib/
spark_ex.ex # Public API entry point
spark_ex/
session.ex # Session GenServer
data_frame.ex # Lazy DataFrame API
data_frame/na.ex # Null-value handling
data_frame/stat.ex # Statistical operations
column.ex # Expression wrapper
functions.ex # 590+ auto-generated Spark SQL functions
grouped_data.ex # GroupedData (group_by + agg + pivot)
window.ex # Window convenience constructors
window_spec.ex # WindowSpec (partition, order, frame)
reader.ex # Batch readers (parquet, csv, json, etc.)
writer.ex # Batch writer (V1)
writer_v2.ex # V2 DataSource writer
stream_reader.ex # Streaming source reader
stream_writer.ex # Streaming sink writer
streaming_query.ex # StreamingQuery controls
streaming_query_manager.ex # Manage active streaming queries
catalog.ex # Catalog API
merge_into_writer.ex # MERGE INTO builder
udf_registration.ex # UDF/UDTF registration
livebook.ex # Livebook/Kino helpers
connect/
channel.ex # sc:// URI parser + gRPC channel
client.ex # Low-level gRPC calls (telemetry + retry)
plan_encoder.ex # DataFrame ops -> protobuf encoding
command_encoder.ex # Commands -> protobuf encoding
result_decoder.ex # Arrow IPC decoding
type_mapper.ex # Spark DataType <-> Explorer dtype
proto/spark/connect/*.pb.ex # Generated protobuf modules
priv/proto/spark/connect/ # Vendored Spark Connect protos (v4.1.1)
notebooks/spark_ex_demo.livemd # Interactive Livebook demo
test/unit/ # Unit tests (~1090 tests)
test/integration/ # Integration tests (~625 tests)Acknowledgements
SparkEx builds on the Spark Connect protocol introduced in Apache Spark 3.4 and stabilized in Spark 4.x.
License
Copyright (c) 2026 Łukasz Samson
Licensed under the Apache License, Version 2.0. See LICENSE for details.