SparkEx.DataFrame (SparkEx v0.1.0)

Copy Markdown View Source

A lazy reference to a Spark DataFrame.

DataFrame structs hold a session reference and an internal plan representation. Transforms build up the plan; actions (collect/1, count/1, etc.) execute it via the Spark Connect server.

Transforms (lazy)

Actions (execute)

Summary

Functions

Aggregate without grouping.

Aliases this DataFrame for use in subqueries.

Performs an as-of join between two DataFrames.

Returns this DataFrame as a table-argument wrapper.

Alias for persist/2 with default storage level (PySpark cache).

Alias for checkpoint/2 (PySpark checkpoint).

Materializes this DataFrame as a cached relation.

Reduces the number of partitions without shuffling data.

Returns a column reference bound to this DataFrame plan.

Selects columns by regex.

Collects all rows from the DataFrame as a list of maps.

Collects rows into a map using the first column as key and second column as value.

Returns a list of column names.

Computes Pearson correlation. Delegates to SparkEx.DataFrame.Stat.corr/4.

Returns the row count of the DataFrame.

Computes covariance. Delegates to SparkEx.DataFrame.Stat.cov/3.

Creates a global temporary view with the given name.

Creates or replaces a global temporary view with the given name.

Creates or replaces a temporary view with the given name.

Creates a temporary view with the given name.

Cross join — shorthand for join(df, other, [], :cross).

Groups by cube of the specified columns.

Describes basic statistics. Delegates to SparkEx.DataFrame.Stat.describe/2.

Returns a new DataFrame with duplicate rows removed.

Drops the specified columns.

Drops duplicate rows based on a subset of columns.

Drops duplicate rows within the watermark window.

Drops a global temporary view by name.

Drops a local temporary view by name.

Drops rows with null values. Delegates to SparkEx.DataFrame.NA.drop/2.

Returns list of {column_name, type_string} tuples.

Returns rows in this DataFrame that are not in the other DataFrame.

Returns rows in this DataFrame that are not in the other, preserving duplicates (equivalent to SQL EXCEPT ALL).

Returns execution metrics from the last action on the session.

Alias for execution_info/1 (PySpark executionInfo).

Returns this DataFrame as an EXISTS subquery expression.

Returns the explain string for the DataFrame's plan.

Filters rows based on a boolean condition.

Returns the first row as a map, or nil if empty.

Applies a function to each row on the driver.

Applies a function to each partition (driver-side shim).

Groups the DataFrame by the given columns, returning a SparkEx.GroupedData.

Alias for group_by/2 (PySpark groupby).

Groups by grouping sets.

Returns the first row as a map, or nil if empty.

Returns the first n rows as a list of maps.

Adds a query optimization hint.

Returns an HTML string representation of the DataFrame.

Returns this DataFrame as an IN subquery expression.

Returns the input files for the plan.

Returns rows in this DataFrame that are also in the other DataFrame.

Returns rows common to both DataFrames, preserving duplicates (equivalent to SQL INTERSECT ALL).

Returns true if the DataFrame is cached (storage level is not NONE).

Alias for is_cached/1 (PySpark is_cached).

Returns true if the DataFrame has no rows.

Checks if the plan is local (can be computed without Spark).

Checks if the plan represents a streaming query.

Joins this DataFrame with another on the given condition.

Performs a lateral join between two DataFrames.

Limits the number of rows.

Alias for local_checkpoint/2 with default options.

Materializes this DataFrame as a local (non-reliable) checkpoint.

Alias for local_checkpoint/2 (PySpark localCheckpoint).

Creates a SparkEx.MergeIntoWriter for MERGE INTO operations.

Returns a metadata column expression by name.

Returns the DataFrame for use with SparkEx.DataFrame.NA functions.

Observes metrics during query execution.

Skips the first n rows.

Sorts the DataFrame by the given columns or sort orders.

Parses string columns in the DataFrame as CSV or JSON.

Persists the DataFrame with optional storage level.

Convenience wrapper for SparkEx.GroupedData.pivot/3 so grouped pipelines can stay under DataFrame.

Prints the schema tree, mirroring PySpark printSchema.

Randomly splits the DataFrame into multiple DataFrames using normalized weights.

Returns the number of partitions in the underlying RDD.

Repartitions the DataFrame.

Repartitions the DataFrame by range using sort order expressions without specifying partitions.

Repartitions the DataFrame by range using sort order expressions.

Groups by rollup of the specified columns.

Checks if this DataFrame has the same semantics as another.

Returns this DataFrame as a scalar subquery expression.

Returns the schema of the DataFrame via AnalyzePlan.

Projects a set of columns or expressions.

Projects columns using SQL expression strings.

Returns the semantic hash of the plan.

