DuckDB-native dataframe library for Elixir.

Dux gives you a dataframe API that compiles to SQL and executes on DuckDB. Pipelines are lazy — operations accumulate as an AST until you materialize. DuckDB handles all the heavy lifting: columnar execution, parallel scans, predicate pushdown, and vectorized aggregation.

This module contains the core dataframe verbs. For graph analytics, see Dux.Graph. For distributed execution, see Dux.Remote. For embedded datasets, see Dux.Datasets.

Quick start

require Dux

Dux.Datasets.penguins()
|> Dux.filter(species == "Gentoo" and body_mass_g > 5000)
|> Dux.group_by(:island)
|> Dux.summarise(count: count(species), avg_mass: avg(body_mass_g))
|> Dux.to_rows()

How it works

  1. Build — each verb (filter, mutate, group_by, etc.) appends an operation to the %Dux{} struct. No computation happens.

  2. Compile — when you call compute/1, to_rows/1, or to_columns/1, the operation list compiles to a chain of SQL CTEs.

  3. Execute — DuckDB runs the SQL. Results land in a temporary table that's automatically cleaned up when garbage collected.

Use sql_preview/1 to see the generated SQL at any point:

iex> require Dux
iex> Dux.from_query("SELECT * FROM range(10) t(x)")
...> |> Dux.filter(x > 5)
...> |> Dux.sql_preview()
...> |> String.contains?("WHERE")
true

Expression syntax

Verbs like filter/2, mutate/2, and summarise/2 are macros that capture Elixir expressions. Bare identifiers become column names. Use ^ to interpolate Elixir values safely (as parameter bindings, not string interpolation):

iex> require Dux
iex> threshold = 3
iex> Dux.from_query("SELECT * FROM range(1, 6) t(x)")
...> |> Dux.filter(x > ^threshold)
...> |> Dux.to_columns()
%{"x" => [4, 5]}

Every expression verb has a _with variant that accepts raw DuckDB SQL strings for full access to DuckDB's function library:

iex> Dux.from_query("SELECT * FROM range(1, 6) t(x)")
...> |> Dux.filter_with("x > 3")
...> |> Dux.to_columns()
%{"x" => [4, 5]}

Distribution

Mark a pipeline for distributed execution across BEAM nodes with distribute/2. The same verbs work — Dux partitions, fans out, and merges automatically. See Dux.Remote for details.

workers = Dux.Remote.Worker.list()

Dux.from_parquet("s3://data/**/*.parquet")
|> Dux.distribute(workers)
|> Dux.group_by(:region)
|> Dux.summarise(total: sum(amount))
|> Dux.to_rows()

Embedded datasets

Dux.Datasets ships with CC0 datasets for learning and testing: penguins, gapminder, nycflights13 (flights, airlines, airports, planes).

Summary

Constructors

Read a CSV file into a lazy Dux pipeline.

Create a Dux from a list of maps.

Read a newline-delimited JSON (NDJSON) file into a lazy Dux pipeline.

Read a Parquet file or glob pattern into a lazy Dux pipeline.

Create a Dux from a raw SQL query.

Transforms

Drop the named columns.

Drop rows where any of the given columns are nil.

Filter rows matching a condition.

Filter rows using a raw SQL expression string or compiled {sql, params}.

Add or replace columns using expressions.

Add or replace columns using raw SQL expression strings or compiled tuples.

Rename columns.

Keep only the named columns.

Aggregation

Group by columns for subsequent aggregation.

Aggregate grouped data using expressions.

Aggregate grouped data using raw SQL expression strings or compiled tuples.

Clear any active grouping set by group_by/2.

Joins

Concatenate rows from multiple dataframes (UNION ALL).

Join two dataframes.

Reshape

Unpivot from wide to long format (UNPIVOT).

Pivot from long to wide format (PIVOT).

Sorting

Keep distinct rows, optionally by specific columns.

Take the first n rows, defaulting to 10 if not specified.

Skip offset rows and take length rows.

Sort rows by columns.

IO

Attach an external database to the DuckDB connection.

Create a DuckDB secret for accessing remote services.

