Signal Bus
View SourceIn our previous guides, we explored how Actions provide composable building blocks, how Agents manage state, and how Signals enable real-time monitoring. Now, let's discover how the Signal Bus ties these components together by providing a robust messaging infrastructure.
Understanding the Signal Bus
At its core, the Jido Signal Bus is a message routing and persistence system that enables:
- Publishing and subscribing to signal streams
- Persistent message storage and replay
- Real-time signal distribution
- State snapshots for recovery
- Flexible adapter architecture
Think of the Signal Bus as the nervous system of your Jido application - it ensures signals flow reliably between components while maintaining message ordering and persistence guarantees.
Key Concepts
The Signal Bus is built around several key abstractions:
- Signals: Standardized message format following CloudEvents spec
- Streams: Ordered sequences of signals
- Subscriptions: Ways to consume signals (transient or persistent)
- Snapshots: Point-in-time state captures
- Adapters: Pluggable backend implementations
Getting Started
Let's see how to use the Signal Bus in practice:
# Start the bus with an in-memory adapter
children = [
{Jido.Bus, name: :my_bus, adapter: :in_memory}
]
# Or with Phoenix.PubSub for distributed scenarios
children = [
{Jido.Bus, name: :my_bus, adapter: :pubsub, pubsub_name: MyApp.PubSub}
]
Publishing Signals
Signals can be published to named streams:
signal = %Jido.Signal{
id: Jido.Util.generate_id(),
source: "user_service",
type: "user.registered",
data: %{user_id: "123", email: "user@example.com"},
metadata: %{client_ip: "127.0.0.1"}
}
:ok = Jido.Bus.publish(:my_bus, "user_stream", :any_version, [signal])
The :any_version
parameter tells the bus to append without checking versions. You can also use:
:no_stream
- Requires stream doesn't exist:stream_exists
- Requires stream exists- Integer version - Requires exact version match
Subscribing to Signals
The bus supports two types of subscriptions:
1. Transient Subscriptions
Best for temporary subscribers that don't need persistence:
# Subscribe to a single stream
:ok = Jido.Bus.subscribe(:my_bus, "user_stream")
# Receive signals
receive do
{:signals, signals} ->
Enum.each(signals, &process_signal/1)
end
2. Persistent Subscriptions
For durable subscribers that need guaranteed delivery:
{:ok, subscription} = Jido.Bus.subscribe_persistent(
:my_bus, # Bus name
"user_stream", # Stream to subscribe to
"user_processor", # Subscription name
self(), # Subscriber pid
:origin, # Start from beginning
[ # Options
concurrency_limit: 2,
partition_by: &partition_function/1
]
)
# Receive signals and acknowledge processing
receive do
{:signals, signals} ->
process_signals(signals)
Jido.Bus.ack(:my_bus, subscription, List.last(signals))
end
The subscription options include:
:origin
- Start from beginning:current
- Start from now- Integer position - Start from specific point
concurrency_limit
- Max concurrent subscriberspartition_by
- Function to partition signals
Replaying Signals
The bus supports replaying historical signals:
# Replay all signals from start
signals = Jido.Bus.replay(:my_bus, "user_stream")
# Replay from specific version
signals = Jido.Bus.replay(:my_bus, "user_stream", 100)
# Control batch size
signals = Jido.Bus.replay(:my_bus, "user_stream", 100, 50)
Enum.each(signals, &process_signal/1)
Working with Snapshots
Snapshots provide point-in-time state capture:
# Record a snapshot
snapshot = %Jido.Bus.Snapshot{
source_id: "user_123",
source_version: 50,
source_type: "UserAggregate",
data: serialize_state(current_state),
created_at: DateTime.utc_now()
}
:ok = Jido.Bus.record_snapshot(:my_bus, snapshot)
# Read snapshot
{:ok, snapshot} = Jido.Bus.read_snapshot(:my_bus, "user_123")
# Delete snapshot
:ok = Jido.Bus.delete_snapshot(:my_bus, "user_123")
Bus Adapters
The Signal Bus supports multiple backend adapters:
In-Memory Adapter
Best for development and testing:
- Full persistence and replay support
- Fast local operation
- Memory-bound storage
- No distribution
{Jido.Bus, name: :my_bus, adapter: :in_memory}
PubSub Adapter
Best for distributed scenarios:
- Built on Phoenix.PubSub
- Supports clustering
- Real-time message distribution
- No persistence/replay
{Jido.Bus, name: :my_bus, adapter: :pubsub, pubsub_name: MyApp.PubSub}