BB.PubSub (bb v0.2.1)

View Source

Hierarchical pubsub system for robot component messages.

Allows processes to subscribe to messages by path with optional message type filtering. Paths are prefixed with a source type atom (:sensor, :actuator, etc.) followed by the location path through the robot topology.

Path Format

[:sensor, :base_link, :joint1, :imu1]      # specific sensor
[:actuator, :base_link, :joint1, :motor1]  # specific actuator

Subscription Patterns

# Exact match - only messages from this specific sensor
subscribe(MyRobot, [:sensor, :base_link, :joint1, :imu1])

# Subtree - all sensors under joint1
subscribe(MyRobot, [:sensor, :base_link, :joint1])

# All of type - all sensors anywhere
subscribe(MyRobot, [:sensor])

# All messages
subscribe(MyRobot, [])

Message Format

Subscribers receive messages as:

{:bb, source_path, %BB.Message{}}

Where source_path is the full path of the publisher.

Message Type Filtering

Subscribe with message_types option to filter by payload type:

subscribe(MyRobot, [:sensor], message_types: [BB.Message.Sensor.Imu])

Empty list (default) means no filtering - receive all message types.

Summary

Functions

Publish a message to all matching subscribers.

Returns the pubsub registry name for a robot module.

Subscribe the calling process to messages matching the given path.

List subscribers registered at a specific path.

Unsubscribe the calling process from the given path.

Functions

publish(robot, path, message)

@spec publish(module(), [atom()], BB.Message.t()) :: :ok

Publish a message to all matching subscribers.

The message is dispatched to subscribers registered at the exact path and all ancestor paths. At each level, subscribers are filtered by their registered message_types (if any).

Examples

# From a sensor process
path = [:sensor | state.bb.path]
publish(state.bb.robot, path, message)

registry_name(robot_module)

@spec registry_name(module()) :: atom()

Returns the pubsub registry name for a robot module.

subscribe(robot, path, opts \\ [])

@spec subscribe(module(), [atom()], keyword()) :: {:ok, pid()} | {:error, term()}

Subscribe the calling process to messages matching the given path.

Options

  • :message_types - List of message payload modules to receive. Empty list (default) means receive all message types.

Examples

# All IMU messages from sensors under joint1
subscribe(MyRobot, [:sensor, :base_link, :joint1],
  message_types: [BB.Message.Sensor.Imu])

# All sensor messages (no type filter)
subscribe(MyRobot, [:sensor])

# All messages from anywhere
subscribe(MyRobot, [])

subscribers(robot, path)

@spec subscribers(module(), [atom()]) :: [{pid(), [module()]}]

List subscribers registered at a specific path.

Returns a list of {pid, message_types} tuples. Useful for debugging.

unsubscribe(robot, path)

@spec unsubscribe(module(), [atom()]) :: :ok

Unsubscribe the calling process from the given path.