[ROS 系列学习教程] rosbag Python API

2024-03-18 09:36

本文主要是介绍[ROS 系列学习教程] rosbag Python API,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

ROS 系列学习教程(总目录)

本文目录

  • 1. 构造函数与关闭文件
  • 2. 属性值
  • 3. 写bag文件内容
  • 4. 读bag文件内容
  • 5. 将bag文件缓存写入磁盘
  • 6. 重建 bag 文件索引
  • 7. 获取bag文件的压缩信息
  • 8. 获取bag文件的消息数量
  • 9. 获取bag文件记录的起止时间
  • 10. 获取话题信息与消息类型

rosbag 的 Python API 主要位于 rosbag 包的 Bag 类中,通过 import rosbag 导入。

Bag 类中的常用接口如下:

1. 构造函数与关闭文件

class Bag(f: Any,mode: str = 'r',compression: str = Compression.NONE,chunk_threshold: int = 768 * 1024,allow_unindexed: bool = False,options: Any | None = None,skip_index: bool = False
)class Compression:NONE = 'none'BZ2  = 'bz2'LZ4  = 'lz4'close(self)

其中,

  • f:bag文件

  • mode:文件操作模式(r, w, a)

  • compression:文件压缩模式,见如上 Compression ,默认Compression.NONE

  • chunk_threshold:Bag 文件中每个块的最大字节数,默认 768 * 1024

  • allow_unindexed:是否允许打开未建立索引的bag文件。说明:在实际使用中,如果你只是想查看或播放bag文件中的所有消息,而不需要基于时间戳进行精确查询,那么你可以将 allow_unindexed 设置为 True。但是,如果你打算对bag文件进行基于时间的查询或其他高级操作,最好先使用 rosbag index 命令建立索引,并确保在打开bag文件时 allow_unindexedFalse(或者简单地省略该参数,因为它默认为 False)。

  • options:字典格式,用于统一设置 Bag 的参数,目前只支持 compressionchunk_threshold ,源码处理如下:

    •   if options is not None:if type(options) is not dict:raise ValueError('options must be of type dict')                if 'compression' in options:compression = options['compression']if 'chunk_threshold' in options:chunk_threshold = options['chunk_threshold']
      
  • skip_index:打开bag文件时是否跳过文件的索引,这可以节省一些内存和加载时间,特别是在处理非常大的bag文件时。然而,这也意味着将无法使用基于索引的高级查询功能,比如按时间戳搜索特定的消息。

2. 属性值

# 只能获取
options # 同上options
filename # bag文件名
version # rosbag的版本号
mode # 文件操作模式(r, w, a)
size # Bag文件的大小(bytes)# 可设置可获取
compression # 文件压缩模式
chunk_threshold # Bag 文件中每个块的最大字节数

3. 写bag文件内容

write(self, topic, msg, t=None, raw=False, connection_header=None)

其中,

  • topic:写入的topic名称
  • msg:写入的msg
  • t:时间戳,默认None以当前时间为时间戳
  • raw:是否已原始数据格式写入,通常不推荐这样做。
  • connection_header:连接头信息,通常不需要手动设置。通常用于内部处理,不是常规用法的一部分。

代码示例:

import rosbag
import rospkg
from std_msgs.msg import Int32, Stringrospack = rospkg.RosPack()
package_path = rospack.get_path('rosbag_learning')
bags_path = package_path + "/bags"bag = rosbag.Bag(bags_path+'/pytest.bag', 'w')try:s = String()s.string = 'hello'i = Int32()i.int = 42bag.write('/chatter', s)bag.write('/number', i)finally:bag.close()

4. 读bag文件内容

read_messages(self, topics=None, start_time=None, end_time=None, connection_filter=None, raw=False, return_connection_header=False)

其中,

  • topics:指定读取的topic,可以是一个topic列表,如果不指定,默认读取全部topic(可选)
  • start_time:通过时间戳过滤消息(消息的最早时间戳,rospy.Time对象)(可选)
  • end_time:通过时间戳过滤消息(消息的最晚时间戳,rospy.Time对象)(可选)
  • connection_filter:一个函数,用于过滤连接。它应该接受一个连接对象并返回一个布尔值,以决定是否保留该连接的消息。如果为 None,则不过滤连接。
  • raw:一个布尔值,指定是否以原始字节形式返回消息。如果为 True,则返回原始字节数据;如果为 False(默认值),则返回解析后的 ROS 消息对象。
  • return_connection_header: 一个布尔值,如果为 True,则返回的每条消息都会是一个元组,其中第二个元素是连接头信息。如果为 False(默认值),则只返回消息本身。
  • 返回值:以(topic, message, timestamp)格式返回