Detach a previously attached database.

Drop a previously created secret.

Create a lazy %Dux{} referencing a table in an attached database.

Read an Excel file (.xlsx) into a lazy Dux pipeline.

Insert a Dux pipeline's results into a table. Triggers computation.

List all attached databases.

Write a Dux to a CSV file. Triggers computation.

Write a Dux to an Excel file (.xlsx). Triggers computation.

Write a Dux to a newline-delimited JSON (NDJSON) file. Triggers computation.

Write a Dux to a Parquet file. Triggers computation.

Materialization

Compile the pipeline to SQL and execute against DuckDB.

Return the number of rows. Triggers computation.

Print a formatted preview of the data. Triggers computation.

Return the SQL that would be generated, without executing.

Compute and return results as a map of column_name => [values].

Compute and return results as a list of maps.

Convert a single column to an Nx tensor via zero-copy from Arrow buffers.

Distribution

Collect distributed results back to a local %Dux{}.

Mark a Dux for distributed execution across the given workers.

Return to local execution, removing any distributed workers from the pipeline.

join

ASOF join — match each left row to the nearest right row satisfying an inequality.

Constructors

from_csv(path, opts \\ [])

Read a CSV file into a lazy Dux pipeline.

All options are passed through to DuckDB's read_csv().

Options

  • :delimiter - field delimiter (default: ",")
  • :header - whether the file has a header row (default: true)
  • :null_padding - pad missing columns with NULL (default: false)
  • :skip - number of rows to skip at the start
  • :columns - list of column names or indices to read
  • :types - map of column name to DuckDB type string
  • :auto_detect - auto-detect types (default: true)

Examples

iex> path = Path.join(Application.app_dir(:dux, "priv/datasets"), "airlines.csv")
iex> Dux.from_csv(path) |> Dux.to_rows() |> length()
16

df = Dux.from_csv("data/sales.csv", delimiter: "\t", skip: 1)

from_list(rows)

Create a Dux from a list of maps.

Each map is a row. Keys become column names. Both atom and string keys are supported.

iex> df = Dux.from_list([%{name: "Alice", age: 30}, %{name: "Bob", age: 25}])
iex> Dux.to_columns(df)
%{"age" => [30, 25], "name" => ["Alice", "Bob"]}

from_ndjson(path, opts \\ [])

Read a newline-delimited JSON (NDJSON) file into a lazy Dux pipeline.

Each line in the file must be a valid JSON object. This is a common format for log files, streaming exports, and data interchange.

Examples

df = Dux.from_ndjson("events.ndjson")

NDJSON files look like this (one JSON object per line):

{"name": "Alice", "age": 30}
{"name": "Bob", "age": 25}

Write an NDJSON file and read it back:

iex> path = Path.join(System.tmp_dir!(), "dux_doctest_1538.ndjson")
iex> Dux.from_list([%{x: 1}, %{x: 2}]) |> Dux.to_ndjson(path)
iex> Dux.from_ndjson(path) |> Dux.to_columns()
%{"x" => [1, 2]}

from_parquet(path, opts \\ [])

Read a Parquet file or glob pattern into a lazy Dux pipeline.

Supports local files, globs, and remote URLs (S3, HTTP) when the appropriate DuckDB extension is loaded (httpfs).

Examples

df = Dux.from_parquet("data/sales.parquet")
df = Dux.from_parquet("data/**/*.parquet")
df = Dux.from_parquet("s3://bucket/data/*.parquet")

Write a Parquet file and read it back:

iex> path = Path.join(System.tmp_dir!(), "dux_doctest_1474.parquet")
iex> Dux.from_list([%{x: 1}, %{x: 2}, %{x: 3}]) |> Dux.to_parquet(path)
iex> Dux.from_parquet(path) |> Dux.to_columns()
%{"x" => [1, 2, 3]}

from_query(sql)

Create a Dux from a raw SQL query.

This is the most flexible constructor — anything DuckDB can query, you can use.

iex> df = Dux.from_query("SELECT 1 AS x, 2 AS y")
iex> Dux.to_rows(df)
[%{"x" => 1, "y" => 2}]

