莫方教程网

专业程序员编程教程与实战案例分享

ROS系统实现Canny边缘检测

ROS系统介绍

机器人运行系统(Robot Operating System,ROS)是机器人领域的开源开发框架,提供通信中间件、工具链和算法库,支持模块化开发,适用于无人机、机械臂等平台。其核心优势包括分布式通信(话题/服务)、仿真工具(Gazebo/Rviz)及全球开发者生态,成为科研和工业原型开发的主流工具,被称为机器人开发领域的“安卓系统”。ROS并非完整操作系统,需依赖Linux/Windows运行,标准版实时性有限,而ROS 2通过DDS通信提升了实时性和安全性。

添加图片注释,不超过 140 字(可选)

基于ROS的Canny边缘检测

本案例分析一个在ROS系统中实现边缘检测的案例,案例旨在帮助初学者学会部署ROS,并实现基本的开发。下面从案例项目背景、需求、基本要素、详细实现过程进行具体展示。

Ros 系统Noetic版本中部署Canny边缘检测算法的Rviz显示

项目需求

在ROS系统的Noetic版本中基于Python部署Canny边缘检测,对KITTI车载摄像头视角的视频图像进行检测,并将结果显示在Rviz窗口里面,要求左边为摄像头采集的图像,右侧为边缘检测结果,同时能够实时的调整检测算法的参数。

基础条件

  1. Linux系统:ROS系统是基于Linux系统开发,因此首先需要解决操作系统配置。Linux系统可以直接部署ROS,Windos系统可通过安装VMware软件安装虚拟机,在虚拟机中安装Linux系统。具体安装过程可参照机器人阿杰的指导视频,本项目中VMware软件使用17.5版本,linux使用的是Ubuntu 20.04 LTS。
  2. Conda环境:需要提前部署好Conda环境,Conda环境使用Python==3.6版本。
  3. KITTI数据集:本案例所使用的验证数据KITTI是由德国卡尔斯鲁厄理工学院推出的自动驾驶领域权威数据集,包含车载摄像头、激光雷达和GPS采集的真实道路场景数据,涵盖立体视觉、光流、3D检测等任务,是算法评测的基准平台。
  4. Rviz显示平台Rviz是ROS的可视化工具,支持机器人传感器数据、模型和算法的3D实时显示与调试,实现检测结果的展示。
  5. CV2检测模块:(OpenCV-Python)是计算机视觉核心库,提供图像处理、特征提取和机器学习算法接口,基于OpenCV-Python实现Canny边缘检测。

实现路径:

基于先完整再完美的解决思路。在本案例第一步,搭建基础框架,安装ROS系统,配置基础环境;第二步,贯通功能模块,联动数据,明确结果存储;第三步,叠加功能,优化性能,提高效率。

  • 安装基础包:在创建好的Conda虚拟环境中安装如下功能包。
opencv-python==4.11.0.86 
kitti2bag==1.5  
  • 数据转换:KITTI转rosbag适配ROS生态,实现多传感器同步与算法验证。
# Open terminal.
# 先在Kitti官网上下载kitti数据集,将其解压到指定位置
cd kitti
kitti2bag -t 2011_09_26 -r 0011 raw_synced
  • 安装ROS并创建项目路径
# Open the first terminal.
cd ros_yanzc
catkin_make 
  • 切换到ROS工作空间目录并运行roscore.
# Open the second terminal.
cd ros_yanzc
roscore
  • 切换到ROS工作空间目录并运行边缘检测的Python脚本
# Open the third terminal.
source ~/ros_yanzc/devel/setup.bash
rosrun detros dete_cte.py -i ~/ros_yanzc/src/detros/kitti_data/kitti_2011_09_26_drive_0011_synced.bag -o ~/ros_yanzc/src/detros/kitti_output/
  • 进入工作空间目录并启动RViz可视化工具
# Open the fouth terminal.
source ~/ros_yanzc/devel/setup.bash
rosrun rviz rviz -d ~/ros_yanzc/src/detros/config/image_show.rviz

边缘检测的代码实现过程:

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import sys
import rospy
import numpy as np
import cv2
from cv_bridge import CvBridge
from sensor_msgs.msg import Image
import rosbag
import os
import argparse


