Guide: The Dispatch System
View SourceThe Dispatch System is Jido.Signal's pluggable architecture for sending signals to various destinations. It provides a unified interface for delivering signals to processes, external services, logging systems, and custom endpoints.
Adapter-Based Architecture
The dispatch system uses adapters to handle different delivery mechanisms. Each adapter implements the Jido.Signal.Dispatch.Adapter
behaviour, providing consistent interfaces for validation and delivery.
# Basic dispatch configuration
dispatch_config = {:pid, target: self()}
# Dispatch with options
dispatch_config = {:http, url: "https://api.example.com/webhook", headers: %{"Authorization" => "Bearer token"}}
Built-in Adapters
:pid
- Process Delivery
Sends signals directly to Elixir processes:
# Send to a specific PID
Bus.subscribe(:my_bus, "user.*", dispatch: {:pid, target: self()})
# Send to a named process
Bus.subscribe(:my_bus, "user.*", dispatch: {:pid, target: :user_handler})
# Async delivery to avoid blocking
Bus.subscribe(:my_bus, "user.*", dispatch: {:pid, target: self(), async: true})
Options:
target
- PID or registered process name (required)async
- Send asynchronously (default: false)timeout
- Delivery timeout in milliseconds (default: 5000)
:named
- Named Process Delivery
Sends signals to processes registered with specific names:
# Send to locally registered process
Bus.subscribe(:my_bus, "user.*", dispatch: {:named, name: MyApp.UserHandler})
# Send to globally registered process
Bus.subscribe(:my_bus, "user.*", dispatch: {:named, name: {:global, :user_handler}})
# Send via Registry
Bus.subscribe(:my_bus, "user.*", dispatch: {:named, name: {:via, Registry, {MyApp.Registry, :handler}}})
Options:
name
- Process name specification (required)async
- Send asynchronously (default: false)timeout
- Delivery timeout (default: 5000)
:pubsub
- Phoenix.PubSub Integration
Distributes signals across clustered nodes using Phoenix.PubSub:
# Broadcast to a topic
Bus.subscribe(:my_bus, "user.*", dispatch: {:pubsub, topic: "user_events"})
# Use custom PubSub instance
Bus.subscribe(:my_bus, "user.*", dispatch: {:pubsub,
pubsub: MyApp.PubSub,
topic: "critical_events"
})
# Local broadcast only
Bus.subscribe(:my_bus, "user.*", dispatch: {:pubsub,
topic: "local_events",
local?: true
})
Options:
topic
- PubSub topic name (required)pubsub
- PubSub instance name (default: app's configured PubSub)local?
- Broadcast only to local node (default: false)
:logger
- Structured Logging
Logs signals using Elixir's Logger:
# Basic logging
Bus.subscribe(:my_bus, "error.**", dispatch: {:logger, level: :error})
# With custom formatting
Bus.subscribe(:my_bus, "user.*", dispatch: {:logger,
level: :info,
format: "User event: $type from $source",
metadata: [:user_id, :timestamp]
})
Options:
level
- Log level (:debug
,:info
,:warn
,:error
)format
- Custom log format stringmetadata
- Additional metadata fields to include
:console
- Console Output
Prints signals to stdout/stderr for development and debugging:
# Simple console output
Bus.subscribe(:my_bus, "debug.**", dispatch: {:console})
# Formatted output
Bus.subscribe(:my_bus, "user.*", dispatch: {:console,
format: :pretty,
colors: true
})
# Output to stderr
Bus.subscribe(:my_bus, "error.**", dispatch: {:console,
device: :stderr,
format: :json
})
Options:
format
- Output format (:json
,:pretty
,:compact
)colors
- Enable colored output (default: true)device
- Output device (:stdio
,:stderr
)
:noop
- No Operation
Discards signals (useful for testing and conditional routing):
# Conditional dispatch based on environment
dispatch_config = if Mix.env() == :test do
{:noop}
else
{:logger, level: :info}
end
Bus.subscribe(:my_bus, "test.**", dispatch: dispatch_config)
:http
- HTTP Delivery
Sends signals as HTTP requests:
# Basic HTTP POST
Bus.subscribe(:my_bus, "webhook.**", dispatch: {:http,
url: "https://api.example.com/events"
})
# With authentication and custom headers
Bus.subscribe(:my_bus, "critical.**", dispatch: {:http,
url: "https://alerts.example.com/webhook",
method: :post,
headers: %{
"Authorization" => "Bearer #{token}",
"Content-Type" => "application/json",
"X-Source" => "jido-signal"
},
timeout: 10_000
})
Options:
url
- Target URL (required)method
- HTTP method (default::post
)headers
- Custom headers maptimeout
- Request timeout in milliseconds (default: 5000)retry_policy
- Retry configuration for failed requests
:webhook
- Webhook Delivery
Specialized HTTP delivery with webhook-specific features:
# Webhook with signature verification
Bus.subscribe(:my_bus, "payment.**", dispatch: {:webhook,
url: "https://merchant.example.com/webhook",
secret: "webhook_secret_key",
signature_header: "X-Signature"
})
# Multiple webhook endpoints
Bus.subscribe(:my_bus, "order.**", dispatch: {:webhook,
urls: [
"https://inventory.example.com/webhook",
"https://analytics.example.com/webhook",
"https://notifications.example.com/webhook"
],
parallel: true
})
Options:
url
orurls
- Webhook endpoint(s)secret
- Secret for HMAC signature generationsignature_header
- Header name for signature (default: "X-Signature")parallel
- Send to multiple URLs concurrently (default: false)
Dispatching to Multiple Targets
Send signals to multiple destinations simultaneously:
Bus.subscribe(:my_bus, "user.created", dispatch: [
{:logger, level: :info},
{:http, url: "https://crm.example.com/webhook"},
{:pid, target: :email_service},
{:pubsub, topic: "user_events"}
])
Asynchronous and Batch Dispatching
Asynchronous Dispatch
For non-blocking delivery:
# Individual async dispatch
Dispatch.dispatch_async(signal, {:http, url: "https://slow-api.example.com"})
# Async subscription
Bus.subscribe(:my_bus, "analytics.**", dispatch: {:http,
url: "https://analytics.example.com",
async: true
})
Batch Dispatch
For high-throughput scenarios:
signals = [signal1, signal2, signal3]
# Batch dispatch to single target
Dispatch.dispatch_batch(signals, {:http, url: "https://api.example.com/batch"})
# Batch dispatch with different targets
dispatch_configs = [
{:logger, level: :info},
{:http, url: "https://api.example.com/webhook"}
]
Dispatch.dispatch_batch(signals, dispatch_configs)
Error Handling and Retries
Retry Policies
Configure automatic retries for failed deliveries:
Bus.subscribe(:my_bus, "critical.**", dispatch: {:http,
url: "https://critical-api.example.com/webhook",
retry_policy: %{
max_retries: 3,
backoff: :exponential,
base_delay: 1000, # 1 second
max_delay: 30_000, # 30 seconds
jitter: true
}
})
Dead Letter Handling
Route failed signals to alternative destinations:
Bus.subscribe(:my_bus, "payment.**", dispatch: [
# Primary delivery
{:http, url: "https://payment-processor.example.com/webhook"},
# Dead letter queue for failures
{:logger, level: :error, on_failure: true},
{:http, url: "https://dead-letter-queue.example.com", on_failure: true}
])
Circuit Breaker Pattern
Protect against cascading failures:
Bus.subscribe(:my_bus, "external.**", dispatch: {:http,
url: "https://external-service.example.com",
circuit_breaker: %{
failure_threshold: 5,
recovery_timeout: 60_000,
half_open_max_calls: 3
}
})
Custom Dispatch Adapters
Create custom adapters for specialized delivery needs:
defmodule MyApp.SlackAdapter do
@behaviour Jido.Signal.Dispatch.Adapter
@impl true
def validate_opts(opts) do
required = [:channel, :token]
case NimbleOptions.validate(opts, required) do
{:ok, validated} -> {:ok, validated}
{:error, error} -> {:error, error}
end
end
@impl true
def deliver(signal, opts) do
case post_to_slack(signal, opts) do
{:ok, response} -> {:ok, response}
{:error, reason} -> {:error, reason}
end
end
defp post_to_slack(signal, opts) do
# Implementation details
end
end
Use custom adapters:
Bus.subscribe(:my_bus, "alerts.**", dispatch: {MyApp.SlackAdapter,
channel: "#alerts",
token: "slack-bot-token"
})
Performance Optimization
Connection Pooling
For HTTP-based adapters, use connection pooling:
# Configure HTTP client with pooling
Bus.subscribe(:my_bus, "api.**", dispatch: {:http,
url: "https://api.example.com/webhook",
pool: :api_pool,
pool_size: 10,
pool_timeout: 5000
})
Batching Strategies
Optimize for throughput with batching:
# Time-based batching
Bus.subscribe(:my_bus, "metrics.**", dispatch: {:http,
url: "https://metrics.example.com/batch",
batch_size: 100,
batch_timeout: 5000
})
# Size-based batching
Bus.subscribe(:my_bus, "logs.**", dispatch: {:http,
url: "https://logs.example.com/bulk",
batch_size: 1000,
flush_interval: 10_000
})
Monitoring and Observability
Telemetry Integration
The dispatch system emits telemetry events for monitoring:
:telemetry.attach("dispatch-metrics", [:jido_signal, :dispatch, :deliver], fn event, measurements, metadata, _config ->
# Log delivery metrics
Logger.info("Signal delivered", [
adapter: metadata.adapter,
duration: measurements.duration,
success: metadata.success
])
end)
Health Checks
Monitor adapter health:
# Check adapter connectivity
{:ok, status} = Dispatch.health_check({:http, url: "https://api.example.com"})
# Returns: %{status: :healthy, latency: 45, last_success: ~U[...]}
The Dispatch System provides a flexible, robust foundation for integrating Jido.Signal with any external system or service, ensuring reliable signal delivery across your entire architecture.