SparkEx.Writer (SparkEx v0.1.0)

Copy Markdown View Source

Builder API for writing DataFrames to external storage.

Mirrors PySpark's DataFrameWriter with a builder pattern.

Examples

import SparkEx.Writer

# Write to Parquet
df
|> SparkEx.DataFrame.write()
|> format("parquet")
|> mode(:overwrite)
|> option("compression", "snappy")
|> save("/data/output.parquet")

# Save as table
df
|> SparkEx.DataFrame.write()
|> format("parquet")
|> mode(:append)
|> save_as_table("my_database.my_table")

# Insert into existing table
df
|> SparkEx.DataFrame.write()
|> mode(:append)
|> insert_into("my_table")

# Shorthand: write Parquet
SparkEx.Writer.parquet(df, "/data/output.parquet", mode: :overwrite)

Summary

Functions

Writes the DataFrame as Avro.

Sets bucketing for the write.

Sets clustering columns for the write.

Writes the DataFrame as CSV.

Sets the output data source format (e.g. "parquet", "csv", "json", "orc").

Inserts the DataFrame into an existing table.

Writes the DataFrame via JDBC.

Writes the DataFrame as JSON.

Sets the save mode.

Sets a single writer option.

Merges a map of options into the writer.

Writes the DataFrame as ORC.

Writes the DataFrame as Parquet.

Sets the partitioning columns for the write.

Saves the DataFrame to the given path.

Saves the DataFrame as a named table.

Sets the sort columns for the write.

Writes the DataFrame as text (single column).

Writes the DataFrame as XML.

Types

t()

@type t() :: %SparkEx.Writer{
  bucket_by: {pos_integer(), [String.t()]} | nil,
  cluster_by: [String.t()],
  df: SparkEx.DataFrame.t(),
  mode: atom(),
  options: %{required(String.t()) => String.t()},
  partition_by: [String.t()],
  sort_by: [String.t()],
  source: String.t() | nil
}

Functions

avro(df, path, opts \\ [])

@spec avro(SparkEx.DataFrame.t(), String.t(), keyword()) :: :ok | {:error, term()}

Writes the DataFrame as Avro.

Options

  • :mode — save mode (default: :error_if_exists)
  • :options — map of Avro writer options
  • :partition_by — partitioning columns

bucket_by(writer, num_buckets, columns)

@spec bucket_by(t(), pos_integer(), [String.t()]) :: t()

Sets bucketing for the write.

cluster_by(writer, columns)

@spec cluster_by(t(), [String.t()]) :: t()

Sets clustering columns for the write.

csv(df, path, opts \\ [])

@spec csv(SparkEx.DataFrame.t(), String.t(), keyword()) :: :ok | {:error, term()}

Writes the DataFrame as CSV.

Options

  • :mode — save mode (default: :error_if_exists)
  • :header — whether to include a header row
  • :separator — field separator
  • :options — map of CSV writer options

format(writer, fmt)

@spec format(t(), String.t()) :: t()

Sets the output data source format (e.g. "parquet", "csv", "json", "orc").

insert_into(writer, table_name, overwrite_or_opts \\ [])

@spec insert_into(t(), String.t(), boolean() | keyword()) :: :ok | {:error, term()}

Inserts the DataFrame into an existing table.

When overwrite: true is passed, mode is set to :overwrite. When overwrite: false is passed, mode is set to :append. When no :overwrite option is given, the writer's current mode is used unchanged (matching PySpark's insertInto behavior where mode defaults to server-side handling).

jdbc(df, opts)

@spec jdbc(
  SparkEx.DataFrame.t(),
  keyword()
) :: :ok | {:error, term()}

jdbc(df, url, table, opts \\ [])

@spec jdbc(SparkEx.DataFrame.t(), String.t(), String.t(), keyword()) ::
  :ok | {:error, term()}

Writes the DataFrame via JDBC.

Options

  • :mode — save mode (default: :error_if_exists)
  • :options — map of JDBC writer options (e.g. url, dbtable)

json(df, path, opts \\ [])

@spec json(SparkEx.DataFrame.t(), String.t(), keyword()) :: :ok | {:error, term()}

Writes the DataFrame as JSON.

Options

  • :mode — save mode (default: :error_if_exists)
  • :options — map of JSON writer options

mode(writer, save_mode)

@spec mode(t(), atom() | String.t()) :: t()

Sets the save mode.

  • :append — append to existing data
  • :overwrite — overwrite existing data
  • :error_if_exists — error if data already exists (default)
  • :ignore — silently ignore if data already exists

option(writer, key, value)

@spec option(t(), String.t(), term()) :: t()

Sets a single writer option.

options(writer, opts)

@spec options(t(), map() | keyword()) :: t()

Merges a map of options into the writer.

orc(df, path, opts \\ [])

@spec orc(SparkEx.DataFrame.t(), String.t(), keyword()) :: :ok | {:error, term()}

Writes the DataFrame as ORC.

Options

  • :mode — save mode (default: :error_if_exists)
  • :options — map of ORC writer options

parquet(df, path, opts \\ [])

@spec parquet(SparkEx.DataFrame.t(), String.t(), keyword()) :: :ok | {:error, term()}

Writes the DataFrame as Parquet.

Options

  • :mode — save mode (default: :error_if_exists)
  • :options — map of Parquet writer options
  • :partition_by — partitioning columns

partition_by(writer, columns)

@spec partition_by(t(), [String.t()]) :: t()

Sets the partitioning columns for the write.

save(writer, path \\ nil, opts \\ [])

@spec save(t(), String.t() | nil, keyword()) :: :ok | {:error, term()}

Saves the DataFrame to the given path.

Executes the write operation on the Spark server.

save_as_table(writer, table_name, opts \\ [])

@spec save_as_table(t(), String.t(), keyword()) :: :ok | {:error, term()}

Saves the DataFrame as a named table.

sort_by(writer, columns)

@spec sort_by(t(), [String.t()]) :: t()

Sets the sort columns for the write.

text(df, path, opts \\ [])

@spec text(SparkEx.DataFrame.t(), String.t(), keyword()) :: :ok | {:error, term()}

Writes the DataFrame as text (single column).

Options

  • :mode — save mode (default: :error_if_exists)
  • :options — map of text writer options

xml(df, path, opts \\ [])

@spec xml(SparkEx.DataFrame.t(), String.t(), keyword()) :: :ok | {:error, term()}

Writes the DataFrame as XML.

Options

  • :mode — save mode (default: :error_if_exists)
  • :options — map of XML writer options
  • :partition_by — partitioning columns