iex> df = Dux.from_query("SELECT * FROM range(3) t(n)")
iex> Dux.to_columns(df)
%{"n" => [0, 1, 2]}

Transforms

discard(dux, columns)

Drop the named columns.

iex> Dux.from_query("SELECT 1 AS a, 2 AS b, 3 AS c")
...> |> Dux.discard([:c])
...> |> Dux.to_rows()
[%{"a" => 1, "b" => 2}]

drop_nil(dux, columns)

Drop rows where any of the given columns are nil.

iex> Dux.from_query("SELECT 1 AS x UNION ALL SELECT NULL UNION ALL SELECT 3")
...> |> Dux.drop_nil([:x])
...> |> Dux.to_columns()
%{"x" => [1, 3]}

filter(dux, expr)

(macro)

Filter rows matching a condition.

This is a macro — bare identifiers become column names, ^ interpolates Elixir values. Requires require Dux.

iex> require Dux
iex> Dux.from_query("SELECT * FROM range(1, 6) t(x)")
...> |> Dux.filter(x > 3)
...> |> Dux.to_columns()
%{"x" => [4, 5]}

iex> require Dux
iex> threshold = 7
iex> Dux.from_query("SELECT * FROM range(1, 11) t(x)")
...> |> Dux.filter(x >= ^threshold)
...> |> Dux.to_columns()
%{"x" => [7, 8, 9, 10]}

For raw SQL strings, use filter_with/2.

filter_with(dux, expr)

Filter rows using a raw SQL expression string or compiled {sql, params}.

iex> Dux.from_query("SELECT * FROM range(1, 6) t(x)")
...> |> Dux.filter_with("x > 3")
...> |> Dux.to_columns()
%{"x" => [4, 5]}

iex> Dux.from_query("SELECT * FROM range(1, 11) t(x)")
...> |> Dux.filter_with("x % 2 = 0")
...> |> Dux.to_columns()
%{"x" => [2, 4, 6, 8, 10]}

mutate(dux, pairs)

(macro)

Add or replace columns using expressions.

This is a macro — bare identifiers in expressions become column names, ^ interpolates Elixir values. Requires require Dux.

iex> require Dux
iex> Dux.from_query("SELECT 1 AS x, 2 AS y")
...> |> Dux.mutate(z: x + y, w: x * 10)
...> |> Dux.to_rows()
[%{"w" => 10, "x" => 1, "y" => 2, "z" => 3}]

iex> require Dux
iex> factor = 5
iex> Dux.from_query("SELECT 10 AS x")
...> |> Dux.mutate(scaled: x * ^factor)
...> |> Dux.to_rows()
[%{"scaled" => 50, "x" => 10}]

For raw SQL strings, use mutate_with/2.

mutate_with(dux, exprs)

Add or replace columns using raw SQL expression strings or compiled tuples.

iex> Dux.from_query("SELECT 1 AS x, 2 AS y")
...> |> Dux.mutate_with(z: "x + y", w: "x * 10")
...> |> Dux.to_rows()
[%{"w" => 10, "x" => 1, "y" => 2, "z" => 3}]

rename(dux, mapping)

Rename columns.

Accepts a keyword list of old_name: :new_name or a map.

iex> Dux.from_query("SELECT 1 AS x, 2 AS y")
...> |> Dux.rename(x: :a, y: :b)
...> |> Dux.to_rows()
[%{"a" => 1, "b" => 2}]

select(dux, columns)

Keep only the named columns.

iex> Dux.from_query("SELECT 1 AS a, 2 AS b, 3 AS c")
...> |> Dux.select([:a, :b])
...> |> Dux.to_rows()
[%{"a" => 1, "b" => 2}]

Aggregation

group_by(dux, columns)

Group by columns for subsequent aggregation.

iex> require Dux
iex> Dux.from_list([%{g: "a", v: 1}, %{g: "a", v: 2}, %{g: "b", v: 3}])
...> |> Dux.group_by(:g)
...> |> Dux.summarise(total: sum(v))
...> |> Dux.sort_by(:g)
...> |> Dux.to_rows()
[%{"g" => "a", "total" => 3}, %{"g" => "b", "total" => 3}]

