Pipeline.Streaming.ResultStream (pipeline v0.0.1)

View Source

Result streaming capabilities for pipeline execution.

Provides memory-efficient result passing between steps by streaming large results instead of keeping them in memory.

Summary

Functions

Clean up a result stream and free resources.

Create a result stream from a large result.

Get metadata about a stream.

Read data from a result stream.

Check if a result should be streamed based on size.

Stream data from a result stream in chunks.

Transform a stream with a function.

Types

t()

@type t() :: %Pipeline.Streaming.ResultStream{
  created_at: term(),
  data_type: term(),
  id: term(),
  metadata: term(),
  source_step: term(),
  stream_ref: term()
}

Functions

cleanup_stream(stream)

@spec cleanup_stream(t()) :: :ok

Clean up a result stream and free resources.

create_stream(step_name, result_key, data, metadata \\ %{})

@spec create_stream(String.t(), String.t(), any(), map()) ::
  {:ok, t()} | {:error, String.t()}

Create a result stream from a large result.

get_stream_info(stream)

@spec get_stream_info(t()) :: map()

Get metadata about a stream.

read_stream(stream)

@spec read_stream(t()) :: {:ok, any()} | {:error, String.t()}

Read data from a result stream.

should_stream_result?(size)

@spec should_stream_result?(non_neg_integer()) :: boolean()

Check if a result should be streamed based on size.

stream_chunks(stream)

@spec stream_chunks(t()) :: {:ok, Enumerable.t()} | {:error, String.t()}

Stream data from a result stream in chunks.

transform_stream(stream, transform_fn)

@spec transform_stream(t(), (any() -> any())) :: {:ok, t()} | {:error, String.t()}

Transform a stream with a function.