Java调用OpenDDS(2)-理解OpenDDS自带的Messager示例

article/2025/10/4 7:58:58

OpenDDS安装好之后,下一步就是利用OpenDDS来开发通信项目了。不过在项目中应用OpenDDS之前,先消化一下OpenDDS安装包中自带的示例项目messenger,通过阅读messenger的源代码来熟悉一下OpenDDS提供的用来开发Java项目的类。



提纲
1、准备工作
2、发送消息:TestPublisher
3、接收消息:TestSubscriber & DataReaderListenerImpl
4、在IDEA中运行示例



1、准备工作

在开始看OpenDDS的示例项目之前,需要安装好OpenDDS,并且在编译OpenDDS的时候要开启Java支持,这是前提条件。

第一步,在IDEA创建空的maven项目ddstest,用这个项目来看OpenDDS自带的messenger项目的示例代码。因为Idea无法直接打开messenger项目,所以才要创建一个ddstest新项目,把messenger的源码拷贝到ddstest中去边运行边看。

第二步,ddstest项目中引入4个必要的jar包。具体方法:File - Project Structure -Libraries - 点击"+"号 - Java ,然后定位到 %DDS_ROOT%\lib,选中3个jar包:i2jrt.jar、OpenDDS_DCPS.jar、tao_java.jar,定位到%DDS_ROOT%\java\tests\messenger下\messenger_idl下,选中messenger_idl_test.jar。
这样以来项目建好了,所需要的的4个jar包也都加入到项目的libraries中了。详情见下图:

注:由于OpenDDS需要CORBA,而Java 9开始移除了CORBA,所以请在Java 8环境下开发。

然后去%DDS_ROOT%\java\tests\messenger下,将publisher和subscriber两个目录下的TestPublisher、TestSubscriber、DataReaderListenerImpl三个java文件复制到ddstest项目的src目录下。

经过以上2步,项目结构如上图所示,源代码都在src下,jar都在external libraries下面。



2、利用TestPublisher发送消息

DomainParticipantFactory dpf = TheParticipantFactory.WithArgs(new StringSeqHolder(args));
DomainParticipant dp = dpf.create_participant(4, PARTICIPANT_QOS_DEFAULT.get(), null, DEFAULT_STATUS_MASK.value);

第一行用来创建域参与者工厂,需要读取参数(实际上就是读取repo.ior文件)
第二行是创建具体的域参与者,create_participant方法的定义(实际在DomainParticipantFactoryImpl.cpp文件中)为:

DDS::DomainParticipant_ptr DomainParticipantFactoryImpl::create_participant(DDS::DomainId_t domainId,const DDS::DomainParticipantQos & qos,DDS::DomainParticipantListener_ptr a_listener,DDS::StatusMask mask)
{DDS::DomainParticipantQos par_qos = qos;if (par_qos == PARTICIPANT_QOS_DEFAULT) {get_default_participant_qos(par_qos);}if (!Qos_Helper::valid(par_qos)) {ACE_ERROR((LM_ERROR,ACE_TEXT("(%P|%t) ERROR: ")ACE_TEXT("DomainParticipantFactoryImpl::create_participant, ")ACE_TEXT("invalid qos.\n")));return DDS::DomainParticipant::_nil();}if (!Qos_Helper::consistent(par_qos)) {ACE_ERROR((LM_ERROR,ACE_TEXT("(%P|%t) ERROR: ")ACE_TEXT("DomainParticipantFactoryImpl::create_participant, ")ACE_TEXT("inconsistent qos.\n")));return DDS::DomainParticipant::_nil();}RcHandle<DomainParticipantImpl> dp =make_rch<DomainParticipantImpl>(this, domainId, par_qos, a_listener, mask);if (qos_.entity_factory.autoenable_created_entities) {if (dp->enable() != DDS::RETCODE_OK) {ACE_ERROR((LM_ERROR,ACE_TEXT("(%P|%t) ERROR: ")ACE_TEXT("DomainParticipantFactoryImpl::create_participant, ")ACE_TEXT("unable to enable DomainParticipant.\n")));return DDS::DomainParticipant::_nil();}}ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,tao_mon,this->participants_protector_,DDS::DomainParticipant::_nil());participants_[domainId].insert(dp);return dp._retn();
}