代码示例:

import rosbag
import rospkgrospack = rospkg.RosPack()
package_path = rospack.get_path('rosbag_learning')
bags_path = package_path + "/bags"bag = rosbag.Bag(bags_path+'/pytest.bag')for topic, msg, t in bag.read_messages(topics=['/chatter', '/number']):print(f"Received message on topic {topic} at time {t}: {msg}")bag.close()

结果如下:

在这里插入图片描述

5. 将bag文件缓存写入磁盘

flush(self)

当你使用 write() 写入数据到 bag 文件时,数据可能不会立即被写入磁盘,而是会先被缓存起来以提高性能。调用 flush() 方法可以强制将这些缓存的数据写入到磁盘中,以确保所有挂起的数据都被写入到 bag 文件中。

它没有参数,并且执行后没有返回值。

在以下情况下,可能需要调用 flush() 方法:

  • 确保数据持久化:当你想要确保所有已经写入 rosbag.Bag 对象的数据都已经持久化到磁盘上时,可以调用 flush()。这在你准备关闭 bag 文件或程序即将退出时特别有用,以确保不会有数据丢失。

  • 实时备份:如果你正在实时记录数据到 bag 文件,并且想要定期备份这些数据,你可以在备份之前调用 flush(),以确保备份时所有需要的数据都已经写入到 bag 文件中。

代码示例:

import rospy
import rosbag
import rospkg
from std_msgs.msg import String, Int32rospy.init_node('bag_writer')  rospack = rospkg.RosPack()
package_path = rospack.get_path('rosbag_learning')
bags_path = package_path + "/bags"# 创建一个bag文件用于写入
with rosbag.Bag(bags_path + '/flush.bag', 'w') as bag:# 写入一条消息msg = String(data='Hello, ROSbag!')bag.write('/chatter', msg, rospy.Time.now())# 在写入更多消息之前调用flush()bag.flush()# 这里可以继续写入更多消息msg = Int32(data=25)bag.write('/number', msg, rospy.Time.now())# 在退出with块之前,flush()会被自动调用(如果需要的话)

查看bag文件信息如下:

在这里插入图片描述

6. 重建 bag 文件索引

reindex(self)

当使用 rosbag 记录或播放数据时,rosbag 会维护一个内部索引,以便能够高效地检索和访问数据。然而,有时索引可能会损坏或变得不一致,这时就需要使用 reindex 方法来重新构建索引。

reindex(self) 方法没有参数,它会对当前打开的 bag 文件执行索引重建操作。重建索引可能需要一些时间,具体取决于 bag 文件的大小和内容。

以下是可能需要使用 reindex 方法的一些场景:

  • 索引损坏:如果你怀疑 bag 文件的索引已经损坏或不一致,导致无法正常访问数据,你可以尝试使用 reindex 方法来修复它。

  • 添加新数据:如果你在 bag 文件关闭后向其中添加了新数据,但没有重新构建索引,那么这些新数据将不会被包含在旧的索引中。在这种情况下,你需要调用 reindex 方法来更新索引,以便能够访问这些新数据。

  • 优化性能:有时,即使索引没有损坏,重新构建索引也可能有助于提高访问数据的性能。特别是当 bag 文件非常大或包含大量数据时,重建索引可以帮助优化数据结构,提高检索速度。

代码示例:

import rosbag
import rospkgrospack = rospkg.RosPack()
package_path = rospack.get_path('rosbag_learning')
bags_path = package_path + "/bags"with rosbag.Bag(bags_path+'/pytest.bag', 'r') as bag:try:for topic, msg, t in bag.read_messages():print(f"Received message on topic {topic} at time {t}: {msg}")except Exception as e:print(f"An error occurred while reading the bag file: {e}")print("Reindexing the bag file...")bag.reindex()  # 尝试重新索引 bag 文件print("Reindexing completed.")# 现在可以正常使用 bag 文件中的数据了

7. 获取bag文件的压缩信息

get_compression_info(self)

