Pipeline.CheckpointManager (pipeline v0.0.1)

View Source

Manages checkpoint creation and restoration for pipeline execution.

Provides state persistence to allow pipeline resumption after failures or interruptions.

Summary

Functions

Delete old checkpoints, keeping only the most recent N.

Get checkpoint metadata without loading the full data.

List all checkpoints for a workflow.

Load a specific checkpoint by filename.

Load the latest checkpoint for a workflow.

Save a checkpoint for the current pipeline state.

Types

checkpoint_data()

@type checkpoint_data() :: %{
  workflow_name: String.t(),
  step_index: non_neg_integer(),
  results: map(),
  execution_log: list(),
  timestamp: DateTime.t(),
  variable_state: map()
}

Functions

cleanup_old_checkpoints(checkpoint_dir, workflow_name, keep_count \\ 5)

@spec cleanup_old_checkpoints(String.t(), String.t(), pos_integer()) ::
  :ok | {:error, any()}

Delete old checkpoints, keeping only the most recent N.

get_checkpoint_info(checkpoint_dir, filename)

@spec get_checkpoint_info(String.t(), String.t()) :: {:ok, map()} | {:error, any()}

Get checkpoint metadata without loading the full data.

list_checkpoints(checkpoint_dir, workflow_name)

@spec list_checkpoints(String.t(), String.t()) ::
  {:ok, [String.t()]} | {:error, any()}

List all checkpoints for a workflow.

load_checkpoint(checkpoint_dir, filename)

@spec load_checkpoint(String.t(), String.t()) ::
  {:ok, checkpoint_data()} | {:error, any()}

Load a specific checkpoint by filename.

load_latest(checkpoint_dir, workflow_name)

@spec load_latest(String.t(), String.t()) ::
  {:ok, checkpoint_data()} | {:error, any()}

Load the latest checkpoint for a workflow.

save(checkpoint_dir, workflow_name, context)

@spec save(String.t(), String.t(), map()) :: :ok | {:error, any()}

Save a checkpoint for the current pipeline state.