Mojentic.Agents.AsyncAggregatorAgent (Mojentic v1.2.0)

Copy Markdown View Source

GenServer-based agent that aggregates events by correlation ID.

The aggregator waits for multiple event types before processing them together. This is useful for coordinating parallel async operations that need to be combined (e.g., waiting for fact-checking and answer generation before producing a final answer).

Features

  • Event Accumulation - Collects events by correlation_id
  • Type Tracking - Waits for specific event types via event_types_needed
  • Timeout Support - Configurable timeout for wait_for_events/3
  • Custom Processing - Override process_events/2 callback

State Structure

%{
  events: %{correlation_id => [events]},
  waiters: %{correlation_id => [caller_pids]},
  event_types_needed: [EventType1, EventType2, ...]
}

Usage

Start the aggregator as a supervised process:

{:ok, pid} = AsyncAggregatorAgent.start_link(
  event_types_needed: [FactCheckEvent, AnswerEvent],
  process_events_fn: &MyModule.process_events/2
)

Or implement as a module:

defmodule FinalAnswerAgent do
  use Mojentic.Agents.AsyncAggregatorAgent

  def start_link(opts) do
    AsyncAggregatorAgent.start_link(
      event_types_needed: [FactCheckEvent, AnswerEvent],
      process_events_fn: &__MODULE__.process_events/2,
      name: __MODULE__
    )
  end

  def process_events(events, state) do
    fact_event = Enum.find(events, &match?(%FactCheckEvent{}, &1))
    answer_event = Enum.find(events, &match?(%AnswerEvent{}, &1))

    final_event = %FinalAnswerEvent{
      source: __MODULE__,
      correlation_id: fact_event.correlation_id,
      answer: answer_event.answer,
      facts: fact_event.facts
    }

    {:ok, [final_event], state}
  end
end

Examples

# Start the aggregator
{:ok, pid} = AsyncAggregatorAgent.start_link(
  event_types_needed: [EventA, EventB],
  process_events_fn: &process/2
)

# Dispatch events (via dispatcher or directly)
AsyncAggregatorAgent.receive_event(pid, event_a)
AsyncAggregatorAgent.receive_event(pid, event_b)

# Wait for all needed events
{:ok, result_events} = AsyncAggregatorAgent.wait_for_events(
  pid,
  correlation_id,
  timeout: 5000
)

Summary

Functions

Enables using this module in your own aggregator implementations.

Returns a specification to start this module under a supervisor.

Receives an event and processes it according to async agent behaviour.

Starts the aggregator agent as a linked process.

Waits for all needed events for a specific correlation_id.

Types

state()

@type state() :: %{
  events: %{required(String.t()) => [Mojentic.Event.t()]},
  results: %{required(String.t()) => [Mojentic.Event.t()]},
  waiters: %{required(String.t()) => [GenServer.from()]},
  event_types_needed: [module()],
  process_events_fn: ([Mojentic.Event.t()], state() ->
                        {:ok, [Mojentic.Event.t()], state()})
}

Functions

__using__(opts)

(macro)

Enables using this module in your own aggregator implementations.

Example

defmodule MyAggregator do
  use Mojentic.Agents.AsyncAggregatorAgent

  def start_link do
    AsyncAggregatorAgent.start_link(
      event_types_needed: [EventA, EventB],
      process_events_fn: &__MODULE__.process_events/2,
      name: __MODULE__
    )
  end

  def process_events(events, state) do
    # Custom processing logic
    {:ok, [result_event], state}
  end
end

child_spec(init_arg)

Returns a specification to start this module under a supervisor.

See Supervisor.

receive_event(pid, event)

Receives an event and processes it according to async agent behaviour.

This is the main entry point called by the dispatcher. The agent will:

  1. Store the event under its correlation_id
  2. Check if all needed event types have arrived
  3. If complete, call process_events_fn and notify waiters
  4. Return the resulting events

Parameters

  • pid - The aggregator process
  • event - The event to process

Returns

  • {:ok, [Event.t()]} - Successfully processed, returns new events
  • {:error, reason} - Processing failed

Examples

{:ok, events} = AsyncAggregatorAgent.receive_event(pid, event)

start_link(opts)

Starts the aggregator agent as a linked process.

Options

  • :event_types_needed - List of event type modules to wait for (required)
  • :process_events_fn - Function to call when all events collected (required)
  • :name - Process registration name (optional)

Examples

{:ok, pid} = AsyncAggregatorAgent.start_link(
  event_types_needed: [EventA, EventB],
  process_events_fn: &MyModule.process/2
)

wait_for_events(pid, correlation_id, opts \\ [])

Waits for all needed events for a specific correlation_id.

This function blocks the caller until all required event types have been received for the given correlation_id, or until the timeout is reached.

Parameters

  • pid - The aggregator process
  • correlation_id - The correlation ID to wait for
  • opts - Keyword list with:
    • :timeout - Maximum wait time in milliseconds (default: 5000)

Returns

  • {:ok, [Event.t()]} - All events received and processed
  • {:error, :timeout} - Timeout reached before all events arrived

Examples

{:ok, events} = AsyncAggregatorAgent.wait_for_events(
  pid,
  "correlation-123",
  timeout: 10_000
)