本文主要是介绍ROS2探索(四)DDS与RTPS,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
引言
朋友们新年好啊,春节假期不讲武德,让人整天想摸鱼,感觉今天才勉强进入工作状态…
前面了解了ROS2的publisher、subscriber以及service,他们在节点中由executor执行且一个进程可以拥有多个节点。ROS2中引入了DDS中间件层(rmw),下面考察一下DDS相关的内容。
本文主要参考DDS-RTPS的2.2版本(?RTPS是啥玩意),详细的协议文档取自OMG官网:http://www.omg.org/spec/DDSI-RTPS/2.2。主要内容分为以下部分:
- DDS与RTPS简介
- RTPS的传输构成
- RTPS的数据格式
- RTPS通信的过程
- RTPS的发现机制
DDS与RTPS简介
DDS指数据分发服务(Data-Distribution Service),是由OMG组织定义的一种以数据为中心的通信模型,支持订阅发布模式(Data-Centric Publish-Subscribe)。DDS的协议标准规定了以下内容:
- 应用的通信数据模型
- 应用与DCPS通信中间件的交互的数据格式,其中包括服务质量要求(QoS)
- 数据如何根据QoS进行收发
- 应用如何访问通信数据
- 通信中间件的状态反馈
RTPS表示实时发布-订阅传输协议(Real-Time Publish Subscribe),该协议已经成为了工业以太网的一个标准(IEC-PAS-62030)。RTPS十分符合DDS模型的要求,于是就被DDS拿去作为它的传输协议了。这里我们就能理解为什么引言里面提到了大量的RTPS。
RTPS的传输构成
RTPS传输由多个Participant实体参与,每个Participant拥有若干Endpoint,Endpoint又分为Reader和Writer两大类型。Endpoint是RTPS最基本的通信单元。在应用程序里用户操作的是DDS实体(DataWriter与DataReader),每个DDS实体对应RTPS的实体,RTPS与DDS实体通过HistoryCache桥梁进行沟通。HistoryCache是一个数组,里面包含若干个CacheChange。当用户要发布某个数据,就把某个CacheChange告诉RTPS实体Writer,Writer把这个CacheChange塞进HistoryCache里然后将CacheChange分发给需要接收数据的Reader们。类似的,Reader的HistoryCache收到CacheChange后用户可以选择从DataReader对应的Reader中读取数据。下图表示了RTPS传输的构成内容:
总结:
- RTPS传输协议由Entity、Participant、Endpoint、Writer、Reader、HistoryCache以及CacheChange组成
- 应用通过DDS层实体来访问对应的RTPS实体,HistoryCache是他们之间的桥梁
RTPS的数据格式
Endpoint是RTPS的通信基本单元,他们之间传输固定格式的Message数据,Message整体格式如下图所示:
可以看到一个Message有一个Header,Header包含RTPS的标识、RTPS协议版本、实现版本以及消息的GUID前缀。这里讲一下GUID前缀字段(guidPrefix),RTPS中每个实体拥有一个全局唯一标识符(GUID),GUID由前缀和实体ID两部分构成,其中前缀就在每个消息的Header中,后面的每个Submessage只需要实体ID(EntityID)即可。
Header后面是Submessage,每个Submessage包含SubmessageHeader和SubmessageElement。SubmessageHeader包含此Submessage的类型,大小端类型以及数据长度等信息。
SubmessageElement表示各种消息的字段,不同的消息类型具有相应的字段,我们不用关心,以后具体实现再看。下面看一下各种Submessage的作用是什么。
Submessage有多种类型,他们具有不同的作用,总结如下表:
Submessage种类 | 主要字段 | 作用 |
---|---|---|
AckNack | +@readerId : EntityId +@writerId : EntityId +@readerSNState : SequenceNumberSet +@count : Count | Reader用这个类型的消息告诉Writer它还没有收到某些消息,这些消息的序列号在readerSNState,用来实现可靠传输 |
Data | +@extraFlags : Flags +@octetsToInlineQos : short +@readerId : EntityId +@writerId : EntityId +@writerSN : SequenceNumber +@inlineQos : ParameterList +@serializedData : SerializedPayload | 用来发送数据类型的CacheChange给Reader,可能是数值也可能是某些数据的生命周期变化信息,具体是哪种信息需要根据extraFlags来判断 |
DataFrag | +@extraFlags : Flags +@octetsToInlineQos : short +@readerId : EntityId +@writerId : EntityId +@writerSN : SequenceNumber +@inlineQos : ParameterList +@serializedData : SerializedPayload +@fragmentStartingNum : FragmentNumber +@fragmentsInSubmessage : ushort +@dataSize : unsigned_long +@fragmentSize : ushort | 表示分片的Data数据,由Reader进行拼接 |
Gap | +@readerId : EntityId +@writerId : EntityId +@gapStart : SequenceNumber +@gapList : SequenceNumberSet | 通知Reader某些序列号的message不再相关,这个目前我也不清楚是有什么用,后面懂的话回头再详细解释下 |
Heartbeat | +@readerId : EntityId +@writerId : EntityId +@firstSN : SequenceNumber +@lastSN : FragmentNumber +@count : Count | 通过此消息告诉Reader方 Writer那边的HistoryCache中有哪些序列号的消息,如果Reader发现自己缺了可以请求缺少的消息。如果Heartbeat没有设置FinalFlag,那么Reader端必须反馈AckNack来告诉Writer自己还缺少哪些序列号的消息;如果设置了FinalFlag,那么Reader沉默即可 |
HeartbeatFrag | +@readerId : EntityId +@writerId : EntityId +@firstSN : SequenceNumber +@lastSN : SequenceNumber +@count : Count | 当发送分片数据且没全部发完时,使用这个消息;分片发送完毕时用上面的Heartbeat |
InfoDestination | +@guidPrefix : GuidPrefix | Writer发给Reader来修改GUID前缀 |
InfoReply | +@multicastLocatorList : LocatorList +@unicastLocatorList : LocatorList | Reader反馈Writer自己的地址 |
InfoSource | +@protocolVersion : ProtocolVersion +@vendorId : VendorId +@guidPrefix : GuidPrefix | 区分Writer所属的Participant |
InfoTimestamp | +@timestamp : Timestamp | 时间戳 |
NackFrag | +@readerId : EntityId +@writerId : EntityId +@fragmentNumberState : FragmentNumberSet +@count : Count +@writerSN : SequenceNumber | 分片时,Reader通知Writer缺少哪些序列号 |
Pad | 空 | 暂时不知道啥用 |
表中可以看出,通过AckNack、Heartbeat以及NackFrag可以实现可靠传输,接下来看一下数据的通信过程。
RTPS通信的过程
这张图描述了user(比如一个ROS2发布者和订阅者)之间通信的大致过程:
图中的步骤描述如下
- 用户调用DDS中间件DataWriter实体,执行write操作
- DataWriter通知RTPS实体Writer创建一个CacheChange
- DataWriter调用add_change将该Cachechange放到Writer的HistoryCache中
- 用户操作结束
- RTPS的Writer将CacheChange发送给RTPS Reader,使用的消息是上一章节的Data类型,并且发送HeartBeat给Reader用来获取Reader接收状态
- RTPS reader收到Data消息后,将CacheChange塞到HistoryCache里面
- DDS通知用户有新的Cachechange,ROS2里面用DDS的WaitSet接口,然后用户使用take操作来获取数据
- DDS DataReader从HistoryCache访问CacheChange
- RTPS Reader发送AckNack表明确认收到消息,这和用户的take操作是独立的,可能同时也可能先发生
- Writer(stateful)记录下确收的CacheChange
- reader这一方的用户调用return_loan操作,表明用户不再使用之前take的该消息
- DDS DataReader调用remove_change操作将该CacheChange从Reader的HistoryCache中移除
- DDS DataWriter根据is_acked_by_all判断CacheChange是否被所有期望接收的Reader确收,如果是的话就将该序号的CacheChange从Writer的HistoryCache中删除,HistoryCache的保留数量由QoS的DURABILITY确定
Writer在上面的例子中采用的StatefulWriter,实际中还有StatelessWriter,这需要根据具体的网络规模、硬件资源等进行选择。
RTPS的发现机制
DDS的另一个特征就是支持自动发现,协议规定了两种发现协议:
- Participant发现协议(PDP)
- Endpoint发现协议(EDP)
协议要求必须实现基本的两种基本发现协议:SPDP和SEDP,他们适合中小型网络场景。
RTPS预留了内置DataReaders和DataWriters以及相应的话题和QoS配置,内置话题包括:“DCPSParticipant,” “DCPSSubscription,” “DCPSPublication”和“DCPSTopic”。内置DataWriters用来向网络广播其本地的DDS Participant、Entities以及相关QoS配置;内置的DataReaders则收集远端发来的这些信息并区分那些Entities。
SPDP
SPDP对每个Participant预留两个内置Endpoint:SPDPbuiltinParticipantWriter 和 SPDPbuiltinParticipantReader。
SPDPbuiltinParticipantWriter是一个尽力交付的 StatelessWriter,不进行可靠传输且不维护Reader的接收状态。它周期性地向预先配置好的一系列locators(目标地址)交换其HistoryCache里的SPDPdiscoveredParticipantData数据。locators可以是多播或者单播地址。SPDPdiscoveredParticipantData的定义见下图:
SPDPbuiltinParticipantWriter不断发布自己当前发现了的Participant,然后将自己发现的信息和其他人交换,这样每个Participant逐渐就都知道对方了。
上面提到SPDPbuiltinParticipantWriter往预先配置好的locators(网络地址)分享信息,这些locators其实是一些SPDP保留的熟知端口,根据平台不同定义成了两个宏:SPDP_WELL_KNOWN_UNICAST_PORT和SPDP_WELL_KNOWN_MULTICAST_PORT。
SEDP
一旦通过SPDP发现了另一个Participant,那么就认为对方Participant存在内置Endpoint并与对方的内置Endpoint进行配对即可,SEDP的内置Endpoint配对应当采用的可靠传输了。
SEDP预留的内置Endpoint和数据类型如上图所示,其中TopicWriter和TopicReader是可选的。
移除
SPDPdiscoveredParticipantData中包含leaseDuration字段,这个字段表示一个Participant在这个时间周期内发布一次SPDPdiscoveredParticipantData则认为这个Participant还是或者的,如果超过leaseDuration没发送SPDPdiscoveredParticipantData则认为这个Participant已经下线,相关的资源以及Endpoint可以被释放掉。
总结
以上总结了DDS的传输协议,介绍了RTPS构成、消息、传输以及发现机制,对ROS2中如何与DDS中间件交互应该有了大致的轮廓。后面再深入看一下ROS2具体的eProsimaDDS实现。
这篇关于ROS2探索(四)DDS与RTPS的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!