Signals and Dispatch

View Source

Signal Structure

Signals implement the CloudEvents v1.0.2 specification with Jido extensions:

# Basic signal
# Preferred: positional constructor (type, data, attrs)
{:ok, signal} = Jido.Signal.new("user.created", %{user_id: "123", email: "user@example.com"},
  source: "/auth/service"
)

# Also available: Map/keyword constructor (backwards compatible)
{:ok, signal} = Jido.Signal.new(%{
  type: "user.created",
  source: "/auth/service",
  data: %{user_id: "123", email: "user@example.com"}
})

# Signal fields
signal.id              # UUID v4 (auto-generated)
signal.specversion     # "1.0.2"
signal.type            # "user.created"
signal.source          # "/auth/service"
signal.data            # %{user_id: "123", email: "user@example.com"}
signal.time            # ISO 8601 timestamp (auto-generated)
signal.datacontenttype # "application/json" (default)
signal.jido_dispatch   # Dispatch configuration (DEPRECATED - pass to Bus.subscribe/3 or Dispatch.dispatch/2 instead)
signal.extensions      # Map of extension namespaces to extension data

# Data semantics (CloudEvents):
# - When datacontenttype is JSON (or omitted in JSON format), data may be any JSON value
#   (object/map, array, string, number, boolean, null)
# - For non-JSON payloads, encode according to datacontenttype; binary payloads use data_base64 when serialized to JSON

Signal IDs

Signals use UUID7 format for their id field, which provides time-ordered identifiers that are efficient for database indexing and naturally sortable:

# IDs are auto-generated by Signal.new
{:ok, signal} = Jido.Signal.new("user.created", %{user_id: "123"}, source: "/auth")
signal.id  # => "0194c3d8-7e82-7d4a-8d5c-1a2b3c4d5e6f"

# Generate IDs manually
id = Jido.Signal.ID.generate!()

# Extract timestamp from ID
timestamp = Jido.Signal.ID.extract_timestamp(signal.id)
# => 1677721600000 (Unix milliseconds)

# Compare IDs chronologically
:lt = Jido.Signal.ID.compare(older_signal.id, newer_signal.id)

ID Utilities

FunctionDescription
Jido.Signal.ID.generate!/0Generate a new UUID7 string
Jido.Signal.ID.generate/0Generate UUID7 with timestamp tuple
Jido.Signal.ID.extract_timestamp/1Get Unix milliseconds from ID
Jido.Signal.ID.compare/2Chronological comparison (:lt, :eq, :gt)
Jido.Signal.ID.sequence_number/1Get sequence number within millisecond
Jido.Signal.ID.valid?/1Validate UUID7 format
Jido.Signal.ID.generate_batch/1Generate multiple ordered IDs

Dispatch Adapters

PID Adapter

Direct process delivery:

# Async delivery (fire-and-forget)
config = {:pid, [target: pid, delivery_mode: :async]}
:ok = Jido.Signal.Dispatch.dispatch(signal, config)

# Sync delivery (with response)
config = {:pid, [
  target: pid,
  delivery_mode: :sync,
  timeout: 10_000
]}

Named Process Adapter

Delivery to registered processes:

config = {:named, [
  target: {:name, :my_server},
  delivery_mode: :async
]}

PubSub Adapter

Phoenix.PubSub broadcast:

config = {:pubsub, [
  target: :my_app_pubsub,
  topic: "events"
]}

HTTP Adapter

HTTP endpoint delivery:

config = {:http, [
  url: "https://api.example.com/events",
  method: :post,
  headers: [{"x-api-key", "secret"}],
  timeout: 5000,
  retry: %{max_attempts: 3, base_delay: 1000}
]}

Sync vs Async Patterns

Synchronous Dispatch

Blocks until all deliveries complete:

:ok = Jido.Signal.Dispatch.dispatch(signal, config)

Asynchronous Dispatch

Returns task immediately:

{:ok, task} = Jido.Signal.Dispatch.dispatch_async(signal, config)
:ok = Task.await(task)

Batch Dispatch

Multiple destinations with concurrency control:

configs = List.duplicate({:pid, [target: pid]}, 1000)
:ok = Jido.Signal.Dispatch.dispatch_batch(
  signal, 
  configs, 
  batch_size: 100,
  max_concurrency: 5
)

