BB.PubSub (bb v0.15.1)

Copy Markdown View Source

Hierarchical pubsub system for robot component messages.

Allows processes to subscribe to messages by path with optional message type filtering. Paths are prefixed with a source type atom (:sensor, :actuator, etc.) followed by the location path through the robot topology.

Path Format

[:sensor, :base_link, :joint1, :imu1]      # specific sensor
[:actuator, :base_link, :joint1, :motor1]  # specific actuator

Subscription Patterns

# Exact match - only messages from this specific sensor
subscribe(MyRobot, [:sensor, :base_link, :joint1, :imu1])

# Subtree - all sensors under joint1
subscribe(MyRobot, [:sensor, :base_link, :joint1])

# All of type - all sensors anywhere
subscribe(MyRobot, [:sensor])

# All messages
subscribe(MyRobot, [])

Message Format

Subscribers receive messages as:

{:bb, source_path, %BB.Message{}}

Where source_path is the full path of the publisher.

Message Type Filtering

Subscribe with message_types option to filter by payload type:

subscribe(MyRobot, [:sensor], message_types: [BB.Message.Sensor.Imu])

Empty list (default) means no filtering - receive all message types.

Summary

Functions

Publish a message to all matching subscribers.

Returns the pubsub registry name for a robot module.

Subscribe the calling process to messages matching the given path.

List subscribers registered at a specific path.

Unsubscribe the calling process from the given path.

Functions

publish(robot, path, message)

@spec publish(module(), [atom()], BB.Message.t()) :: :ok

Publish a message to all matching subscribers.

The message is dispatched to subscribers registered at the exact path and all ancestor paths. At each level, subscribers are filtered by their registered message_types (if any).

Examples

# From a sensor process
path = [:sensor | state.bb.path]
publish(state.bb.robot, path, message)

registry_name(robot_module)

@spec registry_name(module()) :: atom()

Returns the pubsub registry name for a robot module.

subscribe(robot, path, opts \\ [])

@spec subscribe(module(), [atom()], keyword()) :: {:ok, pid()} | {:error, term()}

Subscribe the calling process to messages matching the given path.

Options

  • :message_types - List of message payload modules to receive. Empty list (default) means receive all message types.

Examples

# All IMU messages from sensors under joint1
subscribe(MyRobot, [:sensor, :base_link, :joint1],
  message_types: [BB.Message.Sensor.Imu])

# All sensor messages (no type filter)
subscribe(MyRobot, [:sensor])

# All messages from anywhere
subscribe(MyRobot, [])

subscribers(robot, path)

@spec subscribers(module(), [atom()]) :: [{pid(), [module()]}]

List subscribers registered at a specific path.

Returns a list of {pid, message_types} tuples. Useful for debugging.

unsubscribe(robot, path)

@spec unsubscribe(module(), [atom()]) :: :ok

Unsubscribe the calling process from the given path.