Memory Pressure Monitoring
View SourceMemory pressure monitoring enables adaptive behavior when system memory becomes constrained. This guide covers the monitoring architecture, pressure levels, and integration patterns.
Overview
The erl_esdb_memory module provides:
| Function | Purpose |
|---|---|
start_link/0,1 | Start the memory monitor |
level/0 | Get current pressure level |
configure/1 | Update thresholds |
on_pressure_change/1 | Register callback |
remove_callback/1 | Remove callback |
get_stats/0 | Get memory statistics |
check_now/0 | Force immediate check |
Architecture
Pressure Levels
| Level | Default Threshold | Description |
|---|---|---|
normal | < 70% | Full caching, all features enabled |
elevated | 70-85% | Reduce cache sizes, flush more often |
critical | > 85% | Pause non-essential operations, aggressive cleanup |
Starting the Monitor
Default Configuration
{ok, _Pid} = erl_esdb_memory:start_link().Custom Configuration
{ok, _Pid} = erl_esdb_memory:start_link(#{
elevated_threshold => 0.60, %% Trigger elevated at 60%
critical_threshold => 0.80, %% Trigger critical at 80%
check_interval => 5000 %% Check every 5 seconds
}).Checking Pressure Level
Current Level
Level = erl_esdb_memory:level(),
%% normal | elevated | criticalWith Statistics
Stats = erl_esdb_memory:get_stats(),
%% #{
%% level => normal,
%% memory_used => 2147483648, %% bytes
%% memory_total => 4294967296, %% bytes
%% last_check => 1735689600000, %% timestamp
%% callback_count => 3
%% }Registering Callbacks
On Pressure Change
CallbackRef = erl_esdb_memory:on_pressure_change(fun(Level) ->
case Level of
normal ->
logger:info("Memory pressure normalized"),
restore_full_functionality();
elevated ->
logger:warning("Memory pressure elevated"),
reduce_cache_sizes();
critical ->
logger:error("Memory pressure critical!"),
pause_non_essential_operations()
end
end).Removing Callback
ok = erl_esdb_memory:remove_callback(CallbackRef).Memory Detection
How Memory is Measured
The monitor uses memsup (SASL memory supervisor):
get_memory_info() ->
MemData = memsup:get_system_memory_data(),
case MemData of
[] ->
%% Fallback to erlang:memory()
ErlangMem = erlang:memory(),
Used = proplists:get_value(total, ErlangMem, 0),
{Used, Used * 2};
_ ->
Total = proplists:get_value(total_memory, MemData, 0),
Free = proplists:get_value(free_memory, MemData, 0),
Cached = proplists:get_value(cached_memory, MemData, 0),
Buffered = proplists:get_value(buffered_memory, MemData, 0),
Available = Free + Cached + Buffered,
Used = Total - Available,
{Used, Total}
end.Available Memory Calculation
Available memory includes:
- Free memory
- Cached memory (can be reclaimed)
- Buffered memory (can be reclaimed)
This provides a more accurate picture than just free memory.
Integration Patterns
1. Subscription Backpressure
%% In erl_esdb_emitter or subscription handler
init(State) ->
CallbackRef = erl_esdb_memory:on_pressure_change(fun(Level) ->
gen_server:cast(self(), {memory_pressure, Level})
end),
{ok, State#{memory_callback => CallbackRef}}.
handle_cast({memory_pressure, critical}, State) ->
%% Pause subscription delivery
{noreply, State#{paused => true}};
handle_cast({memory_pressure, _}, State) ->
%% Resume
{noreply, State#{paused => false}}.2. Cache Management
handle_info(check_cache, State) ->
case erl_esdb_memory:level() of
critical ->
ets:delete_all_objects(my_cache),
logger:warning("Cache cleared due to memory pressure");
elevated ->
evict_oldest_entries(my_cache, 0.5); %% Evict 50%
normal ->
ok
end,
{noreply, State}.3. Write Throttling
append_with_throttle(Store, Stream, Version, Events) ->
case erl_esdb_memory:level() of
critical ->
timer:sleep(100), %% Slow down writes
append_with_throttle(Store, Stream, Version, Events);
elevated ->
timer:sleep(10), %% Minor throttle
erl_esdb_streams:append(Store, Stream, Version, Events);
normal ->
erl_esdb_streams:append(Store, Stream, Version, Events)
end.Configuration
Dynamic Reconfiguration
ok = erl_esdb_memory:configure(#{
elevated_threshold => 0.75,
critical_threshold => 0.90,
check_interval => 15000
}).Getting Current Config
Config = erl_esdb_memory:get_config(),
%% #{
%% elevated_threshold => 0.70,
%% critical_threshold => 0.85,
%% check_interval => 10000
%% }Forcing Checks
%% Force immediate memory check
CurrentLevel = erl_esdb_memory:check_now().Useful for:
- Testing pressure response
- After known memory-heavy operations
- Before starting large batch jobs
Telemetry
Pressure changes emit telemetry:
%% Event: [erl_esdb, memory, pressure_changed]
%% Measurements:
%% #{usage_ratio => float()} %% 0.0 - 1.0
%% Metadata:
%% #{old_level => atom(), new_level => atom()}Example Handler
telemetry:attach(
<<"memory-pressure-handler">>,
[erl_esdb, memory, pressure_changed],
fun(_Event, #{usage_ratio := Ratio}, #{old_level := Old, new_level := New}, _Config) ->
logger:notice("Memory pressure: ~p -> ~p (~.1f%)",
[Old, New, Ratio * 100])
end,
#{}
).Cluster Considerations
Local Monitoring
Memory pressure is monitored locally on each node:
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ Node 1 │ │ Node 2 │ │ Node 3 │
│ ┌─────────────┐ │ │ ┌─────────────┐ │ │ ┌─────────────┐ │
│ │ Memory │ │ │ │ Memory │ │ │ │ Memory │ │
│ │ Monitor │ │ │ │ Monitor │ │ │ │ Monitor │ │
│ │ (normal) │ │ │ │ (elevated) │ │ │ │ (normal) │ │
│ └─────────────┘ │ │ └─────────────┘ │ │ └─────────────┘ │
└─────────────────┘ └─────────────────┘ └─────────────────┘Each node manages its own memory pressure independently.
Cross-Node Awareness
For cluster-wide memory awareness, consider:
%% Broadcast pressure to other nodes
on_pressure_change(fun(Level) ->
rpc:multicall(nodes(), my_module, handle_remote_pressure, [node(), Level])
end).Best Practices
1. Register Early
%% In application start
start(_Type, _Args) ->
{ok, _} = erl_esdb_memory:start_link(),
register_pressure_handlers(),
...2. Graceful Degradation
%% Implement graceful degradation, not hard failures
handle_pressure(critical) ->
%% Don't crash, just slow down
reduce_batch_sizes(),
increase_flush_intervals(),
pause_optional_projections();
handle_pressure(elevated) ->
%% Warning mode
reduce_cache_ttls();
handle_pressure(normal) ->
restore_defaults().3. Monitor Recovery
%% Track recovery from pressure
handle_pressure_change(OldLevel, NewLevel) ->
case {OldLevel, NewLevel} of
{critical, elevated} ->
logger:info("Memory recovering - still elevated");
{critical, normal} ->
logger:info("Memory fully recovered");
{elevated, critical} ->
logger:error("Memory degrading to critical");
_ ->
ok
end.4. Test Under Pressure
%% In tests, simulate pressure
test_critical_behavior() ->
%% Force critical level
meck:new(erl_esdb_memory, [passthrough]),
meck:expect(erl_esdb_memory, level, fun() -> critical end),
%% Test behavior
?assertEqual(expected_behavior, my_module:do_something()),
meck:unload(erl_esdb_memory).SASL Configuration
For memsup to work, configure SASL:
%% In sys.config
{sasl, [
{sasl_error_logger, {file, "log/sasl.log"}},
{errlog_type, error}
]},
{os_mon, [
{start_memsup, true},
{memsup_system_only, false},
{memory_check_interval, 60} %% OS check interval (seconds)
]}See Also
- Subscriptions - Event subscription with backpressure
- Storage Internals - Khepri memory usage
- Scavenging - Free memory by removing old events