summarise(dux, pairs)

(macro)

Aggregate grouped data using expressions.

This is a macro — function calls like sum(col), count(col), avg(col) compile to DuckDB SQL aggregations. Requires require Dux.

iex> require Dux
iex> Dux.from_list([
...>   %{region: "US", sales: 100},
...>   %{region: "US", sales: 200},
...>   %{region: "EU", sales: 150}
...> ])
...> |> Dux.group_by(:region)
...> |> Dux.summarise(total: sum(sales), n: count(sales))
...> |> Dux.sort_by(:region)
...> |> Dux.to_rows()
[%{"n" => 1, "region" => "EU", "total" => 150}, %{"n" => 2, "region" => "US", "total" => 300}]

For raw SQL strings, use summarise_with/2.

summarise_with(dux, aggs)

Aggregate grouped data using raw SQL expression strings or compiled tuples.

iex> Dux.from_list([%{g: "a", v: 1}, %{g: "a", v: 2}, %{g: "b", v: 3}])
...> |> Dux.group_by(:g)
...> |> Dux.summarise_with(total: "SUM(v)")
...> |> Dux.sort_by(:g)
...> |> Dux.to_rows()
[%{"g" => "a", "total" => 3}, %{"g" => "b", "total" => 3}]

ungroup(dux)

Clear any active grouping set by group_by/2.

This removes all group columns so subsequent operations apply to the full dataframe rather than per-group. The ungroup is tracked as an operation in the pipeline and takes effect when compiled to SQL.

Examples

iex> df = Dux.from_list([%{g: "a", x: 1}]) |> Dux.group_by(:g) |> Dux.ungroup()
iex> {:ungroup} in df.ops
true

Joins

concat_rows(list)

Concatenate rows from multiple dataframes (UNION ALL).

iex> a = Dux.from_list([%{x: 1}])
iex> b = Dux.from_list([%{x: 2}])
iex> c = Dux.from_list([%{x: 3}])
iex> Dux.concat_rows([a, b, c])
...> |> Dux.to_columns()
%{"x" => [1, 2, 3]}

join(left, right, opts \\ [])

Join two dataframes.

Options:

  • :on — column name(s) to join on (required for most join types)

  • :how — join type: :inner (default), :left, :right, :cross, :anti, :semi

  • :suffix — suffix for duplicate column names (default: "_right")

    iex> left = Dux.from_list([%{id: 1, name: "Alice"}, %{id: 2, name: "Bob"}]) iex> right = Dux.from_list([%{id: 1, score: 95}, %{id: 2, score: 87}]) iex> left ...> |> Dux.join(right, on: :id) ...> |> Dux.sort_by(:id) ...> |> Dux.to_rows() [%{"id" => 1, "name" => "Alice", "score" => 95}, %{"id" => 2, "name" => "Bob", "score" => 87}]

Reshape

pivot_longer(dux, columns, opts \\ [])

Unpivot from wide to long format (UNPIVOT).

Takes multiple columns and stacks them into two columns: one for the original column name and one for the value.

Examples

iex> Dux.from_list([
...>   %{region: "US", q1: 100, q2: 200},
...>   %{region: "EU", q1: 150, q2: 250}
...> ])
...> |> Dux.pivot_longer([:q1, :q2], names_to: "quarter", values_to: "sales")
...> |> Dux.sort_by([:region, :quarter])
...> |> Dux.to_rows()
[%{"quarter" => "q1", "region" => "EU", "sales" => 150}, %{"quarter" => "q2", "region" => "EU", "sales" => 250}, %{"quarter" => "q1", "region" => "US", "sales" => 100}, %{"quarter" => "q2", "region" => "US", "sales" => 200}]

pivot_wider(dux, names_from, values_from, opts \\ [])

Pivot from long to wide format (PIVOT).

Takes values from values_from column and spreads them into new columns named by the names_from column, aggregated with agg_func.

Options

  • :agg — aggregation function (default: "SUM")

Examples