这个方法返回一个tuple(str, int, int),其中包含了关于 bag 文件压缩的详细信息,每一位表示如下:

  • tuple[0]:压缩格式,例如 none(表示没有压缩)或 bz2(表示使用了 bzip2 压缩)。
  • tuple[1]:未压缩的数据大小(以字节为单位)
  • tuple[2]:压缩后的数据大小(以字节为单位)

代码示例:

import rosbag
import rospkg rospack = rospkg.RosPack()
package_path = rospack.get_path('rosbag_learning')
bags_path = package_path + "/bags"# 打开一个存在的 bag 文件
with rosbag.Bag(bags_path+'/pytest.bag', 'r') as bag:# 获取压缩信息compression_info = bag.get_compression_info()# 打印压缩信息if compression_info:print("Compression Type:", compression_info[0])print("UnCompressed Size:", compression_info[1])print("compressed Size:", compression_info[2])else:print("The bag file is not compressed.")

8. 获取bag文件的消息数量

get_message_count(self, topic_filters=None)

这个方法允许你指定一个或多个话题过滤器(topic_filters),以便只计算特定话题的消息数。该参数接收一个字符串或字符串列表,用于指定要计算消息数的话题。如果未提供或设置为 None,则计算 bag 文件中所有话题的消息数。

代码示例:

import rosbag
import rospkg rospack = rospkg.RosPack()
package_path = rospack.get_path('rosbag_learning')
bags_path = package_path + "/bags"# 打开一个存在的 bag 文件
with rosbag.Bag(bags_path+'/pytest.bag', 'r') as bag:# 定义我们想要计算消息数的话题过滤器列表topic_filters = ['/chatter', '/number']# 获取指定话题的消息数量message_counts = bag.get_message_count(topic_filters=topic_filters)# 打印话题的消息数量print(f"Message Count: {message_counts}")

9. 获取bag文件记录的起止时间

get_start_time(self)
get_end_time(self)

get_start_time 函数的返回类型是 float,表示以秒为单位的时间戳,其中的小数部分,表示秒的分数。

代码示例:

import rosbag
import rospkg
from datetime import datetimerospack = rospkg.RosPack()
package_path = rospack.get_path('rosbag_learning')
bags_path = package_path + "/bags"# 打开一个存在的 bag 文件
with rosbag.Bag(bags_path+'/pytest.bag', 'r') as bag:# 获取 bag 文件的开始结束时间start_time = bag.get_start_time()end_time = bag.get_end_time()# 将时间转换为更易读的字符串格式start_time_str = datetime.fromtimestamp(start_time)end_time_str = datetime.fromtimestamp(end_time)# 打印开始结束时间print(f"Bag file start time: {start_time_str}")print(f"Bag file end   time: {end_time_str}")

运行结果如下:

在这里插入图片描述

10. 获取话题信息与消息类型

get_type_and_topic_info(self, topic_filters=None)

其中,topic_filters 用于过滤指定的话题,如果没有提供,则分析所有话题。

函数返回值类型如下:

TypesAndTopicsTuple(dict(str, str), dict(str, TopicTuple(str, int, int, float)))

其中各值说明如下:

TypesAndTopicsTuple(msg_types{key:type name, val: md5hash}, topics{key: topic name, value: TopicTuple(msg_type: msg type (Ex. "std_msgs/String"),message_count: the number of messages of the particular type,connections: the number of connections,frequency: the data frequency)})

其中,

  • msg_types:是一个字典,key为msg类型,value为msgMD5值。
  • topics:是一个字典,key为topic名称,value是一个元组,其中:
    • msg_type:该topic的消息类型
    • message_count:该topic的消息数量
    • connections:该topic的连接数量
    • frequency:该topic的数据频率

代码示例:

import rosbag
import rospkg
from datetime import datetimerospack = rospkg.RosPack()
package_path = rospack.get_path('rosbag_learning')
bags_path = package_path + "/bags"# 打开一个存在的 bag 文件
with rosbag.Bag(bags_path+'/pytest.bag', 'r') as bag:topic_filters = ['/chatter', '/number']msg_types, topics = bag.get_type_and_topic_info(topic_filters)print("Message Types:")for type_name, md5_hash in msg_types.items():print(f"  {type_name}: {md5_hash}")print("Topics:")for type_name, topic_info in topics.items():print("  Topic: {}, Type: {}, MessageCount: {}, Connections: {}, Frequency: {}".format(type_name, topic_info.msg_type, topic_info.message_count, topic_info.connections,topic_info.frequency))

