Pipeline.ResultManager (pipeline v0.0.1)

View Source

Manages step results throughout pipeline execution.

Provides structured storage, retrieval, and transformation of results between pipeline steps with support for serialization and validation.

Features include:

  • JSON Schema validation for step outputs
  • Structured result storage and retrieval
  • Result transformation for prompt consumption
  • Serialization and persistence

Summary

Functions

Extract a specific field from a step result.

Deserialize results from JSON.

Get all results.

Retrieve a result by step name.

Get summary statistics about stored results.

Check if a result exists for a step.

Load results from file.

Create a new result manager.

Save results to file.

Store a result for a step.

Store a result for a step with optional schema validation.

Serialize results to JSON for storage.

Transform results for consumption by next step.

Types

result()

@type result() :: map()

result_key()

@type result_key() :: String.t()

result_store()

@type result_store() :: map()

schema()

@type schema() :: map()

Functions

extract_field(manager, step_name, field_name)

@spec extract_field(
  %Pipeline.ResultManager{metadata: term(), results: term()},
  result_key(),
  String.t()
) :: {:ok, any()} | {:error, atom()}

Extract a specific field from a step result.

from_json(json_string)

@spec from_json(String.t()) ::
  {:ok, %Pipeline.ResultManager{metadata: term(), results: term()}}
  | {:error, Jason.DecodeError.t()}

Deserialize results from JSON.

get_all_results(manager)

@spec get_all_results(%Pipeline.ResultManager{metadata: term(), results: term()}) ::
  result_store()

Get all results.

get_result(manager, step_name)

@spec get_result(
  %Pipeline.ResultManager{metadata: term(), results: term()},
  result_key()
) ::
  {:ok, result()} | {:error, :not_found}

Retrieve a result by step name.

get_summary(manager)

@spec get_summary(%Pipeline.ResultManager{metadata: term(), results: term()}) :: map()

Get summary statistics about stored results.

has_result?(manager, step_name)

@spec has_result?(
  %Pipeline.ResultManager{metadata: term(), results: term()},
  result_key()
) :: boolean()

Check if a result exists for a step.

load_from_file(file_path)

@spec load_from_file(String.t()) ::
  {:ok, %Pipeline.ResultManager{metadata: term(), results: term()}}
  | {:error, any()}

Load results from file.

new()

@spec new() :: %Pipeline.ResultManager{
  metadata: %{created_at: DateTime.t(), last_updated: DateTime.t()},
  results: map()
}

Create a new result manager.

save_to_file(manager, file_path)

@spec save_to_file(
  %Pipeline.ResultManager{metadata: term(), results: term()},
  String.t()
) ::
  :ok | {:error, File.posix()}

Save results to file.

store_result(manager, step_name, result)

@spec store_result(
  %Pipeline.ResultManager{metadata: term(), results: term()},
  result_key(),
  result()
) ::
  %Pipeline.ResultManager{metadata: term(), results: term()}

Store a result for a step.

store_result_with_schema(manager, step_name, result, schema \\ nil)

@spec store_result_with_schema(
  %Pipeline.ResultManager{metadata: term(), results: term()},
  result_key(),
  result(),
  schema() | nil
) ::
  {:ok, %Pipeline.ResultManager{metadata: term(), results: term()}}
  | {:error, String.t()}

Store a result for a step with optional schema validation.

to_json(manager)

@spec to_json(%Pipeline.ResultManager{metadata: term(), results: term()}) ::
  {:ok, String.t()} | {:error, Jason.EncodeError.t()}

Serialize results to JSON for storage.

transform_for_prompt(manager, step_name, opts \\ [])

@spec transform_for_prompt(
  %Pipeline.ResultManager{metadata: term(), results: term()},
  result_key(),
  keyword()
) :: {:ok, String.t()} | {:error, atom()}

Transform results for consumption by next step.