Event Bus
View SourceThe Event Bus provides publish/subscribe messaging with middleware hooks, persistent subscriptions, and snapshot functionality.
Basic Publish/Subscribe
Start a bus and publish signals:
# Start the bus
{:ok, _pid} = Jido.Signal.Bus.start_link(name: :my_bus)
# Create and publish signals
signal = Jido.Signal.new!("user.created", %{user_id: 123})
{:ok, _recorded} = Jido.Signal.Bus.publish(:my_bus, [signal])
# Subscribe to signal patterns
{:ok, sub_id} = Jido.Signal.Bus.subscribe(:my_bus, "user.*")
# Subscribe with specific dispatch
{:ok, sub_id} = Jido.Signal.Bus.subscribe(:my_bus, "user.*",
dispatch: {:pid, target: self(), delivery_mode: :async}
)Unsubscribe:
:ok = Jido.Signal.Bus.unsubscribe(:my_bus, sub_id)Error Handling
All Bus functions return {:ok, result} on success or {:error, term()} on failure:
case Jido.Signal.Bus.publish(:my_bus, [signal]) do
{:ok, recorded_signals} ->
Logger.info("Published #{length(recorded_signals)} signals")
{:error, reason} ->
Logger.error("Publish failed: #{inspect(reason)}")
end
case Jido.Signal.Bus.subscribe(:my_bus, "user.*") do
{:ok, sub_id} -> sub_id
{:error, :bus_not_found} -> raise "Bus not running"
{:error, reason} -> raise "Subscribe failed: #{inspect(reason)}"
endMiddleware Hooks
Implement custom middleware to intercept signals:
defmodule MyMiddleware do
use Jido.Signal.Bus.Middleware
@impl true
def init(opts), do: {:ok, %{}}
@impl true
def before_publish(signals, context, state) do
# Transform or validate signals before publishing
{:cont, signals, state}
end
@impl true
def before_dispatch(signal, subscriber, context, state) do
# Filter or modify signal before dispatch to subscriber
if authorized?(signal, subscriber) do
{:cont, signal, state}
else
{:skip, state} # Skip this subscriber
end
end
@impl true
def after_dispatch(signal, subscriber, result, context, state) do
# Log or handle dispatch results
{:cont, state}
end
defp authorized?(signal, subscriber), do: true
endStart bus with middleware:
{:ok, _pid} = Jido.Signal.Bus.start_link(
name: :my_bus,
middleware: [{MyMiddleware, []}]
)Middleware callbacks are executed by Jido.Signal.Bus.MiddlewarePipeline with per-callback timeouts (middleware_timeout_ms, default 100ms). Each callback can:
{:cont, ...}- Continue processing{:skip, state}- Skip this subscriber (forbefore_dispatchonly){:halt, reason, state}- Stop processing and fail the operation
Persistent Subscriptions
Persistent subscriptions provide reliable message delivery with acknowledgments, checkpointing, and automatic retry with Dead Letter Queue (DLQ) support.
# Create persistent subscription with reliability options
{:ok, sub_id} = Jido.Signal.Bus.subscribe(:my_bus, "order.*",
persistent: true,
dispatch: {:pid, target: self(), delivery_mode: :async},
max_in_flight: 500, # Max unacknowledged signals (default: 1000)
max_pending: 5_000, # Max queued signals before backpressure (default: 10_000)
max_attempts: 5, # Retry attempts before moving to DLQ (default: 5)
retry_interval: 500 # Milliseconds between retry waves (default: 100)
)
# Acknowledge processed signals
:ok = Jido.Signal.Bus.ack(:my_bus, sub_id, signal_id)
# Batch acknowledgment
:ok = Jido.Signal.Bus.ack(:my_bus, sub_id, [signal_id_1, signal_id_2, signal_id_3])
# Reconnect after client disconnect (subscription survives)
{:ok, last_checkpoint} = Jido.Signal.Bus.reconnect(:my_bus, sub_id, self())Persistent subscriptions require a journal adapter to persist checkpoints across restarts:
{:ok, _pid} = Jido.Signal.Bus.start_link(
name: :my_bus,
journal_adapter: Jido.Signal.Journal.Adapters.ETS
)When configured, checkpoints are automatically persisted and restored on reconnection.
Dead Letter Queue (DLQ)
When a persistent subscription exhausts all retry attempts (max_attempts), failed signals are moved to the Dead Letter Queue for later inspection and reprocessing.
# List DLQ entries for a subscription
{:ok, entries} = Jido.Signal.Bus.get_dlq_entries(:my_bus, sub_id)
# Each entry: %{id, subscription_id, signal, reason, metadata, inserted_at}
# Redrive (re-dispatch) DLQ messages
{:ok, %{succeeded: count, failed: failures}} =
Jido.Signal.Bus.redrive_dlq(:my_bus, sub_id, limit: 100)
# Clear all DLQ entries for a subscription
:ok = Jido.Signal.Bus.clear_dlq(:my_bus, sub_id)DLQ requires a journal adapter that implements the DLQ callbacks. Both ETS and Mnesia adapters support DLQ out of the box.
Horizontal Scaling with Partitions
For high-throughput scenarios, the bus can distribute dispatch across multiple partition workers:
{:ok, _pid} = Jido.Signal.Bus.start_link(
name: :my_bus,
partition_count: 4, # Number of partition workers
partition_rate_limit_per_sec: 10_000, # Rate limit per partition
partition_burst_size: 1_000, # Token bucket burst size
middleware: [
{Jido.Signal.Bus.Middleware.Logger, [level: :info]}
],
journal_adapter: Jido.Signal.Journal.Adapters.ETS
)With partitions enabled:
- Non-persistent subscriptions are sharded across partitions based on subscription ID
- Each partition has independent rate limiting using a token bucket algorithm
- Persistent subscriptions are handled by the main bus process to honor backpressure
- Rate-limited signals emit telemetry:
[:jido, :signal, :bus, :rate_limited]
Snapshots and Replay
Create snapshots of signal history:
# Create snapshot of all user events
{:ok, snapshot_ref} = Jido.Signal.Bus.snapshot_create(:my_bus, "user.*")
# Read snapshot data
{:ok, snapshot_data} = Jido.Signal.Bus.snapshot_read(:my_bus, snapshot_ref.id)
# List all snapshots
snapshots = Jido.Signal.Bus.snapshot_list(:my_bus)
# Delete snapshot
:ok = Jido.Signal.Bus.snapshot_delete(:my_bus, snapshot_ref.id)Replay signals from specific timestamp:
# Replay all signals from timestamp
{:ok, signals} = Jido.Signal.Bus.replay(:my_bus, "*", timestamp)
# Replay specific signal types
{:ok, user_signals} = Jido.Signal.Bus.replay(:my_bus, "user.*", timestamp)Advanced Configuration
Configure bus with custom router and options:
custom_router = Jido.Signal.Router.new!()
{:ok, _pid} = Jido.Signal.Bus.start_link(
name: :my_bus,
router: custom_router,
middleware: [
{Jido.Signal.Bus.Middleware.Logger, []},
{MyCustomMiddleware, [option: :value]}
]
)Persistent subscription options:
{:ok, sub_id} = Jido.Signal.Bus.subscribe(:my_bus, "events.*",
persistent: true,
dispatch: {:pid, target: self(), delivery_mode: :async},
max_in_flight: 100, # Max unacknowledged signals
max_pending: 5_000, # Max pending before backpressure
max_attempts: 5, # Attempts before DLQ
retry_interval: 500, # Retry interval in ms
start_from: :origin # :origin, :current, or timestamp
)Next Steps
- Signal Router - Trie-based routing with pattern matching and priority execution
- Signal Extensions - Add domain-specific metadata while maintaining CloudEvents compliance