Builder API for starting structured streaming queries.
Mirrors PySpark's DataStreamWriter with a builder pattern.
Examples
df
|> SparkEx.DataFrame.write_stream()
|> SparkEx.StreamWriter.format("console")
|> SparkEx.StreamWriter.output_mode("append")
|> SparkEx.StreamWriter.start()
Summary
Functions
Sets a foreach batch function for micro-batch processing.
Sets a foreach writer function for row-level processing.
Sets the output path for the streaming sink.
Starts the streaming query, writing to the path set via option("path", ...).
Starts the streaming query, writing to the given table name.
Sets the trigger for the streaming query.
Starts the streaming query writing XML to the given path.
Types
@type t() :: %SparkEx.StreamWriter{ cluster_by: [String.t()], df: SparkEx.DataFrame.t(), foreach_batch: SparkEx.Types.foreach_function() | nil, foreach_writer: SparkEx.Types.foreach_function() | nil, options: %{required(String.t()) => String.t()}, output_mode: String.t() | nil, partition_by: [String.t()], path: String.t() | nil, query_name: String.t() | nil, source: String.t() | nil, table_name: String.t() | nil, trigger: term() | nil }
Functions
@spec foreach_batch(t(), SparkEx.Types.foreach_function()) :: t()
Sets a foreach batch function for micro-batch processing.
Accepts a Spark Connect StreamingForeachFunction proto struct. Use this with Java/Scala UDF payloads.
Example with Scala UDF
foreach_fn = %Spark.Connect.StreamingForeachFunction{
function: {:scala_function, %Spark.Connect.ScalarScalaUDF{
payload: serialized_scala_bytes
}}
}
writer |> StreamWriter.foreach_batch(foreach_fn)
@spec foreach_writer(t(), SparkEx.Types.foreach_function()) :: t()
Sets a foreach writer function for row-level processing.
Accepts a Spark Connect StreamingForeachFunction proto struct. Use this with Java/Scala UDF payloads.
Example with Scala UDF
foreach_fn = %Spark.Connect.StreamingForeachFunction{
function: {:scala_function, %Spark.Connect.ScalarScalaUDF{
payload: serialized_scala_bytes
}}
}
writer |> StreamWriter.foreach_writer(foreach_fn)
Sets the output path for the streaming sink.
@spec start( t(), keyword() ) :: {:ok, SparkEx.StreamingQuery.t()} | {:error, term()}
Starts the streaming query, writing to the path set via option("path", ...).
Returns {:ok, StreamingQuery.t()} on success.
@spec to_table(t(), String.t(), keyword()) :: {:ok, SparkEx.StreamingQuery.t()} | {:error, term()}
Starts the streaming query, writing to the given table name.
Returns {:ok, StreamingQuery.t()} on success.
Sets the trigger for the streaming query.
Trigger types
processing_time: "5 seconds"— micro-batch at the given intervalavailable_now: true— process all available data then stoponce: true— process one micro-batch then stopcontinuous: "1 second"— continuous processing at the given checkpoint interval
Starts the streaming query writing XML to the given path.