Signals and Dispatch
View SourceSignal Structure
Signals implement the CloudEvents v1.0.2 specification with Jido extensions:
# Basic signal
# Preferred: positional constructor (type, data, attrs)
{:ok, signal} = Jido.Signal.new("user.created", %{user_id: "123", email: "user@example.com"},
source: "/auth/service"
)
# Also available: Map/keyword constructor (backwards compatible)
{:ok, signal} = Jido.Signal.new(%{
type: "user.created",
source: "/auth/service",
data: %{user_id: "123", email: "user@example.com"}
})
# Signal fields
signal.id # UUID v4 (auto-generated)
signal.specversion # "1.0.2"
signal.type # "user.created"
signal.source # "/auth/service"
signal.data # %{user_id: "123", email: "user@example.com"}
signal.time # ISO 8601 timestamp (auto-generated)
signal.datacontenttype # "application/json" (default)
signal.jido_dispatch # Dispatch configuration (DEPRECATED - pass to Bus.subscribe/3 or Dispatch.dispatch/2 instead)
signal.extensions # Map of extension namespaces to extension data
# Data semantics (CloudEvents):
# - When datacontenttype is JSON (or omitted in JSON format), data may be any JSON value
# (object/map, array, string, number, boolean, null)
# - For non-JSON payloads, encode according to datacontenttype; binary payloads use data_base64 when serialized to JSONSignal IDs
Signals use UUID7 format for their id field, which provides time-ordered identifiers that are efficient for database indexing and naturally sortable:
# IDs are auto-generated by Signal.new
{:ok, signal} = Jido.Signal.new("user.created", %{user_id: "123"}, source: "/auth")
signal.id # => "0194c3d8-7e82-7d4a-8d5c-1a2b3c4d5e6f"
# Generate IDs manually
id = Jido.Signal.ID.generate!()
# Extract timestamp from ID
timestamp = Jido.Signal.ID.extract_timestamp(signal.id)
# => 1677721600000 (Unix milliseconds)
# Compare IDs chronologically
:lt = Jido.Signal.ID.compare(older_signal.id, newer_signal.id)ID Utilities
| Function | Description |
|---|---|
Jido.Signal.ID.generate!/0 | Generate a new UUID7 string |
Jido.Signal.ID.generate/0 | Generate UUID7 with timestamp tuple |
Jido.Signal.ID.extract_timestamp/1 | Get Unix milliseconds from ID |
Jido.Signal.ID.compare/2 | Chronological comparison (:lt, :eq, :gt) |
Jido.Signal.ID.sequence_number/1 | Get sequence number within millisecond |
Jido.Signal.ID.valid?/1 | Validate UUID7 format |
Jido.Signal.ID.generate_batch/1 | Generate multiple ordered IDs |
Dispatch Adapters
PID Adapter
Direct process delivery:
# Async delivery (fire-and-forget)
config = {:pid, [target: pid, delivery_mode: :async]}
:ok = Jido.Signal.Dispatch.dispatch(signal, config)
# Sync delivery (with response)
config = {:pid, [
target: pid,
delivery_mode: :sync,
timeout: 10_000
]}Named Process Adapter
Delivery to registered processes:
config = {:named, [
target: {:name, :my_server},
delivery_mode: :async
]}PubSub Adapter
Phoenix.PubSub broadcast:
config = {:pubsub, [
target: :my_app_pubsub,
topic: "events"
]}HTTP Adapter
HTTP endpoint delivery:
config = {:http, [
url: "https://api.example.com/events",
method: :post,
headers: [{"x-api-key", "secret"}],
timeout: 5000,
retry: %{max_attempts: 3, base_delay: 1000}
]}Sync vs Async Patterns
Synchronous Dispatch
Blocks until all deliveries complete:
:ok = Jido.Signal.Dispatch.dispatch(signal, config)Asynchronous Dispatch
Returns task immediately:
{:ok, task} = Jido.Signal.Dispatch.dispatch_async(signal, config)
:ok = Task.await(task)Batch Dispatch
Multiple destinations with concurrency control:
configs = List.duplicate({:pid, [target: pid]}, 1000)
:ok = Jido.Signal.Dispatch.dispatch_batch(
signal,
configs,
batch_size: 100,
max_concurrency: 5
)Multiple Destinations
Send to multiple adapters:
configs = [
{:pubsub, [target: :pubsub, topic: "events"]},
{:logger, [level: :info]},
{:http, [url: "https://webhook.example.com"]}
]
:ok = Jido.Signal.Dispatch.dispatch(signal, configs)Custom Signal Types
Define structured signal types:
defmodule UserCreatedSignal do
use Jido.Signal,
type: "user.created",
default_source: "/auth/service",
datacontenttype: "application/json",
schema: [
user_id: [type: :string, required: true],
email: [type: :string, required: true],
name: [type: :string, required: false]
]
end
# Usage
{:ok, signal} = UserCreatedSignal.new(%{
user_id: "123",
email: "user@example.com"
})
# Override defaults
{:ok, signal} = UserCreatedSignal.new(
%{user_id: "123", email: "user@example.com"},
source: "/different/source"
)
# Then dispatch separately (preferred over jido_dispatch field):
Jido.Signal.Dispatch.dispatch(signal, {:pubsub, [target: :pubsub, topic: "user-events"]})Schema Validation
Custom signals validate data against schema:
# Valid
{:ok, signal} = UserCreatedSignal.new(%{
user_id: "123",
email: "user@example.com"
})
# Invalid - missing required field
{:error, reason} = UserCreatedSignal.new(%{
user_id: "123"
# email is required
})Error Handling
Validation Errors
# Invalid dispatch config
{:error, reason} = Jido.Signal.Dispatch.validate_opts({:invalid, []})
# Invalid signal data
{:error, reason} = Jido.Signal.new(%{}) # missing type and sourceDelivery Errors
By default, dispatch returns raw error atoms. Structured errors (Jido.Signal.Error.DispatchError) are opt-in via configuration.
# Process not alive (returns raw atom by default)
{:error, :process_not_alive} =
Jido.Signal.Dispatch.dispatch(signal, {:pid, [target: dead_pid]})
# HTTP timeout
{:error, :timeout} =
Jido.Signal.Dispatch.dispatch(signal, {:http, [url: "...", timeout: 1]})Batch Errors
# Some failures
{:error, errors} = Jido.Signal.Dispatch.dispatch_batch(signal, configs)
# errors = [{index, reason}, ...]Circuit Breaker
The Jido.Signal.Dispatch.CircuitBreaker module provides fault isolation for dispatch adapters using the :fuse library. Circuits are per-adapter-type, providing bulk protection without per-endpoint overhead.
Configuration
Default settings:
- 5 failures in 10 seconds triggers the circuit to open
- 30 second reset time before allowing requests again
Usage
alias Jido.Signal.Dispatch.CircuitBreaker
# Install circuit breaker (once at application startup)
:ok = CircuitBreaker.install(:http,
strategy: {:standard, 5, 10_000}, # 5 failures in 10 seconds
refresh: 30_000 # 30 second reset
)
# Wrap dispatch calls with circuit breaker protection
case CircuitBreaker.run(:http, fn ->
Jido.Signal.Dispatch.dispatch(signal, {:http, [url: "https://api.example.com/events"]})
end) do
:ok ->
:ok
{:error, :circuit_open} ->
# Circuit is open, degrade gracefully
Logger.warning("HTTP circuit open, queuing for retry")
{:error, :circuit_open}
{:error, reason} ->
{:error, reason}
end
# Check circuit status
:ok = CircuitBreaker.status(:http) # Circuit closed (healthy)
:blown = CircuitBreaker.status(:http) # Circuit open (failing)
# Manually reset circuit
:ok = CircuitBreaker.reset(:http)Telemetry Events
The circuit breaker emits telemetry events:
[:jido, :dispatch, :circuit, :melt]- Failure recorded[:jido, :dispatch, :circuit, :rejected]- Request rejected (circuit open)[:jido, :dispatch, :circuit, :reset]- Circuit reset
Next Steps
- Event Bus - Publish/subscribe messaging with middleware hooks and persistent subscriptions
- Signal Router - High-performance trie-based routing with pattern matching