In this tutorial, you'll learn how to add sensors to your robot and subscribe to their messages using Beam Bots' hierarchical PubSub system.
Prerequisites
Complete Starting and Stopping. You should have a MyRobot module that you can start.
Adding a Sensor to the DSL
Sensors are processes that publish data. Add one to your robot:
defmodule MyRobot do
use BB
topology do
link :base do
sensor :imu, MyImuSensor
joint :pan_joint do
# ... rest of robot
end
end
end
endThe sensor declaration takes:
- A name (
:imu) - A child spec (
MyImuSensoror{MyImuSensor, options})
Sensors can be attached at three levels:
- Robot level - in a
sensors do ... endblock - Link level - inside a link definition
- Joint level - inside a joint definition
Implementing a Sensor Process
A sensor is a GenServer that publishes messages. Here's a simple IMU sensor:
defmodule MyImuSensor do
use GenServer
alias BB.Message.Sensor.Imu
alias BB.Message.{Vec3, Quaternion}
alias BB.PubSub
def start_link(opts) do
GenServer.start_link(__MODULE__, opts)
end
@impl GenServer
def init(opts) do
# BB passes robot context in opts
robot = Keyword.fetch!(opts, :robot)
path = Keyword.fetch!(opts, :path)
# Schedule periodic readings
:timer.send_interval(100, :read_sensor)
{:ok, %{robot: robot, path: path}}
end
@impl GenServer
def handle_info(:read_sensor, state) do
# Create an IMU message
{:ok, message} = Imu.new(:imu,
orientation: Quaternion.identity(),
angular_velocity: Vec3.zero(),
linear_acceleration: Vec3.new(0.0, 0.0, 9.81)
)
# Publish to subscribers
# Path format: [:sensor | location_path]
PubSub.publish(state.robot, [:sensor | state.path], message)
{:noreply, state}
end
endKey points:
- BB passes
:robotand:pathin the options - The path reflects where the sensor is in the topology (e.g.,
[:base, :imu]) Publish with
[:sensor | path]to identify it as a sensor message
For Roboticists: This is similar to ROS publishers. The sensor publishes on a topic (path) and subscribers receive the messages asynchronously.
For Elixirists: The sensor is just a GenServer. BB starts it as part of the supervision tree and provides context about where it sits in the robot topology.
Subscribing to Messages
Start your robot and subscribe to sensor messages:
iex> {:ok, _} = BB.Supervisor.start_link(MyRobot)
iex> BB.PubSub.subscribe(MyRobot, [:sensor])
{:ok, #PID<0.234.0>}Now your IEx process receives sensor messages:
iex> flush()
{:bb, [:sensor, :base, :imu], %BB.Message{...}}
{:bb, [:sensor, :base, :imu], %BB.Message{...}}Subscription Patterns
The path you subscribe to determines which messages you receive:
# All sensor messages from anywhere
BB.PubSub.subscribe(MyRobot, [:sensor])
# Sensors under the base link
BB.PubSub.subscribe(MyRobot, [:sensor, :base])
# Only the specific IMU sensor
BB.PubSub.subscribe(MyRobot, [:sensor, :base, :imu])
# All messages (sensors, actuators, everything)
BB.PubSub.subscribe(MyRobot, [])Filtering by Message Type
Subscribe only to specific message types:
alias BB.Message.Sensor.Imu
BB.PubSub.subscribe(MyRobot, [:sensor],
message_types: [Imu]
)This is useful when you have many sensors but only care about IMU data.
Receiving Messages in a Process
In a real application, you'll receive messages in a GenServer:
defmodule MyController do
use GenServer
def start_link(opts) do
GenServer.start_link(__MODULE__, opts)
end
@impl GenServer
def init(opts) do
robot = Keyword.fetch!(opts, :robot)
# Subscribe to all sensor messages
BB.PubSub.subscribe(robot, [:sensor])
{:ok, %{robot: robot}}
end
@impl GenServer
def handle_info({:bb, path, message}, state) do
# Process the sensor message
IO.inspect(message.payload, label: "Received from #{inspect(path)}")
{:noreply, state}
end
endMessage Structure
Messages have a standard envelope structure:
%BB.Message{
timestamp: -576460748776542, # monotonic nanoseconds
frame_id: :imu,
payload: %BB.Message.Sensor.Imu{
orientation: {:quaternion, 0.0, 0.0, 0.0, 1.0},
angular_velocity: {:vec3, 0.0, 0.0, 0.0},
linear_acceleration: {:vec3, 0.0, 0.0, 9.81}
}
}timestamp- Monotonic time in nanoseconds (fromSystem.monotonic_time/1)frame_id- Coordinate frame for the data (typically the sensor name)payload- The actual sensor data struct (type depends on message type)
Available Message Types
BB includes common sensor message types:
| Module | Description |
|---|---|
BB.Message.Sensor.Imu | Accelerometer, gyroscope |
BB.Message.Sensor.JointState | Joint positions, velocities, efforts |
BB.Message.Sensor.LaserScan | Lidar range data |
BB.Message.Sensor.Range | Single distance measurement |
BB.Message.Sensor.Image | Camera images |
BB.Message.Sensor.BatteryState | Battery status |
And geometry types for transforms and motion:
| Module | Description |
|---|---|
BB.Message.Geometry.Point3D | 3D point (wraps BB.Math.Vec3) |
BB.Message.Geometry.Pose | Position + orientation (wraps BB.Math.Transform) |
BB.Message.Geometry.Twist | Linear + angular velocity |
BB.Message.Geometry.Wrench | Force + torque |
BB.Message.Geometry.Accel | Linear + angular acceleration |
Creating Custom Payload Types
You can define your own payload types for domain-specific sensor data. Use the use BB.Message macro with a schema:
defmodule MyApp.Message.Temperature do
@moduledoc "Temperature reading from a thermal sensor."
defstruct [:celsius, :sensor_id]
use BB.Message,
schema: [
celsius: [
type: :float,
required: true,
doc: "Temperature in degrees Celsius"
],
sensor_id: [
type: :atom,
required: true,
doc: "Identifier of the temperature sensor"
]
]
@type t :: %__MODULE__{
celsius: float(),
sensor_id: atom()
}
# Custom convenience constructor (in addition to generated new/2)
@spec new(atom(), atom(), float()) ::
{:ok, BB.Message.t()} | {:error, term()}
def new(frame_id, sensor_id, celsius) do
new(frame_id, celsius: celsius, sensor_id: sensor_id)
end
endThe use BB.Message macro:
- Sets up the
BB.Messagebehaviour - Compiles the schema via
Spark.Options - Generates a
new/2function:new(frame_id, attrs) - Implements the
schema/0callback
Note: Define defstruct before use BB.Message.
Use your custom payload in a sensor:
defmodule MyTemperatureSensor do
use GenServer
alias MyApp.Message.Temperature
alias BB.PubSub
def start_link(opts), do: GenServer.start_link(__MODULE__, opts)
@impl GenServer
def init(opts) do
robot = Keyword.fetch!(opts, :robot)
path = Keyword.fetch!(opts, :path)
:timer.send_interval(1000, :read_temperature)
{:ok, %{robot: robot, path: path}}
end
@impl GenServer
def handle_info(:read_temperature, state) do
# Read from actual hardware here
celsius = 23.5 + :rand.uniform() * 2
{:ok, message} = Temperature.new(:thermal_sensor, :temp_1, celsius)
PubSub.publish(state.robot, [:sensor | state.path], message)
{:noreply, state}
end
endThe Spark.Options schema validates attributes when creating messages. If validation fails, BB.Message.new/3 returns {:error, reason} with details about what went wrong.
Unsubscribing
Stop receiving messages:
BB.PubSub.unsubscribe(MyRobot, [:sensor])Debugging Subscriptions
List who's subscribed to a path:
iex> BB.PubSub.subscribers(MyRobot, [:sensor])
[{#PID<0.234.0>, []}] # PID and message type filtersSensors with Options
Pass configuration to your sensor:
topology do
link :base do
sensor :imu, {MyImuSensor, sample_rate: 200, bus: :spi0}
end
endYour sensor receives these in start_link/1:
def init(opts) do
robot = Keyword.fetch!(opts, :robot)
path = Keyword.fetch!(opts, :path)
sample_rate = Keyword.get(opts, :sample_rate, 100)
bus = Keyword.get(opts, :bus, :i2c1)
# ...
endRobot-Level Sensors
Some sensors aren't attached to a specific link (e.g., GPS, battery monitor). Define them at robot level:
defmodule MyRobot do
use BB
sensors do
sensor :gps, GpsSensor
sensor :battery, BatteryMonitor
end
topology do
# ... links and joints
end
endThese sensors publish with shorter paths: [:sensor, :gps] instead of [:sensor, :base, :gps].
What's Next?
You can now publish and subscribe to sensor data. In the next tutorial, we'll:
- Use sensor data to compute robot state
- Understand forward kinematics
- Calculate link positions from joint angles
Continue to Forward Kinematics.