iex> Dux.from_list([
...>   %{region: "US", product: "Widget", sales: 100},
...>   %{region: "US", product: "Gadget", sales: 200},
...>   %{region: "EU", product: "Widget", sales: 150}
...> ])
...> |> Dux.pivot_wider(:product, :sales)
...> |> Dux.sort_by(:region)
...> |> Dux.to_rows()
[%{"Gadget" => nil, "Widget" => 150, "region" => "EU"}, %{"Gadget" => 200, "Widget" => 100, "region" => "US"}]

Sorting

distinct(dux, columns \\ nil)

Keep distinct rows, optionally by specific columns.

Row ordering is not guaranteed after distinct/1 — use sort_by/2 if you need deterministic output order. When called with columns, which row is kept for each distinct group is also non-deterministic.

iex> result = Dux.from_list([%{x: 1, y: "a"}, %{x: 1, y: "b"}, %{x: 2, y: "c"}])
...> |> Dux.distinct([:x])
...> |> Dux.sort_by(:x)
...> |> Dux.to_columns()
iex> result["x"]
[1, 2]

iex> Dux.from_list([%{x: 1}, %{x: 1}, %{x: 2}])
...> |> Dux.distinct()
...> |> Dux.sort_by(:x)
...> |> Dux.to_columns()
%{"x" => [1, 2]}

head(dux, n \\ 10)

Take the first n rows, defaulting to 10 if not specified.

In IEx, the result is automatically displayed via the Inspect protocol. Use peek/2 for an explicit table preview.

Examples

iex> Dux.from_query("SELECT * FROM range(100) t(x)")
...> |> Dux.head(3)
...> |> Dux.to_columns()
%{"x" => [0, 1, 2]}

iex> Dux.from_query("SELECT * FROM range(100) t(x)")
...> |> Dux.head()
...> |> Dux.to_columns()
...> |> Map.get("x")
...> |> length()
10

slice(dux, offset, length)

Skip offset rows and take length rows.

iex> Dux.from_query("SELECT * FROM range(10) t(x)")
...> |> Dux.slice(3, 4)
...> |> Dux.to_columns()
%{"x" => [3, 4, 5, 6]}

sort_by(dux, columns)

Sort rows by columns.

Accepts a column name (ascending) or keyword list with :asc/:desc.

iex> Dux.from_list([%{x: 3}, %{x: 1}, %{x: 2}])
...> |> Dux.sort_by(:x)
...> |> Dux.to_columns()
%{"x" => [1, 2, 3]}

iex> Dux.from_list([%{x: 3}, %{x: 1}, %{x: 2}])
...> |> Dux.sort_by(desc: :x)
...> |> Dux.to_columns()
%{"x" => [3, 2, 1]}

IO

attach(name, connection_string, opts \\ [])

Attach an external database to the DuckDB connection.

DuckDB can query Postgres, MySQL, SQLite, and lakehouse formats (Iceberg, Delta, DuckLake) as if they were local tables. Filter pushdown is automatic — DuckDB sends filtered queries to the remote database, transferring only matching rows.

Options

  • :type — database type (required): :postgres, :mysql, :sqlite, :iceberg, :delta, :ducklake, :duckdb
  • :read_only — attach as read-only (default: true)

Examples

Dux.attach(:warehouse, "postgresql://user:pass@host/db", type: :postgres)
Dux.attach(:lake, "s3://bucket/iceberg-table/", type: :iceberg)
Dux.attach(:local, "other.duckdb", type: :duckdb, read_only: false)

create_secret(name, opts)

Create a DuckDB secret for accessing remote services.

Wraps DuckDB's Secrets Manager. Secrets are scoped to specific URL prefixes so different credentials can be used for different buckets or databases.

Options

  • :type — secret type (required): :s3, :gcs, :azure, :postgres, :mysql
  • :key_id — access key ID (S3/GCS)
  • :secret — secret access key (S3/GCS)
  • :region — AWS region (S3)
  • :provider — credential provider: :credential_chain (use IAM/env)
  • :scope — URL prefix scope (e.g., "s3://my-bucket/")
  • :host, :user, :password — database connection options
  • :password_env — read password from environment variable at runtime