主要就是进行Qos的验证,然后创建DomainParticipantImpl对象,调用其enable方法以激活,然后添加到参与者组中。参与者组实质是个Map,以域ID作为key。

接下来是创建主题:

MessageTypeSupportImpl servant = new MessageTypeSupportImpl();
Topic top = dp.create_topic("Movie Discussion List",servant.get_type_name(),TOPIC_QOS_DEFAULT.get(),null,DEFAULT_STATUS_MASK.value);

需要注意的是,OpenDDS的主题和发送的消息,必须在发送时就全部确定,不能用Scanner接受用户输入然后动态创建

create_topic方法实际调用的是create_topic_i方法,添加了一个topic_mask参数(值为0),整个方法有150+行,仅挑些重点:

首先还是Qos检查,略过。

接下来是根据主题名检查主题有没有创建过(假如开启了主题过滤,并且不允许重复主题,则直接退出):

TopicMap::mapped_type* entry = 0;bool found = false;{ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,tao_mon,this->topics_protector_,DDS::Topic::_nil());#if !defined(OPENDDS_NO_CONTENT_FILTERED_TOPIC) || !defined(OPENDDS_NO_MULTI_TOPIC)if (topic_descrs_.count(topic_name)) {if (DCPS_debug_level > 3) {…… //一些报错信息}return 0;}
#endifif (Util::find(topics_, topic_name, entry) == 0) {found = true;}}

假如同名主题已经创建过了,则进行type和qos的比较,如果有一个不符合,则返回空,创建失败,如果都符合,说明主题已经存在,就直接返回主题对象

假如同名主题不存在,则创建新主题:

