Mastering ROS 2 Topic-Based Inter-Process Communication

Core Concept: The Topic Abstraction

In robotic software architectures, individual nodes handle distinct responsibilities such as perception, planning, or actuation. These components rarely operate in isolation; they rely on robust communication channels to exchange state and commands. Within the Robot Operating System 2 (ROS 2) ecosystem, topics serve as the foundational mechanism for this asynchronous, decoupled data exchange.

Publish-Subscribe Architecture

Topic communication adheres to a publish-subscribe (pub/sub) paradigm. Nodes that generate data act as publishers, while those requiring information operate as subscribers. Each channel is identified by a unique string name and carries payloads conforming to strictly defined message types.

  • Many-to-Many Topology: A single topic can support multiple concurernt publishers and subscribers. Developers must implement arbitration strategies to handle potential conflicts when multiple sources broadcast conflicting state updates.
  • Asynchronous Execution: Publishers transmit data without blocking or awaiting acknowledgment from subscribers. This fire-and-forget characteristic makes topics ideal for high-frequency telemetry, sensor streams, and continuous control loops. Synchronous operations are typically handled via ROS 2 services instead.
  • Message Contracts: Data serialization relies on intermediate representation files (typically ending in .msg). These definitions abstract away language boundaries, enabling heterogeneous nodes written in C++, Python, or Rust to interoperate seamlessly. This contract-driven approach fosters modular system design.

Implementaiton Example I: Fundamental String Broadcasting

The following demonstration establishes a basic pub/sub pair exchanging standardized string payloads at regular intervals.

Broadcaster Node Logic

import rclpy
from rclpy.node import Node
from std_msgs.msg import String

class StatusBroadcaster(Node):
    def __init__(self):
        super().__init__('status_broadcaster')
        qos_profile = rclpy.qos.QoSProfile(depth=10)
        self.publisher_ = self.create_publisher(String, 'system_status', qos_profile)
        self._timer = self.create_timer(1.0, self._broadcast_tick)
        self._counter = 0

    def _broadcast_tick(self):
        msg = String()
        msg.data = f'Heartbeat sequence {self._counter}'
        self.publisher_.publish(msg)
        self.get_logger().info(f'Sent: {msg.data}')
        self._counter += 1

def main():
    rclpy.init()
    broadcaster = StatusBroadcaster()
    try:
        rclpy.spin(broadcaster)
    except KeyboardInterrupt:
        pass
    finally:
        broadcaster.destroy_node()
        rclpy.shutdown()

if __name__ == '__main__':
    main()

Listener Node Logic

import rclpy
from rclpy.node import Node
from std_msgs.msg import String

class StatusListener(Node):
    def __init__(self):
        super().__init__('status_listener')
        qos_profile = rclpy.qos.QoSProfile(depth=10)
        self.subscription = self.create_subscription(
            String, 'system_status', self._process_update, qos_profile
        )

    def _process_update(self, msg):
        self.get_logger().info(f'Received payload: {msg.data}')

def main():
    rclpy.init()
    listener = StatusListener()
    try:
        rclpy.spin(listener)
    except KeyboardInterrupt:
        pass
    finally:
        listener.destroy_node()
        rclpy.shutdown()

if __name__ == '__main__':
    main()

Implementation Example II: Real-Time Computer Vision Pipeline

Decoupling hardware abstraction from algorithmic processing enhances maintainability. By routing raw camera frames over a dedicated topic, the perception module remains independent of sensor specifications.

Custom Frame Publisher

import rclpy
from rclpy.node import Node
from sensor_msgs.msg import Image
from cv_bridge import CvBridge
import cv2

class CameraFrameBroadcaster(Node):
    def __init__(self):
        super().__init__('camera_streamer')
        self._pub = self.create_publisher(Image, 'raw_video_feed', 10)
        self._bridge = CvBridge()
        self._capture = cv2.VideoCapture(0)
        self._timer = self.create_timer(0.08, self._read_frame)

    def _read_frame(self):
        success, frame = self._capture.read()
        if success:
            ros_msg = self._bridge.cv2_to_imgmsg(frame, encoding='bgr8')
            self._pub.publish(ros_msg)

def main():
    rclpy.init()
    streamer = CameraFrameBroadcaster()
    rclpy.spin(streamer)
    streamer.destroy_node()
    rclpy.shutdown()

if __name__ == '__main__':
    main()

Color-Threshold Detection Listener

import rclpy
from rclpy.node import Node
from sensor_msgs.msg import Image
from cv_bridge import CvBridge
import cv2
import numpy as np

TARGET_LOWER = np.array([0, 90, 128])
TARGET_UPPER = np.array([180, 255, 255])

class ColorTargetTracker(Node):
    def __init__(self):
        super().__init__('target_tracker')
        self._sub = self.create_subscription(
            Image, 'raw_video_feed', self._analyze_stream, 10
        )
        self._bridge = CvBridge()

    def _detect_objects(self, bgr_frame):
        hsv_space = cv2.cvtColor(bgr_frame, cv2.COLOR_BGR2HSV)
        mask = cv2.inRange(hsv_space, tuple(TARGET_LOWER), tuple(TARGET_UPPER))
        
        contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
        
        for region in contours:
            area = cv2.contourArea(region)
            if area < 150:
                continue
            
            x, y, w, h = cv2.boundingRect(region)
            cv2.drawContours(bgr_frame, [region], -1, (0, 255, 0), 2)
            center_x, center_y = int(x + w / 2), int(y + h / 2)
            cv2.circle(bgr_frame, (center_x, center_y), 5, (0, 255, 0), -1)
        
        cv2.imshow('Detection Output', bgr_frame)
        cv2.waitKey(1)

    def _analyze_stream(self, ros_image):
        original_frame = self._bridge.imgmsg_to_cv2(ros_image, desired_encoding='bgr8')
        self._detect_objects(original_frame)

def main():
    rclpy.init()
    tracker = ColorTargetTracker()
    rclpy.spin(tracker)
    tracker.destroy_node()
    rclpy.shutdown()

if __name__ == '__main__':
    main()

Streamlining Hardware Integration

Custom camera drivers introduce unnecessary maintenance overhead. ROS 2 distributions provide standardized hardware interfaces that abstract device specifics. For USB peripherals, the official utility package handles initialization and frame broadcasting automatically.

# Deploy standardized hardware abstraction
$ sudo apt install ros-humble-usb-cam

# Replace custom broadcaster with managed service
$ ros2 run usb_cam usb_cam_node_exe
$ ros2 run learning_topic target_tracker

Diagnostic Command-Line Utilities

ROS 2 ships with built-in terminal tools for runtime inspection and testing. These utilities bypass application-level debugging by interacting directly with the DDS middleware layer.

Command Description
ros2 topic list Dumps active channel names across all connected processes
ros2 topic info <topic_name> Displays publisher count, subscriber count, and message type metadata
ros2 topic hz <topic_name> Measures throughput frequency over a sliding time window
ros2 topic bw <topic_name> Evaluates network bandwidth consumption per second
ros2 topic echo <topic_name> Dumps raw serialized payloads to stdout in real-time
ros2 topic pub <topic_name> <msg_type> "<args>" Injects test messages directly from the shell for quick validation

Topic-based messaging enforces unidirectional data flow, making it the preferred architecture for continuous streams, telemetry logging, and distributed sensor fusion pipelines.

Tags: ROS2 topics pub-sub rclpy sensor-msgs

Posted on Sun, 24 May 2026 18:41:24 +0000 by Teddy B.