Pipeline.Safety.ResourceMonitor (pipeline v0.0.1)
View SourceResource monitoring and management for nested pipeline execution.
Provides memory usage tracking, resource cleanup, and resource limit enforcement to prevent resource exhaustion during nested pipeline execution.
Summary
Functions
Check if current resource usage exceeds configured limits.
Check if memory usage is approaching limits and log warnings.
Perform resource cleanup on execution context.
Clean up workspace directory and its contents.
Collect current resource usage metrics.
Create workspace directory for nested pipeline execution.
Log resource usage with appropriate detail level.
Monitor resource usage during execution and check limits.
Types
@type check_result() :: :ok | {:error, String.t()}
@type resource_limits() :: %{ memory_limit_mb: non_neg_integer(), timeout_seconds: non_neg_integer() }
@type resource_usage() :: %{ memory_bytes: non_neg_integer(), start_time: DateTime.t(), elapsed_ms: non_neg_integer() }
Functions
@spec check_limits(resource_usage()) :: check_result()
Check if current resource usage exceeds configured limits.
Parameters
usage
: Current resource usage metricslimits
: Resource limits configuration (optional)
Returns
:ok
if within limits{:error, message}
if limits exceeded
Examples
iex> usage = %{memory_bytes: 100_000_000, start_time: DateTime.utc_now(), elapsed_ms: 1000}
iex> Pipeline.Safety.ResourceMonitor.check_limits(usage)
:ok
iex> usage = %{memory_bytes: 2_000_000_000, start_time: DateTime.utc_now(), elapsed_ms: 1000}
iex> limits = %{memory_limit_mb: 1024}
iex> Pipeline.Safety.ResourceMonitor.check_limits(usage, limits)
{:error, "Memory limit exceeded: 1907.3 MB > 1024 MB"}
@spec check_limits(resource_usage(), resource_limits()) :: check_result()
@spec check_memory_pressure(resource_usage()) :: :ok
Check if memory usage is approaching limits and log warnings.
Parameters
usage
: Current resource usage metricslimits
: Resource limits configuration (optional)
Returns
:ok
always (warnings are logged, not returned as errors)
@spec check_memory_pressure(resource_usage(), resource_limits()) :: :ok
@spec cleanup_context(map()) :: %{ :execution_log => [], :results => %{}, optional(any()) => any() }
Perform resource cleanup on execution context.
This function cleans up resources associated with nested pipeline execution, including workspace directories and large data structures.
Parameters
context
: Execution context containing resource references
Returns
- Updated context with resources cleaned
Clean up workspace directory and its contents.
Parameters
workspace_path
: Path to workspace directory to clean up
Returns
:ok
if cleanup successful{:error, reason}
if cleanup failed
@spec collect_usage(DateTime.t() | nil) :: resource_usage()
Collect current resource usage metrics.
Parameters
start_time
: When execution started (optional, defaults to now)
Returns
- Resource usage metrics
Create workspace directory for nested pipeline execution.
Parameters
workspace_path
: Path to workspace directorystep_name
: Name of the step (for unique directory naming)
Returns
{:ok, full_path}
if directory created successfully{:error, reason}
if creation failed
@spec log_resource_usage(resource_usage(), atom()) :: :ok
Log resource usage with appropriate detail level.
Parameters
usage
: Current resource usage metricslevel
: Log level (:debug, :info, :warning, :error)
@spec monitor_execution(DateTime.t()) :: check_result()
Monitor resource usage during execution and check limits.
Parameters
start_time
: When execution startedlimits
: Resource limits configuration (optional)
Returns
:ok
if within limits{:error, message}
if limits exceeded
@spec monitor_execution(DateTime.t(), resource_limits()) :: check_result()