运行结果:

在这里插入图片描述

这篇关于[ROS 系列学习教程] rosbag Python API的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/821931

相关文章

python生成随机唯一id的几种实现方法

《python生成随机唯一id的几种实现方法》在Python中生成随机唯一ID有多种方法,根据不同的需求场景可以选择最适合的方案,文中通过示例代码介绍的非常详细,需要的朋友们下面随着小编来一起学习学习... 目录方法 1:使用 UUID 模块(推荐)方法 2:使用 Secrets 模块(安全敏感场景)方法

Windows环境下解决Matplotlib中文字体显示问题的详细教程

《Windows环境下解决Matplotlib中文字体显示问题的详细教程》本文详细介绍了在Windows下解决Matplotlib中文显示问题的方法,包括安装字体、更新缓存、配置文件设置及编码調整,并... 目录引言问题分析解决方案详解1. 检查系统已安装字体2. 手动添加中文字体(以SimHei为例)步骤

Java JDK1.8 安装和环境配置教程详解

《JavaJDK1.8安装和环境配置教程详解》文章简要介绍了JDK1.8的安装流程,包括官网下载对应系统版本、安装时选择非系统盘路径、配置JAVA_HOME、CLASSPATH和Path环境变量,... 目录1.下载JDK2.安装JDK3.配置环境变量4.检验JDK官网下载地址:Java Downloads

使用Python删除Excel中的行列和单元格示例详解

《使用Python删除Excel中的行列和单元格示例详解》在处理Excel数据时,删除不需要的行、列或单元格是一项常见且必要的操作,本文将使用Python脚本实现对Excel表格的高效自动化处理,感兴... 目录开发环境准备使用 python 删除 Excphpel 表格中的行删除特定行删除空白行删除含指定

Python通用唯一标识符模块uuid使用案例详解

《Python通用唯一标识符模块uuid使用案例详解》Pythonuuid模块用于生成128位全局唯一标识符,支持UUID1-5版本,适用于分布式系统、数据库主键等场景,需注意隐私、碰撞概率及存储优... 目录简介核心功能1. UUID版本2. UUID属性3. 命名空间使用场景1. 生成唯一标识符2. 数

Python办公自动化实战之打造智能邮件发送工具

《Python办公自动化实战之打造智能邮件发送工具》在数字化办公场景中,邮件自动化是提升工作效率的关键技能,本文将演示如何使用Python的smtplib和email库构建一个支持图文混排,多附件,多... 目录前言一、基础配置:搭建邮件发送框架1.1 邮箱服务准备1.2 核心库导入1.3 基础发送函数二、

Python包管理工具pip的升级指南

《Python包管理工具pip的升级指南》本文全面探讨Python包管理工具pip的升级策略,从基础升级方法到高级技巧,涵盖不同操作系统环境下的最佳实践,我们将深入分析pip的工作原理,介绍多种升级方... 目录1. 背景介绍1.1 目的和范围1.2 预期读者1.3 文档结构概述1.4 术语表1.4.1 核

基于Python实现一个图片拆分工具

《基于Python实现一个图片拆分工具》这篇文章主要为大家详细介绍了如何基于Python实现一个图片拆分工具,可以根据需要的行数和列数进行拆分,感兴趣的小伙伴可以跟随小编一起学习一下... 简单介绍先自己选择输入的图片,默认是输出到项目文件夹中,可以自己选择其他的文件夹,选择需要拆分的行数和列数,可以通过

Python中反转字符串的常见方法小结

《Python中反转字符串的常见方法小结》在Python中,字符串对象没有内置的反转方法,然而,在实际开发中,我们经常会遇到需要反转字符串的场景,比如处理回文字符串、文本加密等,因此,掌握如何在Pyt... 目录python中反转字符串的方法技术背景实现步骤1. 使用切片2. 使用 reversed() 函

Python中将嵌套列表扁平化的多种实现方法

《Python中将嵌套列表扁平化的多种实现方法》在Python编程中,我们常常会遇到需要将嵌套列表(即列表中包含列表)转换为一个一维的扁平列表的需求,本文将给大家介绍了多种实现这一目标的方法,需要的朋... 目录python中将嵌套列表扁平化的方法技术背景实现步骤1. 使用嵌套列表推导式2. 使用itert