前言
ROS(Robot Operating System)作为机器人领域最流行的开源软件框架,其核心通信机制是学习ROS的必经之路。 话题(Topic)通信 是ROS中最基础也是最常用的通信方式,采用经典的 发布-订阅(Publish-Subscribe)模式 ,实现了节点间的松耦合异步通信。
本文将通过一个完整的实战项目,深入讲解ROS话题通信的原理、实现方法和最佳实践,包含 标准消息类型 和 自定义消息类型 的完整代码实现。
一、ROS话题通信原理
1.1 核心概念
1.2 工作机制
通信流程 :
1. 发布者通过 ros::advertise() 向ROS Master注册话题
2. 订阅者通过 ros::subscribe() 向ROS Master注册订阅
3. ROS Master建立发布者和订阅者之间的连接
4. 发布者通过 publish() 方法发布消息
5. 订阅者通过回调函数接收并处理消息
1.3 特点优势
- 异步通信 :发布者和订阅者无需同步等待
- 松耦合 :发布者和订阅者互相独立,无需知道对方存在
- 多对多 :一个话题可由多个节点发布,也可被多个节点订阅
- 实时性 :适合周期性数据传输,如传感器数据
二、项目环境搭建
2.1 系统环境
2.2 创建工作空间
# 创建目录结构 mkdir -p ~/catkin_ws/src cd ~/catkin_ws/src # 初始化工作空间 catkin_init_workspace # 编译工作空间 cd ~/catkin_ws catkin_make # 配置环境变量 source devel/setup.bash2.3 创建功能包
cd ~/catkin_ws/src catkin_create_pkg topic_demo std_msgs rospy roscpp message_generation cd ~/catkin_ws catkin_make source devel/setup.bash功能包依赖说明 :
- std_msgs :标准消息类型库
- rospy :Python ROS客户端库
- roscpp :C++ ROS客户端库
- message_generation :自定义消息生成工具
三、自定义消息类型
3.1 创建消息文件
在 topic_demo/msg/ 目录下创建自定义消息:
3.2 配置package.xml
<?xml version="1.0"?> <package format="2"> <name>topic_demo</name> <version>1.0.0</version> <description>ROS Topic Communication Demonstration</description> <maintainer email="student@robotics.edu">24级人工智能2班</maintainer> <license>BSD</license> <buildtool_depend>catkin</buildtool_depend> <build_depend>roscpp</build_depend> <build_depend>rospy</build_depend> <build_depend>std_msgs</build_depend> <build_depend>message_generation</build_depend> <exec_depend>roscpp</exec_depend> <exec_depend>rospy</exec_depend> <exec_depend>std_msgs</exec_depend> <exec_depend>message_runtime</exec_depend> </package>3.3 配置CMakeLists.txt
cmake_minimum_required(VERSION 3.0.2) project(topic_demo) # 查找依赖包 find_package(catkin REQUIRED COMPONENTS roscpp rospy std_msgs message_generation ) # 添加自定义消息文件 add_message_files( FILES Person.msg RobotState.msg ) # 生成消息 generate_messages( DEPENDENCIES std_msgs ) # 配置catkin包 catkin_package( CATKIN_DEPENDS roscpp rospy std_msgs message_runtime ) # 包含目录 include_directories( ${catkin_INCLUDE_DIRS} ) # 编译C++节点 add_executable(talker src/talker.cpp) target_link_libraries(talker ${catkin_LIBRARIES}) add_dependencies(talker ${${PROJECT_NAME}_EXPORTED_TARGETS}) add_executable(listener src/listener.cpp) target_link_libraries(listener ${catkin_LIBRARIES}) add_dependencies(listener ${${PROJECT_NAME}_EXPORTED_TARGETS})3.4 编译消息
cd ~/catkin_ws catkin_make source devel/setup.bash编译成功后,系统会自动生成消息的头文件和Python模块:
- C++头文件: devel/include/topic_demo/Person.h 、 devel/include/topic_demo/RobotState.h
- Python模块: devel/lib/python3/dist-packages/topic_demo/msg/__init__.py
四、C++实现
4.1 发布者节点(talker.cpp)
#include "ros/ros.h" #include "std_msgs/String.h" #include "std_msgs/Int32.h" #include "std_msgs/Float64.h" #include "topic_demo/Person.h" #include "topic_demo/RobotState.h" #include <sstream> int main(int argc, char **argv) { // 初始化节点,节点名为"talker" ros::init(argc, argv, "talker"); // 创建节点句柄 ros::NodeHandle n; // 创建发布者,向5个话题发布消息 ros::Publisher chatter_pub = n.advertise<std_msgs::String>("chatter", 1000); ros::Publisher counter_pub = n.advertise<std_msgs::Int32>("counter", 1000); ros::Publisher temperature_pub = n.advertise<std_msgs::Float64>("temperature", 1000); ros::Publisher person_pub = n.advertise<topic_demo::Person>("person_info", 1000); ros::Publisher robot_state_pub = n.advertise<topic_demo::RobotState>("robot_state", 1000); // 设置发布频率为10Hz ros::Rate loop_rate(10); int count = 0; // 主循环,持续发布消息 while (ros::ok()) { // 1. 发布字符串消息 std_msgs::String str_msg; std::stringstream ss; ss << "Hello ROS! Message #" << count; str_msg.data = ss.str(); ROS_INFO("[发布] chatter: %s", str_msg.data.c_str()); chatter_pub.publish(str_msg); // 2. 发布整数计数器 std_msgs::Int32 int_msg; int_msg.data = count; ROS_INFO("[发布] counter: %d", int_msg.data); counter_pub.publish(int_msg); // 3. 发布模拟温度数据(正弦波动) std_msgs::Float64 float_msg; float_msg.data = 25.0 + sin(count * 0.5) * 5.0; ROS_INFO("[发布] temperature: %.2f", float_msg.data); temperature_pub.publish(float_msg); // 4. 发布自定义Person消息 topic_demo::Person person_msg; person_msg.name = "Student_" + std::to_string(count % 10); person_msg.age = 18 + (count % 5); person_msg.height = 1.70 + (count % 10) * 0.02; person_msg.address = "Room_" + std::to_string(count % 100); person_msg.gender = count % 2; person_msg.is_student = true; ROS_INFO("[发布] person: name=%s, age=%d, height=%.2f", person_msg.name.c_str(), person_msg.age, person_msg.height); person_pub.publish(person_msg); // 5. 发布自定义RobotState消息 topic_demo::RobotState robot_msg; robot_msg.x = count * 0.1; robot_msg.y = count * 0.05; robot_msg.theta = count * 0.1; robot_msg.linear_velocity = 0.5; robot_msg.angular_velocity = 0.2; robot_msg.is_moving = true; robot_msg.battery_level = "85%"; ROS_INFO("[发布] robot: x=%.2f, y=%.2f, theta=%.2f", robot_msg.x, robot_msg.y, robot_msg.theta); robot_state_pub.publish(robot_msg); // 处理回调 ros::spinOnce(); // 按频率休眠 loop_rate.sleep(); ++count; } return 0; }代码解析 :
- ros::init() :初始化ROS节点
- ros::NodeHandle :创建节点句柄,用于管理节点资源
- ros::Publisher :创建发布者对象,参数为话题名和队列大小
- ros::Rate :控制发布频率
- publish() :发布消息
- ros::spinOnce() :处理一次回调队列
4.2 订阅者节点(listener.cpp)
#include "ros/ros.h" #include "std_msgs/String.h" #include "std_msgs/Int32.h" #include "std_msgs/Float64.h" #include "topic_demo/Person.h" #include "topic_demo/RobotState.h" // 字符串消息回调函数 void chatterCallback(const std_msgs::String::ConstPtr& msg) { ROS_INFO("[订阅] chatter: %s", msg->data.c_str()); } // 整数消息回调函数 void counterCallback(const std_msgs::Int32::ConstPtr& msg) { ROS_INFO("[订阅] counter: %d", msg->data); } // 浮点消息回调函数 void temperatureCallback(const std_msgs::Float64::ConstPtr& msg) { ROS_INFO("[订阅] temperature: %.2f", msg->data); } // Person消息回调函数 void personCallback(const topic_demo::Person::ConstPtr& msg) { ROS_INFO("[订阅] Person:"); ROS_INFO(" 姓名: %s", msg->name.c_str()); ROS_INFO(" 年龄: %d", msg->age); ROS_INFO(" 身高: %.2f m", msg->height); ROS_INFO(" 地址: %s", msg->address.c_str()); ROS_INFO(" 性别: %s", msg->gender == 0 ? "男" : "女"); ROS_INFO(" 学生: %s", msg->is_student ? "是" : "否"); } // RobotState消息回调函数 void robotStateCallback(const topic_demo::RobotState::ConstPtr& msg) { ROS_INFO("[订阅] RobotState:"); ROS_INFO(" 位置: (%.2f, %.2f)", msg->x, msg->y); ROS_INFO(" 航向角: %.2f rad", msg->theta); ROS_INFO(" 线速度: %.2f m/s", msg->linear_velocity); ROS_INFO(" 角速度: %.2f rad/s", msg->angular_velocity); ROS_INFO(" 移动中: %s", msg->is_moving ? "是" : "否"); ROS_INFO(" 电池电量: %s", msg->battery_level.c_str()); } int main(int argc, char **argv) { // 初始化节点,节点名为"listener" ros::init(argc, argv, "listener"); // 创建节点句柄 ros::NodeHandle n; // 创建订阅者,订阅5个话题 ros::Subscriber sub_chatter = n.subscribe("chatter", 1000, chatterCallback); ros::Subscriber sub_counter = n.subscribe("counter", 1000, counterCallback); ros::Subscriber sub_temp = n.subscribe("temperature", 1000, temperatureCallback); ros::Subscriber sub_person = n.subscribe("person_info", 1000, personCallback); ros::Subscriber sub_robot = n.subscribe("robot_state", 1000, robotStateCallback); ROS_INFO("订阅者节点已启动,等待接收消息..."); // 进入事件循环,持续处理回调 ros::spin(); return 0; }代码解析 :
- ros::Subscriber :创建订阅者对象,参数为话题名、队列大小和回调函数
- 回调函数:当收到消息时自动调用,参数为消息的智能指针
- ros::spin() :进入事件循环,持续处理回调,直到节点关闭
五、Python实现
5.1 发布者节点(talker.py)
#!/usr/bin/env python3 import rospy import math from std_msgs.msg import String, Int32, Float64 from topic_demo.msg import Person, RobotState def talker(): # 初始化节点 rospy.init_node('talker', anonymous=True) # 创建发布者 pub_chatter = rospy.Publisher('chatter', String, queue_size=1000) pub_counter = rospy.Publisher('counter', Int32, queue_size=1000) pub_temp = rospy.Publisher('temperature', Float64, queue_size=1000) pub_person = rospy.Publisher('person_info', Person, queue_size=1000) pub_robot = rospy.Publisher('robot_state', RobotState, queue_size=1000) # 设置发布频率 rate = rospy.Rate(10) count = 0 rospy.loginfo("Python发布者节点已启动") while not rospy.is_shutdown(): # 发布字符串消息 pub_chatter.publish(String(data=f"Hello ROS! Message #{count}")) # 发布整数消息 pub_counter.publish(Int32(data=count)) # 发布浮点消息 pub_temp.publish(Float64(data=25.0 + math.sin(count * 0.5) * 5.0)) # 发布Person消息 person = Person() person.name = f"Student_{count % 10}" person.age = 18 + (count % 5) person.height = 1.70 + (count % 10) * 0.02 person.address = f"Room_{count % 100}" person.gender = count % 2 person.is_student = True pub_person.publish(person) # 发布RobotState消息 robot = RobotState() robot.x = count * 0.1 robot.y = count * 0.05 robot.theta = count * 0.1 robot.linear_velocity = 0.5 robot.angular_velocity = 0.2 robot.is_moving = True robot.battery_level = "85%" pub_robot.publish(robot) rospy.loginfo(f"已发布第 {count} 组消息") rate.sleep() count += 1 if __name__ == '__main__': try: talker() except rospy.ROSInterruptException: rospy.loginfo("发布者节点被中断")5.2 订阅者节点(listener.py)
#!/usr/bin/env python3 import rospy from std_msgs.msg import String, Int32, Float64 from topic_demo.msg import Person, RobotState def chatter_callback(msg): rospy.loginfo(f"[订阅] chatter: {msg.data}") def counter_callback(msg): rospy.loginfo(f"[订阅] counter: {msg.data}") def temperature_callback(msg): rospy.loginfo(f"[订阅] temperature: {msg.data:.2f}") def person_callback(msg): rospy.loginfo("[订阅] Person:") rospy.loginfo(f" 姓名: {msg.name}") rospy.loginfo(f" 年龄: {msg.age}") rospy.loginfo(f" 身高: {msg.height:.2f} m") rospy.loginfo(f" 地址: {msg.address}") rospy.loginfo(f" 性别: {'男' if msg.gender == 0 else '女'}") rospy.loginfo(f" 学生: {'是' if msg.is_student else '否'}") def robot_state_callback(msg): rospy.loginfo("[订阅] RobotState:") rospy.loginfo(f" 位置: ({msg.x:.2f}, {msg.y:.2f})") rospy.loginfo(f" 航向角: {msg.theta:.2f} rad") rospy.loginfo(f" 线速度: {msg.linear_velocity:.2f} m/s") rospy.loginfo(f" 角速度: {msg.angular_velocity:.2f} rad/s") rospy.loginfo(f" 移动中: {'是' if msg.is_moving else '否'}") rospy.loginfo(f" 电池电量: {msg.battery_level}") def listener(): rospy.init_node('listener', anonymous=True) rospy.Subscriber('chatter', String, chatter_callback) rospy.Subscriber('counter', Int32, counter_callback) rospy.Subscriber('temperature', Float64, temperature_callback) rospy.Subscriber('person_info', Person, person_callback) rospy.Subscriber('robot_state', RobotState, robot_state_callback) rospy.loginfo("Python订阅者节点已启动,等待接收消息...") rospy.spin() if __name__ == '__main__': try: listener() except rospy.ROSInterruptException: rospy.loginfo("订阅者节点被中断")六、运行与测试
6.1 编译项目
cd ~/catkin_ws catkin_make source devel/setup.bash6.2 方式一:使用launch文件启动
创建 launch/topic_demo.launch :
<launch> <node name="talker" pkg="topic_demo" type="talker" output="screen"/> <node name="listener" pkg="topic_demo" type="listener" output="screen"/> </launch>启动命令:
roslaunch topic_demo topic_demo.launch6.3 方式二:分步运行
终端1 - 启动ROS核心 :
roscore终端2 - 运行C++发布者 :
rosrun topic_demo talker终端3 - 运行C++订阅者 :
rosrun topic_demo listener终端4 - 运行Python发布者 :
rosrun topic_demo talker.py终端5 - 运行Python订阅者 :
rosrun topic_demo listener.py6.4 运行结果
发布者终端输出:
[ INFO] [1620000000.000000]: [发布] chatter: Hello ROS! Message #0 [ INFO] [1620000000.000001]: [发布] counter: 0 [ INFO] [1620000000.000002]: [发布] temperature: 25.00 [ INFO] [1620000000.000003]: [发布] person: name=Student_0, age=18, height=1.70 [ INFO] [1620000000.000004]: [发布] robot: x=0.00, y=0.00, theta=0.00 [ INFO] [1620000000.100000]: [发布] chatter: Hello ROS! Message #1 [ INFO] [1620000000.100001]: [发布] counter: 1 [ INFO] [1620000000.100002]: [发布] temperature: 27.48 [ INFO] [1620000000.100003]: [发布] person: name=Student_1, age=19, height=1.72 [ INFO] [1620000000.100004]: [发布] robot: x=0.10, y=0.05, theta=0.10 ...订阅者终端输出 :
[ INFO] [1620000000.000000]: 订阅者节点已启动,等待接收消息... [ INFO] [1620000000.000001]: [订阅] chatter: Hello ROS! Message #0 [ INFO] [1620000000.000002]: [订阅] counter: 0 [ INFO] [1620000000.000003]: [订阅] temperature: 25.00 [ INFO] [1620000000.000004]: [订阅] Person: [ INFO] [1620000000.000005]: 姓名: Student_0 [ INFO] [1620000000.000006]: 年龄: 18 [ INFO] [1620000000.000007]: 身高: 1.70 m [ INFO] [1620000000.000008]: 地址: Room_0 [ INFO] [1620000000.000009]: 性别: 男 [ INFO] [1620000000.000010]: 学生: 是 [ INFO] [1620000000.000011]: [订阅] RobotState: [ INFO] [1620000000.000012]: 位置: (0.00, 0.00) [ INFO] [1620000000.000013]: 航向角: 0.00 rad [ INFO] [1620000000.000014]: 线速度: 0.50 m/s [ INFO] [1620000000.000015]: 角速度: 0.20 rad/s [ INFO] [1620000000.000016]: 移动中: 是 [ INFO] [1620000000.000017]: 电池电量: 85% ...七、话题管理工具
7.1 查看所有话题
rostopic list输出:
/chatter /counter /temperature /person_info /robot_state /rosout /rosout_agg7.2 查看话题详情
rostopic info /person_info输出:
Type: topic_demo/Person Publishers: * /talker (http://localhost:11311/) Subscribers: * /listener (http://localhost:11311/)7.3 查看消息内容
rostopic echo /chatter7.4 查看消息类型定义
rosmsg show topic_demo/Person输出:
string name uint8 age float32 height string address uint8 gender bool is_student7.5 查看话题发布频率
rostopic hz /chatter输出:
subscribed to [/chatter] average rate: 10.001 min: 0.099s max: 0.101s std dev: 0.00033s window: 10八、技术要点总结
8.1 C++与Python对比
8.2 最佳实践
1. 消息设计原则 :
- 字段命名清晰,语义明确
- 避免过深的嵌套结构
- 优先使用标准消息类型
2. 队列大小设置 :
- 根据发布频率和处理能力设置
- 过小可能导致消息丢失
- 过大可能增加内存占用
3. 命名规范 :
- 话题名使用小写字母和下划线
- 使用命名空间区分功能模块
4. 错误处理 :
- 检查 ros::ok() 状态
- 添加日志输出便于调试
- 处理异常情况
8.3 常见问题
九、总结
通过本文的实战项目,我们系统学习了ROS话题通信的完整流程:
1. 原理理解 :掌握了发布-订阅模式的工作机制
2. 消息定义 :学会了自定义消息类型的创建和配置
3. 代码实现 :掌握了C++和Python两种语言的发布者/订阅者编写
4. 调试技巧 :熟悉了rostopic、rosmsg等工具的使用
5. 最佳实践 :了解了话题通信的设计原则和注意事项
话题通信是ROS中最基础也是最重要的通信方式,广泛应用于传感器数据传输、机器人状态发布、控制指令下发等场景。掌握好这一知识点,对于后续学习服务通信、动作通信等高级内容至关重要。
附录:项目文件结构
catkin_ws/ ├── src/ │ └── topic_demo/ │ ├── CMakeLists.txt │ ├── package.xml │ ├── launch/ │ │ └── topic_demo.launch │ ├── msg/ │ │ ├── Person.msg │ │ └── RobotState.msg │ ├── scripts/ │ │ ├── talker.py │ │ └── listener.py │ └── src/ │ ├── talker.cpp │ └── listener.cpp ├── build/ └── devel/