How to Troubleshoot PubSub
View SourceDiagnose and fix common issues with BB's publish-subscribe system.
Prerequisites
- Understanding of BB PubSub (see Understanding the PubSub System)
- A running BB robot
Common Symptoms
| Symptom | Likely Cause |
|---|---|
| No messages received | Wrong path, not subscribed, publisher not running |
| Messages delayed | Slow subscriber, mailbox backlog |
| Duplicate messages | Multiple subscriptions, multiple publishers |
| Messages stop | Publisher crashed, unsubscribed |
Diagnostic Tools
See All Messages
Subscribe to the root path to see everything:
BB.subscribe(MyRobot, [])
# Messages will print in IEx
# {:bb, [:sensor, :shoulder], %BB.Message{...}}
# {:bb, [:state_machine], %BB.Message{...}}Count Messages by Path
defmodule MessageCounter do
use GenServer
def start_link(robot) do
GenServer.start_link(__MODULE__, robot, name: __MODULE__)
end
def init(robot) do
BB.subscribe(robot, [])
{:ok, %{counts: %{}, robot: robot}}
end
def get_counts, do: GenServer.call(__MODULE__, :get_counts)
def reset, do: GenServer.cast(__MODULE__, :reset)
def handle_call(:get_counts, _from, state) do
{:reply, state.counts, state}
end
def handle_cast(:reset, state) do
{:noreply, %{state | counts: %{}}}
end
def handle_info({:bb, path, _msg}, state) do
key = Enum.take(path, 2) |> Enum.join(".")
counts = Map.update(state.counts, key, 1, &(&1 + 1))
{:noreply, %{state | counts: counts}}
end
end
# Usage
MessageCounter.start_link(MyRobot)
Process.sleep(5000)
MessageCounter.get_counts()
# => %{"sensor.shoulder" => 50, "state_machine" => 2}Use Event Stream Widget
In Livebook with bb_kino:
BB.Kino.events(MyRobot)Or with bb_liveview dashboard - the event stream component shows all messages.
Issue: Messages Not Received
Check 1: Is the Publisher Running?
# List all processes for the robot
Registry.select(BB.Registry, [{{MyRobot, :"$1", :"$2"}, [], [{{:"$1", :"$2"}}]}])
# => [{[:sensor, :shoulder], #PID<0.456.0>}, ...]If the expected process isn't listed, check:
- Robot topology includes the sensor/actuator
- Process hasn't crashed (check logs)
Check 2: Correct Path?
Paths are hierarchical. Common mistakes:
# WRONG - extra nesting
BB.subscribe(MyRobot, [:joint, :shoulder, :sensor, :encoder])
# RIGHT - sensors are at joint level
BB.subscribe(MyRobot, [:sensor, :encoder])Check what path the publisher uses:
# Subscribe to everything, look at actual paths
BB.subscribe(MyRobot, [])Check 3: Subscription Active?
Subscriptions are process-linked. If your process restarted, you need to resubscribe:
# In GenServer init
def init(opts) do
BB.subscribe(MyRobot, [:sensor])
{:ok, %{}}
end
# Also resubscribe after reconnection in LiveView
def handle_info(:reconnected, socket) do
BB.subscribe(socket.assigns.robot, [:sensor])
{:noreply, socket}
endCheck 4: Message Type Matches?
Ensure you're pattern matching correctly:
# This won't match if payload is different type
def handle_info({:bb, [:sensor, _], %{payload: %JointState{}}}, state)
# More permissive
def handle_info({:bb, [:sensor, _], %{payload: payload}}, state) do
IO.inspect(payload, label: "Received")
{:noreply, state}
endIssue: Messages Delayed
Check 1: Mailbox Size
{:message_queue_len, len} = Process.info(self(), :message_queue_len)
IO.puts("Mailbox has #{len} messages")If mailbox is growing:
- Process messages faster
- Add selective receive
- Consider sampling/throttling
Check 2: Slow Handler
Profile your handler:
def handle_info({:bb, path, msg}, state) do
{time, result} = :timer.tc(fn -> process_message(msg, state) end)
if time > 10_000 do # > 10ms
Logger.warning("Slow message processing: #{time}µs for #{inspect(path)}")
end
result
endCheck 3: High-Frequency Publisher
If a sensor publishes too fast:
# Throttle in subscriber
def handle_info({:bb, [:sensor, _], msg}, state) do
now = System.monotonic_time(:millisecond)
if now - state.last_processed > 50 do # Max 20Hz
process_message(msg)
{:noreply, %{state | last_processed: now}}
else
{:noreply, state} # Skip this message
end
endIssue: Duplicate Messages
Check 1: Multiple Subscriptions
# BAD - subscribes twice
def init(opts) do
BB.subscribe(MyRobot, [:sensor])
BB.subscribe(MyRobot, [:sensor, :shoulder]) # Also matches!
{:ok, %{}}
end
# GOOD - subscribe to most specific path
def init(opts) do
BB.subscribe(MyRobot, [:sensor, :shoulder])
{:ok, %{}}
endCheck 2: Multiple Publishers
Check if multiple processes publish to the same path:
# Find all processes that might publish to [:sensor, :shoulder]
Registry.select(BB.Registry, [{{MyRobot, :"$1", :"$2"}, [], [{{:"$1", :"$2"}}]}])
|> Enum.filter(fn {path, _pid} ->
List.starts_with?(path, [:sensor, :shoulder])
end)Issue: Messages Stop
Check 1: Publisher Crashed
# Check if process is alive
case BB.Process.whereis(MyRobot, [:sensor, :shoulder]) do
{:ok, pid} ->
if Process.alive?(pid), do: :running, else: :dead
{:error, _} ->
:not_found
endCheck supervisor logs for crash reasons.
Check 2: Unsubscribed
If your process called BB.unsubscribe/2 or restarted without resubscribing.
Check 3: Robot Stopped
# Check if robot supervisor is running
case Process.whereis(MyRobot.Supervisor) do
nil -> :stopped
pid -> if Process.alive?(pid), do: :running, else: :stopping
endDebugging Techniques
Add Logging
def handle_info({:bb, path, msg}, state) do
Logger.debug("Received: #{inspect(path)} - #{inspect(msg.payload.__struct__)}")
# ... handle message
endTrace Publications
# Wrap BB.publish temporarily
defmodule DebugPublish do
def publish(robot, path, msg) do
IO.puts("PUBLISH: #{inspect(path)}")
BB.publish(robot, path, msg)
end
endCheck Registry State
# All subscriptions for a robot
Registry.lookup(BB.PubSub.Registry, {MyRobot, []})
# => [{pid, path_prefix}, ...]Performance Tuning
Reduce Message Volume
# Only publish on significant change
defp maybe_publish(new_value, state) do
if abs(new_value - state.last_published) > 0.01 do
BB.publish(...)
%{state | last_published: new_value}
else
state
end
endBatch Messages
# Collect readings, publish batch
def handle_info(:flush, state) do
if length(state.buffer) > 0 do
message = JointState.new!(
names: Enum.map(state.buffer, & &1.name),
positions: Enum.map(state.buffer, & &1.position)
)
BB.publish(state.robot, [:sensor, :batch], message)
end
schedule_flush()
{:noreply, %{state | buffer: []}}
endUse Direct Delivery for Low Latency
For actuator commands where PubSub overhead matters:
# Instead of
BB.Actuator.set_position(MyRobot, [:actuator, :servo], position)
# Use direct
BB.Actuator.set_position!(MyRobot, :servo, position)Quick Reference
| Problem | First Check | Solution |
|---|---|---|
| No messages | Path correct? | Subscribe to [], check paths |
| Delayed | Mailbox size? | Profile handler, throttle |
| Duplicates | Multiple subs? | Use most specific path |
| Stopped | Process alive? | Check supervisor, logs |
Related Documentation
- Understanding the PubSub System - Architecture explanation
- Sensors and PubSub - Tutorial
- Reference: Message Types - All message types