Mojentic.AsyncDispatcher (Mojentic v1.2.0)

Copy Markdown View Source

GenServer-based async event dispatcher for the agent system.

The AsyncDispatcher manages event routing between agents in an asynchronous, non-blocking manner. It maintains an event queue (FIFO) and processes events by routing them through a Mojentic.Router to the appropriate agents.

Features

  • Event Queue - FIFO queue using :queue module
  • Async Processing - Non-blocking event handling via Task and GenServer
  • Mixed Agent Support - Handles both sync and async agents
  • Graceful Shutdown - Stop via TerminateEvent or explicit stop/1
  • Queue Monitoring - Wait for empty queue with wait_for_empty_queue/2

Architecture


  Dispatcher 
   (GenServer) 

        Event Queue
        [:queue]
       
        Router  Agent1  [New Events]
                   
        Router  Agent2  [New Events]

State Structure

%{
  router: %Router{},
  event_queue: :queue.queue(),
  processing: boolean(),
  batch_size: integer()
}

Usage

# Create router
router = Router.new()
|> Router.add_route(QuestionEvent, fact_checker)
|> Router.add_route(QuestionEvent, answer_generator)

# Start dispatcher
{:ok, pid} = AsyncDispatcher.start_link(router: router)

# Dispatch events
event = %QuestionEvent{source: MyApp, question: "What is Elixir?"}
AsyncDispatcher.dispatch(pid, event)

# Wait for queue to empty
:ok = AsyncDispatcher.wait_for_empty_queue(pid, timeout: 10_000)

# Stop dispatcher
AsyncDispatcher.stop(pid)

Examples

# Full workflow
router = Router.new()
|> Router.add_route(QuestionEvent, fact_checker_pid)
|> Router.add_route(FactCheckEvent, aggregator_pid)

{:ok, dispatcher} = AsyncDispatcher.start_link(
  router: router,
  batch_size: 10
)

question = %QuestionEvent{
  source: MyApp,
  question: "What is the capital of France?"
}

AsyncDispatcher.dispatch(dispatcher, question)
AsyncDispatcher.wait_for_empty_queue(dispatcher)
AsyncDispatcher.stop(dispatcher)

Summary

Functions

Returns a specification to start this module under a supervisor.

Dispatches an event to the event queue.

Gets the current size of the event queue.

Starts the async dispatcher as a linked process.

Stops the dispatcher gracefully.

Waits for the event queue to be empty.

Types

state()

@type state() :: %{
  router: Mojentic.Router.t(),
  event_queue: :queue.queue(),
  processing: boolean(),
  batch_size: non_neg_integer(),
  pending_tasks: non_neg_integer()
}

Functions

child_spec(init_arg)

Returns a specification to start this module under a supervisor.

See Supervisor.

dispatch(pid, event)

Dispatches an event to the event queue.

The event will be assigned a correlation_id if it doesn't have one. Events are processed in FIFO order by the dispatcher loop.

Parameters

  • pid - The dispatcher process
  • event - The event to dispatch

Examples

event = %QuestionEvent{source: MyApp, question: "Hello?"}
AsyncDispatcher.dispatch(dispatcher, event)

get_queue_size(pid)

Gets the current size of the event queue.

Examples

size = AsyncDispatcher.get_queue_size(dispatcher)

start_link(opts)

Starts the async dispatcher as a linked process.

Options

  • :router - Router instance for event routing (required)
  • :batch_size - Number of events to process per batch (default: 5)
  • :name - Process registration name (optional)

Examples

{:ok, pid} = AsyncDispatcher.start_link(router: router)

{:ok, pid} = AsyncDispatcher.start_link(
  router: router,
  batch_size: 10,
  name: MyDispatcher
)

stop(pid, timeout \\ 5000)

Stops the dispatcher gracefully.

Waits for the current batch to complete before shutting down.

Parameters

  • pid - The dispatcher process
  • timeout - Maximum time to wait for shutdown (default: 5000ms)

Examples

AsyncDispatcher.stop(dispatcher)
AsyncDispatcher.stop(dispatcher, 10_000)

wait_for_empty_queue(pid, opts \\ [])

Waits for the event queue to be empty.

This is useful for testing or ensuring all events have been processed before continuing.

Parameters

  • pid - The dispatcher process
  • opts - Keyword list with:
    • :timeout - Maximum wait time in milliseconds (default: 5000)

Returns

  • :ok - Queue is empty
  • {:error, :timeout} - Timeout reached with events still in queue

Examples

:ok = AsyncDispatcher.wait_for_empty_queue(dispatcher)

case AsyncDispatcher.wait_for_empty_queue(dispatcher, timeout: 10_000) do
  :ok -> IO.puts("All events processed")
  {:error, :timeout} -> IO.puts("Timed out waiting")
end