Creating Data
From Elixir
Dux.from_list([%{x: 1, y: "a"}, %{x: 2, y: "b"}])
From files
Dux.from_csv("data.csv")
Dux.from_csv("data.csv", delimiter: "\t", nullstr: "NA")
Dux.from_parquet("data/**/*.parquet")
Dux.from_ndjson("events.ndjson")
Dux.from_excel("data.xlsx")
Dux.from_excel("data.xlsx", sheet: "Sales", all_varchar: true)
From databases
Dux.attach(:pg, "host=... dbname=db", type: :postgres)
Dux.from_attached(:pg, "public.orders")
Dux.from_attached(:pg, "public.orders", partition_by: :id)
Dux.detach(:pg)
Dux.list_attached()
Secrets
Dux.create_secret(:s3, type: :s3, key_id: "...", secret: "...", region: "us-east-1")
Dux.drop_secret(:s3)
From SQL
Dux.from_query("SELECT * FROM range(100) t(x)")
Filtering
Expression syntax
Macro (require Dux)
Dux.filter(df, x > 10 and status == "active")
# Interpolate Elixir values with ^
min_val = 50
Dux.filter(df, price > ^min_val)
Raw SQL
Dux.filter_with(df, "x > 10 AND status = 'active'")
# DuckDB functions work here
Dux.filter_with(df, "x BETWEEN 10 AND 20")
Mutate (add/replace columns)
Macro
Dux.mutate(df,
revenue: price * qty,
tax: price * 0.08
)
Raw SQL
Dux.mutate_with(df,
revenue: "price * qty",
upper_name: "UPPER(name)",
rank: "ROW_NUMBER() OVER (ORDER BY score DESC)"
)
Column management
Dux.select(df, [:name, :age]) # keep columns
Dux.discard(df, [:temp, :debug]) # drop columns
Dux.rename(df, old_name: :new_name) # rename columns
Dux.drop_nil(df, [:age, :email]) # remove rows with nils
Sorting & Limiting
Dux.sort_by(df, :name) # ascending
Dux.sort_by(df, desc: :score) # descending
Dux.sort_by(df, asc: :dept, desc: :salary) # multi-column
Dux.head(df) # first 10 rows (default)
Dux.head(df, 5) # first 5 rows
Dux.slice(df, 5, 10) # offset 5, take 10
Dux.distinct(df) # deduplicate all columns
Aggregation
Group + Summarise
Macro
df
|> Dux.group_by(:region)
|> Dux.summarise(
total: sum(amount),
average: avg(price),
n: count(id),
biggest: max(amount),
smallest: min(amount)
)
Raw SQL
df
|> Dux.group_by([:region, :year])
|> Dux.summarise_with(
total: "SUM(amount)",
median: "MEDIAN(price)",
p95: "PERCENTILE_CONT(0.95) WITHIN GROUP (ORDER BY amount)",
unique: "COUNT(DISTINCT customer_id)"
)
Joins
Join types
Dux.join(left, right, on: :id) # inner (default)
Dux.join(left, right, on: :id, how: :left) # left
Dux.join(left, right, on: :id, how: :right) # right
Dux.join(left, right, on: :id, how: :anti) # anti (no match)
Dux.join(left, right, on: :id, how: :semi) # semi (exists)
Dux.join(left, right, on: :id, how: :cross) # cross product
Different column names
Dux.join(flights, airports, on: [{:dest, :faa}])
Dux.join(orders, users, on: [{:customer_id, :id}])
ASOF join (time series)
Dux.asof_join(trades, quotes, on: :symbol, by: {:timestamp, :>=})
Dux.asof_join(trades, quotes, on: :symbol, by: {:timestamp, :>=}, how: :left)
Concat rows (UNION ALL)
Dux.concat_rows([df1, df2, df3])
Reshape
Pivot
Long → Wide
Dux.pivot_wider(df, :product, :sales)
Dux.pivot_wider(df, :product, :sales, agg: "SUM")
Wide → Long
Dux.pivot_longer(df, [:q1, :q2, :q3, :q4],
names_to: "quarter",
values_to: "revenue"
)
IO
Read & Write
Reading
Dux.from_csv("file.csv")
Dux.from_csv("file.csv", delimiter: "\t", nullstr: "NA")
Dux.from_parquet("data/*.parquet")
Dux.from_parquet("s3://bucket/data/*.parquet")
Dux.from_ndjson("events.ndjson")
Dux.from_query("SELECT * FROM 'file.csv'")
Writing
Dux.to_csv(df, "out.csv")
Dux.to_parquet(df, "out.parquet")
Dux.to_parquet(df, "out.parquet", compression: :zstd)
Dux.to_parquet(df, "out/", partition_by: [:year, :month])
Dux.to_ndjson(df, "out.ndjson")
Dux.to_excel(df, "out.xlsx")
Database writes
Dux.insert_into(df, "my_table", create: true)
Dux.insert_into(df, "pg.public.events")
Materialization
Dux.compute(df) # → %Dux{} (execute pipeline)
Dux.to_rows(df) # → [%{"col" => val}, ...]
Dux.to_rows(df, atom_keys: true) # → [%{col: val}, ...]
Dux.to_columns(df) # → %{"col" => [vals]}
Dux.n_rows(df) # → integer
Dux.peek(df) # print formatted table
Dux.sql_preview(df) # → SQL string
Dux.sql_preview(df, pretty: true) # → formatted SQL
Distributed
Reads
workers = Dux.Remote.Worker.list()
# Size-balanced Parquet distribution
Dux.from_parquet("s3://data/**/*.parquet")
|> Dux.distribute(workers)
|> Dux.filter(amount > 100)
|> Dux.group_by(:region)
|> Dux.summarise(total: sum(amount))
|> Dux.compute()
# Hash-partitioned Postgres reads
Dux.from_attached(:pg, "public.orders", partition_by: :id)
|> Dux.distribute(workers)
|> Dux.compute()
Writes
# Parallel file writes
df |> Dux.distribute(workers) |> Dux.to_parquet("s3://out/")
# Hive-partitioned output
df |> Dux.distribute(workers) |> Dux.to_parquet("s3://out/", partition_by: :year)
# Parallel database inserts
df |> Dux.distribute(workers) |> Dux.insert_into("pg.public.events", create: true)
# Collect back to local
df |> Dux.distribute(workers) |> Dux.collect()
FLAME: elastic cloud compute
Dux.Flame.start_pool(backend: {FLAME.FlyBackend, ...}, max: 10)
workers = Dux.Flame.spin_up(5)
Graph Analytics
graph = Dux.Graph.new(vertices: v, edges: e)
# Algorithms (return %Dux{} — pipe into any verb)
Dux.Graph.pagerank(graph) # influence ranking
Dux.Graph.shortest_paths(graph, start) # BFS distances
Dux.Graph.connected_components(graph) # community detection
Dux.Graph.triangle_count(graph) # clustering density
Dux.Graph.out_degree(graph) # connection count
Dux.Graph.in_degree(graph) # incoming connections
# Distribute across workers
graph |> Dux.Graph.distribute(workers) |> Dux.Graph.pagerank()
Nx Interop
tensor = Dux.to_tensor(df, :price) # column → Nx.Tensor
# Implements Nx.LazyContainer for defn