SparkEx.StreamWriter (SparkEx v0.1.0)

Copy Markdown View Source

Builder API for starting structured streaming queries.

Mirrors PySpark's DataStreamWriter with a builder pattern.

Examples

df
|> SparkEx.DataFrame.write_stream()
|> SparkEx.StreamWriter.format("console")
|> SparkEx.StreamWriter.output_mode("append")
|> SparkEx.StreamWriter.start()

Summary

Functions

Sets a foreach batch function for micro-batch processing.

Sets a foreach writer function for row-level processing.

Sets the output path for the streaming sink.

Starts the streaming query, writing to the path set via option("path", ...).

Starts the streaming query, writing to the given table name.

Sets the trigger for the streaming query.

Starts the streaming query writing XML to the given path.

Types

t()

@type t() :: %SparkEx.StreamWriter{
  cluster_by: [String.t()],
  df: SparkEx.DataFrame.t(),
  foreach_batch: SparkEx.Types.foreach_function() | nil,
  foreach_writer: SparkEx.Types.foreach_function() | nil,
  options: %{required(String.t()) => String.t()},
  output_mode: String.t() | nil,
  partition_by: [String.t()],
  path: String.t() | nil,
  query_name: String.t() | nil,
  source: String.t() | nil,
  table_name: String.t() | nil,
  trigger: term() | nil
}

Functions

cluster_by(writer, cols)

@spec cluster_by(t(), [String.t()]) :: t()

foreach_batch(writer, func)

@spec foreach_batch(t(), SparkEx.Types.foreach_function()) :: t()

Sets a foreach batch function for micro-batch processing.

Accepts a Spark Connect StreamingForeachFunction proto struct. Use this with Java/Scala UDF payloads.

Example with Scala UDF

foreach_fn = %Spark.Connect.StreamingForeachFunction{
  function: {:scala_function, %Spark.Connect.ScalarScalaUDF{
    payload: serialized_scala_bytes
  }}
}
writer |> StreamWriter.foreach_batch(foreach_fn)

foreach_writer(writer, func)

@spec foreach_writer(t(), SparkEx.Types.foreach_function()) :: t()

Sets a foreach writer function for row-level processing.

Accepts a Spark Connect StreamingForeachFunction proto struct. Use this with Java/Scala UDF payloads.

Example with Scala UDF

foreach_fn = %Spark.Connect.StreamingForeachFunction{
  function: {:scala_function, %Spark.Connect.ScalarScalaUDF{
    payload: serialized_scala_bytes
  }}
}
writer |> StreamWriter.foreach_writer(foreach_fn)

format(writer, source)

@spec format(t(), String.t()) :: t()

option(writer, key, value)

@spec option(t(), String.t(), term()) :: t()

options(writer, opts)

@spec options(t(), map() | keyword()) :: t()

output_mode(writer, mode)

@spec output_mode(t(), String.t()) :: t()

partition_by(writer, cols)

@spec partition_by(t(), [String.t()]) :: t()

path(writer, path)

@spec path(t(), String.t()) :: t()

Sets the output path for the streaming sink.

query_name(writer, name)

@spec query_name(t(), String.t()) :: t()

start(writer, opts \\ [])

@spec start(
  t(),
  keyword()
) :: {:ok, SparkEx.StreamingQuery.t()} | {:error, term()}

Starts the streaming query, writing to the path set via option("path", ...).

Returns {:ok, StreamingQuery.t()} on success.

to_table(writer, table_name, opts \\ [])

@spec to_table(t(), String.t(), keyword()) ::
  {:ok, SparkEx.StreamingQuery.t()} | {:error, term()}

Starts the streaming query, writing to the given table name.

Returns {:ok, StreamingQuery.t()} on success.

trigger(writer, opts)

@spec trigger(
  t(),
  keyword()
) :: t()

Sets the trigger for the streaming query.

Trigger types

  • processing_time: "5 seconds" — micro-batch at the given interval
  • available_now: true — process all available data then stop
  • once: true — process one micro-batch then stop
  • continuous: "1 second" — continuous processing at the given checkpoint interval

xml(writer, path)

@spec xml(t(), String.t()) :: t()

Starts the streaming query writing XML to the given path.