Returns a formatted string representation of the DataFrame (like PySpark's show()).

Alias for order_by/2 (PySpark sort).

Sorts within each partition by the given columns.

Returns the parent Spark session.

Alias for spark_session/1 (PySpark sparkSession).

Returns the DataFrame for use with SparkEx.DataFrame.Stat functions.

Returns the storage level of a persisted DataFrame.

Alias for except/2 (EXCEPT DISTINCT, matching PySpark subtract).

Computes summary statistics. Delegates to SparkEx.DataFrame.Stat.summary/2.

Creates a DataFrame from a table-valued function (TVF) call.

Tags the DataFrame with an operation tag for interrupt targeting.

Returns the last n rows as a list of maps.

Returns a lazy DataFrame relation for the last n rows.

Returns up to n rows from the DataFrame as a list of maps.

Casts this DataFrame to the given schema.

Materializes the DataFrame as a raw Arrow IPC binary.

Renames all columns in the DataFrame.

Materializes the DataFrame as an Explorer.DataFrame.

Converts each row to a JSON string, returning a single-column DataFrame.

Returns a lazy enumerable over collected rows.

Applies a transformation function to the DataFrame.

Transposes the DataFrame.

Returns the tree-string representation of the plan.

Returns a new DataFrame with the union of rows from both DataFrames.

Union by column name rather than position.

Returns a new DataFrame with the union of rows, removing duplicates (equivalent to SQL UNION).

Alias for union/2 (PySpark unionAll).

Unpersists the DataFrame.

Unpivots a DataFrame from wide to long format.

Adds or replaces a column with the given name and expression.

Adds or replaces multiple columns at once.

Renames multiple columns using a map of old -> new names.

Adds or replaces metadata for an existing column.

Adds a watermark for streaming event-time processing.

Returns a SparkEx.Writer builder for this DataFrame.

Returns a SparkEx.StreamWriter builder for this streaming DataFrame.

Returns a SparkEx.WriterV2 builder for this DataFrame targeting the given table.

Types

plan()

@type plan() :: term()

t()

@type t() :: %SparkEx.DataFrame{
  plan: plan(),
  session: GenServer.server(),
  tags: [String.t()]
}

Functions

agg(grouped, expr)

@spec agg(
  t() | SparkEx.GroupedData.t(),
  SparkEx.Column.t() | [SparkEx.Column.t()] | map()
) :: t()

Aggregate without grouping.

Shortcut for df |> group_by([]) |> GroupedData.agg(exprs).

Examples

df |> DataFrame.agg([Functions.count(Functions.col("id"))])

alias_(df, name)

@spec alias_(t(), String.t()) :: t()

Aliases this DataFrame for use in subqueries.

Examples

df |> DataFrame.alias("t")

approx_quantile(df, col, probabilities, relative_error \\ 0.0)

@spec approx_quantile(t(), String.t() | [String.t()], [float()], float()) ::
  {:ok, [float()] | [[float()]]} | {:error, term()}

Computes approximate quantiles. Delegates to SparkEx.DataFrame.Stat.approx_quantile/4.

as_of_join(left, right, left_as_of, right_as_of, opts \\ [])

@spec as_of_join(
  t(),
  t(),
  SparkEx.Column.t() | String.t(),
  SparkEx.Column.t() | String.t(),
  keyword()
) :: t()

Performs an as-of join between two DataFrames.

Options

  • :tolerance — optional tolerance expression (e.g. Functions.lit(10))
  • :allow_exact_matches — whether exact matches are allowed (default: true)
  • :direction — join direction string (default: "backward")

Examples

import SparkEx.Functions, only: [col: 1, lit: 1]

DataFrame.as_of_join(df1, df2, col("t1"), col("t2"), on: col("id"), tolerance: lit(5))

as_table(df)

@spec as_table(t()) :: SparkEx.TableArg.t()

Returns this DataFrame as a table-argument wrapper.

cache(df)

@spec cache(t()) :: t() | {:error, term()}

Alias for persist/2 with default storage level (PySpark cache).

checkpoint(df)

@spec checkpoint(t()) :: t() | {:error, term()}

Alias for checkpoint/2 (PySpark checkpoint).

checkpoint(df, opts)

@spec checkpoint(
  t(),
  keyword()
) :: t() | {:error, term()}

Materializes this DataFrame as a cached relation.

Options

  • :eager — whether to checkpoint eagerly (default: true)

coalesce(df, num_partitions)

@spec coalesce(t(), pos_integer()) :: t()

Reduces the number of partitions without shuffling data.

Examples

df |> DataFrame.coalesce(1)

col(df, name)

@spec col(t(), String.t() | atom()) :: SparkEx.Column.t()

Returns a column reference bound to this DataFrame plan.

Useful for disambiguating columns across joins/subqueries.

col_regex(df, pattern)

@spec col_regex(t(), String.t()) :: SparkEx.Column.t()

Selects columns by regex.

Examples

df |> DataFrame.col_regex("^name_.*")

collect(df_or_other, opts \\ [])

@spec collect(
  t(),
  keyword()
) :: {:ok, [map()]} | {:error, term()}

Collects all rows from the DataFrame as a list of maps.

Options

  • :timeout — gRPC call timeout in ms (default: 60_000)

collect_as_map(df, opts \\ [])

@spec collect_as_map(
  t(),
  keyword()
) :: {:ok, map()} | {:error, term()}

Collects rows into a map using the first column as key and second column as value.

The DataFrame must have exactly two columns.

columns(df)

@spec columns(t()) :: {:ok, [String.t()]} | {:error, term()}

Returns a list of column names.

corr(df, col1, col2, method \\ "pearson")

@spec corr(t(), String.t(), String.t(), String.t()) ::
  {:ok, float()} | {:error, term()}

Computes Pearson correlation. Delegates to SparkEx.DataFrame.Stat.corr/4.

count(df)

@spec count(t()) :: {:ok, non_neg_integer()} | {:error, term()}

Returns the row count of the DataFrame.

cov(df, col1, col2)

@spec cov(t(), String.t(), String.t()) :: {:ok, float()} | {:error, term()}

Computes covariance. Delegates to SparkEx.DataFrame.Stat.cov/3.

create_global_temp_view(df, name, opts \\ [])

@spec create_global_temp_view(t(), String.t(), keyword()) :: :ok | {:error, term()}

Creates a global temporary view with the given name.

Global temp views are accessible across sessions within the same Spark application and are available in the global_temp database.

create_or_replace_global_temp_view(df, name, opts \\ [])

@spec create_or_replace_global_temp_view(t(), String.t(), keyword()) ::
  :ok | {:error, term()}

Creates or replaces a global temporary view with the given name.

create_or_replace_temp_view(df, name, opts \\ [])

@spec create_or_replace_temp_view(t(), String.t(), keyword()) ::
  :ok | {:error, term()}

Creates or replaces a temporary view with the given name.

create_temp_view(df, name, opts \\ [])

@spec create_temp_view(t(), String.t(), keyword()) :: :ok | {:error, term()}

Creates a temporary view with the given name.

Raises an error if a view with this name already exists.

cross_join(left, right)

@spec cross_join(t(), t()) :: t()

Cross join — shorthand for join(df, other, [], :cross).

crosstab(df, col1, col2)

@spec crosstab(t(), String.t(), String.t()) :: t()

Computes crosstab. Delegates to SparkEx.DataFrame.Stat.crosstab/3.

cube(df, columns)

@spec cube(
  t(),
  SparkEx.Column.t()
  | String.t()
  | atom()
  | [SparkEx.Column.t() | String.t() | atom()]
) ::
  SparkEx.GroupedData.t()

Groups by cube of the specified columns.

describe(df, cols \\ [])

@spec describe(t(), String.t() | [String.t()]) :: t()

Describes basic statistics. Delegates to SparkEx.DataFrame.Stat.describe/2.

distinct(df)

@spec distinct(t()) :: t()

Returns a new DataFrame with duplicate rows removed.

Examples

df |> SparkEx.DataFrame.distinct()

drop(df, columns)

@spec drop(
  t(),
  [SparkEx.Column.t() | String.t() | atom()]
  | SparkEx.Column.t()
  | String.t()
  | atom()
) ::
  t()

Drops the specified columns.

Accepts a list of column names as strings or atoms.

Examples

df |> SparkEx.DataFrame.drop(["temp_col", "debug_col"])
df |> SparkEx.DataFrame.drop([:temp_col])

drop_duplicates(df, subset \\ nil)

@spec drop_duplicates(t(), [SparkEx.Column.t() | String.t() | atom()] | nil) :: t()

Drops duplicate rows based on a subset of columns.

When subset is empty, deduplicates on all columns (like distinct/1).

Examples

df |> DataFrame.drop_duplicates(["id", "name"])

drop_duplicates_within_watermark(df, subset \\ [])

@spec drop_duplicates_within_watermark(t(), [SparkEx.Column.t() | String.t() | atom()]) ::
  t()

Drops duplicate rows within the watermark window.

drop_global_temp_view(session, name)

@spec drop_global_temp_view(GenServer.server(), String.t()) ::
  {:ok, boolean()} | {:error, term()}

Drops a global temporary view by name.

drop_temp_view(session, name)

@spec drop_temp_view(GenServer.server(), String.t()) ::
  {:ok, boolean()} | {:error, term()}

Drops a local temporary view by name.

dropna(df, opts \\ [])

@spec dropna(
  t(),
  keyword()
) :: t()

Drops rows with null values. Delegates to SparkEx.DataFrame.NA.drop/2.

dtypes(df)

@spec dtypes(t() | {:ok, t()} | {:error, term()}) ::
  {:ok, [{String.t(), String.t()}]} | {:error, term()}

Returns list of {column_name, type_string} tuples.

except(left, right)

@spec except(t(), t()) :: t()

Returns rows in this DataFrame that are not in the other DataFrame.

Examples

DataFrame.except(df1, df2)

except_all(left, right)

@spec except_all(t(), t()) :: t()

Returns rows in this DataFrame that are not in the other, preserving duplicates (equivalent to SQL EXCEPT ALL).

execution_info(df)

@spec execution_info(t()) :: {:ok, map()} | {:error, term()}

Returns execution metrics from the last action on the session.

executionInfo(df)

@spec executionInfo(t()) :: {:ok, map()} | {:error, term()}

Alias for execution_info/1 (PySpark executionInfo).

exists(df)

@spec exists(t()) :: SparkEx.Column.t()

Returns this DataFrame as an EXISTS subquery expression.

explain(df, mode \\ :simple)

@spec explain(t(), atom() | boolean() | String.t()) ::
  {:ok, String.t()} | {:error, term()}

Returns the explain string for the DataFrame's plan.

Modes: :simple, :extended, :codegen, :cost, :formatted

fillna(df, value, opts \\ [])

@spec fillna(t(), term(), keyword()) :: t()

Fills null values. Delegates to SparkEx.DataFrame.NA.fill/3.

filter(df, condition)

@spec filter(t(), SparkEx.Column.t() | String.t()) :: t()

Filters rows based on a boolean condition.

Examples

import SparkEx.Functions, only: [col: 1, lit: 1]

df |> SparkEx.DataFrame.filter(col("age") |> SparkEx.Column.gt(lit(18)))

first(df, opts \\ [])

@spec first(
  t(),
  keyword()
) :: {:ok, map() | nil} | {:error, term()}

Returns the first row as a map, or nil if empty.

foreach(df, fun, opts \\ [])

@spec foreach(t(), (map() -> term()), keyword()) :: :ok | {:error, term()}

Applies a function to each row on the driver.

foreach_partition(df, fun, opts \\ [])

@spec foreach_partition(t(), (Enumerable.t() -> term()), keyword()) ::
  :ok | {:error, term()}

Applies a function to each partition (driver-side shim).

freq_items(df, cols, support \\ 0.01)

@spec freq_items(t(), [String.t()], float() | keyword()) :: t()

Finds frequent items. Delegates to SparkEx.DataFrame.Stat.freq_items/3.

group_by(df, columns)

@spec group_by(
  t(),
  SparkEx.Column.t()
  | String.t()
  | atom()
  | [SparkEx.Column.t() | String.t() | atom()]
) :: SparkEx.GroupedData.t()

Groups the DataFrame by the given columns, returning a SparkEx.GroupedData.

Use SparkEx.GroupedData.agg/2 to apply aggregate functions.

Accepts a list of column names (strings or atoms) or Column structs.

Examples

import SparkEx.Functions

df
|> DataFrame.group_by(["department"])
|> SparkEx.GroupedData.agg([sum(col("salary"))])

groupby(df, columns)

@spec groupby(t(), [SparkEx.Column.t() | String.t() | atom()]) ::
  SparkEx.GroupedData.t()

Alias for group_by/2 (PySpark groupby).

grouping_sets(df, sets, cols \\ [])

@spec grouping_sets(t(), [[SparkEx.Column.t() | String.t() | atom()]], [
  SparkEx.Column.t() | String.t() | atom()
]) :: SparkEx.GroupedData.t()

Groups by grouping sets.

Accepts a list of column lists, and an optional list of explicit grouping columns. When grouping columns are provided, they are used as the grouping expressions instead of being derived from the sets.

head(df)

@spec head(t()) :: {:ok, map() | nil} | {:error, term()}

Returns the first row as a map, or nil if empty.

Equivalent to PySpark's head() behavior.

head(df, opts)

@spec head(
  t(),
  keyword()
) :: {:ok, map() | nil} | {:error, term()}
@spec head(t(), non_neg_integer()) :: {:ok, [map()]} | {:error, term()}

Returns the first n rows as a list of maps.

Equivalent to take/3 but follows PySpark naming for head(n).

head(df, n, opts)

@spec head(t(), non_neg_integer(), keyword()) :: {:ok, [map()]} | {:error, term()}

hint(df, name, parameters \\ [])

@spec hint(t(), String.t(), term()) :: t()

Adds a query optimization hint.

Supports primitive values, Columns, and lists of primitive values/columns.

html_string(df, opts \\ [])

@spec html_string(
  t(),
  keyword()
) :: {:ok, String.t()} | {:error, term()}

Returns an HTML string representation of the DataFrame.

Options

  • :num_rows — number of rows (default: 20)
  • :truncate — column width truncation (default: 20)

in_subquery(value, subquery)

@spec in_subquery(t(), [SparkEx.Column.t()]) :: SparkEx.Column.t()
@spec in_subquery(SparkEx.Column.t(), t()) :: SparkEx.Column.t()

Returns this DataFrame as an IN subquery expression.

Accepts a list of expressions to compare against the subquery values.

input_files(df)

@spec input_files(t()) :: {:ok, [String.t()]} | {:error, term()}

Returns the input files for the plan.

intersect(left, right)

@spec intersect(t(), t()) :: t()

Returns rows in this DataFrame that are also in the other DataFrame.

Examples

DataFrame.intersect(df1, df2)

intersect_all(left, right)

@spec intersect_all(t(), t()) :: t()

Returns rows common to both DataFrames, preserving duplicates (equivalent to SQL INTERSECT ALL).

is_cached(df)

@spec is_cached(t()) :: {:ok, boolean()} | {:error, term()}

Returns true if the DataFrame is cached (storage level is not NONE).

is_cached?(df)

@spec is_cached?(t()) :: {:ok, boolean()} | {:error, term()}

Alias for is_cached/1 (PySpark is_cached).

is_empty(df)

@spec is_empty(t()) :: {:ok, boolean()} | {:error, term()}

Returns true if the DataFrame has no rows.

is_empty?(df)

@spec is_empty?(t()) :: {:ok, boolean()} | {:error, term()}

Alias for is_empty/1.

is_local(df)

@spec is_local(t()) :: {:ok, boolean()} | {:error, term()}

Checks if the plan is local (can be computed without Spark).

is_streaming(df)

@spec is_streaming(t()) :: {:ok, boolean()} | {:error, term()}

Checks if the plan represents a streaming query.

is_streaming?(df)

@spec is_streaming?(t()) :: {:ok, boolean()} | {:error, term()}

Alias for is_streaming/1.

join(left, right, on, join_type \\ :inner)

@spec join(
  t(),
  t(),
  SparkEx.Column.t()
  | String.t()
  | atom()
  | [SparkEx.Column.t() | String.t() | atom()],
  atom() | String.t()
) :: t()

Joins this DataFrame with another on the given condition.

Join types

  • :inner (default)
  • :left — left outer join
  • :right — right outer join
  • :full — full outer join
  • :cross — cross join (no condition needed)
  • :left_semi — left semi join
  • :left_anti — left anti join

Join conditions

The on parameter can be:

  • A Column struct representing the join condition expression
  • A list of column name strings for a USING join

Examples

import SparkEx.Functions, only: [col: 1]

DataFrame.join(df1, df2, Column.eq(col("df1.id"), col("df2.id")), :inner)
DataFrame.join(df1, df2, ["id"], :inner)

lateral_join(left, right, condition \\ nil, join_type \\ :inner)

@spec lateral_join(t(), t(), SparkEx.Column.t() | nil, atom() | String.t()) :: t()

Performs a lateral join between two DataFrames.

The right plan is expected to reference columns from the left plan where supported.

limit(df, n)

@spec limit(t(), non_neg_integer()) :: t()

Limits the number of rows.

Examples

df |> SparkEx.DataFrame.limit(100)

local_checkpoint(df)

@spec local_checkpoint(t()) :: t() | {:error, term()}

Alias for local_checkpoint/2 with default options.

local_checkpoint(df, opts)

@spec local_checkpoint(
  t(),
  keyword()
) :: t() | {:error, term()}

Materializes this DataFrame as a local (non-reliable) checkpoint.

Options

localCheckpoint(df)

@spec localCheckpoint(t()) :: t() | {:error, term()}

Alias for local_checkpoint/2 (PySpark localCheckpoint).

melt(df, ids, values, variable_column_name, value_column_name)

@spec melt(
  t(),
  [SparkEx.Column.t() | String.t() | atom()],
  [SparkEx.Column.t() | String.t() | atom()] | nil,
  String.t(),
  String.t()
) :: t()

Alias for unpivot/5.

merge_into(df, table_name)

@spec merge_into(t(), String.t()) :: SparkEx.MergeIntoWriter.t()

Creates a SparkEx.MergeIntoWriter for MERGE INTO operations.

Examples

df
|> DataFrame.merge_into("target_table")
|> MergeIntoWriter.on(col("source.id") |> Column.eq(col("target.id")))
|> MergeIntoWriter.when_matched_update_all()
|> MergeIntoWriter.when_not_matched_insert_all()
|> MergeIntoWriter.merge()

merge_into(df, table_name, condition)

@spec merge_into(t(), String.t(), SparkEx.Column.t()) :: SparkEx.MergeIntoWriter.t()

metadata_column(df, name)

@spec metadata_column(t(), String.t()) :: SparkEx.Column.t()

Returns a metadata column expression by name.

Examples

df |> DataFrame.metadata_column("_metadata")

na(df)

@spec na(t()) :: t()

Returns the DataFrame for use with SparkEx.DataFrame.NA functions.

observe(df, name, exprs)

@spec observe(t(), SparkEx.Observation.t() | String.t(), [SparkEx.Column.t()]) :: t()

Observes metrics during query execution.

Accepts an SparkEx.Observation or a name string and a list of Column expressions.

offset(df, n)

@spec offset(t(), non_neg_integer()) :: t()

Skips the first n rows.

Examples

df |> DataFrame.offset(10)

order_by(df, column_or_columns, opts \\ [])

@spec order_by(
  t(),
  SparkEx.Column.t()
  | String.t()
  | atom()
  | [SparkEx.Column.t() | String.t() | atom()],
  keyword()
) :: t()

Sorts the DataFrame by the given columns or sort orders.

Accepts a list of:

  • SparkEx.Column structs (with optional .asc() / .desc())
  • strings (ascending by default)
  • atoms (ascending by default)

Examples

import SparkEx.Functions, only: [col: 1]

df |> SparkEx.DataFrame.order_by([col("age") |> SparkEx.Column.desc()])
df |> SparkEx.DataFrame.order_by(["name"])

parse(df, format, schema \\ nil, options \\ nil)

@spec parse(
  t(),
  :csv | :json,
  String.t() | SparkEx.Types.struct_type() | nil,
  map() | nil
) :: t()

Parses string columns in the DataFrame as CSV or JSON.

Parameters

  • format:csv or :json
  • schema — DDL string or struct type for the output schema (optional)
  • options — map of parse options (optional)

Examples

df |> DataFrame.parse(:csv, "a INT, b STRING")
df |> DataFrame.parse(:json, "a INT, b STRING", %{"mode" => "FAILFAST"})

persist(df, opts \\ [])

@spec persist(
  t(),
  keyword()
) :: t() | {:error, term()}

Persists the DataFrame with optional storage level.

Options

pivot(grouped, pivot_col, values \\ nil)

Convenience wrapper for SparkEx.GroupedData.pivot/3 so grouped pipelines can stay under DataFrame.

random_split(df, weights, seed_or_opts \\ nil)

@spec random_split(t(), [number()], integer() | nil | keyword()) :: {:ok, [t()]}

Randomly splits the DataFrame into multiple DataFrames using normalized weights.

rdd_num_partitions(df)

@spec rdd_num_partitions(t()) :: {:ok, non_neg_integer()} | {:error, term()}

Returns the number of partitions in the underlying RDD.

register_temp_table(df, name, opts \\ [])

@spec register_temp_table(t(), String.t(), keyword()) :: :ok | {:error, term()}

Alias for create_or_replace_temp_view/3 (PySpark registerTempTable).

registerTempTable(df, name, opts \\ [])

@spec registerTempTable(t(), String.t(), keyword()) :: :ok | {:error, term()}

Alias for create_or_replace_temp_view/3 (PySpark registerTempTable).

repartition(df, num_or_cols, cols \\ [])

@spec repartition(t(), pos_integer() | [SparkEx.Column.t() | String.t() | atom()], [
  SparkEx.Column.t() | String.t() | atom()
]) :: t()

Repartitions the DataFrame.

When called with an integer, does a hash repartition to num_partitions. When called with an integer and columns, repartitions by those expressions. When called with only columns (list), repartitions by those expressions with default partition count.

Examples

df |> DataFrame.repartition(10)
df |> DataFrame.repartition(10, [col("key")])
df |> DataFrame.repartition([col("key")])

repartition_by_id(df, num_partitions \\ nil, col)

@spec repartition_by_id(
  t(),
  pos_integer() | nil,
  SparkEx.Column.t() | String.t() | atom()
) :: t()

Repartitions by partition ID.

Examples

df |> DataFrame.repartition_by_id(col("partition_col"))

repartition_by_range(df, cols)

@spec repartition_by_range(t(), [SparkEx.Column.t() | String.t() | atom()]) :: t()

Repartitions the DataFrame by range using sort order expressions without specifying partitions.

repartition_by_range(df, num_partitions, cols)

@spec repartition_by_range(t(), pos_integer(), [
  SparkEx.Column.t() | String.t() | atom()
]) :: t()

Repartitions the DataFrame by range using sort order expressions.

This uses the RepartitionByExpression relation with sort-order expressions.

replace(df, to_replace, value_or_opts \\ nil, opts \\ [])

@spec replace(t(), term(), term(), keyword()) :: t()

Replaces values. Delegates to SparkEx.DataFrame.NA.replace/4.

rollup(df, columns)

@spec rollup(
  t(),
  SparkEx.Column.t()
  | String.t()
  | atom()
  | [SparkEx.Column.t() | String.t() | atom()]
) ::
  SparkEx.GroupedData.t()

Groups by rollup of the specified columns.

same_semantics(df1, df2)

@spec same_semantics(t(), t()) :: {:ok, boolean()} | {:error, term()}

Checks if this DataFrame has the same semantics as another.

sample(df, with_replacement_or_fraction, fraction_or_opts \\ [], seed_or_opts \\ [])

@spec sample(t(), boolean() | float(), float() | keyword(), integer() | keyword()) ::
  t()

Returns a random sample of rows.

Options

  • :with_replacement — sample with replacement (default: false)
  • :seed — random seed (default: nil)

Examples

df |> DataFrame.sample(0.1)
df |> DataFrame.sample(0.5, with_replacement: true, seed: 42)

sample_by(df, col, fractions, seed \\ nil)

@spec sample_by(
  t(),
  SparkEx.Column.t() | String.t(),
  map(),
  integer() | keyword() | nil
) :: t()

Returns stratified sample. Delegates to SparkEx.DataFrame.Stat.sample_by/4.

scalar(df)

@spec scalar(t()) :: SparkEx.Column.t()

Returns this DataFrame as a scalar subquery expression.

schema(df)

@spec schema(t()) :: {:ok, term()} | {:error, term()}

Returns the schema of the DataFrame via AnalyzePlan.

select(df, columns)

@spec select(
  t(),
  SparkEx.Column.t()
  | String.t()
  | atom()
  | [SparkEx.Column.t() | String.t() | atom()]
) ::
  t()

Projects a set of columns or expressions.

Accepts a list of:

  • SparkEx.Column structs
  • strings (interpreted as column names)
  • atoms (interpreted as column names)

Examples

import SparkEx.Functions, only: [col: 1, lit: 1]

df |> SparkEx.DataFrame.select([col("name"), col("age")])
df |> SparkEx.DataFrame.select(["name", "age"])
df |> SparkEx.DataFrame.select([:name, :age])

select_expr(df, exprs)

@spec select_expr(t(), String.t() | [String.t()]) :: t()

Projects columns using SQL expression strings.

Each string is parsed as a SQL expression by Spark.

Examples

df |> DataFrame.select_expr(["name", "age + 1 AS age_plus"])

semantic_hash(df)

@spec semantic_hash(t()) :: {:ok, integer()} | {:error, term()}

Returns the semantic hash of the plan.

show(df, opts \\ [])

@spec show(
  t(),
  keyword()
) :: {:ok, String.t()} | {:error, term()}

Returns a formatted string representation of the DataFrame (like PySpark's show()).

Options

  • :num_rows — number of rows to show (default: 20)
  • :truncate — column width truncation (default: 20, 0 for no truncation)
  • :vertical — vertical display format (default: false)

sort(df, columns)

@spec sort(t(), [SparkEx.Column.t() | String.t() | atom()]) :: t()

Alias for order_by/2 (PySpark sort).

sort_within_partitions(df, columns, opts \\ [])

@spec sort_within_partitions(
  t(),
  [SparkEx.Column.t() | String.t() | atom() | integer()],
  keyword()
) ::
  t()

Sorts within each partition by the given columns.

Examples

df |> DataFrame.sort_within_partitions(["key"])

spark_session(df)

@spec spark_session(t()) :: GenServer.server()

Returns the parent Spark session.

sparkSession(df)

@spec sparkSession(t()) :: GenServer.server()

Alias for spark_session/1 (PySpark sparkSession).

stat(df)

@spec stat(t()) :: t()

Returns the DataFrame for use with SparkEx.DataFrame.Stat functions.

storage_level(df)

@spec storage_level(t()) :: {:ok, SparkEx.Types.storage_level()} | {:error, term()}

Returns the storage level of a persisted DataFrame.

subtract(left, right)

@spec subtract(t(), t()) :: t()

Alias for except/2 (EXCEPT DISTINCT, matching PySpark subtract).

summary(df, statistics \\ [])

@spec summary(t(), String.t() | [String.t()]) :: t()

Computes summary statistics. Delegates to SparkEx.DataFrame.Stat.summary/2.

table_function(session, function_name, args \\ [])

@spec table_function(GenServer.server(), String.t(), [SparkEx.Column.t() | term()]) ::
  t()

Creates a DataFrame from a table-valued function (TVF) call.

TVFs are built-in Spark functions that return tables (e.g. range, explode). Arguments can be Column structs or literal values.

Examples

DataFrame.table_function(session, "range", [lit(0), lit(10)])

tag(df, tag)

@spec tag(t(), String.t()) :: t()

Tags the DataFrame with an operation tag for interrupt targeting.

Tags are propagated to the ExecutePlanRequest when the DataFrame is executed. Multiple tags can be added by calling tag/2 multiple times.

Examples

df = SparkEx.sql(session, "SELECT * FROM big_table")
|> DataFrame.tag("etl-job-42")

# Later, from another process:
SparkEx.interrupt_tag(session, "etl-job-42")

tail(df, n)

@spec tail(t(), non_neg_integer()) :: {:ok, [map()]} | {:error, term()}

Returns the last n rows as a list of maps.

Mirrors PySpark tail(n) eager behavior.

tail_df(df, n)

@spec tail_df(t(), non_neg_integer()) :: t()

Returns a lazy DataFrame relation for the last n rows.

This preserves the previous lazy tail behavior when needed.

take(df, n, opts \\ [])

@spec take(t(), non_neg_integer(), keyword()) :: {:ok, [map()]} | {:error, term()}

Returns up to n rows from the DataFrame as a list of maps.

to(df, schema)

Casts this DataFrame to the given schema.

Accepts a Spark Connect DataType, a DDL string, or a SparkEx.Types struct type.

to_arrow(df, opts \\ [])

@spec to_arrow(
  t(),
  keyword()
) :: {:ok, term()} | {:error, term()}

Materializes the DataFrame as a raw Arrow IPC binary.

Returns an Arrow.Table if the :arrow dependency is available.

to_df(df, column_names)

@spec to_df(t(), [String.t()]) :: t()

Renames all columns in the DataFrame.

Examples

df |> DataFrame.to_df(["id", "full_name", "years"])

to_explorer(df, opts \\ [])

@spec to_explorer(
  t(),
  keyword()
) :: {:ok, Explorer.DataFrame.t()} | {:error, term()}

Materializes the DataFrame as an Explorer.DataFrame.

By default, injects a LIMIT of max_rows into the Spark plan to prevent unbounded collection. Pass unsafe: true to skip the limit injection. Local decoder limits still apply unless you explicitly set max_rows: :infinity and/or max_bytes: :infinity.

Options

  • :max_rows — maximum number of rows (default: 10_000)
  • :max_bytes — maximum Arrow data bytes (default: 64 MB)
  • :unsafe — skip LIMIT injection only (default: false)
  • :timeout — gRPC timeout in ms (default: 60_000)

Examples

{:ok, explorer_df} = DataFrame.to_explorer(df)
{:ok, explorer_df} = DataFrame.to_explorer(df, max_rows: 1_000)
{:ok, explorer_df} = DataFrame.to_explorer(df, unsafe: true)

to_json_rows(df)

@spec to_json_rows(t()) :: t()

Converts each row to a JSON string, returning a single-column DataFrame.

Equivalent to PySpark's DataFrame.toJSON().

Examples

df |> DataFrame.to_json_rows()

to_local_iterator(df, opts \\ [])

@spec to_local_iterator(
  t(),
  keyword()
) :: {:ok, Enumerable.t()} | {:error, term()}

Returns a lazy enumerable over collected rows.

transform(df, fun)

@spec transform(t(), (t() -> t())) :: t()

Applies a transformation function to the DataFrame.

The function receives the DataFrame and must return a DataFrame.

transpose(df, opts \\ [])

@spec transpose(t(), keyword() | SparkEx.Column.t() | String.t()) :: t()

Transposes the DataFrame.

Options

  • :index_column — column(s) to use as index (default: nil)

tree_string(df, opts \\ [])

@spec tree_string(
  t(),
  keyword()
) :: {:ok, String.t()} | {:error, term()}

Returns the tree-string representation of the plan.

Options

  • :level — tree depth level (optional)

union(left, right)

@spec union(t(), t()) :: t()

Returns a new DataFrame with the union of rows from both DataFrames.

Both DataFrames must have the same schema. Duplicates are preserved (equivalent to SQL UNION ALL).

Examples

DataFrame.union(df1, df2)

union_all(left, right)

@spec union_all(t(), t()) :: t()

Alias for union/2.

union_by_name(left, right, opts \\ [])

@spec union_by_name(t(), t(), keyword()) :: t()

Union by column name rather than position.

Options

  • :allow_missing — if true, missing columns are filled with nulls (default: false)

Examples

DataFrame.union_by_name(df1, df2)
DataFrame.union_by_name(df1, df2, allow_missing: true)

union_distinct(left, right)

@spec union_distinct(t(), t()) :: t()

Returns a new DataFrame with the union of rows, removing duplicates (equivalent to SQL UNION).

Examples

DataFrame.union_distinct(df1, df2)

unionAll(left, right)

@spec unionAll(t(), t()) :: t()

Alias for union/2 (PySpark unionAll).

unpersist(df, opts \\ [])

@spec unpersist(
  t(),
  keyword()
) :: t() | {:error, term()}

Unpersists the DataFrame.

Options

  • :blocking — whether to block until unpersisted (default: false)

unpivot(df, ids, values, variable_column_name, value_column_name)

@spec unpivot(
  t(),
  [SparkEx.Column.t() | String.t() | atom()],
  [
    SparkEx.Column.t()
    | String.t()
    | atom()
    | {String.t() | atom(), String.t() | atom()}
  ]
  | nil,
  String.t(),
  String.t()
) :: t()

Unpivots a DataFrame from wide to long format.

Parameters

  • ids — columns to keep as identifier columns
  • values — columns to unpivot (nil for all non-id columns)
  • variable_column_name — name for the variable column
  • value_column_name — name for the value column

Examples

df |> DataFrame.unpivot(["id"], ["col1", "col2"], "variable", "value")

where(df, condition)

@spec where(t(), SparkEx.Column.t()) :: t()

Alias for filter/2.

with_column(df, name, col)

@spec with_column(t(), String.t(), SparkEx.Column.t()) :: t()

Adds or replaces a column with the given name and expression.

Examples

import SparkEx.Functions, only: [col: 1, lit: 1]

df |> SparkEx.DataFrame.with_column("doubled", col("value") |> SparkEx.Column.multiply(lit(2)))

with_column_renamed(df, existing, new_name)

@spec with_column_renamed(t(), String.t(), String.t()) :: t()

Renames a single column.

Examples

df |> DataFrame.with_column_renamed("old_name", "new_name")

with_columns(df, columns)

@spec with_columns(t(), [{String.t(), SparkEx.Column.t()}] | map()) :: t()

Adds or replaces multiple columns at once.

Accepts a list of {name, column} tuples or a list of aliased Column expressions.

Examples

import SparkEx.Functions, only: [col: 1, lit: 1]

df |> DataFrame.with_columns([
  {"doubled", Column.multiply(col("x"), lit(2))},
  {"const", lit(42)}
])

with_columns_renamed(df, rename_map)

@spec with_columns_renamed(
  t(),
  %{required(String.t()) => String.t()} | (String.t() -> String.t())
) ::
  t()

Renames multiple columns using a map of old -> new names.

When called with a function, makes an eager schema RPC to discover column names. The map variant is fully lazy.

Examples

df |> DataFrame.with_columns_renamed(%{"old1" => "new1", "old2" => "new2"})
df |> DataFrame.with_columns_renamed(&String.upcase/1)

with_metadata(df, column_name, metadata)

@spec with_metadata(t(), String.t(), map()) :: t()

Adds or replaces metadata for an existing column.

with_watermark(df, event_time, delay_threshold)

@spec with_watermark(t(), String.t(), String.t()) :: t()

Adds a watermark for streaming event-time processing.

Examples

df |> DataFrame.with_watermark("event_time", "10 minutes")

write(df)

@spec write(t()) :: SparkEx.Writer.t()

Returns a SparkEx.Writer builder for this DataFrame.

Examples

df
|> DataFrame.write()
|> SparkEx.Writer.format("parquet")
|> SparkEx.Writer.mode(:overwrite)
|> SparkEx.Writer.save("/data/output.parquet")

write_stream(df)

@spec write_stream(t()) :: SparkEx.StreamWriter.t()

Returns a SparkEx.StreamWriter builder for this streaming DataFrame.

Examples

df
|> DataFrame.write_stream()
|> SparkEx.StreamWriter.format("console")
|> SparkEx.StreamWriter.output_mode("append")
|> SparkEx.StreamWriter.start()

write_v2(df, table_name)

@spec write_v2(t(), String.t()) :: SparkEx.WriterV2.t()

Returns a SparkEx.WriterV2 builder for this DataFrame targeting the given table.

Examples

df
|> DataFrame.write_v2("catalog.db.my_table")
|> SparkEx.WriterV2.using("parquet")
|> SparkEx.WriterV2.create()