Examples

Dux.create_secret(:my_s3, type: :s3,
  key_id: "AKIA...",
  secret: "...",
  region: "us-east-1"
)

# Use IAM role / environment credentials
Dux.create_secret(:my_s3, type: :s3, provider: :credential_chain)

# Scoped to a specific bucket
Dux.create_secret(:prod, type: :s3,
  scope: "s3://prod-bucket/",
  provider: :credential_chain
)

detach(name)

Detach a previously attached database.

Examples

Dux.detach(:warehouse)

drop_secret(name)

Drop a previously created secret.

Examples

Dux.drop_secret(:my_s3)

from_attached(db_name, table_name, opts \\ [])

Create a lazy %Dux{} referencing a table in an attached database.

The table name can include the schema (e.g., "public.customers").

Options

  • :version — snapshot/version number for time-travel (Iceberg, Delta, DuckLake)
  • :as_of — timestamp for time-travel
  • :partition_by — column to hash-partition on for distributed reads. When set and the pipeline is distributed, each worker ATTACHes the database independently and reads a disjoint hash partition. Without this, attached sources are read on the coordinator only.

Examples

customers = Dux.from_attached(:warehouse, "public.customers")
events = Dux.from_attached(:lake, "default.click_events")

# Time travel
Dux.from_attached(:lake, "events", version: 5)

# Distributed reads from Postgres (each worker reads 1/N)
Dux.from_attached(:pg, "public.orders", partition_by: :id)
|> Dux.distribute(workers)
|> Dux.to_rows()

from_excel(path, opts \\ [])

Read an Excel file (.xlsx) into a lazy Dux pipeline.

Uses DuckDB's read_xlsx function (available in DuckDB 1.5+).

Options

  • :sheet — sheet name (default: first sheet)
  • :range — cell range, e.g. "A1:F1000" (default: auto-detect)
  • :header — whether the first row is a header (default: true)
  • :all_varchar — read all columns as VARCHAR (default: false). Useful when type inference fails on mixed-type columns.
  • :ignore_errors — replace type-cast failures with NULL instead of raising (default: true). DuckDB infers column types from the first data row only — if a column has NULL or a number in row 1 but strings later, this prevents hard failures.
  • :empty_as_varchar — infer empty cells as VARCHAR instead of DOUBLE (default: true)
  • :stop_at_empty — stop reading at the first empty row (default: true)

Examples

df = Dux.from_excel("sales.xlsx")
df = Dux.from_excel("data.xlsx", sheet: "Q1 2024", range: "A1:F100")
df = Dux.from_excel("messy.xlsx", all_varchar: true)

insert_into(dux, table, opts \\ [])

Insert a Dux pipeline's results into a table. Triggers computation.

The target can be any table DuckDB can write to — a local table, or a table in an attached database (Postgres, DuckLake, etc.). The pipeline is compiled to SQL and executed as INSERT INTO target SELECT ....

When workers are set (Dux.distribute/2), each worker ATTACHes the target database independently and inserts its partition in parallel. Per-worker transactions — not atomic across workers.

Options

  • :create — create the target table if it doesn't exist (default: false). Uses CREATE TABLE ... AS SELECT ... instead of INSERT INTO.

Examples

# Insert into an attached Postgres table
Dux.attach(:pg, "host=... dbname=analytics", type: :postgres, read_only: false)

Dux.from_parquet("s3://bucket/raw/*.parquet")
|> Dux.filter(col("status") == "active")
|> Dux.insert_into("pg.public.users")

# Distributed insert — each worker writes its partition to Postgres
Dux.from_parquet("s3://input/**/*.parquet")
|> Dux.distribute(workers)
|> Dux.insert_into("pg.public.events", create: true)

# Insert into a local DuckDB table
Dux.from_query("SELECT 1 AS x")
|> Dux.insert_into("my_table", create: true)

list_attached()

List all attached databases.

Returns a list of maps with :name, :type, and :path keys.

Examples

Dux.list_attached()
#=> [%{name: "memory", type: "duckdb", path: "", read_only: false}, ...]