Multiple Destinations

Send to multiple adapters:

configs = [
  {:pubsub, [target: :pubsub, topic: "events"]},
  {:logger, [level: :info]},
  {:http, [url: "https://webhook.example.com"]}
]
:ok = Jido.Signal.Dispatch.dispatch(signal, configs)

Custom Signal Types

Define structured signal types:

defmodule UserCreatedSignal do
  use Jido.Signal,
    type: "user.created",
    default_source: "/auth/service",
    datacontenttype: "application/json",
    schema: [
      user_id: [type: :string, required: true],
      email: [type: :string, required: true],
      name: [type: :string, required: false]
    ]
end

# Usage
{:ok, signal} = UserCreatedSignal.new(%{
  user_id: "123",
  email: "user@example.com"
})

# Override defaults
{:ok, signal} = UserCreatedSignal.new(
  %{user_id: "123", email: "user@example.com"},
  source: "/different/source"
)

# Then dispatch separately (preferred over jido_dispatch field):
Jido.Signal.Dispatch.dispatch(signal, {:pubsub, [target: :pubsub, topic: "user-events"]})

Schema Validation

Custom signals validate data against schema:

# Valid
{:ok, signal} = UserCreatedSignal.new(%{
  user_id: "123",
  email: "user@example.com"
})

# Invalid - missing required field
{:error, reason} = UserCreatedSignal.new(%{
  user_id: "123"
  # email is required
})

Error Handling

Validation Errors

# Invalid dispatch config
{:error, reason} = Jido.Signal.Dispatch.validate_opts({:invalid, []})

# Invalid signal data
{:error, reason} = Jido.Signal.new(%{})  # missing type and source

Delivery Errors

By default, dispatch returns raw error atoms. Structured errors (Jido.Signal.Error.DispatchError) are opt-in via configuration.

# Process not alive (returns raw atom by default)
{:error, :process_not_alive} = 
  Jido.Signal.Dispatch.dispatch(signal, {:pid, [target: dead_pid]})

# HTTP timeout
{:error, :timeout} = 
  Jido.Signal.Dispatch.dispatch(signal, {:http, [url: "...", timeout: 1]})

Batch Errors

# Some failures
{:error, errors} = Jido.Signal.Dispatch.dispatch_batch(signal, configs)
# errors = [{index, reason}, ...]

Circuit Breaker

The Jido.Signal.Dispatch.CircuitBreaker module provides fault isolation for dispatch adapters using the :fuse library. Circuits are per-adapter-type, providing bulk protection without per-endpoint overhead.

Configuration

Default settings:

  • 5 failures in 10 seconds triggers the circuit to open
  • 30 second reset time before allowing requests again

Usage

alias Jido.Signal.Dispatch.CircuitBreaker

# Install circuit breaker (once at application startup)
:ok = CircuitBreaker.install(:http, 
  strategy: {:standard, 5, 10_000},  # 5 failures in 10 seconds
  refresh: 30_000                     # 30 second reset
)

# Wrap dispatch calls with circuit breaker protection
case CircuitBreaker.run(:http, fn ->
       Jido.Signal.Dispatch.dispatch(signal, {:http, [url: "https://api.example.com/events"]})
     end) do
  :ok -> 
    :ok
  {:error, :circuit_open} -> 
    # Circuit is open, degrade gracefully
    Logger.warning("HTTP circuit open, queuing for retry")
    {:error, :circuit_open}
  {:error, reason} -> 
    {:error, reason}
end

# Check circuit status
:ok = CircuitBreaker.status(:http)    # Circuit closed (healthy)
:blown = CircuitBreaker.status(:http) # Circuit open (failing)

# Manually reset circuit
:ok = CircuitBreaker.reset(:http)

Telemetry Events

The circuit breaker emits telemetry events:

  • [:jido, :dispatch, :circuit, :melt] - Failure recorded
  • [:jido, :dispatch, :circuit, :rejected] - Request rejected (circuit open)
  • [:jido, :dispatch, :circuit, :reset] - Circuit reset

Next Steps

  • Event Bus - Publish/subscribe messaging with middleware hooks and persistent subscriptions
  • Signal Router - High-performance trie-based routing with pattern matching