DDS之DCPS Infrastructure模块

article/2025/9/22 11:30:21

DCPS Infrastructure

  • Infrastructure Module
    • Entity
        • Entity Identifier
        • QoS policy
        • Listener
        • Status
        • StatusCondition
        • Enabling Entities
    • QosPolicy
    • Status
      • Status 定义
      • StatusMask 定义
      • Listener callback定义
      • Listener callback的实现
    • Condition and WaitSet

Infrastructure Module

图1. Infrastructure UML图
从Infrastructure模块的类图中可以看出,其主要包含如下几个部分:

  • Entity
  • QosPolicy
  • Status
  • Condition
  • WaitSet

其中Entity类派生出了DomainEntity类和DomainParticipant类;Condition派生出了StatusCondition类,GuardCondition类和ReadCondition类

Entity

Entity可以理解为DDS标准中执行以数据为中心的发布订阅模型的实体,Entity包含QoS 策略,Listener和Status condition,通过改变其包含的这些内容可以控制其行为;
从C++的角度来讲Entity是所有DDS实体的一个抽象基类,其继承关系如下:
Entity继承关系
DDS Entity包含的共同特征如下:

  • Entity Identifier
  • QoS policy
  • Listener
  • Status
  • StatusCondition
  • Enabling Entities

对它们的详细解释如下:

Entity Identifier

每一个Entity都有一个独一无二的Entity ID,可以通过其方法InstanceHandle_t 获取。

inline const InstanceHandle_t &get_instance_handle() const

QoS policy

大家都知道,DDS的最大特点就是其具有丰富的Qos策略来控制通信行为。所有的DDS Entity都可以通过set_qos,get_qos来设置或者获取Qos。
以DomainParticipant Entity为例

ReturnCode_t set_qos(const DomainParticipantQos &qos) const
Parameters:qos – DomainParticipantQos to be set
Returns:RETCODE_IMMUTABLE_POLICY if any of the Qos cannot be changed, RETCODE_INCONSISTENT_POLICY if the Qos is not self consistent and RETCODE_OK if the qos is changed correctly.

ReturnCode_t get_qos(DomainParticipantQos &qos) const
Parameters:qos – DomainParticipantQos reference where the qos is going to be returned
Returns:RETCODE_OK

Listener

Listener定义了一Callback函数,这写接口都是虚函数,默认未定义其内容,只有用户需要在特定条件(StatusMask满足某种条件)下需要实现某些动作的时候可以在Listener中实现。

Listener的继承关系

Status

Status其实就是Communication Status,这些状态变化就是Listener的触发的基础。

StatusCondition

StatusCondition表示的是Entity的状态变化,不一样的是StatusCondition主要与Wait-Set的配合,组成了一套DDS的等待某种特定条件然后获取数据的通信方式。

Enabling Entities

对应的是Entity类下面的enable方法,默认情况下创建的Entity都是enable的,但是这个可以通过修改EntityFactoryQosPolicy来更改。

QosPolicy

Qos可以说是DDS通信方式最大特征,也是DDS区别于SOME/IP的主要内容,用户主要用Qos来进行灵活的通信行为配置,配置对象主要为各种Qos Policy。
DDS官方标准定义了一套Qos Policy,其相互关系如下UML图;另外,不同DDS的实现厂商也定义了一些Qos策略。
QosPolicy

对DDS官方标准 Qos Policy整理如下:

