Pipeline.Safety.ResourceMonitor (pipeline v0.0.1)

View Source

Resource monitoring and management for nested pipeline execution.

Provides memory usage tracking, resource cleanup, and resource limit enforcement to prevent resource exhaustion during nested pipeline execution.

Summary

Functions

Check if current resource usage exceeds configured limits.

Check if memory usage is approaching limits and log warnings.

Perform resource cleanup on execution context.

Clean up workspace directory and its contents.

Collect current resource usage metrics.

Create workspace directory for nested pipeline execution.

Log resource usage with appropriate detail level.

Monitor resource usage during execution and check limits.

Types

check_result()

@type check_result() :: :ok | {:error, String.t()}

resource_limits()

@type resource_limits() :: %{
  memory_limit_mb: non_neg_integer(),
  timeout_seconds: non_neg_integer()
}

resource_usage()

@type resource_usage() :: %{
  memory_bytes: non_neg_integer(),
  start_time: DateTime.t(),
  elapsed_ms: non_neg_integer()
}

Functions

check_limits(usage)

@spec check_limits(resource_usage()) :: check_result()

Check if current resource usage exceeds configured limits.

Parameters

  • usage: Current resource usage metrics
  • limits: Resource limits configuration (optional)

Returns

  • :ok if within limits
  • {:error, message} if limits exceeded

Examples

iex> usage = %{memory_bytes: 100_000_000, start_time: DateTime.utc_now(), elapsed_ms: 1000}
iex> Pipeline.Safety.ResourceMonitor.check_limits(usage)
:ok

iex> usage = %{memory_bytes: 2_000_000_000, start_time: DateTime.utc_now(), elapsed_ms: 1000}
iex> limits = %{memory_limit_mb: 1024}
iex> Pipeline.Safety.ResourceMonitor.check_limits(usage, limits)
{:error, "Memory limit exceeded: 1907.3 MB > 1024 MB"}

check_limits(usage, limits)

@spec check_limits(resource_usage(), resource_limits()) :: check_result()

check_memory_pressure(usage)

@spec check_memory_pressure(resource_usage()) :: :ok

Check if memory usage is approaching limits and log warnings.

Parameters

  • usage: Current resource usage metrics
  • limits: Resource limits configuration (optional)

Returns

  • :ok always (warnings are logged, not returned as errors)

check_memory_pressure(usage, limits)

@spec check_memory_pressure(resource_usage(), resource_limits()) :: :ok

cleanup_context(context)

@spec cleanup_context(map()) :: %{
  :execution_log => [],
  :results => %{},
  optional(any()) => any()
}

Perform resource cleanup on execution context.

This function cleans up resources associated with nested pipeline execution, including workspace directories and large data structures.

Parameters

  • context: Execution context containing resource references

Returns

  • Updated context with resources cleaned

cleanup_workspace(workspace_path)

@spec cleanup_workspace(String.t()) :: :ok | {:error, String.t()}

Clean up workspace directory and its contents.

Parameters

  • workspace_path: Path to workspace directory to clean up

Returns

  • :ok if cleanup successful
  • {:error, reason} if cleanup failed

collect_usage(start_time \\ nil)

@spec collect_usage(DateTime.t() | nil) :: resource_usage()

Collect current resource usage metrics.

Parameters

  • start_time: When execution started (optional, defaults to now)

Returns

  • Resource usage metrics

create_workspace(workspace_path, step_name)

@spec create_workspace(String.t(), String.t()) ::
  {:ok, String.t()} | {:error, String.t()}

Create workspace directory for nested pipeline execution.

Parameters

  • workspace_path: Path to workspace directory
  • step_name: Name of the step (for unique directory naming)

Returns

  • {:ok, full_path} if directory created successfully
  • {:error, reason} if creation failed

log_resource_usage(usage, level \\ :debug)

@spec log_resource_usage(resource_usage(), atom()) :: :ok

Log resource usage with appropriate detail level.

Parameters

  • usage: Current resource usage metrics
  • level: Log level (:debug, :info, :warning, :error)

monitor_execution(start_time)

@spec monitor_execution(DateTime.t()) :: check_result()

Monitor resource usage during execution and check limits.

Parameters

  • start_time: When execution started
  • limits: Resource limits configuration (optional)

Returns

  • :ok if within limits
  • {:error, message} if limits exceeded

monitor_execution(start_time, limits)

@spec monitor_execution(DateTime.t(), resource_limits()) :: check_result()