GenServer-based async event dispatcher for the agent system.
The AsyncDispatcher manages event routing between agents in an asynchronous,
non-blocking manner. It maintains an event queue (FIFO) and processes events
by routing them through a Mojentic.Router to the appropriate agents.
Features
- Event Queue - FIFO queue using
:queuemodule - Async Processing - Non-blocking event handling via Task and GenServer
- Mixed Agent Support - Handles both sync and async agents
- Graceful Shutdown - Stop via
TerminateEventor explicitstop/1 - Queue Monitoring - Wait for empty queue with
wait_for_empty_queue/2
Architecture
┌─────────────┐
│ Dispatcher │
│ (GenServer) │
└──────┬──────┘
│ Event Queue
│ [:queue]
│
├─→ Router ─→ Agent1 ──→ [New Events]
│ ↓
└─→ Router ─→ Agent2 ──→ [New Events]State Structure
%{
router: %Router{},
event_queue: :queue.queue(),
processing: boolean(),
batch_size: integer()
}Usage
# Create router
router = Router.new()
|> Router.add_route(QuestionEvent, fact_checker)
|> Router.add_route(QuestionEvent, answer_generator)
# Start dispatcher
{:ok, pid} = AsyncDispatcher.start_link(router: router)
# Dispatch events
event = %QuestionEvent{source: MyApp, question: "What is Elixir?"}
AsyncDispatcher.dispatch(pid, event)
# Wait for queue to empty
:ok = AsyncDispatcher.wait_for_empty_queue(pid, timeout: 10_000)
# Stop dispatcher
AsyncDispatcher.stop(pid)Examples
# Full workflow
router = Router.new()
|> Router.add_route(QuestionEvent, fact_checker_pid)
|> Router.add_route(FactCheckEvent, aggregator_pid)
{:ok, dispatcher} = AsyncDispatcher.start_link(
router: router,
batch_size: 10
)
question = %QuestionEvent{
source: MyApp,
question: "What is the capital of France?"
}
AsyncDispatcher.dispatch(dispatcher, question)
AsyncDispatcher.wait_for_empty_queue(dispatcher)
AsyncDispatcher.stop(dispatcher)
Summary
Functions
Returns a specification to start this module under a supervisor.
Dispatches an event to the event queue.
Gets the current size of the event queue.
Starts the async dispatcher as a linked process.
Stops the dispatcher gracefully.
Waits for the event queue to be empty.
Types
@type state() :: %{ router: Mojentic.Router.t(), event_queue: :queue.queue(), processing: boolean(), batch_size: non_neg_integer(), pending_tasks: non_neg_integer() }
Functions
Returns a specification to start this module under a supervisor.
See Supervisor.
Dispatches an event to the event queue.
The event will be assigned a correlation_id if it doesn't have one. Events are processed in FIFO order by the dispatcher loop.
Parameters
pid- The dispatcher processevent- The event to dispatch
Examples
event = %QuestionEvent{source: MyApp, question: "Hello?"}
AsyncDispatcher.dispatch(dispatcher, event)
Gets the current size of the event queue.
Examples
size = AsyncDispatcher.get_queue_size(dispatcher)
Starts the async dispatcher as a linked process.
Options
:router- Router instance for event routing (required):batch_size- Number of events to process per batch (default: 5):name- Process registration name (optional)
Examples
{:ok, pid} = AsyncDispatcher.start_link(router: router)
{:ok, pid} = AsyncDispatcher.start_link(
router: router,
batch_size: 10,
name: MyDispatcher
)
Stops the dispatcher gracefully.
Waits for the current batch to complete before shutting down.
Parameters
pid- The dispatcher processtimeout- Maximum time to wait for shutdown (default: 5000ms)
Examples
AsyncDispatcher.stop(dispatcher)
AsyncDispatcher.stop(dispatcher, 10_000)
Waits for the event queue to be empty.
This is useful for testing or ensuring all events have been processed before continuing.
Parameters
pid- The dispatcher processopts- Keyword list with::timeout- Maximum wait time in milliseconds (default: 5000)
Returns
:ok- Queue is empty{:error, :timeout}- Timeout reached with events still in queue
Examples
:ok = AsyncDispatcher.wait_for_empty_queue(dispatcher)
case AsyncDispatcher.wait_for_empty_queue(dispatcher, timeout: 10_000) do
:ok -> IO.puts("All events processed")
{:error, :timeout} -> IO.puts("Timed out waiting")
end