首先要检查欲创建的主题类型有没有注册过:

    if (0 == topic_mask) {// creating a topic with compile time typetype_support = Registered_Data_Types->lookup(this, type_name);if (CORBA::is_nil(type_support)) {if (DCPS_debug_level >= 1) {…… //报错}return DDS::Topic::_nil();}has_keys = type_support->has_dcps_key();}

接下来就是对topic信息的检查,以及初始化Topic对象的过程,如果设置了TopicListener的话,创建完毕后会进行回调

接下来是创建Publisher对象;

Publisher pub = dp.create_publisher(PUBLISHER_QOS_DEFAULT.get(), null, DEFAULT_STATUS_MASK.value);

其C++代码流程和create_participant类似,不同的是,保存Publisher对象的是个集合而非Map,还有Publisher的ID是用一个内置的ID生成器自动产生的

接下来是创建DataWriter对象, 为之配置了非常复杂的Qos策略,并等待Publisher匹配成功

到这里准备工作才刚刚结束,下面开始正式的数据发送

        MessageDataWriter mdw = MessageDataWriterHelper.narrow(dw);Message msg = new Message();msg.subject_id = 99;int handle = mdw.register_instance(msg);msg.from = "OpenDDS-Java";msg.subject = "Review";msg.text = "Worst. Movie. Ever.";msg.count = 0;int ret = RETCODE_TIMEOUT.value;for (; msg.count < N_MSGS; ++msg.count) {while ((ret = mdw.write(msg, handle)) == RETCODE_TIMEOUT.value) {}if (ret != RETCODE_OK.value) {System.err.println("ERROR " + msg.count +" write() returned " + ret);}try {Thread.sleep(100);} catch(InterruptedException ie) {}}

MessageDataWriter是messenger_idl_test包含的DataWriter子类,实际执行Message发送任务,Message的格式由Messenger.idl定义。for循环一共发送40条消息。

假如启动时加了-w参数,则会等待响应,否则等待1秒后直接进行清理、退出



3、接收消息:TestSubscriber & DataReaderListenerImpl

TestSubscriber的整个流程和TestPublisher几乎完全一致,不同的是,创建DataReader时,会将DataReaderListenerImpl实例传入。

当Publisher向域中输入消息后,就会触发DATA_AVAILABLE事件,通知DataReaderListenerImpl处理

    public synchronized void on_data_available(DataReader reader) {initialize_counts();MessageDataReader mdr = MessageDataReaderHelper.narrow(reader);if (mdr == null) {System.err.println("ERROR: read: narrow failed.");return;}MessageHolder mh = new MessageHolder(new Message());SampleInfoHolder sih = new SampleInfoHolder(new SampleInfo(0, 0, 0,new Time_t(), 0, 0, 0, 0, 0, 0, 0, false, 0));int status = mdr.take_next_sample(mh, sih);if (status == RETCODE_OK.value) {System.out.println("SampleInfo.sample_rank = "+ sih.value.sample_rank);System.out.println("SampleInfo.instance_state = "+ sih.value.instance_state);if (sih.value.valid_data) {String prefix = "";boolean invalid_count = false;if (mh.value.count < 0 || mh.value.count >= counts.size()) {invalid_count = true;}else {if (counts.get(mh.value.count) == false){counts.set(mh.value.count, true);}else {prefix = "ERROR: Repeat ";}}…… //消息内容输出}…… //一些报错} else if (status == RETCODE_NO_DATA.value) {System.err.println("ERROR: reader received DDS::RETCODE_NO_DATA!");} else {System.err.println("ERROR: read Messenger.Message: Error: " + status);}}

程序使用MessageHolder存储读到的信息,每次读取一条。



4、在IDEA中运行TestPublisher、TestSubscriber

运行run_test.pl脚本时,可以看到类似:

“C:\Program Files\Java\bin\java.exe” -Xcheck:jni -ea -cp classes;E:\OpenDDS-3.13/lib/i2jrt.jar;E:\OpenDDS-3.13/lib/OpenDDS_DCPS.jar;E:\OpenDDS-3.13/java/tests/messenger/messenger_idl/messenger_idl_test.jar;publisher/classes -Dopendds.native.debug=true TestPublisher -DCPSBit 0 -DCPSConfigFile tcp.ini -r -w
的命令输出,可以参照这个配置运行条件。

1)首先配置虚拟机选项:

Run菜单 - Edit Configurations,在 vm options 一栏填写:

-ea -Dopendds.native.debug=true -Djava.library.path=E:\dds\OpenDDS-3.13/java/tests/messenger/messenger_idl;E:\OpenDDS-3.13\lib

请按照自己的实际情况修改路径。由于已经配置了jar包,所以不需要再写-cp选项。

-Djava.library.path是用来配置JNI库路径,不写的话会提示找不到dll

去除 -Xcheck:jni 是为了屏蔽掉过多的JNI Warning

2)然后配置主程序参数:

在同一个窗口的 Program arguments一栏填写:

-DCPSBit 0 -DCPSConfigFile E:\dds\OpenDDS-3.13/java/tests/messenger/tcp.ini

同样按照自己的实际情况修改路径。这一行如果写在vm options中,就会提示”ERROR: Domain Participant Factory not found“,因为参数传递给了虚拟机而不是程序,导致无法创建DomainParticipantFactory

3)运行实例

按照以上配置,分别配置TestPublisher的run configuration和TestSubscriber的run configuration,配置两个,因为运行的时候,它们两个会同时运行,各自使用自己的run configuration。

单独启动一个Visual Studio开发人员命令行,在其中运行:
%DDS_ROOT%/bin/DCPSInfoRepo -o repo.ior

接着修改tcp.ini,将common块DCPSInfoRepo项的值修改成repo.ior所在位置,不要去掉"file://"前缀

然后按照先subscriber,后publisher的顺序启动程序即可。



参考资料:
1、https://blog.csdn.net/u010670411/article/details/86552739




http://chatgpt.dhexx.cn/article/vdwfJpCl.shtml

相关文章

OpenDDS

OpenDDS简介 Don Busch&#xff0c;首席软件工程师兼合作伙伴 Object Computing&#xff0c;Inc.&#xff08;OCI&#xff09; 介绍 分布式实时应用程序有时以数据为中心而不是以服务为中心&#xff0c;这意味着分布式系统中参与者的主要目标是分发应用程序数据&#xff0c;而…

OpenDDS系列(3) —— 第一个OpenDDS程序

文章目录 [toc]3.1 发送数据3.2 项目3.2.1 主题3.2.2 Publisher&#xff08;发布者&#xff09;3.2.3 Subscriber&#xff08;订阅者&#xff09; 3.3 在Windows上构建3.4 在Linux上构建3.4.1 运行 3.5 结论 3.1 发送数据 我们将创建一个主题&#xff0c;这是一个通过DDS交换数…

OpenDDS学习笔记(2):DDS概述

文章目录 一、DDS体系结构1.1 DLRL层1.2 DCPS层 二、DDS通信过程三、DDS通信特点四、DDS标准实现4.1 RTI DDS软件4.2 OpenSplice DDS软件4.3 OpenDDS软件 一、DDS体系结构 DDS采用DCPS通信机制&#xff0c;提供一个与平台无关的数据模型。它允许应用程序实时发布拥有的信息&am…

OpenDDS系列(1) —— OpenDDS 简介

1. OpenDDS简要介绍 1.1 简介 1.1.1 DDS是什么1.1.2 DDS通信的基本要素1.1.3 DDS架构的主要优点1.1.4 DDS产品种类1.1.5 OpenDDS 1.2 DDS的应用领域 美国海上战争中心(NSWC)高性能分布式计算系统&#xff08;HiPer-D&#xff09; 1.3 结论 1. OpenDDS简要介绍 1.1 简介 1.1.…

IDEA中查找与替换快捷键(项目全局替换、该文件下替换)

该文件下查找&#xff08;CtrlF&#xff09; 项目全局查找&#xff08;CtrlShiftF 或【Edit】——>【Find】——>【Find in Path…】&#xff09; 注意&#xff1a;本人电脑上的IDEA版本不支持该快捷键&#xff08;CtrlShiftF&#xff09;&#xff0c;有可能是快捷键冲突…

idea实现快捷批量修改替换

1. 在当前文件内容中替换 idea替换快捷键&#xff0c;批量处理对象 ctrl r: 当前文件内容替换&#xff0c;指的是在当前打开的文件中替换匹配的字符&#xff0c;只操作一个文件。 2. 在路径中替换(可替换不同文件夹中的内容) ctrl shift r: 在路径中替换&#xff0c;指的是…

idea 查找与替换

查找当前文件内容&#xff1a;ctrlF 如上图片 查找全局文件&#xff1a;ctrlshiftF 或double shift&#xff08;按两下&#xff09;或ctrlshiftN替换当前文件内容 &#xff1a;ctrlR 如上图片 你想通过编辑器快速的将所有的’29’&#xff0c;变为29&#xff0c;你可以 ctrl…

IDEA全局替换

在做项目时&#xff0c;有时会在整个项目里或指定文件夹下进行全局搜索和替换&#xff0c;这是一个很方便功能。使用方法如下&#xff1a; 一、全局搜索 1、使用快捷键CtrlShiftF打开搜索窗口&#xff0c;或者通过点击Edit–>Find–>Find in path打开搜索窗口&#xff0…

idea 替换

idea 替换功能说明 快捷键&#xff1a; ctrl R界面说明

idea全局查找和替换

原文 https://blog.csdn.net/fanrenxiang/article/details/80168215 全局查找 通过快捷键 CtrlShiftf 快速进入全局查找页面&#xff0c;或者通过 Edit 》Find 》Find In Path 1、你要检索的内容; 2、如何匹配内容&#xff0c;分别表示 区分大小写、单个单词、正则、过滤查找…

JDK更换IDEA如何修改

一、.打开idea设置。 1、点击file里面的settings... 二、取消默认javac编译 2、然后点开 Build, Execution, Deployment找到里面的compiler&#xff0c;再点开Java compiler 取消勾选。点击&#x1f197; 如图演示&#xff1a; 三、检查项目jdk配置 3、点击file里面的…

总结idea全局搜索和替换的快捷键

文章目录 1. 文章引言2. 按文本内容查找(替换)3. 按文件名称搜索文件4. 查看类的继承关系5. 查找类(方法)在哪使用6. 搜索任何东西7. 按名字搜索类 1. 文章引言 我们在使用idea做java开发时&#xff0c;通常会执行如下查找&#xff1a; 在页面内查找或替换 全局查找或替换 按…

IDEA代码替换

IDEA代码替换 快捷键 当前文件内容 C t r l R CtrlR CtrlR 全局替换 C t r l S h i f t R CtrlShiftR CtrlShiftR 使用 第一行输入栏&#xff1a;输入被替换内容 第二行输入栏&#xff1a;输入替换内容 详细使用 第一行输入栏后第一个图标&#xff1a;换行 第一行输…

IntelliJ IDEA中怎么全局搜索替换(整个项目)(Eclipse)

IntelliJ IDEA使用教程 &#xff08;总目录篇&#xff09; 我们用Eclipse或者IntelliJ IDEA编程&#xff0c;有时候需要将整个项目的某个字符串替换成其他的。 全局搜索我会&#xff0c;我还给调成ctrlg了呢&#xff0c;但是遇到要全局(整个项目)替换字符串。哎哟&#xff0c…

idea利用正则表达式快速替换

idea利用正则表达式快速替换 需求&#xff1a;要将link标签中的href属性替换成用thymeleaf表达的形式&#xff0c;让其资源寻址是从从项目名开始寻找的 如将<link rel"stylesheet" href"assets/css/bootstrap.min.css"> 换成<link rel"styl…

intellij idea全局查找和替换

点击这里查看 <intellij idea使用教程汇总篇> 全局查找 通过快捷键 CtrlShiftf 快速进入全局查找页面&#xff0c;或者通过 Edit 》Find 》Find In Path 1、你要检索的内容; 2、如何匹配内容&#xff0c;分别表示 区分大小写、单个单词、正则、过滤查找文件; 3、查找…

IDEA全局替换快捷键

Intellij IDEA使用教程相关系列 目录 Intellij IDEA 强大的搜索能力 IDEA全局替换 通过快捷键 CtrlShiftr 或这点击 Edit 》Find 》Replace In Path 有些IDEA版本按了快捷键也没效果&#xff0c;经常遇到的原因如下&#xff1a; 1、与其他软件快捷键冲突了&#xff0c;特别是…

idea替换所有文件中的内容

通过快捷键CtrlShiftR打开窗口&#xff0c;或者通过点击Edit–>Find–>Replace in path打开窗口。如下图&#xff1a; 输入被替换和要替换的内容后&#xff0c;点击replace all即可替换全部。

IDEA 批量修改变量名、批量替换代码快捷键

平时学习过程中经常遇到一些变量名需要依照规范来修改的&#xff0c;这里记录Idea批量修改变量名&#xff0c;或者批量修改代码的快捷键&#xff0c;希望对大家有所帮助吧。1. 使用Shift F6批量修改变量名 选中要修改的变量名按ShiftF6&#xff0c;然后就可以直接编辑 编辑完…

idea的替换快捷键

替换&#xff1a;有全局替换 和 当前文件内容替换 全局&#xff1a;ctrl shift r: 在路径中替换,指的是在选定的目录下或者类包下,查找要被替换的字符。 当前文件内容&#xff1a;ctrl r: 当前文件内容替换,指的是在当前打开的文件中替换匹配的字符,只操作一个文件。 一、…