to_csv(dux, path, opts \\ [])

Write a Dux to a CSV file. Triggers computation.

Returns :ok on success. The file is written atomically by DuckDB's COPY ... TO statement.

Options

  • :delimiter - field delimiter (default: ",")
  • :header - write header row (default: true)

Examples

Dux.from_query("SELECT * FROM range(10) t(x)")
|> Dux.to_csv("/tmp/output.csv")

# With custom delimiter
Dux.from_list([%{name: "Alice", age: 30}])
|> Dux.to_csv("/tmp/output.tsv", delimiter: "   ")

to_excel(dux, path)

Write a Dux to an Excel file (.xlsx). Triggers computation.

Requires the DuckDB excel extension (auto-installed, needs explicit LOAD).

Examples

Dux.from_list([%{name: "Alice", age: 30}, %{name: "Bob", age: 25}])
|> Dux.to_excel("/tmp/output.xlsx")

to_ndjson(dux, path, opts \\ [])

Write a Dux to a newline-delimited JSON (NDJSON) file. Triggers computation.

Each row becomes a single JSON object on its own line. Returns :ok on success.

Examples

Dux.from_query("SELECT * FROM range(10) t(x)")
|> Dux.to_ndjson("/tmp/output.ndjson")

# The resulting file contains one JSON object per line:
# {"x":0}
# {"x":1}
# ...

to_parquet(dux, path, opts \\ [])

Write a Dux to a Parquet file. Triggers computation.

Options

  • :compression - compression codec: :snappy (default), :zstd, :gzip, :none
  • :row_group_size - rows per row group
  • :partition_by - column(s) for Hive-style partitioned output. Pass an atom, string, or list. Writes to a directory tree: path/col=value/data_0.parquet.

Examples

Dux.from_query("SELECT * FROM range(10) t(x)")
|> Dux.to_parquet("/tmp/output.parquet")

Dux.from_query("SELECT * FROM range(10) t(x)")
|> Dux.to_parquet("/tmp/output.parquet", compression: :zstd)

# Hive-partitioned output
Dux.from_parquet("events.parquet")
|> Dux.to_parquet("/tmp/events/", partition_by: [:year, :month])

Materialization

compute(dux, opts \\ [])

Compile the pipeline to SQL and execute against DuckDB.

Returns a new %Dux{} with source: {:table, ref} and empty ops. The ref is a NIF ResourceArc — when it's GC'd, the temp table is dropped.

Options (distributed only)

  • :broadcast_threshold — max IPC size in bytes for broadcast joins (default: 256MB). Right sides larger than this trigger shuffle joins.

Examples

iex> df = Dux.from_query("SELECT 1 AS x") |> Dux.compute()
iex> df.ops
[]
iex> match?({:table, _}, df.source)
true

n_rows(dux)

Return the number of rows. Triggers computation.

iex> Dux.from_query("SELECT * FROM range(42) t(x)")
...> |> Dux.n_rows()
42

peek(dux, opts \\ [])

Print a formatted preview of the data. Triggers computation.

Shows the first limit rows (default 5) as a formatted table with a shape summary.

Options

  • :limit - number of rows to show (default: 5)

Returns :ok.

sql_preview(dux, opts \\ [])

Return the SQL that would be generated, without executing.

Options

  • :pretty - format with indentation (default: false)

Examples

iex> sql = Dux.from_query("SELECT * FROM t")
...> |> Dux.filter_with("x > 10")
...> |> Dux.head(5)
...> |> Dux.sql_preview()
iex> sql =~ "WHERE"
true
iex> sql =~ "LIMIT"
true

to_columns(dux, opts \\ [])

Compute and return results as a map of column_name => [values].

Automatically collects from distributed if needed.

Options

  • :atom_keys - use atom keys instead of string keys (default: false)

Examples

iex> Dux.from_query("SELECT * FROM range(3) t(x)")
...> |> Dux.to_columns()
%{"x" => [0, 1, 2]}

iex> Dux.from_query("SELECT * FROM range(3) t(x)")
...> |> Dux.to_columns(atom_keys: true)
%{x: [0, 1, 2]}

