Jido.Signal.Dispatch (Jido v1.1.0-rc.2)

View Source

A flexible signal dispatching system that routes signals to various destinations using configurable adapters.

The Dispatch module serves as the central hub for signal delivery in the Jido system. It provides a unified interface for sending signals to different destinations through various adapters. Each adapter implements specific delivery mechanisms suited for different use cases.

Built-in Adapters

The following adapters are provided out of the box:

Configuration

Each adapter requires specific configuration options. A dispatch configuration is a tuple of {adapter_type, options} where:

  • adapter_type - One of the built-in adapter types above or a custom module implementing the Jido.Signal.Dispatch.Adapter behaviour
  • options - Keyword list of options specific to the chosen adapter

Multiple dispatch configurations can be provided as a list to send signals to multiple destinations.

Dispatch Modes

The module supports three dispatch modes:

  1. Synchronous (via dispatch/2) - Fire-and-forget dispatch that returns when all dispatches complete
  2. Asynchronous (via dispatch_async/2) - Returns immediately with a task that can be monitored
  3. Batched (via dispatch_batch/3) - Handles large numbers of dispatches in configurable batches

Examples

# Synchronous dispatch
config = {:pid, [target: pid, delivery_mode: :async]}
:ok = Dispatch.dispatch(signal, config)

# Asynchronous dispatch
{:ok, task} = Dispatch.dispatch_async(signal, config)
:ok = Task.await(task)

# Batch dispatch
configs = List.duplicate({:pid, [target: pid]}, 1000)
:ok = Dispatch.dispatch_batch(signal, configs, batch_size: 100)

# HTTP dispatch
config = {:http, [
  url: "https://api.example.com/events",
  method: :post,
  headers: [{"x-api-key", "secret"}]
]}
:ok = Dispatch.dispatch(signal, config)

# Webhook dispatch
config = {:webhook, [
  url: "https://api.example.com/webhook",
  secret: "webhook_secret",
  event_type_map: %{"user:created" => "user.created"}
]}
:ok = Dispatch.dispatch(signal, config)

Summary

Functions

Dispatches a signal using the provided configuration.

Dispatches a signal asynchronously using the provided configuration.

Dispatches a signal to multiple destinations in batches.

Validates a dispatch configuration.

Types

adapter()

@type adapter() ::
  :pid
  | :bus
  | :named
  | :pubsub
  | :logger
  | :console
  | :noop
  | :http
  | :webhook
  | nil
  | module()

batch_opts()

@type batch_opts() :: [batch_size: pos_integer(), max_concurrency: pos_integer()]

dispatch_config()

@type dispatch_config() :: {adapter(), Keyword.t()}

dispatch_configs()

@type dispatch_configs() :: dispatch_config() | [dispatch_config()]

Functions

dispatch(signal, config)

@spec dispatch(Jido.Signal.t(), dispatch_configs()) :: :ok | {:error, term()}

Dispatches a signal using the provided configuration.

This is a synchronous operation that returns when all dispatches complete. For asynchronous dispatch, use dispatch_async/2. For batch dispatch, use dispatch_batch/3.

Parameters

  • signal - The signal to dispatch
  • config - Either a single dispatch configuration tuple or a list of configurations

Examples

# Single destination
iex> config = {:pid, [target: {:pid, pid}, delivery_mode: :async]}
iex> Jido.Signal.Dispatch.dispatch(signal, config)
:ok

# Multiple destinations
iex> config = [
...>   {:bus, [target: {:bus, :default}, stream: "events"]},
...>   {:pubsub, [target: :audit, topic: "audit"]}
...> ]
iex> Jido.Signal.Dispatch.dispatch(signal, config)
:ok

dispatch_async(signal, config)

@spec dispatch_async(Jido.Signal.t(), dispatch_configs()) ::
  {:ok, Task.t()} | {:error, term()}

Dispatches a signal asynchronously using the provided configuration.

Returns immediately with a task that can be monitored for completion.

Parameters

  • signal - The signal to dispatch
  • config - Either a single dispatch configuration tuple or a list of configurations

Returns

  • {:ok, task} where task is a Task that can be awaited
  • {:error, reason} if the configuration is invalid

Examples

{:ok, task} = Dispatch.dispatch_async(signal, config)
:ok = Task.await(task)

dispatch_batch(signal, configs, opts \\ [])

@spec dispatch_batch(Jido.Signal.t(), [dispatch_config()], batch_opts()) ::
  :ok | {:error, [{non_neg_integer(), term()}]}

Dispatches a signal to multiple destinations in batches.

This is useful when dispatching to a large number of destinations to avoid overwhelming the system. The dispatches are processed in batches of configurable size with configurable concurrency.

Parameters

  • signal - The signal to dispatch
  • configs - List of dispatch configurations
  • opts - Batch options:
    • :batch_size - Size of each batch (default: 50)
    • :max_concurrency - Maximum number of concurrent batches (default: 5)

Returns

  • :ok if all dispatches succeed
  • {:error, errors} where errors is a list of {index, reason} tuples

Examples

configs = List.duplicate({:pid, [target: pid]}, 1000)
:ok = Dispatch.dispatch_batch(signal, configs, batch_size: 100)

validate_opts(config)

@spec validate_opts(dispatch_configs()) ::
  {:ok, dispatch_configs()} | {:error, term()}

Validates a dispatch configuration.

Parameters

  • config - Either a single dispatch configuration tuple or a list of dispatch configurations

Returns

  • {:ok, config} if the configuration is valid
  • {:error, reason} if the configuration is invalid

Examples

# Single config
iex> config = {:pid, [target: {:pid, self()}, delivery_mode: :async]}
iex> Jido.Signal.Dispatch.validate_opts(config)
{:ok, ^config}

# Multiple configs
iex> config = [
...>   {:bus, [target: {:bus, :default}, stream: "events"]},
...>   {:pubsub, [target: {:pubsub, :audit}, topic: "audit"]}
...> ]
iex> Jido.Signal.Dispatch.validate_opts(config)
{:ok, ^config}