Creating Data

From Elixir

Dux.from_list([%{x: 1, y: "a"}, %{x: 2, y: "b"}])

From files

Dux.from_csv("data.csv")
Dux.from_csv("data.csv", delimiter: "\t", nullstr: "NA")
Dux.from_parquet("data/**/*.parquet")
Dux.from_ndjson("events.ndjson")
Dux.from_excel("data.xlsx")
Dux.from_excel("data.xlsx", sheet: "Sales", all_varchar: true)

From databases

Dux.attach(:pg, "host=... dbname=db", type: :postgres)
Dux.from_attached(:pg, "public.orders")
Dux.from_attached(:pg, "public.orders", partition_by: :id)
Dux.detach(:pg)
Dux.list_attached()

Secrets

Dux.create_secret(:s3, type: :s3, key_id: "...", secret: "...", region: "us-east-1")
Dux.drop_secret(:s3)

From SQL

Dux.from_query("SELECT * FROM range(100) t(x)")

Filtering

Expression syntax

Macro (require Dux)

Dux.filter(df, x > 10 and status == "active")

# Interpolate Elixir values with ^
min_val = 50
Dux.filter(df, price > ^min_val)

Raw SQL

Dux.filter_with(df, "x > 10 AND status = 'active'")

# DuckDB functions work here
Dux.filter_with(df, "x BETWEEN 10 AND 20")

Transforms

Mutate (add/replace columns)

Macro

Dux.mutate(df,
  revenue: price * qty,
  tax: price * 0.08
)

Raw SQL

Dux.mutate_with(df,
  revenue: "price * qty",
  upper_name: "UPPER(name)",
  rank: "ROW_NUMBER() OVER (ORDER BY score DESC)"
)

Column management

Dux.select(df, [:name, :age])         # keep columns
Dux.discard(df, [:temp, :debug])       # drop columns
Dux.rename(df, old_name: :new_name)    # rename columns
Dux.drop_nil(df, [:age, :email])       # remove rows with nils

Sorting & Limiting

Dux.sort_by(df, :name)                          # ascending
Dux.sort_by(df, desc: :score)                    # descending
Dux.sort_by(df, asc: :dept, desc: :salary)       # multi-column
Dux.head(df)                                     # first 10 rows (default)
Dux.head(df, 5)                                  # first 5 rows
Dux.slice(df, 5, 10)                             # offset 5, take 10
Dux.distinct(df)                                 # deduplicate all columns

Aggregation

Group + Summarise

Macro

df
|> Dux.group_by(:region)
|> Dux.summarise(
  total: sum(amount),
  average: avg(price),
  n: count(id),
  biggest: max(amount),
  smallest: min(amount)
)

Raw SQL

df
|> Dux.group_by([:region, :year])
|> Dux.summarise_with(
  total: "SUM(amount)",
  median: "MEDIAN(price)",
  p95: "PERCENTILE_CONT(0.95) WITHIN GROUP (ORDER BY amount)",
  unique: "COUNT(DISTINCT customer_id)"
)

Joins

Join types

Dux.join(left, right, on: :id)                   # inner (default)
Dux.join(left, right, on: :id, how: :left)       # left
Dux.join(left, right, on: :id, how: :right)      # right
Dux.join(left, right, on: :id, how: :anti)       # anti (no match)
Dux.join(left, right, on: :id, how: :semi)       # semi (exists)
Dux.join(left, right, on: :id, how: :cross)      # cross product

Different column names

Dux.join(flights, airports, on: [{:dest, :faa}])
Dux.join(orders, users, on: [{:customer_id, :id}])

ASOF join (time series)

Dux.asof_join(trades, quotes, on: :symbol, by: {:timestamp, :>=})
Dux.asof_join(trades, quotes, on: :symbol, by: {:timestamp, :>=}, how: :left)

Concat rows (UNION ALL)

Dux.concat_rows([df1, df2, df3])

Reshape

Pivot

Long → Wide

Dux.pivot_wider(df, :product, :sales)
Dux.pivot_wider(df, :product, :sales, agg: "SUM")

Wide → Long

Dux.pivot_longer(df, [:q1, :q2, :q3, :q4],
  names_to: "quarter",
  values_to: "revenue"
)

IO

Read & Write

Reading

Dux.from_csv("file.csv")
Dux.from_csv("file.csv", delimiter: "\t", nullstr: "NA")
Dux.from_parquet("data/*.parquet")
Dux.from_parquet("s3://bucket/data/*.parquet")
Dux.from_ndjson("events.ndjson")
Dux.from_query("SELECT * FROM 'file.csv'")

Writing

Dux.to_csv(df, "out.csv")
Dux.to_parquet(df, "out.parquet")
Dux.to_parquet(df, "out.parquet", compression: :zstd)
Dux.to_parquet(df, "out/", partition_by: [:year, :month])
Dux.to_ndjson(df, "out.ndjson")
Dux.to_excel(df, "out.xlsx")

Database writes

Dux.insert_into(df, "my_table", create: true)
Dux.insert_into(df, "pg.public.events")

Materialization

Dux.compute(df)                    # → %Dux{} (execute pipeline)
Dux.to_rows(df)                    # → [%{"col" => val}, ...]
Dux.to_rows(df, atom_keys: true)   # → [%{col: val}, ...]
Dux.to_columns(df)                 # → %{"col" => [vals]}
Dux.n_rows(df)                     # → integer
Dux.peek(df)                       # print formatted table
Dux.sql_preview(df)                # → SQL string
Dux.sql_preview(df, pretty: true)  # → formatted SQL

Distributed

Reads

workers = Dux.Remote.Worker.list()

# Size-balanced Parquet distribution
Dux.from_parquet("s3://data/**/*.parquet")
|> Dux.distribute(workers)
|> Dux.filter(amount > 100)
|> Dux.group_by(:region)
|> Dux.summarise(total: sum(amount))
|> Dux.compute()

# Hash-partitioned Postgres reads
Dux.from_attached(:pg, "public.orders", partition_by: :id)
|> Dux.distribute(workers)
|> Dux.compute()

Writes

# Parallel file writes
df |> Dux.distribute(workers) |> Dux.to_parquet("s3://out/")

# Hive-partitioned output
df |> Dux.distribute(workers) |> Dux.to_parquet("s3://out/", partition_by: :year)

# Parallel database inserts
df |> Dux.distribute(workers) |> Dux.insert_into("pg.public.events", create: true)

# Collect back to local
df |> Dux.distribute(workers) |> Dux.collect()

FLAME: elastic cloud compute

Dux.Flame.start_pool(backend: {FLAME.FlyBackend, ...}, max: 10)
workers = Dux.Flame.spin_up(5)

Graph Analytics

graph = Dux.Graph.new(vertices: v, edges: e)

# Algorithms (return %Dux{} — pipe into any verb)
Dux.Graph.pagerank(graph)                    # influence ranking
Dux.Graph.shortest_paths(graph, start)       # BFS distances
Dux.Graph.connected_components(graph)        # community detection
Dux.Graph.triangle_count(graph)              # clustering density
Dux.Graph.out_degree(graph)                  # connection count
Dux.Graph.in_degree(graph)                   # incoming connections

# Distribute across workers
graph |> Dux.Graph.distribute(workers) |> Dux.Graph.pagerank()

Nx Interop

tensor = Dux.to_tensor(df, :price)    # column → Nx.Tensor
# Implements Nx.LazyContainer for defn