Jido.Signal.Dispatch
(Jido Signal v2.0.0-rc.4)
View Source
A flexible signal dispatching system that routes signals to various destinations using configurable adapters.
The Dispatch module serves as the central hub for signal delivery in the Jido system. It provides a unified interface for sending signals to different destinations through various adapters. Each adapter implements specific delivery mechanisms suited for different use cases.
Built-in Adapters
The following adapters are provided out of the box:
:pid- Direct delivery to a specific process (seeJido.Signal.Dispatch.PidAdapter):bus- Delivery to an event bus (UNSUPPORTED - implementation pending):named- Delivery to a named process (seeJido.Signal.Dispatch.Named):pubsub- Delivery via PubSub mechanism (seeJido.Signal.Dispatch.PubSub):logger- Log signals using Logger (seeJido.Signal.Dispatch.LoggerAdapter):console- Print signals to console (seeJido.Signal.Dispatch.ConsoleAdapter):noop- No-op adapter for testing/development (seeJido.Signal.Dispatch.NoopAdapter):http- HTTP requests using :httpc (seeJido.Signal.Dispatch.Http):webhook- Webhook delivery with signatures (seeJido.Signal.Dispatch.Webhook)
Configuration
Each adapter requires specific configuration options. A dispatch configuration is a tuple of
{adapter_type, options} where:
adapter_type- One of the built-in adapter types above or a custom module implementing theJido.Signal.Dispatch.Adapterbehaviouroptions- Keyword list of options specific to the chosen adapter
Multiple dispatch configurations can be provided as a list to send signals to multiple destinations.
Dispatch Modes
The module supports three dispatch modes:
- Synchronous (via
dispatch/2) - Fire-and-forget dispatch that returns when all dispatches complete - Asynchronous (via
dispatch_async/2) - Returns immediately with a task that can be monitored - Batched (via
dispatch_batch/3) - Handles large numbers of dispatches in configurable batches
Concurrency
When dispatching to multiple targets (via a list of configs), the dispatch system processes
them in parallel using Task.Supervisor.async_stream/3. The maximum concurrency can be
configured at compile-time or runtime:
# In config.exs (compile-time)
config :jido, :dispatch_max_concurrency, 16
# Default: 8 concurrent dispatchesThis parallel processing significantly improves throughput for multiple targets. For example, dispatching to 10 targets with 100ms latency each completes in ~200ms (with default concurrency) instead of ~1000ms sequentially.
Error Handling
BREAKING CHANGE in behavior: When dispatching to multiple targets (list of configs),
dispatch/2 now aggregates all errors instead of returning only the first error:
- Single config:
dispatch(signal, config)returns:okor{:error, reason} - Multiple configs:
dispatch(signal, configs)returns:okor{:error, [reason1, reason2, ...]}
The batch dispatch function dispatch_batch/3 continues to return indexed errors as before:
{:error, [{index, reason}, ...]}.
Reserved Options
The following option keys are reserved for internal use and should not be used in dispatch configurations:
:__validated__- Marks pre-validated configurations (internal optimization flag)
Deprecated
batch_sizeoption indispatch_batch/3- Kept for backwards compatibility but no longer used. All dispatches now use the same parallel processing withmax_concurrencycontrol.
Examples
# Synchronous dispatch
config = {:pid, [target: pid, delivery_mode: :async]}
:ok = Dispatch.dispatch(signal, config)
# Asynchronous dispatch
{:ok, task} = Dispatch.dispatch_async(signal, config)
:ok = Task.await(task)
# Batch dispatch
configs = List.duplicate({:pid, [target: pid]}, 1000)
:ok = Dispatch.dispatch_batch(signal, configs, max_concurrency: 20)
# Multiple targets with parallel dispatch
configs = [
{:pid, [target: pid1]},
{:http, [url: "https://api.example.com/webhook"]},
{:pubsub, [target: :my_pubsub, topic: "events"]}
]
# Returns :ok or {:error, [error1, error2, ...]}
Dispatch.dispatch(signal, configs)
# HTTP dispatch
config = {:http, [
url: "https://api.example.com/events",
method: :post,
headers: [{"x-api-key", "secret"}]
]}
:ok = Dispatch.dispatch(signal, config)
# Webhook dispatch
config = {:webhook, [
url: "https://api.example.com/webhook",
secret: "webhook_secret",
event_type_map: %{"user:created" => "user.created"}
]}
:ok = Dispatch.dispatch(signal, config)
Summary
Functions
Dispatches a signal using the provided configuration.
Dispatches a signal asynchronously using the provided configuration.
Dispatches a signal to multiple destinations in batches.
Validates a dispatch configuration without executing the dispatch.
Types
@type adapter() :: :pid | :named | :pubsub | :logger | :console | :noop | :http | :webhook | nil | module()
@type batch_opts() :: [batch_size: pos_integer(), max_concurrency: pos_integer()]
@type dispatch_configs() :: dispatch_config() | [dispatch_config()]
Functions
@spec dispatch(Jido.Signal.t(), dispatch_configs()) :: :ok | {:error, term()}
Dispatches a signal using the provided configuration.
This is a synchronous operation that returns when all dispatches complete.
For asynchronous dispatch, use dispatch_async/2.
For batch dispatch, use dispatch_batch/3.
When dispatching to multiple targets (list of configs), dispatches are executed in parallel with configurable concurrency (default: 8 concurrent tasks).
Parameters
signal- The signal to dispatchconfig- Either a single dispatch configuration tuple or a list of configurations
Returns
:ok- All dispatches succeeded{:error, reason}- Single config dispatch failed{:error, [reason1, reason2, ...]}- One or more multi-config dispatches failed (aggregated errors)
Examples
# Single destination
iex> config = {:pid, [target: {:pid, pid}, delivery_mode: :async]}
iex> Jido.Signal.Dispatch.dispatch(signal, config)
:ok
# Multiple destinations (executed in parallel)
iex> config = [
...> {:bus, [target: {:bus, :default}, stream: "events"]},
...> {:pubsub, [target: :audit, topic: "audit"]}
...> ]
iex> Jido.Signal.Dispatch.dispatch(signal, config)
:ok
# Error aggregation for multiple targets
iex> config = [
...> {:pid, [target: pid1]},
...> {:invalid_adapter, []},
...> {:pid, [target: pid2]}
...> ]
iex> Jido.Signal.Dispatch.dispatch(signal, config)
{:error, [%Jido.Signal.Error{...}]}
@spec dispatch_async(Jido.Signal.t(), dispatch_configs()) :: {:ok, Task.t()} | {:error, term()}
Dispatches a signal asynchronously using the provided configuration.
Returns immediately with a task that can be monitored for completion.
Parameters
signal- The signal to dispatchconfig- Either a single dispatch configuration tuple or a list of configurations
Returns
{:ok, task}where task is a Task that can be awaited{:error, reason}if the configuration is invalid
Examples
{:ok, task} = Dispatch.dispatch_async(signal, config)
:ok = Task.await(task)
@spec dispatch_batch(Jido.Signal.t(), [dispatch_config()], batch_opts()) :: :ok | {:error, [{non_neg_integer(), term()}]}
Dispatches a signal to multiple destinations in batches.
This is useful when dispatching to a large number of destinations to avoid overwhelming the system. The dispatches are processed with configurable concurrency.
Parameters
signal- The signal to dispatchconfigs- List of dispatch configurationsopts- Batch options::batch_size- Deprecated (kept for backwards compatibility, no longer used):max_concurrency- Maximum number of concurrent dispatches (default: 5)
Returns
:okif all dispatches succeed{:error, errors}where errors is a list of{index, reason}tuples
Examples
configs = List.duplicate({:pid, [target: pid]}, 1000)
:ok = Dispatch.dispatch_batch(signal, configs, max_concurrency: 20)
@spec validate_opts(dispatch_configs()) :: {:ok, dispatch_configs()} | {:error, term()}
Validates a dispatch configuration without executing the dispatch.
This is useful for pre-validating configurations before dispatch time, or for testing adapter configurations. Validation is automatically performed during dispatch, but this function allows explicit validation for debugging or testing purposes.
Parameters
config- Either a single dispatch configuration tuple or a list of dispatch configurations
Returns
{:ok, config}if the configuration is valid{:error, reason}if the configuration is invalid
Examples
# Single config
iex> config = {:pid, [target: {:pid, self()}, delivery_mode: :async]}
iex> Jido.Signal.Dispatch.validate_opts(config)
{:ok, ^config}
# Multiple configs
iex> config = [
...> {:bus, [target: {:bus, :default}, stream: "events"]},
...> {:pubsub, [target: {:pubsub, :audit}, topic: "audit"]}
...> ]
iex> Jido.Signal.Dispatch.validate_opts(config)
{:ok, ^config}
# Pre-validation for testing
{:ok, validated} = Dispatch.validate_opts({:pid, [target: pid]})
# Use validated config later...