class EdgeDetectorNode:
    def __init__(self):
        # ROS 初始化
        rospy.init_node('edge_detector_node')

        # 创建 CV 桥接
        self.bridge = CvBridge()

        # 参数初始化
        self.gaussian_kernel_size = rospy.get_param('~gaussian_kernel_size', 5)
        self.threshold1 = rospy.get_param('~threshold1', 50)
        self.threshold2 = rospy.get_param('~threshold2', 150)

        # 图像变量
        self.original_image = None
        self.edges = None

        # 创建发布者
        self.original_pub = rospy.Publisher('/left_camera/image_raw', Image, queue_size=1)
        self.edges_pub = rospy.Publisher('/canny_edge_image', Image, queue_size=1)

        # 创建 OpenCV 窗口
        cv2.namedWindow("Edge Detection", cv2.WINDOW_NORMAL)
        cv2.createTrackbar("Gaussian Kernel", "Edge Detection",
                           self.gaussian_kernel_size, 31,
                           self.gaussian_parameter_callback)
        cv2.setTrackbarMin("Gaussian Kernel", "Edge Detection", 3)
        cv2.createTrackbar("Threshold1", "Edge Detection",
                           self.threshold1, 255,
                           self.threshold1_callback)
        cv2.createTrackbar("Threshold2", "Edge Detection",
                           self.threshold2, 255,
                           self.threshold2_callback)

        rospy.loginfo("Edge Detector Node 已启动")

    def gaussian_parameter_callback(self, val):
        """高斯核大小回调函数"""
        self.gaussian_kernel_size = max(3, val if val % 2 == 1 else val - 1)
        self.process_image()

    def threshold1_callback(self, val):
        """阈值1回调函数"""
        self.threshold1 = val
        self.process_image()

    def threshold2_callback(self, val):
        """阈值2回调函数"""
        self.threshold2 = val
        self.process_image()

    def process_image(self):
        """处理当前图像"""
        if self.original_image is None:
            return

        # 高斯滤波
        blurred = cv2.GaussianBlur(
            self.original_image,
            (self.gaussian_kernel_size, self.gaussian_kernel_size),
            0)

        # Canny 边缘检测
        self.edges = cv2.Canny(blurred, self.threshold1, self.threshold2)

        # 准备显示图像
        if len(self.original_image.shape) == 2:
            original_display = cv2.cvtColor(self.original_image, cv2.COLOR_GRAY2BGR)
        else:
            original_display = self.original_image.copy()

        edges_display = cv2.cvtColor(self.edges, cv2.COLOR_GRAY2BGR)

        # 添加参数信息
        font = cv2.FONT_HERSHEY_SIMPLEX
        param_text = (f"Gaussian: {self.gaussian_kernel_size}x{self.gaussian_kernel_size}, "
                      f"Thresholds: {self.threshold1}/{self.threshold2}")
        cv2.putText(original_display, param_text, (10, 30),
                    font, 0.7, (0, 255, 255), 2)

        # 合并图像
        combined = np.hstack((original_display, edges_display))
        cv2.imshow("Edge Detection", combined)
        cv2.waitKey(1)

        # 发布图像到ROS话题
        try:
            original_msg = self.bridge.cv2_to_imgmsg(self.original_image, "bgr8")
            edges_msg = self.bridge.cv2_to_imgmsg(self.edges, "mono8")
            self.original_pub.publish(original_msg)
            self.edges_pub.publish(edges_msg)
        except Exception as e:
            rospy.logerr(f"发布图像时出错: {str(e)}")

    def process_bag_file(self, bag_path, output_dir):
        """处理 rosbag 文件带1秒间隔和实时参数调整"""
        try:
            bag = rosbag.Bag(bag_path)
            os.makedirs(output_dir, exist_ok=True)
            rate = rospy.Rate(1)  # 1Hz,即每秒处理1张

            # 添加退出标志
            should_exit = False

            for topic, msg, t in bag.read_messages():
                if should_exit:
                    break

                if "image" in topic.lower():
                    try:
                        start_time = rospy.get_time()  # 记录开始时间

                        # 转换图像
                        cv_image = self.bridge.imgmsg_to_cv2(msg, desired_encoding="passthrough")
                        if cv_image.dtype == np.uint16:
                            cv_image = cv2.normalize(cv_image, None, 0, 255, cv2.NORM_MINMAX, dtype=cv2.CV_8U)

                        self.original_image = cv_image
                        self.process_image()

                        # 自动保存
                        timestamp = t.to_nsec()
                        original_path = os.path.join(output_dir, f"original_{timestamp}.png")
                        edges_path = os.path.join(output_dir, f"edges_{timestamp}.png")
                        cv2.imwrite(original_path, self.original_image)
                        cv2.imwrite(edges_path, self.edges)
                        rospy.loginfo(f"图像已保存到: {edges_path}")

                        # 实时参数调整处理
                        while (rospy.get_time() - start_time) < 0.50:  # 确保总共耗时1秒
                            key = cv2.waitKey(100) & 0xFF  # 100ms检测一次
                            
                            # 检查轨迹栏数值变化
                            new_gaussian = cv2.getTrackbarPos("Gaussian Kernel", "Edge Detection")
                            new_threshold1 = cv2.getTrackbarPos("Threshold1", "Edge Detection")
                            new_threshold2 = cv2.getTrackbarPos("Threshold2", "Edge Detection")
                            
                            # 如果参数有变化,更新并重新处理
                            if (new_gaussian != self.gaussian_kernel_size or 
                                new_threshold1 != self.threshold1 or 
                                new_threshold2 != self.threshold2):
                                
                                self.gaussian_kernel_size = max(3, new_gaussian if new_gaussian % 2 == 1 else new_gaussian - 1)
                                self.threshold1 = new_threshold1
                                self.threshold2 = new_threshold2
                                
                                self.process_image()
                                cv2.imwrite(edges_path, self.edges)
                                rospy.loginfo(f"参数已更新并重新保存: {edges_path}")

                            if key == 27:  # ESC键退出
                                should_exit = True
                                rospy.loginfo("用户中断处理过程")
                                break
                        
                        if should_exit:
                            break
                    except Exception as e:
                        rospy.logerr(f"处理图像时出错: {str(e)}")
                        continue

        except Exception as e:
            rospy.logerr(f"打开或读取 bag 文件时出错: {str(e)}")
        finally:
            bag.close()
            cv2.destroyAllWindows()

            

if __name__ == '__main__':
    try:
        # 解析命令行参数
        parser = argparse.ArgumentParser(description="从 ROS bag 文件中读取图像并执行边缘检测")
        parser.add_argument("-i", "--input", required=True, help="输入 ROS bag 文件路径")
        parser.add_argument("-o", "--output", required=True, help="输出目录路径")
        args = parser.parse_args()

        # 创建节点并处理 bag 文件
        node = EdgeDetectorNode()
        node.process_bag_file(args.input, args.output)

    except rospy.ROSInterruptException:
        pass

完整项目文件:

具体的项目文件可通过ROS_Canny链接下载。

运行视频

基本的实现过程如上,具体的操作视频如下:

上传视频封面

详细过程请参照我的知乎主页「链接」

控制面板
您好,欢迎到访网站!
  查看权限
网站分类
最新留言

    滇ICP备2024046894号-1