Jido.Signal.Bus (Jido Signal v2.1.1)

View Source

Implements a signal bus for routing, filtering, and distributing signals.

The Bus acts as a central hub for signals in the system, allowing components to publish and subscribe to signals. It handles routing based on signal paths, subscription management, persistence, and signal filtering. The Bus maintains an internal log of signals and provides mechanisms for retrieving historical signals and snapshots.

Journal Configuration

The Bus can be configured with a journal adapter for persistent checkpoints. This allows subscriptions to resume from their last acknowledged position after restarts.

{:ok, bus} = Bus.start_link(
  name: :my_bus,
  journal_adapter: Jido.Signal.Journal.Adapters.ETS,
  journal_adapter_opts: []
)

Via Application Config

# In config/config.exs
config :jido_signal,
  journal_adapter: Jido.Signal.Journal.Adapters.ETS,
  journal_adapter_opts: []

Available Adapters

If no adapter is configured, checkpoints will be in-memory only and will not survive process restarts.

Summary

Functions

Acknowledges a signal for a persistent subscription.

Returns a child specification for starting the bus under a supervisor.

Clears all DLQ entries for a subscription.

Lists all DLQ entries for a subscription.

Starts a new bus process.

Publishes a list of signals to the bus. Returns {:ok, recorded_signals} on success.

Reconnects a client to a persistent subscription.

Replays DLQ entries for a subscription, attempting redelivery.

Replays signals from the bus log that match the given path pattern. Optional start_timestamp to replay from a specific point in time.

Creates a new snapshot of signals matching the given path pattern.

Deletes a snapshot by its ID.

Lists all available snapshots.

Reads a snapshot by its ID.

Starts a new bus process and links it to the calling process.

Subscribes to signals matching the given path pattern. Options

Unsubscribes from signals using the subscription ID. Options

Types

path()

@type path() :: Jido.Signal.Router.path()

server()

@type server() ::
  pid() | atom() | binary() | {name :: atom() | binary(), registry :: module()}

start_option()

@type start_option() :: {:name, atom()} | {atom(), term()}

subscription_id()

@type subscription_id() :: String.t()

Functions

ack(bus, subscription_id, signal_id)

@spec ack(server(), subscription_id(), String.t() | [String.t()] | integer()) ::
  :ok | {:error, term()}

Acknowledges a signal for a persistent subscription.

signal_id accepts:

  • a single recorded signal ID (binary)
  • a list of recorded signal IDs (for batch acknowledgement)

child_spec(init_arg)

@spec child_spec(keyword()) :: Supervisor.child_spec()

Returns a child specification for starting the bus under a supervisor.

Options

  • name: The name to register the bus under (required)
  • router: A custom router implementation (optional)

clear_dlq(bus, subscription_id)

@spec clear_dlq(server(), subscription_id()) :: :ok | {:error, term()}

Clears all DLQ entries for a subscription.

Returns

  • :ok - DLQ cleared
  • {:error, term()} - If the operation fails

dlq_entries(bus, subscription_id)

@spec dlq_entries(server(), subscription_id()) :: {:ok, [map()]} | {:error, term()}

Lists all DLQ entries for a subscription.

Parameters

  • bus: The bus server reference
  • subscription_id: The ID of the subscription

Returns

  • {:ok, [dlq_entry]} - List of DLQ entries
  • {:error, term()} - If the operation fails

init(arg)

Starts a new bus process.

Options

  • :name - The name to register the bus under (required)
  • :router - A custom router implementation (optional)
  • :middleware - A list of {module, opts} tuples for middleware (optional)
  • :middleware_timeout_ms - Timeout for middleware execution in ms (default: 100)
  • :journal_adapter - Module implementing Jido.Signal.Journal.Persistence (optional)
  • :journal_adapter_opts - Options to pass to journal adapter init (optional, unused by default adapters)
  • :journal_pid - Pre-initialized journal adapter pid (optional, skips adapter init if provided)
  • :max_log_size - Maximum number of signals to keep in the log (default: 100_000)
  • :log_ttl_ms - Optional TTL in milliseconds for log entries; enables periodic garbage collection (default: nil)

If :journal_adapter is not specified, falls back to application config (:jido_signal, :journal_adapter).

publish(bus, signals)

