GenServer-based agent that aggregates events by correlation ID.
The aggregator waits for multiple event types before processing them together. This is useful for coordinating parallel async operations that need to be combined (e.g., waiting for fact-checking and answer generation before producing a final answer).
Features
- Event Accumulation - Collects events by correlation_id
- Type Tracking - Waits for specific event types via
event_types_needed - Timeout Support - Configurable timeout for
wait_for_events/3 - Custom Processing - Override
process_events/2callback
State Structure
%{
events: %{correlation_id => [events]},
waiters: %{correlation_id => [caller_pids]},
event_types_needed: [EventType1, EventType2, ...]
}Usage
Start the aggregator as a supervised process:
{:ok, pid} = AsyncAggregatorAgent.start_link(
event_types_needed: [FactCheckEvent, AnswerEvent],
process_events_fn: &MyModule.process_events/2
)Or implement as a module:
defmodule FinalAnswerAgent do
use Mojentic.Agents.AsyncAggregatorAgent
def start_link(opts) do
AsyncAggregatorAgent.start_link(
event_types_needed: [FactCheckEvent, AnswerEvent],
process_events_fn: &__MODULE__.process_events/2,
name: __MODULE__
)
end
def process_events(events, state) do
fact_event = Enum.find(events, &match?(%FactCheckEvent{}, &1))
answer_event = Enum.find(events, &match?(%AnswerEvent{}, &1))
final_event = %FinalAnswerEvent{
source: __MODULE__,
correlation_id: fact_event.correlation_id,
answer: answer_event.answer,
facts: fact_event.facts
}
{:ok, [final_event], state}
end
endExamples
# Start the aggregator
{:ok, pid} = AsyncAggregatorAgent.start_link(
event_types_needed: [EventA, EventB],
process_events_fn: &process/2
)
# Dispatch events (via dispatcher or directly)
AsyncAggregatorAgent.receive_event(pid, event_a)
AsyncAggregatorAgent.receive_event(pid, event_b)
# Wait for all needed events
{:ok, result_events} = AsyncAggregatorAgent.wait_for_events(
pid,
correlation_id,
timeout: 5000
)
Summary
Functions
Enables using this module in your own aggregator implementations.
Returns a specification to start this module under a supervisor.
Receives an event and processes it according to async agent behaviour.
Starts the aggregator agent as a linked process.
Waits for all needed events for a specific correlation_id.
Types
@type state() :: %{ events: %{required(String.t()) => [Mojentic.Event.t()]}, results: %{required(String.t()) => [Mojentic.Event.t()]}, waiters: %{required(String.t()) => [GenServer.from()]}, event_types_needed: [module()], process_events_fn: ([Mojentic.Event.t()], state() -> {:ok, [Mojentic.Event.t()], state()}) }
Functions
Enables using this module in your own aggregator implementations.
Example
defmodule MyAggregator do
use Mojentic.Agents.AsyncAggregatorAgent
def start_link do
AsyncAggregatorAgent.start_link(
event_types_needed: [EventA, EventB],
process_events_fn: &__MODULE__.process_events/2,
name: __MODULE__
)
end
def process_events(events, state) do
# Custom processing logic
{:ok, [result_event], state}
end
end
Returns a specification to start this module under a supervisor.
See Supervisor.
Receives an event and processes it according to async agent behaviour.
This is the main entry point called by the dispatcher. The agent will:
- Store the event under its correlation_id
- Check if all needed event types have arrived
- If complete, call
process_events_fnand notify waiters - Return the resulting events
Parameters
pid- The aggregator processevent- The event to process
Returns
{:ok, [Event.t()]}- Successfully processed, returns new events{:error, reason}- Processing failed
Examples
{:ok, events} = AsyncAggregatorAgent.receive_event(pid, event)
Starts the aggregator agent as a linked process.
Options
:event_types_needed- List of event type modules to wait for (required):process_events_fn- Function to call when all events collected (required):name- Process registration name (optional)
Examples
{:ok, pid} = AsyncAggregatorAgent.start_link(
event_types_needed: [EventA, EventB],
process_events_fn: &MyModule.process/2
)
Waits for all needed events for a specific correlation_id.
This function blocks the caller until all required event types have been received for the given correlation_id, or until the timeout is reached.
Parameters
pid- The aggregator processcorrelation_id- The correlation ID to wait foropts- Keyword list with::timeout- Maximum wait time in milliseconds (default: 5000)
Returns
{:ok, [Event.t()]}- All events received and processed{:error, :timeout}- Timeout reached before all events arrived
Examples
{:ok, events} = AsyncAggregatorAgent.wait_for_events(
pid,
"correlation-123",
timeout: 10_000
)