DurableStreams.Retention.Worker (Streamkeeper v0.3.0)
View SourcePerforms stream compaction based on retention policies.
This module is stateless and runs as Tasks under the TaskSupervisor. It calculates which messages need to be removed based on the stream's retention policy (max_age, max_messages, max_bytes) and performs the actual deletion.
How Compaction Works
- Fetch stream metadata and retention policy
- Calculate new earliest offset based on policy limits
- Delete all messages before the new earliest offset
- Update stream metadata with new earliest_offset and counters
Messages are removed when any of these conditions are met:
max_age: Message timestamp is older than (now - max_age)max_messages: More than max_messages exist in the streammax_bytes: Total stream size exceeds max_bytes
The most aggressive limit (resulting in highest earliest_offset) wins.
Summary
Functions
Compacts a stream by removing messages that exceed the retention policy.
Checks if a stream needs compaction based on its retention policy.
Functions
Compacts a stream by removing messages that exceed the retention policy.
Returns:
:ok- Compaction successful (or no compaction needed){:error, reason}- Compaction failed
@spec needs_compaction?(DurableStreams.Stream.t()) :: boolean()
Checks if a stream needs compaction based on its retention policy.