Core Concept: The Topic Abstraction
In robotic software architectures, individual nodes handle distinct responsibilities such as perception, planning, or actuation. These components rarely operate in isolation; they rely on robust communication channels to exchange state and commands. Within the Robot Operating System 2 (ROS 2) ecosystem, topics serve as the foundational mechanism for this asynchronous, decoupled data exchange.
Publish-Subscribe Architecture
Topic communication adheres to a publish-subscribe (pub/sub) paradigm. Nodes that generate data act as publishers, while those requiring information operate as subscribers. Each channel is identified by a unique string name and carries payloads conforming to strictly defined message types.
- Many-to-Many Topology: A single topic can support multiple concurernt publishers and subscribers. Developers must implement arbitration strategies to handle potential conflicts when multiple sources broadcast conflicting state updates.
- Asynchronous Execution: Publishers transmit data without blocking or awaiting acknowledgment from subscribers. This fire-and-forget characteristic makes topics ideal for high-frequency telemetry, sensor streams, and continuous control loops. Synchronous operations are typically handled via ROS 2 services instead.
- Message Contracts: Data serialization relies on intermediate representation files (typically ending in
.msg). These definitions abstract away language boundaries, enabling heterogeneous nodes written in C++, Python, or Rust to interoperate seamlessly. This contract-driven approach fosters modular system design.
Implementaiton Example I: Fundamental String Broadcasting
The following demonstration establishes a basic pub/sub pair exchanging standardized string payloads at regular intervals.
Broadcaster Node Logic
import rclpy
from rclpy.node import Node
from std_msgs.msg import String
class StatusBroadcaster(Node):
def __init__(self):
super().__init__('status_broadcaster')
qos_profile = rclpy.qos.QoSProfile(depth=10)
self.publisher_ = self.create_publisher(String, 'system_status', qos_profile)
self._timer = self.create_timer(1.0, self._broadcast_tick)
self._counter = 0
def _broadcast_tick(self):
msg = String()
msg.data = f'Heartbeat sequence {self._counter}'
self.publisher_.publish(msg)
self.get_logger().info(f'Sent: {msg.data}')
self._counter += 1
def main():
rclpy.init()
broadcaster = StatusBroadcaster()
try:
rclpy.spin(broadcaster)
except KeyboardInterrupt:
pass
finally:
broadcaster.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
Listener Node Logic
import rclpy
from rclpy.node import Node
from std_msgs.msg import String
class StatusListener(Node):
def __init__(self):
super().__init__('status_listener')
qos_profile = rclpy.qos.QoSProfile(depth=10)
self.subscription = self.create_subscription(
String, 'system_status', self._process_update, qos_profile
)
def _process_update(self, msg):
self.get_logger().info(f'Received payload: {msg.data}')
def main():
rclpy.init()
listener = StatusListener()
try:
rclpy.spin(listener)
except KeyboardInterrupt:
pass
finally:
listener.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
Implementation Example II: Real-Time Computer Vision Pipeline
Decoupling hardware abstraction from algorithmic processing enhances maintainability. By routing raw camera frames over a dedicated topic, the perception module remains independent of sensor specifications.
Custom Frame Publisher
import rclpy
from rclpy.node import Node
from sensor_msgs.msg import Image
from cv_bridge import CvBridge
import cv2
class CameraFrameBroadcaster(Node):
def __init__(self):
super().__init__('camera_streamer')
self._pub = self.create_publisher(Image, 'raw_video_feed', 10)
self._bridge = CvBridge()
self._capture = cv2.VideoCapture(0)
self._timer = self.create_timer(0.08, self._read_frame)
def _read_frame(self):
success, frame = self._capture.read()
if success:
ros_msg = self._bridge.cv2_to_imgmsg(frame, encoding='bgr8')
self._pub.publish(ros_msg)
def main():
rclpy.init()
streamer = CameraFrameBroadcaster()
rclpy.spin(streamer)
streamer.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
Color-Threshold Detection Listener
import rclpy
from rclpy.node import Node
from sensor_msgs.msg import Image
from cv_bridge import CvBridge
import cv2
import numpy as np
TARGET_LOWER = np.array([0, 90, 128])
TARGET_UPPER = np.array([180, 255, 255])
class ColorTargetTracker(Node):
def __init__(self):
super().__init__('target_tracker')
self._sub = self.create_subscription(
Image, 'raw_video_feed', self._analyze_stream, 10
)
self._bridge = CvBridge()
def _detect_objects(self, bgr_frame):
hsv_space = cv2.cvtColor(bgr_frame, cv2.COLOR_BGR2HSV)
mask = cv2.inRange(hsv_space, tuple(TARGET_LOWER), tuple(TARGET_UPPER))
contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
for region in contours:
area = cv2.contourArea(region)
if area < 150:
continue
x, y, w, h = cv2.boundingRect(region)
cv2.drawContours(bgr_frame, [region], -1, (0, 255, 0), 2)
center_x, center_y = int(x + w / 2), int(y + h / 2)
cv2.circle(bgr_frame, (center_x, center_y), 5, (0, 255, 0), -1)
cv2.imshow('Detection Output', bgr_frame)
cv2.waitKey(1)
def _analyze_stream(self, ros_image):
original_frame = self._bridge.imgmsg_to_cv2(ros_image, desired_encoding='bgr8')
self._detect_objects(original_frame)
def main():
rclpy.init()
tracker = ColorTargetTracker()
rclpy.spin(tracker)
tracker.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
Streamlining Hardware Integration
Custom camera drivers introduce unnecessary maintenance overhead. ROS 2 distributions provide standardized hardware interfaces that abstract device specifics. For USB peripherals, the official utility package handles initialization and frame broadcasting automatically.
# Deploy standardized hardware abstraction
$ sudo apt install ros-humble-usb-cam
# Replace custom broadcaster with managed service
$ ros2 run usb_cam usb_cam_node_exe
$ ros2 run learning_topic target_tracker
Diagnostic Command-Line Utilities
ROS 2 ships with built-in terminal tools for runtime inspection and testing. These utilities bypass application-level debugging by interacting directly with the DDS middleware layer.
| Command | Description |
|---|---|
ros2 topic list |
Dumps active channel names across all connected processes |
ros2 topic info <topic_name> |
Displays publisher count, subscriber count, and message type metadata |
ros2 topic hz <topic_name> |
Measures throughput frequency over a sliding time window |
ros2 topic bw <topic_name> |
Evaluates network bandwidth consumption per second |
ros2 topic echo <topic_name> |
Dumps raw serialized payloads to stdout in real-time |
ros2 topic pub <topic_name> <msg_type> "<args>" |
Injects test messages directly from the shell for quick validation |
Topic-based messaging enforces unidirectional data flow, making it the preferred architecture for continuous streams, telemetry logging, and distributed sensor fusion pipelines.