QosPolicyData Member NameTypeDescriptionConcern to Entity
DeadlineQosPolicyperiodDuration_tc_TimeInfinite(默认)
其实就是定义了DataWriter/DataReader的发送/接收最大时间
Topic, DataReader and DataWriter
DestinationOrderQosPolicykindDestinationOrderQosPolicyKindBY_RECEPTION_TIMESTAMP_DESTINATIONORDER_QOS
BY_SOURCE_TIMESTAMP_DESTINATIONORDER_QOS(非默认)
Topic, DataReader and DataWriter
DurabilityQosPolicykindDurabilityQosPolicyKind(4个值)默认情况下:VOLATILE_DURABILITY_QOS for DataReaders
TRANSIENT_LOCAL_DURABILITY_QOS for DataWriters
Topic, DataReader and DataWriter
DurabilityServiceQosPolicyTopic and DataWriter
GroupDataQosPolicycollectionstd::vectorEmpty vector(默认)
配合DataWriter and DataReader listeners 使用来实现配对策略
Publisher and Subscriber
HistoryQosPolicykind
depth
HistoryQosPolicyKind
int32_t
HistoryQosPolicyKind:(KEEP_LAST_HISTORY_QOS(默认),KEEP_ALL_HISTORY_QOS)控制Instance被DataReader读取之前的变化次数Topic, DataWriter and DataReader
LatencyBudgetQosPolicydurationDuration_tc_TimeZero(默认=0)
用于指定数据从被写到进入到DataReader History的最大可接受时间
Topic, DataWriter and DataReader
LifespanQosPolicydurationDuration_tc_TimeInfinite(默认)
表示数据被DataWriter写之后的存活时间
Topic, DataReader and DataWriter
LivelinessQosPolicykind
lease_duration
announcement_period
LivelinessQosPolicyKind
Duration_t
Duration_t
默认情况下:AUTOMATIC_LIVELINESS_QOS
c_TimeInfinite
c_TimeInfinite
Topic, DataReader and DataWriter
OwnershipQosPolicykindOwnershipQosPolicyKindSHARED_OWNERSHIP_QOSTopic, DataReader and DataWriter
OwnershipStrengthQosPolicyvalueuint32_t默认=0DataWriter
PartitionQosPolicymax_size
names(各个partition name)
uint32_t
SerializedPayload_t
0 (Length Unlimited)
Empty List
Publisher and Subscriber
PresentationQosPolicy
ReaderDataLifecycleQosPolicy
ReliabilityQosPolicy
ResourceLimitsQosPolicy
TimeBasedFilterQosPolicy
TopicDataQosPolicy
TransportPriorityQosPolicy
UserDataQosPolicy
WriterDataLifecycleQosPolicy

Status

DDS针对于不同Entity定义了不同的用于判断其状态的对象Status。

Status 定义

实际上它就是定义了不同的结构体,以LivelinessChangedStatus的定义为例:

struct LivelinessChangedStatus
{//! @brief The total number of currently active publishers that write the topic read by the subscriber//! @details This count increases when a newly matched publisher asserts its liveliness for the first time//! or when a publisher previously considered to be not alive reasserts its liveliness. The count decreases//! when a publisher considered alive fails to assert its liveliness and becomes not alive, whether because//! it was deleted normally or for some other reasonint32_t alive_count = 0;//! @brief The total count of current publishers that write the topic read by the subscriber that are no longer//! asserting their liveliness//! @details This count increases when a publisher considered alive fails to assert its liveliness and becomes//! not alive for some reason other than the normal deletion of that publisher. It decreases when a previously//! not alive publisher either reasserts its liveliness or is deleted normallyint32_t not_alive_count = 0;//! @brief The change in the alive_count since the last time the listener was called or the status was readint32_t alive_count_change = 0;//! @brief The change in the not_alive_count since the last time the listener was called or the status was readint32_t not_alive_count_change = 0;//! @brief Handle to the last publisher whose change in liveliness caused this status to changeInstanceHandle_t last_publication_handle;
};

StatusMask 定义

以及在StatusMask的类定义中会定义每一个Status具体由StatusMask中哪一个位表示:

/*** @brief* StatusMask is a bitmap or bitset field.** This bitset is used to:* - determine which listener functions to call* - set conditions in dds::core::cond::StatusCondition* - indicate status changes when calling dds::core::Entity::status_changes*/class RTPS_DllAPI StatusMask : public std::bitset<FASTDDS_STATUS_COUNT>
{
public:/*** Get the StatusMask associated with dds::core::status::LivelinessChangedStatus** @return StatusMask liveliness_changed*/inline static StatusMask liveliness_changed(){return StatusMask(0x00000001 << 12u);}.../*** Get the statusmask associated with dds::core::status::PublicationMatchedStatus** @return StatusMask publication_matched*/inline static StatusMask publication_matched(){return StatusMask(0x00000001 << 13u);}...
};

Listener callback定义

Status作为Listener callback的传入参数之一,其作用是特定执行动作的状态判断,因为liveliness_changed这个状态是针对于DataReader对象的,所以其定义在DataReaderListener的类定义中,如下所示:

/*** Class DataReaderListener, it should be used by the end user to implement specific callbacks to certain actions.* @ingroup FASTDDS_MODULE*/
class DataReaderListener
{
public:
.../*** @brief Method called when the liveliness status associated to a subscriber changes** @param reader The DataReader* @param status The liveliness changed status*/RTPS_DllAPI virtual void on_liveliness_changed(DataReader* reader,const fastrtps::LivelinessChangedStatus& status){(void)reader;(void)status;}
...
}

由此可见,on_liveliness_changed定义的是一个虚函数,具体的实现需要User去自己去定义实现的内容。

Listener callback的实现

我们在实例化一个DataReaderListener之后,如果需要on_liveliness_changed的调用,需要将其实现定义出来:

class CustmDataReaderListener :DataReaderListener
{
public:
.../*** @brief Method called when the liveliness status associated to a subscriber changes** @param reader The DataReader* @param status The liveliness changed status*/virtual void on_liveliness_changed(DataReader* reader,const fastrtps::LivelinessChangedStatus& status){(void)reader;(void)status;}
...
}
void CustmDataReaderListener::on_liveliness_changed(DataReader* reader,const fastrtps::LivelinessChangedStatus& status) override{(void)reader;if (status.alive_count_change == 1){std::cout << "Publisher recovered liveliness" << std::endl;}else if (status.not_alive_count_change == 1){std::cout << "Publisher lost liveliness" << std::endl;run_ = false;}}

Condition and WaitSet

DDS中Condition和WaiSet配合使用,提供了一种允许Application同时等待多个Entity满足特定状态之后执行特定动作的机制:
其示例如下:

class ApplicationJob
{WaitSet wait_set_;GuardCondition terminate_condition_;std::thread thread_;void main_loop(){// Main loop is repeated until the terminate condition is triggeredwhile (false == terminate_condition_.get_trigger_value()){// Wait for any of the conditions to be triggeredReturnCode_t ret_code;ConditionSeq triggered_conditions;ret_code = wait_set_.wait(triggered_conditions, eprosima::fastrtps::c_TimeInfinite);if (ReturnCode_t::RETCODE_OK != ret_code){// ... handle errorcontinue;}// Process triggered conditionsfor (Condition* cond : triggered_conditions){StatusCondition* status_cond = dynamic_cast<StatusCondition*>(cond);if (nullptr != status_cond){Entity* entity = status_cond->get_entity();StatusMask changed_statuses = entity->get_status_changes();// Process status. Liveliness changed and data available are depicted as an exampleif (changed_statuses.is_active(StatusMask::liveliness_changed())){std::cout << "Liveliness changed reported for entity " << entity->get_instance_handle() <<std::endl;}if (changed_statuses.is_active(StatusMask::data_available())){std::cout << "Data avilable on reader " << entity->get_instance_handle() << std::endl;FooSeq data_seq;SampleInfoSeq info_seq;DataReader* reader = static_cast<DataReader*>(entity);// Process all the samples until no one is returnedwhile (ReturnCode_t::RETCODE_OK == reader->take(data_seq, info_seq,LENGTH_UNLIMITED, ANY_SAMPLE_STATE,ANY_VIEW_STATE, ANY_INSTANCE_STATE)){// Both info_seq.length() and data_seq.length() will have the number of samples returnedfor (FooSeq::size_type n = 0; n < info_seq.length(); ++n){// Only samples for which valid_data is true should be accessedif (info_seq[n].valid_data){// Process sample on data_seq[n]}}// must return the loaned sequences when done processingreader->return_loan(data_seq, info_seq);}}}}}}public:ApplicationJob(const std::vector<DataReader*>& readers,const std::vector<DataWriter*>& writers){// Add a GuardCondition, so we can signal the processing thread to stop// 00.构造函数中将terminate_condition和status_condition添加到Waitset中 wait_set_.attach_condition(terminate_condition_);// Add the status condition of every reader and writerfor (DataReader* reader : readers){wait_set_.attach_condition(reader->get_statuscondition());}for (DataWriter* writer : writers){wait_set_.attach_condition(writer->get_statuscondition());}thread_ = std::thread(&ApplicationJob::main_loop, this);}~ApplicationJob(){// Signal the GuardCondition to force the WaitSet to wake upterminate_condition_.set_trigger_value(true);// Wait for the thread to finishthread_.join();}};
// Application initialization
ReturnCode_t ret_code;
std::vector<DataReader*> application_readers;
std::vector<DataWriter*> application_writers;// Create the participant, topics, readers, and writers.
ret_code = create_dds_application(application_readers, application_writers);
if (ReturnCode_t::RETCODE_OK != ret_code)
{// ... handle errorreturn;
}{ApplicationJob main_loop_thread(application_readers, application_writers);// ... wait for application termination signaling (signal handler, user input, etc)// ... Destructor of ApplicationJob takes care of stopping the processing thread
}// Destroy readers, writers, topics, and participant
destroy_dds_application();

由以上示例可以看出,其对应的流程如下:

  1. 将一个Condition对象与wait-set关联
  2. 当wait-set会持续等待直到其关联的其中一个或者多个Condition的trigger value为TRUE
  3. 当条件满足的时候,取值

http://chatgpt.dhexx.cn/article/Rd3PvdA1.shtml

相关文章

Oracle 19.3 Grid Infrastructure 软件安装详细教程

更多文章&#xff0c;欢迎关注作者公众号&#xff0c;欢迎一起交流。 1 安装环境 CentOS 7.9Oracle Database 19.3 - Enterprise Edition 2 安装配置 2.1 内存要求 1&#xff09;数据库安装&#xff1a;至少1GB&#xff0c;推荐2G以上&#xff1b; 2&#xff09;Grid安装&…

Oracle 19c Grid Infrastructure安装

概述 本文描述在单个主机上&#xff08;不是RAC&#xff09;GI 19c的安装。 Oracle数据库软件19c已安装&#xff0c;但未创建任何数据库。参见这篇文章。 主机为Oracle Linux 7&#xff0c;主机上已安装先决条件包(oracle-database-preinstall-19c)&#xff0c;数据库软件用户…

infra-structure Ad Hoc

“infrastructure”模式&#xff1a; 所谓infrastructure是在一种整合有线与无线局域网架构的应用模式&#xff0c;与ad- hoc不同的是配备无线网卡的设备必须通过ap来进行无线通讯&#xff0c;设置后&#xff0c;无线网络设备就必须有AP&#xff08;Access Pointer&#xff09;…

DDD(八)【基础设施层】

最近被&#xff24;&#xff24;&#xff24;吸引了阿&#xff0c;在这里感谢一下小佟&#xff0c;呵呵&#xff0c;领域驱动设计是个不错的东西&#xff0c;帮助我们把问题清晰化&#xff0c;这候对于复杂业务逻辑是很重要的&#xff0c;今天这一讲主要说一下&#xff24;&…

DB2初步使用

DB2初步使用 1.安装完成后开始菜单栏里会有一个 DB2 Command Window - Administrator 打开这个命令窗口 2.db2  db2&#xff0c;启动 3.list databse directory 会列出所有数据库 create databse XXX  创建新的数据库XXX  drop database XXX  删除已有数据库XXX 4.conn…

db2 时间戳相减返回天数

db2时间戳类型时间相减返回天数&#xff0c;先用timestampdiff函数让时间戳相减得到秒&#xff0c;然后利用cast()函数转换成天数。timestampdiff()函数中的时间一定是时间戳timestamp类型。 select cast(timestampdiff(2,(CURRENT_TIMESTAMP - 2021-08-10 11:00:00)) as doub…

Linux 命令行进入DB2

1. docker ps 2.docker exec -it 813cb755c14c /bin/bash 3.su - db2inst1 进入DB2账户下 4.db2 list db directory 列出目前所有数据库 5. db2 直接进入db2数据库命令行模式 6.db2 connect to testdb&#xff08;testdb数据库名称&#xff09; 如果需要对某一个数据库操作…

DB2安装到卸载一套龙服务

DB2安装到卸载一套龙服务 1.DB2数据库下载 下载地址: 点击这里下载 点击链接保存&#xff0c;或者复制本段内容&#xff0c;打开「阿里云盘」APP 下载下来是 3 号 双击3解压 会有一个弹框&#xff0c;为四号 &#xff0c;点击Browse 这里选择解压地址&#xff0c;我是解压…

db2获取日志路径

db2 get db cfg for bpfdb3 | grep log

db2的启动命令

想重启db2库&#xff0c;直接使用db2stop命令&#xff0c;执行报错 这样停不掉&#xff0c;因为有连接在上面 查看连接 db2 list applications show detail 发现有很多连接&#xff0c;连接在bpfdb5这个库上面 杀掉所有的连接 db2 force application all 再次检查连接 发现没…

db2top操作手册

本手册译自: https://www.ibm.com/developerworks/data/library/techarticle/dm-0812wang/ 目录 1.db2top命令语法... 4 2.db2top运行模式... 5 2.1 交互模式... 5 2.2 批量模式... 6 3.db2top监控模式... 8 3.1 数据库监控 (d). 8 3.2 表空间监控 (t). 9 3.3 动态SQL监…

DB2基础语法与简单使用

1、启动 [rootMyClone hadoop]# su db2inst1[db2inst1MyClone hadoop]$ db2start2、创建数据库 [db2inst1MyClone hadoop]$ db2 create database hiber;3、 列出数据库 [db2inst1MyClone hadoop]$ db2 list db directory4、连接数据库 [db2inst1MyClone hadoop]$ db2 conne…

awk的使用

awk的使用 一、awk的作用 1.用来从文本中截取字符串 2.用来匹配过滤文本&#xff0c;起到grep的作用 二、awk里面的分隔符 1、输入分隔符&#xff1a; -F -FS":" FS&#xff08;input field separater&#xff09;2、输出分隔符&#xff1a; 定义OFS“#” OFS&a…

AWK基础教程

前言 之前针对WorkerHub小程序做的数据分析文章 互联网卷王花落谁家&#xff1f; 收到了一些小伙伴的回复&#xff0c;点名要学习数据分析&#xff0c;其实我也是一知半解&#xff0c;想着来写几篇文章简单聊下我分析的过程。 首先是数据清洗和统计了&#xff0c;这块我并没有…

详解AWK的用法

Awk工具介绍 AwK是一种处理文本文件的语言&#xff0c;是一个强大的文本分析工具。 它是专门为文本处理设计的编程语言&#xff0c;也是行处理软件&#xff0c;通常用于扫描、过滤、统计汇总工作。数据可以来自标准输入也可以是管道或文件 20世纪70年代诞生于贝尔实验室&#…

Shell脚本文本三剑客之AWK

目录 一、AWK工具介绍 1.1AWK命令的基本格式 1.2AWK工作原理 1.3常见的内建变量&#xff08;可直接用&#xff09; 二、实例 2.1打印文本内容 案例1&#xff1a;打印磁盘已经使用情况 案例2:打印字符串 案例3&#xff1a;打印字符串确定文件有多少行 2.2根据$n提取字…

AWK常用技巧

1.1 介绍 awk其名称得自于它的创始人 Alfred Aho 、Peter Weinberger 和 Brian Kernighan 姓氏的首个字母。实际上 AWK 的确拥有自己的语言&#xff1a; AWK 程序设计语言 &#xff0c; 三位创建者已将它正式定义为“样式扫描和处理语言”。 它允许您创建简短的程序&#xff…

AWK用法全解

一、awk介绍 awk是Linux自带的一个逐行扫描的文本处理工具&#xff0c;支持正则表达式、循环控制、条件判断、格式化输出。AWK自身带有一些变量&#xff0c;可以在书写脚本时调用。 二、基本语法格式 2.1、在shell中使用awk awk [option] 代码块 文件名 option的选项及含义…

awk文本工具

awk 一、什么是awk&#xff1f;二、awk的工作原理三、命令格式四、管道符号调用四、getline 一、什么是awk&#xff1f; awk命令是一种编程语言&#xff0c;用于在linux/unix下对文本和数据进行处理。 而且它支持用户自定义函数和动态正则表达式等先进功能&#xff0c;是linux…

shell之三剑客awk(基础用法)

文章目录 一、awk概述1、awk工作原理 二、awk的格式三、内置变量演示1、【$n】进行演示2、【$0】的演示3、【NF】(多少列) 和 【NR】(多少行)的演示4、面试题5、BEGIN开始和END结尾6、模糊匹配7、关于数值与字符串的比较 四、总结 一、awk概述 AWK是一种处理文本文件的语言&am…