to_rows(dux, opts \\ [])

Compute and return results as a list of maps.

Automatically collects from distributed if needed.

Options

  • :atom_keys - use atom keys instead of string keys (default: false)

Examples

iex> Dux.from_query("SELECT 1 AS x, 'hello' AS y")
...> |> Dux.to_rows()
[%{"x" => 1, "y" => "hello"}]

iex> Dux.from_query("SELECT 1 AS x, 'hello' AS y")
...> |> Dux.to_rows(atom_keys: true)
[%{x: 1, y: "hello"}]

to_tensor(dux, column)

Convert a single column to an Nx tensor via zero-copy from Arrow buffers.

Triggers computation. Supports integer, float, date, time, timestamp, duration, and dictionary-encoded columns. Columns with nulls raise ArgumentError — filter them first. Boolean and decimal columns are not supported (booleans are bit-packed, decimals need conversion).

Requires Nx to be available as a dependency.

Examples

df = Dux.from_list([%{x: 1.0, y: 2.0}, %{x: 3.0, y: 4.0}])
Dux.to_tensor(df, :x)
#=> #Nx.Tensor<f64[2] [1.0, 3.0]>

Distribution

collect(dux)

Collect distributed results back to a local %Dux{}.

For distributed pipelines, this brings results back to the calling node. For local pipelines, this is equivalent to compute/1.

Examples

workers = Dux.Remote.Worker.list()

Dux.from_parquet("data/**/*.parquet")
|> Dux.distribute(workers)
|> Dux.filter(amount > 100)
|> Dux.collect()
# => local %Dux{} with no workers

distribute(dux, workers)

Mark a Dux for distributed execution across the given workers.

All subsequent operations will automatically use the Coordinator to fan out work across the workers. Use collect/1 to bring distributed results back to a local %Dux{}, or to_rows/1 / to_columns/1 to materialize directly.

Examples

workers = Dux.Remote.Worker.list()

Dux.from_parquet("data/**/*.parquet")
|> Dux.distribute(workers)
|> Dux.filter(amount > 100)
|> Dux.group_by(:region)
|> Dux.summarise(total: sum(amount))
|> Dux.to_rows()

local(dux)

Return to local execution, removing any distributed workers from the pipeline.

This is the inverse of distribute/2. After calling local/1, all subsequent operations execute on the local node's DuckDB instance.

Examples

iex> df = Dux.from_list([%{x: 1}]) |> Dux.distribute([:fake])
iex> df.workers
[:fake]
iex> Dux.local(df).workers
nil

Types

source()

@type source() ::
  {:parquet, String.t()}
  | {:csv, String.t(), keyword()}
  | {:ndjson, String.t(), keyword()}
  | {:table, reference()}
  | {:sql, String.t()}

t()

@type t() :: %Dux{
  dtypes: %{required(String.t()) => atom() | tuple()},
  groups: [String.t()],
  names: [String.t()],
  ops: [tuple()],
  remote: term(),
  source: source(),
  workers: term()
}

join

asof_join(left, right, opts)

ASOF join — match each left row to the nearest right row satisfying an inequality.

Useful for time series alignment: match each trade to the most recent quote, each event to the closest preceding snapshot, etc.

Options

  • :on — equality column(s) to match on (atom, string, or list)
  • :by{column, operator} specifying the inequality condition. Operators: :>=, :>, :<=, :<
  • :how:inner (default) or :left (preserve unmatched left rows)
  • :suffix — suffix for duplicate column names (default: "_right")

Examples

trades = Dux.from_list([
  %{symbol: "AAPL", timestamp: 10, price: 150.0},
  %{symbol: "AAPL", timestamp: 20, price: 152.0}
])
quotes = Dux.from_list([
  %{symbol: "AAPL", timestamp: 5, bid: 149.0},
  %{symbol: "AAPL", timestamp: 15, bid: 151.0}
])

# Match each trade to the most recent quote
Dux.asof_join(trades, quotes, on: :symbol, by: {:timestamp, :>=})
|> Dux.to_rows()