Memory Pressure Monitoring

View Source

Memory pressure monitoring enables adaptive behavior when system memory becomes constrained. This guide covers the monitoring architecture, pressure levels, and integration patterns.

Overview

The erl_esdb_memory module provides:

FunctionPurpose
start_link/0,1Start the memory monitor
level/0Get current pressure level
configure/1Update thresholds
on_pressure_change/1Register callback
remove_callback/1Remove callback
get_stats/0Get memory statistics
check_now/0Force immediate check

Architecture

Memory Pressure Levels

Pressure Levels

LevelDefault ThresholdDescription
normal< 70%Full caching, all features enabled
elevated70-85%Reduce cache sizes, flush more often
critical> 85%Pause non-essential operations, aggressive cleanup

Starting the Monitor

Default Configuration

{ok, _Pid} = erl_esdb_memory:start_link().

Custom Configuration

{ok, _Pid} = erl_esdb_memory:start_link(#{
    elevated_threshold => 0.60,    %% Trigger elevated at 60%
    critical_threshold => 0.80,    %% Trigger critical at 80%
    check_interval => 5000         %% Check every 5 seconds
}).

Checking Pressure Level

Current Level

Level = erl_esdb_memory:level(),
%% normal | elevated | critical

With Statistics

Stats = erl_esdb_memory:get_stats(),
%% #{
%%     level => normal,
%%     memory_used => 2147483648,      %% bytes
%%     memory_total => 4294967296,     %% bytes
%%     last_check => 1735689600000,    %% timestamp
%%     callback_count => 3
%% }

Registering Callbacks

On Pressure Change

CallbackRef = erl_esdb_memory:on_pressure_change(fun(Level) ->
    case Level of
        normal ->
            logger:info("Memory pressure normalized"),
            restore_full_functionality();
        elevated ->
            logger:warning("Memory pressure elevated"),
            reduce_cache_sizes();
        critical ->
            logger:error("Memory pressure critical!"),
            pause_non_essential_operations()
    end
end).

Removing Callback

ok = erl_esdb_memory:remove_callback(CallbackRef).

Memory Detection

How Memory is Measured

The monitor uses memsup (SASL memory supervisor):

get_memory_info() ->
    MemData = memsup:get_system_memory_data(),
    case MemData of
        [] ->
            %% Fallback to erlang:memory()
            ErlangMem = erlang:memory(),
            Used = proplists:get_value(total, ErlangMem, 0),
            {Used, Used * 2};
        _ ->
            Total = proplists:get_value(total_memory, MemData, 0),
            Free = proplists:get_value(free_memory, MemData, 0),
            Cached = proplists:get_value(cached_memory, MemData, 0),
            Buffered = proplists:get_value(buffered_memory, MemData, 0),
            Available = Free + Cached + Buffered,
            Used = Total - Available,
            {Used, Total}
    end.

Available Memory Calculation

Available memory includes:

  • Free memory
  • Cached memory (can be reclaimed)
  • Buffered memory (can be reclaimed)

This provides a more accurate picture than just free memory.

Integration Patterns

1. Subscription Backpressure