@spec publish(server(), [Jido.Signal.t()]) ::
  {:ok, [Jido.Signal.Bus.RecordedSignal.t()]} | {:error, term()}

Publishes a list of signals to the bus. Returns {:ok, recorded_signals} on success.

reconnect(bus, subscription_id, client_pid)

@spec reconnect(server(), subscription_id(), pid()) ::
  {:ok, non_neg_integer()} | {:error, term()}

Reconnects a client to a persistent subscription.

redrive_dlq(bus, subscription_id, opts \\ [])

@spec redrive_dlq(server(), subscription_id(), keyword()) ::
  {:ok, %{succeeded: integer(), failed: integer()}} | {:error, term()}

Replays DLQ entries for a subscription, attempting redelivery.

Options

  • :limit - Maximum entries to replay (default: all)
  • :clear_on_success - Remove from DLQ if delivery succeeds (default: true)

Returns

  • {:ok, %{succeeded: integer(), failed: integer()}} - Results of replay
  • {:error, term()} - If the operation fails

replay(bus, path \\ "*", start_timestamp \\ 0, opts \\ [])

@spec replay(server(), path(), non_neg_integer(), Keyword.t()) ::
  {:ok, [Jido.Signal.Bus.RecordedSignal.t()]} | {:error, term()}

Replays signals from the bus log that match the given path pattern. Optional start_timestamp to replay from a specific point in time.

snapshot_create(bus, path)

@spec snapshot_create(server(), path()) ::
  {:ok, Jido.Signal.Bus.Snapshot.SnapshotRef.t()} | {:error, term()}

Creates a new snapshot of signals matching the given path pattern.

snapshot_delete(bus, snapshot_id)

@spec snapshot_delete(server(), String.t()) :: :ok | {:error, term()}

Deletes a snapshot by its ID.

snapshot_list(bus)

@spec snapshot_list(server()) :: [Jido.Signal.Bus.Snapshot.SnapshotRef.t()]

Lists all available snapshots.

snapshot_read(bus, snapshot_id)

@spec snapshot_read(server(), String.t()) ::
  {:ok, Jido.Signal.Bus.Snapshot.SnapshotData.t()} | {:error, term()}

Reads a snapshot by its ID.

start_link(opts)

@spec start_link(keyword()) :: {:ok, pid()} | {:error, term()}

Starts a new bus process and links it to the calling process.

Options

  • :name - The name to register the bus under (required)
  • :router - A custom router implementation (optional)
  • :middleware - A list of {module, opts} tuples for middleware (optional)
  • :middleware_timeout_ms - Timeout for middleware execution in ms (default: 100)
  • :journal_adapter - Module implementing Jido.Signal.Journal.Persistence (optional)
  • :journal_adapter_opts - Options to pass to journal adapter init (optional)
  • :journal_pid - Pre-initialized journal adapter pid (optional, skips adapter init if provided)

Returns

  • {:ok, pid} if the bus starts successfully
  • {:error, reason} if the bus fails to start

Examples

iex> {:ok, pid} = Jido.Signal.Bus.start_link(name: :my_bus)
iex> is_pid(pid)
true

iex> {:ok, pid} = Jido.Signal.Bus.start_link([
...>   name: :my_bus,
...>   journal_adapter: Jido.Signal.Journal.Adapters.ETS
...> ])
iex> is_pid(pid)
true

subscribe(bus, path, opts \\ [])

@spec subscribe(server(), path(), Keyword.t()) ::
  {:ok, subscription_id()} | {:error, term()}

Subscribes to signals matching the given path pattern. Options:

  • dispatch: How to dispatch signals to the subscriber (default: async to calling process)
  • persistent?: Canonical key for persistent subscription behavior
  • persistent: Backward-compatible alias for persistent?

If both persistent and persistent? are provided, persistent? takes precedence.

unsubscribe(bus, subscription_id, opts \\ [])

@spec unsubscribe(server(), subscription_id(), Keyword.t()) :: :ok | {:error, term()}

Unsubscribes from signals using the subscription ID. Options:

  • delete_persistence: Whether to delete persistent subscription data (default: false)

via_tuple(name, opts \\ [])

See Jido.Signal.Util.via_tuple/2.

whereis(server, opts \\ [])

See Jido.Signal.Util.whereis/2.