Dux.Remote.Worker (Dux v0.2.0)

Copy Markdown View Source

A DuckDB worker node in a distributed Dux cluster.

Each worker owns a DuckDB connection, registers in the :dux_workers process group via :pg, and executes pipelines on behalf of the coordinator.

Starting workers

Workers are started automatically by the application supervisor if distributed mode is enabled, or manually:

{:ok, pid} = Dux.Remote.Worker.start_link([])

Discovery

The coordinator discovers workers via :pg:

workers = Dux.Remote.Worker.list()

Execution

Workers receive %Dux{} pipelines (plain data), compile to SQL locally, execute against their local DuckDB, and return results as Arrow IPC.

{:ok, ipc_binary} = Dux.Remote.Worker.execute(worker_pid, pipeline)

Summary

Functions

Append an Arrow IPC chunk to a named temp table. Creates the table if it doesn't exist. Used during shuffle exchange.

Returns a specification to start this module under a supervisor.

Drop a named temporary table on the worker.

Execute a %Dux{} pipeline on a worker. Returns {:ok, ipc_binary} or {:error, reason}.

Hash-partition a pipeline's results into n_buckets buckets by join key(s).

Get worker info (node, connection status).

Execute a pipeline and insert the results into a table.

List all registered workers across the cluster.

List workers on a specific node.

Register an Arrow IPC binary as a named temporary table on the worker. Used for broadcast joins — the coordinator sends a small table to all workers.

Start a worker and register it in the :dux_workers process group.

Execute a pipeline and write the results directly to a file.

Functions

append_chunk(worker, table_name, ipc_binary)

Append an Arrow IPC chunk to a named temp table. Creates the table if it doesn't exist. Used during shuffle exchange.

child_spec(init_arg)

Returns a specification to start this module under a supervisor.

See Supervisor.

drop_table(worker, name)

Drop a named temporary table on the worker.

execute(worker, pipeline, timeout \\ :infinity)

Execute a %Dux{} pipeline on a worker. Returns {:ok, ipc_binary} or {:error, reason}.

The pipeline is compiled to SQL on the worker node and executed against the worker's local DuckDB. The result is serialized as Arrow IPC.

hash_partition(worker, pipeline, on, n_buckets, timeout \\ :infinity)

Hash-partition a pipeline's results into n_buckets buckets by join key(s).

on can be a single column (atom/string) or a list of columns. Returns %{bucket_id => ipc_binary} — each bucket contains the rows whose hash(join_keys) % n_buckets == bucket_id.

info(worker)

Get worker info (node, connection status).

insert_into(worker, pipeline, table, setup_sqls, create?, timeout \\ :infinity)

Execute a pipeline and insert the results into a table.

setup_sqls is a list of SQL statements to run before the insert (e.g., INSTALL/LOAD extensions, ATTACH databases). The worker compiles the pipeline, then runs INSERT INTO or CREATE TABLE AS.

Returns {:ok, table} or {:error, reason}.

list()

List all registered workers across the cluster.

list(node)

List workers on a specific node.

register_table(worker, name, ipc_binary)

Register an Arrow IPC binary as a named temporary table on the worker. Used for broadcast joins — the coordinator sends a small table to all workers.

start_link(opts \\ [])

Start a worker and register it in the :dux_workers process group.

write(worker, pipeline, path, copy_opts_sql, timeout \\ :infinity)

Execute a pipeline and write the results directly to a file.

The worker compiles the pipeline to SQL, then runs COPY (query) TO 'path' (format_opts). Returns {:ok, path} or {:error, reason}.