DCPS Infrastructure
- Infrastructure Module
- Entity
- Entity Identifier
- QoS policy
- Listener
- Status
- StatusCondition
- Enabling Entities
- QosPolicy
- Status
- Status 定义
- StatusMask 定义
- Listener callback定义
- Listener callback的实现
- Condition and WaitSet
Infrastructure Module
从Infrastructure模块的类图中可以看出,其主要包含如下几个部分:
- Entity
- QosPolicy
- Status
- Condition
- WaitSet
其中Entity类派生出了DomainEntity类和DomainParticipant类;Condition派生出了StatusCondition类,GuardCondition类和ReadCondition类
Entity
Entity可以理解为DDS标准中执行以数据为中心的发布订阅模型的实体,Entity包含QoS 策略,Listener和Status condition,通过改变其包含的这些内容可以控制其行为;
从C++的角度来讲Entity是所有DDS实体的一个抽象基类,其继承关系如下:
DDS Entity包含的共同特征如下:
- Entity Identifier
- QoS policy
- Listener
- Status
- StatusCondition
- Enabling Entities
对它们的详细解释如下:
Entity Identifier
每一个Entity都有一个独一无二的Entity ID,可以通过其方法InstanceHandle_t 获取。
inline const InstanceHandle_t &get_instance_handle() const
QoS policy
大家都知道,DDS的最大特点就是其具有丰富的Qos策略来控制通信行为。所有的DDS Entity都可以通过set_qos,get_qos来设置或者获取Qos。
以DomainParticipant Entity为例
ReturnCode_t set_qos(const DomainParticipantQos &qos) const
Parameters:qos – DomainParticipantQos to be set
Returns:RETCODE_IMMUTABLE_POLICY if any of the Qos cannot be changed, RETCODE_INCONSISTENT_POLICY if the Qos is not self consistent and RETCODE_OK if the qos is changed correctly.
ReturnCode_t get_qos(DomainParticipantQos &qos) const
Parameters:qos – DomainParticipantQos reference where the qos is going to be returned
Returns:RETCODE_OK
Listener
Listener定义了一Callback函数,这写接口都是虚函数,默认未定义其内容,只有用户需要在特定条件(StatusMask满足某种条件)下需要实现某些动作的时候可以在Listener中实现。
Status
Status其实就是Communication Status,这些状态变化就是Listener的触发的基础。
StatusCondition
StatusCondition表示的是Entity的状态变化,不一样的是StatusCondition主要与Wait-Set的配合,组成了一套DDS的等待某种特定条件然后获取数据的通信方式。
Enabling Entities
对应的是Entity类下面的enable方法,默认情况下创建的Entity都是enable的,但是这个可以通过修改EntityFactoryQosPolicy来更改。
QosPolicy
Qos可以说是DDS通信方式最大特征,也是DDS区别于SOME/IP的主要内容,用户主要用Qos来进行灵活的通信行为配置,配置对象主要为各种Qos Policy。
DDS官方标准定义了一套Qos Policy,其相互关系如下UML图;另外,不同DDS的实现厂商也定义了一些Qos策略。
对DDS官方标准 Qos Policy整理如下:
QosPolicy | Data Member Name | Type | Description | Concern to Entity |
---|---|---|---|---|
DeadlineQosPolicy | period | Duration_t | c_TimeInfinite(默认) 其实就是定义了DataWriter/DataReader的发送/接收最大时间 | Topic, DataReader and DataWriter |
DestinationOrderQosPolicy | kind | DestinationOrderQosPolicyKind | BY_RECEPTION_TIMESTAMP_DESTINATIONORDER_QOS BY_SOURCE_TIMESTAMP_DESTINATIONORDER_QOS(非默认) | Topic, DataReader and DataWriter |
DurabilityQosPolicy | kind | DurabilityQosPolicyKind(4个值) | 默认情况下:VOLATILE_DURABILITY_QOS for DataReaders TRANSIENT_LOCAL_DURABILITY_QOS for DataWriters | Topic, DataReader and DataWriter |
DurabilityServiceQosPolicy | Topic and DataWriter | |||
GroupDataQosPolicy | collection | std::vector | Empty vector(默认) 配合DataWriter and DataReader listeners 使用来实现配对策略 | Publisher and Subscriber |
HistoryQosPolicy | kind depth | HistoryQosPolicyKind int32_t | HistoryQosPolicyKind:(KEEP_LAST_HISTORY_QOS(默认),KEEP_ALL_HISTORY_QOS)控制Instance被DataReader读取之前的变化次数 | Topic, DataWriter and DataReader |
LatencyBudgetQosPolicy | duration | Duration_t | c_TimeZero(默认=0) 用于指定数据从被写到进入到DataReader History的最大可接受时间 | Topic, DataWriter and DataReader |
LifespanQosPolicy | duration | Duration_t | c_TimeInfinite(默认) 表示数据被DataWriter写之后的存活时间 | Topic, DataReader and DataWriter |
LivelinessQosPolicy | kind lease_duration announcement_period | LivelinessQosPolicyKind Duration_t Duration_t | 默认情况下:AUTOMATIC_LIVELINESS_QOS c_TimeInfinite c_TimeInfinite | Topic, DataReader and DataWriter |
OwnershipQosPolicy | kind | OwnershipQosPolicyKind | SHARED_OWNERSHIP_QOS | Topic, DataReader and DataWriter |
OwnershipStrengthQosPolicy | value | uint32_t | 默认=0 | DataWriter |
PartitionQosPolicy | max_size names(各个partition name) | uint32_t SerializedPayload_t | 0 (Length Unlimited) Empty List | Publisher and Subscriber |
PresentationQosPolicy | ||||
ReaderDataLifecycleQosPolicy | ||||
ReliabilityQosPolicy | ||||
ResourceLimitsQosPolicy | ||||
TimeBasedFilterQosPolicy | ||||
TopicDataQosPolicy | ||||
TransportPriorityQosPolicy | ||||
UserDataQosPolicy | ||||
WriterDataLifecycleQosPolicy |
Status
DDS针对于不同Entity定义了不同的用于判断其状态的对象Status。
Status 定义
实际上它就是定义了不同的结构体,以LivelinessChangedStatus的定义为例:
struct LivelinessChangedStatus
{//! @brief The total number of currently active publishers that write the topic read by the subscriber//! @details This count increases when a newly matched publisher asserts its liveliness for the first time//! or when a publisher previously considered to be not alive reasserts its liveliness. The count decreases//! when a publisher considered alive fails to assert its liveliness and becomes not alive, whether because//! it was deleted normally or for some other reasonint32_t alive_count = 0;//! @brief The total count of current publishers that write the topic read by the subscriber that are no longer//! asserting their liveliness//! @details This count increases when a publisher considered alive fails to assert its liveliness and becomes//! not alive for some reason other than the normal deletion of that publisher. It decreases when a previously//! not alive publisher either reasserts its liveliness or is deleted normallyint32_t not_alive_count = 0;//! @brief The change in the alive_count since the last time the listener was called or the status was readint32_t alive_count_change = 0;//! @brief The change in the not_alive_count since the last time the listener was called or the status was readint32_t not_alive_count_change = 0;//! @brief Handle to the last publisher whose change in liveliness caused this status to changeInstanceHandle_t last_publication_handle;
};
StatusMask 定义
以及在StatusMask的类定义中会定义每一个Status具体由StatusMask中哪一个位表示:
/*** @brief* StatusMask is a bitmap or bitset field.** This bitset is used to:* - determine which listener functions to call* - set conditions in dds::core::cond::StatusCondition* - indicate status changes when calling dds::core::Entity::status_changes*/class RTPS_DllAPI StatusMask : public std::bitset<FASTDDS_STATUS_COUNT>
{
public:/*** Get the StatusMask associated with dds::core::status::LivelinessChangedStatus** @return StatusMask liveliness_changed*/inline static StatusMask liveliness_changed(){return StatusMask(0x00000001 << 12u);}.../*** Get the statusmask associated with dds::core::status::PublicationMatchedStatus** @return StatusMask publication_matched*/inline static StatusMask publication_matched(){return StatusMask(0x00000001 << 13u);}...
};
Listener callback定义
Status作为Listener callback的传入参数之一,其作用是特定执行动作的状态判断,因为liveliness_changed这个状态是针对于DataReader对象的,所以其定义在DataReaderListener的类定义中,如下所示:
/*** Class DataReaderListener, it should be used by the end user to implement specific callbacks to certain actions.* @ingroup FASTDDS_MODULE*/
class DataReaderListener
{
public:
.../*** @brief Method called when the liveliness status associated to a subscriber changes** @param reader The DataReader* @param status The liveliness changed status*/RTPS_DllAPI virtual void on_liveliness_changed(DataReader* reader,const fastrtps::LivelinessChangedStatus& status){(void)reader;(void)status;}
...
}
由此可见,on_liveliness_changed定义的是一个虚函数,具体的实现需要User去自己去定义实现的内容。
Listener callback的实现
我们在实例化一个DataReaderListener之后,如果需要on_liveliness_changed的调用,需要将其实现定义出来:
class CustmDataReaderListener :DataReaderListener
{
public:
.../*** @brief Method called when the liveliness status associated to a subscriber changes** @param reader The DataReader* @param status The liveliness changed status*/virtual void on_liveliness_changed(DataReader* reader,const fastrtps::LivelinessChangedStatus& status){(void)reader;(void)status;}
...
}
void CustmDataReaderListener::on_liveliness_changed(DataReader* reader,const fastrtps::LivelinessChangedStatus& status) override{(void)reader;if (status.alive_count_change == 1){std::cout << "Publisher recovered liveliness" << std::endl;}else if (status.not_alive_count_change == 1){std::cout << "Publisher lost liveliness" << std::endl;run_ = false;}}
Condition and WaitSet
DDS中Condition和WaiSet配合使用,提供了一种允许Application同时等待多个Entity满足特定状态之后执行特定动作的机制:
其示例如下:
class ApplicationJob
{WaitSet wait_set_;GuardCondition terminate_condition_;std::thread thread_;void main_loop(){// Main loop is repeated until the terminate condition is triggeredwhile (false == terminate_condition_.get_trigger_value()){// Wait for any of the conditions to be triggeredReturnCode_t ret_code;ConditionSeq triggered_conditions;ret_code = wait_set_.wait(triggered_conditions, eprosima::fastrtps::c_TimeInfinite);if (ReturnCode_t::RETCODE_OK != ret_code){// ... handle errorcontinue;}// Process triggered conditionsfor (Condition* cond : triggered_conditions){StatusCondition* status_cond = dynamic_cast<StatusCondition*>(cond);if (nullptr != status_cond){Entity* entity = status_cond->get_entity();StatusMask changed_statuses = entity->get_status_changes();// Process status. Liveliness changed and data available are depicted as an exampleif (changed_statuses.is_active(StatusMask::liveliness_changed())){std::cout << "Liveliness changed reported for entity " << entity->get_instance_handle() <<std::endl;}if (changed_statuses.is_active(StatusMask::data_available())){std::cout << "Data avilable on reader " << entity->get_instance_handle() << std::endl;FooSeq data_seq;SampleInfoSeq info_seq;DataReader* reader = static_cast<DataReader*>(entity);// Process all the samples until no one is returnedwhile (ReturnCode_t::RETCODE_OK == reader->take(data_seq, info_seq,LENGTH_UNLIMITED, ANY_SAMPLE_STATE,ANY_VIEW_STATE, ANY_INSTANCE_STATE)){// Both info_seq.length() and data_seq.length() will have the number of samples returnedfor (FooSeq::size_type n = 0; n < info_seq.length(); ++n){// Only samples for which valid_data is true should be accessedif (info_seq[n].valid_data){// Process sample on data_seq[n]}}// must return the loaned sequences when done processingreader->return_loan(data_seq, info_seq);}}}}}}public:ApplicationJob(const std::vector<DataReader*>& readers,const std::vector<DataWriter*>& writers){// Add a GuardCondition, so we can signal the processing thread to stop// 00.构造函数中将terminate_condition和status_condition添加到Waitset中 wait_set_.attach_condition(terminate_condition_);// Add the status condition of every reader and writerfor (DataReader* reader : readers){wait_set_.attach_condition(reader->get_statuscondition());}for (DataWriter* writer : writers){wait_set_.attach_condition(writer->get_statuscondition());}thread_ = std::thread(&ApplicationJob::main_loop, this);}~ApplicationJob(){// Signal the GuardCondition to force the WaitSet to wake upterminate_condition_.set_trigger_value(true);// Wait for the thread to finishthread_.join();}};
// Application initialization
ReturnCode_t ret_code;
std::vector<DataReader*> application_readers;
std::vector<DataWriter*> application_writers;// Create the participant, topics, readers, and writers.
ret_code = create_dds_application(application_readers, application_writers);
if (ReturnCode_t::RETCODE_OK != ret_code)
{// ... handle errorreturn;
}{ApplicationJob main_loop_thread(application_readers, application_writers);// ... wait for application termination signaling (signal handler, user input, etc)// ... Destructor of ApplicationJob takes care of stopping the processing thread
}// Destroy readers, writers, topics, and participant
destroy_dds_application();
由以上示例可以看出,其对应的流程如下:
- 将一个Condition对象与wait-set关联
- 当wait-set会持续等待直到其关联的其中一个或者多个Condition的trigger value为TRUE
- 当条件满足的时候,取值