Jido.Signal.Bus
(Jido Signal v2.1.1)
View Source
Implements a signal bus for routing, filtering, and distributing signals.
The Bus acts as a central hub for signals in the system, allowing components to publish and subscribe to signals. It handles routing based on signal paths, subscription management, persistence, and signal filtering. The Bus maintains an internal log of signals and provides mechanisms for retrieving historical signals and snapshots.
Journal Configuration
The Bus can be configured with a journal adapter for persistent checkpoints. This allows subscriptions to resume from their last acknowledged position after restarts.
Via start_link
{:ok, bus} = Bus.start_link(
name: :my_bus,
journal_adapter: Jido.Signal.Journal.Adapters.ETS,
journal_adapter_opts: []
)Via Application Config
# In config/config.exs
config :jido_signal,
journal_adapter: Jido.Signal.Journal.Adapters.ETS,
journal_adapter_opts: []Available Adapters
Jido.Signal.Journal.Adapters.ETS- ETS-based persistence (default for production)Jido.Signal.Journal.Adapters.InMemory- In-memory persistence (for testing)Jido.Signal.Journal.Adapters.Mnesia- Mnesia-based persistence (for distributed systems)
If no adapter is configured, checkpoints will be in-memory only and will not survive process restarts.
Summary
Functions
Acknowledges a signal for a persistent subscription.
Returns a child specification for starting the bus under a supervisor.
Clears all DLQ entries for a subscription.
Lists all DLQ entries for a subscription.
Starts a new bus process.
Publishes a list of signals to the bus. Returns {:ok, recorded_signals} on success.
Reconnects a client to a persistent subscription.
Replays DLQ entries for a subscription, attempting redelivery.
Replays signals from the bus log that match the given path pattern. Optional start_timestamp to replay from a specific point in time.
Creates a new snapshot of signals matching the given path pattern.
Deletes a snapshot by its ID.
Lists all available snapshots.
Reads a snapshot by its ID.
Starts a new bus process and links it to the calling process.
Subscribes to signals matching the given path pattern. Options
Unsubscribes from signals using the subscription ID. Options
Types
Functions
@spec ack(server(), subscription_id(), String.t() | [String.t()] | integer()) :: :ok | {:error, term()}
Acknowledges a signal for a persistent subscription.
signal_id accepts:
- a single recorded signal ID (binary)
- a list of recorded signal IDs (for batch acknowledgement)
@spec child_spec(keyword()) :: Supervisor.child_spec()
Returns a child specification for starting the bus under a supervisor.
Options
- name: The name to register the bus under (required)
- router: A custom router implementation (optional)
@spec clear_dlq(server(), subscription_id()) :: :ok | {:error, term()}
Clears all DLQ entries for a subscription.
Returns
:ok- DLQ cleared{:error, term()}- If the operation fails
@spec dlq_entries(server(), subscription_id()) :: {:ok, [map()]} | {:error, term()}
Lists all DLQ entries for a subscription.
Parameters
- bus: The bus server reference
- subscription_id: The ID of the subscription
Returns
{:ok, [dlq_entry]}- List of DLQ entries{:error, term()}- If the operation fails
Starts a new bus process.
Options
:name- The name to register the bus under (required):router- A custom router implementation (optional):middleware- A list of {module, opts} tuples for middleware (optional):middleware_timeout_ms- Timeout for middleware execution in ms (default: 100):journal_adapter- Module implementingJido.Signal.Journal.Persistence(optional):journal_adapter_opts- Options to pass to journal adapter init (optional, unused by default adapters):journal_pid- Pre-initialized journal adapter pid (optional, skips adapter init if provided):max_log_size- Maximum number of signals to keep in the log (default: 100_000):log_ttl_ms- Optional TTL in milliseconds for log entries; enables periodic garbage collection (default: nil)
If :journal_adapter is not specified, falls back to application config
(:jido_signal, :journal_adapter).
@spec publish(server(), [Jido.Signal.t()]) :: {:ok, [Jido.Signal.Bus.RecordedSignal.t()]} | {:error, term()}
Publishes a list of signals to the bus. Returns {:ok, recorded_signals} on success.
@spec reconnect(server(), subscription_id(), pid()) :: {:ok, non_neg_integer()} | {:error, term()}
Reconnects a client to a persistent subscription.
@spec redrive_dlq(server(), subscription_id(), keyword()) :: {:ok, %{succeeded: integer(), failed: integer()}} | {:error, term()}
Replays DLQ entries for a subscription, attempting redelivery.
Options
:limit- Maximum entries to replay (default: all):clear_on_success- Remove from DLQ if delivery succeeds (default: true)
Returns
{:ok, %{succeeded: integer(), failed: integer()}}- Results of replay{:error, term()}- If the operation fails
@spec replay(server(), path(), non_neg_integer(), Keyword.t()) :: {:ok, [Jido.Signal.Bus.RecordedSignal.t()]} | {:error, term()}
Replays signals from the bus log that match the given path pattern. Optional start_timestamp to replay from a specific point in time.
@spec snapshot_create(server(), path()) :: {:ok, Jido.Signal.Bus.Snapshot.SnapshotRef.t()} | {:error, term()}
Creates a new snapshot of signals matching the given path pattern.
Deletes a snapshot by its ID.
@spec snapshot_list(server()) :: [Jido.Signal.Bus.Snapshot.SnapshotRef.t()]
Lists all available snapshots.
@spec snapshot_read(server(), String.t()) :: {:ok, Jido.Signal.Bus.Snapshot.SnapshotData.t()} | {:error, term()}
Reads a snapshot by its ID.
Starts a new bus process and links it to the calling process.
Options
:name- The name to register the bus under (required):router- A custom router implementation (optional):middleware- A list of {module, opts} tuples for middleware (optional):middleware_timeout_ms- Timeout for middleware execution in ms (default: 100):journal_adapter- Module implementingJido.Signal.Journal.Persistence(optional):journal_adapter_opts- Options to pass to journal adapter init (optional):journal_pid- Pre-initialized journal adapter pid (optional, skips adapter init if provided)
Returns
{:ok, pid}if the bus starts successfully{:error, reason}if the bus fails to start
Examples
iex> {:ok, pid} = Jido.Signal.Bus.start_link(name: :my_bus)
iex> is_pid(pid)
true
iex> {:ok, pid} = Jido.Signal.Bus.start_link([
...> name: :my_bus,
...> journal_adapter: Jido.Signal.Journal.Adapters.ETS
...> ])
iex> is_pid(pid)
true
@spec subscribe(server(), path(), Keyword.t()) :: {:ok, subscription_id()} | {:error, term()}
Subscribes to signals matching the given path pattern. Options:
- dispatch: How to dispatch signals to the subscriber (default: async to calling process)
- persistent?: Canonical key for persistent subscription behavior
- persistent: Backward-compatible alias for
persistent?
If both persistent and persistent? are provided, persistent? takes precedence.
@spec unsubscribe(server(), subscription_id(), Keyword.t()) :: :ok | {:error, term()}
Unsubscribes from signals using the subscription ID. Options:
- delete_persistence: Whether to delete persistent subscription data (default: false)