%% In erl_esdb_emitter or subscription handler
init(State) ->
    CallbackRef = erl_esdb_memory:on_pressure_change(fun(Level) ->
        gen_server:cast(self(), {memory_pressure, Level})
    end),
    {ok, State#{memory_callback => CallbackRef}}.

handle_cast({memory_pressure, critical}, State) ->
    %% Pause subscription delivery
    {noreply, State#{paused => true}};
handle_cast({memory_pressure, _}, State) ->
    %% Resume
    {noreply, State#{paused => false}}.

2. Cache Management

handle_info(check_cache, State) ->
    case erl_esdb_memory:level() of
        critical ->
            ets:delete_all_objects(my_cache),
            logger:warning("Cache cleared due to memory pressure");
        elevated ->
            evict_oldest_entries(my_cache, 0.5);  %% Evict 50%
        normal ->
            ok
    end,
    {noreply, State}.

3. Write Throttling

append_with_throttle(Store, Stream, Version, Events) ->
    case erl_esdb_memory:level() of
        critical ->
            timer:sleep(100),  %% Slow down writes
            append_with_throttle(Store, Stream, Version, Events);
        elevated ->
            timer:sleep(10),   %% Minor throttle
            erl_esdb_streams:append(Store, Stream, Version, Events);
        normal ->
            erl_esdb_streams:append(Store, Stream, Version, Events)
    end.

Configuration

Dynamic Reconfiguration

ok = erl_esdb_memory:configure(#{
    elevated_threshold => 0.75,
    critical_threshold => 0.90,
    check_interval => 15000
}).

Getting Current Config

Config = erl_esdb_memory:get_config(),
%% #{
%%     elevated_threshold => 0.70,
%%     critical_threshold => 0.85,
%%     check_interval => 10000
%% }

Forcing Checks

%% Force immediate memory check
CurrentLevel = erl_esdb_memory:check_now().

Useful for:

  • Testing pressure response
  • After known memory-heavy operations
  • Before starting large batch jobs

Telemetry

Pressure changes emit telemetry:

%% Event: [erl_esdb, memory, pressure_changed]
%% Measurements:
%%   #{usage_ratio => float()}  %% 0.0 - 1.0
%% Metadata:
%%   #{old_level => atom(), new_level => atom()}

Example Handler

telemetry:attach(
    <<"memory-pressure-handler">>,
    [erl_esdb, memory, pressure_changed],
    fun(_Event, #{usage_ratio := Ratio}, #{old_level := Old, new_level := New}, _Config) ->
        logger:notice("Memory pressure: ~p -> ~p (~.1f%)",
                      [Old, New, Ratio * 100])
    end,
    #{}
).

Cluster Considerations

Local Monitoring

Memory pressure is monitored locally on each node:

        
     Node 1               Node 2               Node 3      
              
  Memory             Memory             Memory       
  Monitor            Monitor            Monitor      
  (normal)           (elevated)         (normal)     
              
        

Each node manages its own memory pressure independently.

Cross-Node Awareness

For cluster-wide memory awareness, consider:

%% Broadcast pressure to other nodes
on_pressure_change(fun(Level) ->
    rpc:multicall(nodes(), my_module, handle_remote_pressure, [node(), Level])
end).

Best Practices

1. Register Early

%% In application start
start(_Type, _Args) ->
    {ok, _} = erl_esdb_memory:start_link(),
    register_pressure_handlers(),
    ...

2. Graceful Degradation

%% Implement graceful degradation, not hard failures
handle_pressure(critical) ->
    %% Don't crash, just slow down
    reduce_batch_sizes(),
    increase_flush_intervals(),
    pause_optional_projections();
handle_pressure(elevated) ->
    %% Warning mode
    reduce_cache_ttls();
handle_pressure(normal) ->
    restore_defaults().

3. Monitor Recovery

%% Track recovery from pressure
handle_pressure_change(OldLevel, NewLevel) ->
    case {OldLevel, NewLevel} of
        {critical, elevated} ->
            logger:info("Memory recovering - still elevated");
        {critical, normal} ->
            logger:info("Memory fully recovered");
        {elevated, critical} ->
            logger:error("Memory degrading to critical");
        _ ->
            ok
    end.

4. Test Under Pressure

%% In tests, simulate pressure
test_critical_behavior() ->
    %% Force critical level
    meck:new(erl_esdb_memory, [passthrough]),
    meck:expect(erl_esdb_memory, level, fun() -> critical end),

    %% Test behavior
    ?assertEqual(expected_behavior, my_module:do_something()),

    meck:unload(erl_esdb_memory).

SASL Configuration

For memsup to work, configure SASL:

%% In sys.config
{sasl, [
    {sasl_error_logger, {file, "log/sasl.log"}},
    {errlog_type, error}
]},
{os_mon, [
    {start_memsup, true},
    {memsup_system_only, false},
    {memory_check_interval, 60}  %% OS check interval (seconds)
]}

See Also