ROS系统介绍
机器人运行系统(Robot Operating System,ROS)是机器人领域的开源开发框架,提供通信中间件、工具链和算法库,支持模块化开发,适用于无人机、机械臂等平台。其核心优势包括分布式通信(话题/服务)、仿真工具(Gazebo/Rviz)及全球开发者生态,成为科研和工业原型开发的主流工具,被称为机器人开发领域的“安卓系统”。ROS并非完整操作系统,需依赖Linux/Windows运行,标准版实时性有限,而ROS 2通过DDS通信提升了实时性和安全性。
添加图片注释,不超过 140 字(可选)
基于ROS的Canny边缘检测
本案例分析一个在ROS系统中实现边缘检测的案例,案例旨在帮助初学者学会部署ROS,并实现基本的开发。下面从案例项目背景、需求、基本要素、详细实现过程进行具体展示。
Ros 系统Noetic版本中部署Canny边缘检测算法的Rviz显示
项目需求
在ROS系统的Noetic版本中基于Python部署Canny边缘检测,对KITTI车载摄像头视角的视频图像进行检测,并将结果显示在Rviz窗口里面,要求左边为摄像头采集的图像,右侧为边缘检测结果,同时能够实时的调整检测算法的参数。
基础条件
- Linux系统:ROS系统是基于Linux系统开发,因此首先需要解决操作系统配置。Linux系统可以直接部署ROS,Windos系统可通过安装VMware软件安装虚拟机,在虚拟机中安装Linux系统。具体安装过程可参照机器人阿杰的指导视频,本项目中VMware软件使用17.5版本,linux使用的是Ubuntu 20.04 LTS。
- Conda环境:需要提前部署好Conda环境,Conda环境使用Python==3.6版本。
- KITTI数据集:本案例所使用的验证数据KITTI是由德国卡尔斯鲁厄理工学院推出的自动驾驶领域权威数据集,包含车载摄像头、激光雷达和GPS采集的真实道路场景数据,涵盖立体视觉、光流、3D检测等任务,是算法评测的基准平台。
- Rviz显示平台:Rviz是ROS的可视化工具,支持机器人传感器数据、模型和算法的3D实时显示与调试,实现检测结果的展示。
- CV2检测模块:(OpenCV-Python)是计算机视觉核心库,提供图像处理、特征提取和机器学习算法接口,基于OpenCV-Python实现Canny边缘检测。
实现路径:
基于先完整再完美的解决思路。在本案例第一步,搭建基础框架,安装ROS系统,配置基础环境;第二步,贯通功能模块,联动数据,明确结果存储;第三步,叠加功能,优化性能,提高效率。
- 安装基础包:在创建好的Conda虚拟环境中安装如下功能包。
opencv-python==4.11.0.86
kitti2bag==1.5
- 数据转换:KITTI转rosbag适配ROS生态,实现多传感器同步与算法验证。
# Open terminal.
# 先在Kitti官网上下载kitti数据集,将其解压到指定位置
cd kitti
kitti2bag -t 2011_09_26 -r 0011 raw_synced
- 安装ROS并创建项目路径
# Open the first terminal.
cd ros_yanzc
catkin_make
- 切换到ROS工作空间目录并运行roscore.
# Open the second terminal.
cd ros_yanzc
roscore
- 切换到ROS工作空间目录并运行边缘检测的Python脚本。
# Open the third terminal.
source ~/ros_yanzc/devel/setup.bash
rosrun detros dete_cte.py -i ~/ros_yanzc/src/detros/kitti_data/kitti_2011_09_26_drive_0011_synced.bag -o ~/ros_yanzc/src/detros/kitti_output/
- 进入工作空间目录并启动RViz可视化工具
# Open the fouth terminal.
source ~/ros_yanzc/devel/setup.bash
rosrun rviz rviz -d ~/ros_yanzc/src/detros/config/image_show.rviz
边缘检测的代码实现过程:
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import sys
import rospy
import numpy as np
import cv2
from cv_bridge import CvBridge
from sensor_msgs.msg import Image
import rosbag
import os
import argparse
class EdgeDetectorNode:
def __init__(self):
# ROS 初始化
rospy.init_node('edge_detector_node')
# 创建 CV 桥接
self.bridge = CvBridge()
# 参数初始化
self.gaussian_kernel_size = rospy.get_param('~gaussian_kernel_size', 5)
self.threshold1 = rospy.get_param('~threshold1', 50)
self.threshold2 = rospy.get_param('~threshold2', 150)
# 图像变量
self.original_image = None
self.edges = None
# 创建发布者
self.original_pub = rospy.Publisher('/left_camera/image_raw', Image, queue_size=1)
self.edges_pub = rospy.Publisher('/canny_edge_image', Image, queue_size=1)
# 创建 OpenCV 窗口
cv2.namedWindow("Edge Detection", cv2.WINDOW_NORMAL)
cv2.createTrackbar("Gaussian Kernel", "Edge Detection",
self.gaussian_kernel_size, 31,
self.gaussian_parameter_callback)
cv2.setTrackbarMin("Gaussian Kernel", "Edge Detection", 3)
cv2.createTrackbar("Threshold1", "Edge Detection",
self.threshold1, 255,
self.threshold1_callback)
cv2.createTrackbar("Threshold2", "Edge Detection",
self.threshold2, 255,
self.threshold2_callback)
rospy.loginfo("Edge Detector Node 已启动")
def gaussian_parameter_callback(self, val):
"""高斯核大小回调函数"""
self.gaussian_kernel_size = max(3, val if val % 2 == 1 else val - 1)
self.process_image()
def threshold1_callback(self, val):
"""阈值1回调函数"""
self.threshold1 = val
self.process_image()
def threshold2_callback(self, val):
"""阈值2回调函数"""
self.threshold2 = val
self.process_image()
def process_image(self):
"""处理当前图像"""
if self.original_image is None:
return
# 高斯滤波
blurred = cv2.GaussianBlur(
self.original_image,
(self.gaussian_kernel_size, self.gaussian_kernel_size),
0)
# Canny 边缘检测
self.edges = cv2.Canny(blurred, self.threshold1, self.threshold2)
# 准备显示图像
if len(self.original_image.shape) == 2:
original_display = cv2.cvtColor(self.original_image, cv2.COLOR_GRAY2BGR)
else:
original_display = self.original_image.copy()
edges_display = cv2.cvtColor(self.edges, cv2.COLOR_GRAY2BGR)
# 添加参数信息
font = cv2.FONT_HERSHEY_SIMPLEX
param_text = (f"Gaussian: {self.gaussian_kernel_size}x{self.gaussian_kernel_size}, "
f"Thresholds: {self.threshold1}/{self.threshold2}")
cv2.putText(original_display, param_text, (10, 30),
font, 0.7, (0, 255, 255), 2)
# 合并图像
combined = np.hstack((original_display, edges_display))
cv2.imshow("Edge Detection", combined)
cv2.waitKey(1)
# 发布图像到ROS话题
try:
original_msg = self.bridge.cv2_to_imgmsg(self.original_image, "bgr8")
edges_msg = self.bridge.cv2_to_imgmsg(self.edges, "mono8")
self.original_pub.publish(original_msg)
self.edges_pub.publish(edges_msg)
except Exception as e:
rospy.logerr(f"发布图像时出错: {str(e)}")
def process_bag_file(self, bag_path, output_dir):
"""处理 rosbag 文件带1秒间隔和实时参数调整"""
try:
bag = rosbag.Bag(bag_path)
os.makedirs(output_dir, exist_ok=True)
rate = rospy.Rate(1) # 1Hz,即每秒处理1张
# 添加退出标志
should_exit = False
for topic, msg, t in bag.read_messages():
if should_exit:
break
if "image" in topic.lower():
try:
start_time = rospy.get_time() # 记录开始时间
# 转换图像
cv_image = self.bridge.imgmsg_to_cv2(msg, desired_encoding="passthrough")
if cv_image.dtype == np.uint16:
cv_image = cv2.normalize(cv_image, None, 0, 255, cv2.NORM_MINMAX, dtype=cv2.CV_8U)
self.original_image = cv_image
self.process_image()
# 自动保存
timestamp = t.to_nsec()
original_path = os.path.join(output_dir, f"original_{timestamp}.png")
edges_path = os.path.join(output_dir, f"edges_{timestamp}.png")
cv2.imwrite(original_path, self.original_image)
cv2.imwrite(edges_path, self.edges)
rospy.loginfo(f"图像已保存到: {edges_path}")
# 实时参数调整处理
while (rospy.get_time() - start_time) < 0.50: # 确保总共耗时1秒
key = cv2.waitKey(100) & 0xFF # 100ms检测一次
# 检查轨迹栏数值变化
new_gaussian = cv2.getTrackbarPos("Gaussian Kernel", "Edge Detection")
new_threshold1 = cv2.getTrackbarPos("Threshold1", "Edge Detection")
new_threshold2 = cv2.getTrackbarPos("Threshold2", "Edge Detection")
# 如果参数有变化,更新并重新处理
if (new_gaussian != self.gaussian_kernel_size or
new_threshold1 != self.threshold1 or
new_threshold2 != self.threshold2):
self.gaussian_kernel_size = max(3, new_gaussian if new_gaussian % 2 == 1 else new_gaussian - 1)
self.threshold1 = new_threshold1
self.threshold2 = new_threshold2
self.process_image()
cv2.imwrite(edges_path, self.edges)
rospy.loginfo(f"参数已更新并重新保存: {edges_path}")
if key == 27: # ESC键退出
should_exit = True
rospy.loginfo("用户中断处理过程")
break
if should_exit:
break
except Exception as e:
rospy.logerr(f"处理图像时出错: {str(e)}")
continue
except Exception as e:
rospy.logerr(f"打开或读取 bag 文件时出错: {str(e)}")
finally:
bag.close()
cv2.destroyAllWindows()
if __name__ == '__main__':
try:
# 解析命令行参数
parser = argparse.ArgumentParser(description="从 ROS bag 文件中读取图像并执行边缘检测")
parser.add_argument("-i", "--input", required=True, help="输入 ROS bag 文件路径")
parser.add_argument("-o", "--output", required=True, help="输出目录路径")
args = parser.parse_args()
# 创建节点并处理 bag 文件
node = EdgeDetectorNode()
node.process_bag_file(args.input, args.output)
except rospy.ROSInterruptException:
pass
完整项目文件:
具体的项目文件可通过ROS_Canny链接下载。
运行视频
基本的实现过程如上,具体的操作视频如下:
上传视频封面
详细过程请参照我的知乎主页「链接」