JMS,ActiveMQ,Solace和RxJava记录

article/2025/9/24 18:26:01

目录

JMS

ActiveMQ

用Java代码实现收发消息

1. 使用JMS方式发送接收消息

​编辑

2. 在SpringBoot中使用ActiveMQ

Solace

RxJava


除了本人另外一篇博客的 Kafka 记录(https://blog.csdn.net/Beth_Chan/article/details/111189133)外,其他MQ实现还有 RabbitMQ,RocketMQ,ActiveMQ,IBM Websphere MQ,Solace等

公司 Java ETL 除了Active MQ 还使用了 Solace(https://solace.com,是一个 Event-driven Pub/Sub)和 IBM Websphere MQ。

下面简单介绍一下 JMS,Solace,ActiveMQ,JMS集成Spring的应用,IBM MQ详见另一篇文章(https://blog.csdn.net/Beth_Chan/article/details/124698408)。

JMS

JMS是一个Java标准,定义了使用消息代理(message broker)的通用API,最早于2001年提出。长期以来,JMS一直是实现异步消息的首选方案。在JMS出现之前,每个消息代理都有私有的API,这就使得不同代理之间的消息代码很难通用。但是借助JMS,所有遵从规范的实现都使用通用的接口,这就好像JDBC为数据库操作提供了通用的接口一样。JMS是由标准Java规范定义的,所以它得到了众多代理实现的支持,在Java中实现消息时它是常见的可选方案。但是JMS有一些缺点,尤其是作为Java规范,它只能用在Java应用中。RabbitMQ和Kafka等较新的消息传递方案克服了这些缺点,可以用于JVM之外的其他语言和平台。

JMS编码总体架构:

Spring对JMS的支持,包括JmsTemplate和消息驱动POJO。在发送和接收消息之前,首先需要一个消息代理(broker),它能够在消息的生产者和消费者之间传递消息。

需要使用ActiveMQ特定的属性:

使用了一个名为spring.activemq.broker-url的属性来指定代理的地址。URL应该是“tcp://”协议的地址。ActiveMQ的JMS默认端口是61616,管理控制台默认端口是8161。如果是在本地开发运行,那么你都不需要配置这些属性。但是,如果你选择使用ActiveMQ,需要将spring.activemq.in-memory属性设置为false,防止Spring启动内存中运行的代理。内存中运行的代理看起来很有用,但是只有同一个应用发布和消费消息时才能使用它。

启动是官网下载后,解压,进入到对应bin下,./activemq start

使用JmsTemplate发送消息

将JmsTemplate注入到其他bean中,并使用它来发送和接收消息。JmsTemplate实际上只有两大类方法,send()和convertAndSend(),每个方法都有重载形式以支持不同的参数。

  • 3个send()方法都需要MessageCreator来生成Message对象。
  • 3个convertAndSend()方法会接受Object对象,并且会在幕后自动将Object转换为Message。
  • 3个convertAndSend()会自动将Object转换为Message,但同时还能接受一个MessagePostProcessor对象,用来在发送之前对Message进行自定义。

这3种方法分类都分别包含3个重载方法,它们的区别在于如何指定JMS的目的地(队列或主题)。

  • 有1个方法不接受目的地参数,它会将消息发送至默认的目的地。
  • 有1个方法接受Destination对象,该对象指定了消息的目的地。
  • 有1个方法接受String,它通过名字的形式指定了消息的目的地。

接收JMS消息

在消费消息的时候,我们可以选择拉取模式(pull model)和推送模式(pushmodel),前者会在我们的代码中请求消息并一直等待直到消息到达为止,而后者则会在消息可用的时候自动在你的代码中执行。JmsTemplate提供了多种方式来接收消息,但它们使用的都是拉取模式。我们可以调用其中的某个方法来请求消息,而线程会一直阻塞到一个消息抵达为止(这可能马上发生,也可能需要等待一会儿)。另外,我们也可以使用推送模式,在这种情况下,我们会定义一个消息监听器,每当有消息可用时,它就会被调用。这两种方案能够适用于各种用户场景。人们普遍觉得推送模式是更好的方案,因为它不会阻塞线程;但是,在某些场景下,如果消息抵达的速度太快,那么监听器可能会过载。而拉取模式允许消费者声明它们何时才为接收新消息做好准备。

JmsTemplate提供了多个对代理的拉取方法,简直就是JmsTemplate中send()和convertAndSend()方法的镜像。receive()方法接收原始的Message,而receiveAndConvert()则会使用一个配置好的消息转换器将消息转换成领域对象。对于其中的每种方法,我们都可以指定Destination或者包含目的地名称的String值,否则,我们将会从默认目的地拉取消息。

如何通过声明JMS监听器来实现推送模式

要创建能够对JMS消息做出反应的消息监听器,我们需要为组件中的某个方法添加@JmsListener注解。

ActiveMQ

在公司谷歌云的实验中,跟其他组里的人通过MQ处理数据,使用了MQ。为了在本地方便快速测试,采取在本地安装并使用 ActiveMQ。

ActiveMQ 官网下载:

https://mirrors.bfsu.edu.cn/apache/activemq/5.15.14/

Mac下载tar.gz包解压后,cd 到 bin 对应的文件夹中,执行 ./macosx/activemq start(Windows下载zip包解压后,也是bin里执行activemq start的操作)

用户名和密码均为 admin(如果想修改用户名和密码的话,在conf/jetty-realm.properties中修改即可),登录成功后:

  • Queues是队列方式消息
  • Topics是主题方式消息
  • Subscribers消息订阅监控查询
  • Connections可以查看链接数,分别可以查看xmpp、ssl、stomp、openwire、ws和网络链接
  • Network是网络链接数监控
  • Scheduler是定时消息投递
  • Send可以发送消息数据

ActiveMQ:

发送者发送消息到消息服务器,消息服务器将消息存放在队列/主题中。

1. 点对点方式(point-to-point): Destination叫队列;队列生产者,队列消费者

点对点的消息发送方式主要建立在 Message Queue,Sender,reciever上,Message Queue 存贮消息,Sender 发送消息,Receive接收消息。具体点就是Sender Client发送Message Queue ,而 receiver Cliernt从Queue中接收消息和”发送消息已接受”到Quere,确认消息接收。消息发送客户端与接收客户端没有时间上的依赖,发送客户端可以在任何时刻发送信息到Queue,而不需要知道接收客户端是不是在运行

2. 发布/订阅 方式(publish/subscriber Messaging): Destination叫主题;主题生产者,主题消费者

发布/订阅方式用于多接收客户端的方式.作为发布订阅的方式,可能存在多个接收客户端,并且接收端客户端与发送客户端存在时间上的依赖。一个接收端只能接收他创建以后发送客户端发送的信息。作为subscriber ,在接收消息时有两种方法,destination的receive方法,和实现message listener 接口的onMessage 方法。

用Java代码实现收发消息

1. 使用JMS方式发送接收消息

发送方

import org.apache.activemq.ActiveMQConnectionFactory;import javax.jms.*;public class JmsProducer {private static final String BROKEN_URL = "tcp://192.168.0.107:61616";private static final String QUEUE_NAME = "beth-queue";public static void main(String[] args) throws JMSException {ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(BROKEN_URL);Connection connection = activeMQConnectionFactory.createConnection();connection.start();Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);Queue queue = session.createQueue(QUEUE_NAME);MessageProducer messageProducer = session.createProducer(queue);// 使用MessageProducer生产3条消息发送到MQ的队列里for (int i = 1; i <= 3; i++) {TextMessage textMessage = session.createTextMessage("msg ------- " + i);messageProducer.send(textMessage);}messageProducer.close();session.close();connection.close();System.out.println("消息发布到MQ完成");}
}

运行输出:

控制台:

接收方

import org.apache.activemq.ActiveMQConnectionFactory;import javax.jms.*;public class JmsConsumer {private static final String BROKEN_URL = "tcp://192.168.0.107:61616";private static final String QUEUE_NAME = "beth-queue";public static void main(String[] args) throws JMSException {ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(BROKEN_URL);Connection connection = activeMQConnectionFactory.createConnection();connection.start();Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);Queue queue = session.createQueue(QUEUE_NAME);MessageConsumer messageConsumer = session.createConsumer(queue);// 使用MessageProducer生产3条消息发送到MQ的队列里while(true) {// receive可以加timeoutTextMessage textMessage = (TextMessage)messageConsumer.receive();if (textMessage != null) {System.out.println("消费者接收到消息:" + textMessage.getText());} else {break;}}messageConsumer.close();session.close();connection.close();System.out.println("消息发布到MQ完成");}
}

运行输出:

控制台:

也可以通过监听Listener方式消费:

import org.apache.activemq.ActiveMQConnectionFactory;import javax.jms.*;
import java.io.IOException;public class JmsListenerConsumer {private static final String BROKEN_URL = "tcp://192.168.0.107:61616";private static final String QUEUE_NAME = "beth-queue";public static void main(String[] args) throws JMSException, IOException {ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(BROKEN_URL);Connection connection = activeMQConnectionFactory.createConnection();connection.start();Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);Queue queue = session.createQueue(QUEUE_NAME);MessageConsumer messageConsumer = session.createConsumer(queue);messageConsumer.setMessageListener(new MessageListener() {public void onMessage(Message message) {if (message != null && message instanceof TextMessage) {TextMessage textMessage = (TextMessage) message;try {System.out.println(textMessage.getText());} catch (JMSException e) {e.printStackTrace();}}}});System.in.read();messageConsumer.close();session.close();connection.close();}
}

运行输出:

Topic类的就可以把:

Queue queue = session.createQueue(QUEUE_NAME);

改成以下即可

Topic topic = session.createTopic(TOPIC_NAME);

发送方也是对应从Queue改成Topic:

2. 在SpringBoot中使用ActiveMQ

pom.xml

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-activemq</artifactId>
</dependency>

application.yml (注意broker-url里ip必须改成你自己对应的ip)

server:port: 9090
spring:activemq:broker-url: tcp://192.168.0.107:61616user: adminpassword: admin
queueName: beth-boot-queue

发送方:

@Component
@EnableJms
public class ActiveMQConfig {@Value("${queueName}")private String queueName;@Beanpublic Queue queue(){return new ActiveMQQueue(queueName);}
}
@Component
// @EnableScheduling
public class QueueProducer {@Autowiredprivate JmsMessagingTemplate jmsMessagingTemplate;@Autowiredprivate Queue queue;// @Scheduled(fixedDelay = 3000)//每3s执行1次,将消息放入队列内public void produceMsg() {this.jmsMessagingTemplate.convertAndSend(queue, UUID.randomUUID().toString().substring(0,6));}}
@SpringBootApplication
// @EnableScheduling
public class MainAppProducer {public static void main(String[] args) {SpringApplication.run(MainAppProducer.class, args);}
}

运行单元测试:

package com.beth.springbootactivemq;import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import org.springframework.test.context.web.WebAppConfiguration;import javax.annotation.Resource;@SpringBootTest(classes = MainAppProducer.class)
@RunWith(SpringJUnit4ClassRunner.class)
@WebAppConfiguration
public class TestActiveMQ {@Resourceprivate QueueProducer queueProducer;@Testpublic void testSend() throws Exception {queueProducer.produceMsg();}
}

testSend run一下就会有一条message,或者打开启动类和produceMsg方法的Schedule相关注解运行,就可以每3s自动发送一条msg:

接收方:

@Component
public class QueueConsumer {@JmsListener(destination = "${queueName}")public void receive(TextMessage textMessage) throws JMSException {System.out.println("消费者接收到消息:"+textMessage.getText());}
}

启动运行后的输出:

Topic的话,同样的代码,只需把 XXXQueue 相关的类都换成 XXXTopic 即可。

Solace

  • soladm

其他team的人会把源文件会发送至Solace,solace.provider=smf://<Server Host name>,Server Host name可以有不同的Management Host name,比如Server Host是XXXprodXXX,Management Host可以有XXXprod01,XXXprod02;soladm 工具 里的连接信息需要填Management Host,Managerment Port是默认的80,User Name(个人账号必须要有Admin权限)和账号密码。solace.principal是<acurrent_user>@<Message VPN name>

  • JMQXplorer

公司电脑使用的客户端连接工具是 JMQXplorer,连接MQ时需要输入对应Host,Port,可查看Queue详细情况等; Java ETL 项目代码中的配置文件有MQ的相关配置,包括MQhost,MQport,MQmgr,MQtimeout,Queue的名字。

RxJava

RxJava文档和教程 · ReactiveX文档中文翻译


http://chatgpt.dhexx.cn/article/2uJPWbml.shtml

相关文章

“去中心化”和“分布式”的区别

区块链对于很多人来说&#xff0c;是一个概念性的、未来的事物&#xff0c;经常可以听到区块链有着“分布式、去中心化、可信任、匿名性、信息不可逆”等特点&#xff0c;这些特点看起来相互关联&#xff0c;又有所差异。而以太坊创始人V神近日就在推特上表示&#xff0c;尝试用…

为什么说去中心化很重要

去中心化是与中心化相对的一个概念&#xff0c;简单的来说中心化的意思&#xff0c;是中心决定节点。节点必须依赖中心&#xff0c;节点离开了中心就无法生存。去中心化恰恰相反&#xff0c;在一个分布有众多节点的系统中&#xff0c;每个节点都具有高度自治的特征&#xff0c;…

去中心化金融(DeFi)的发展历史

随着Web3.0的兴起&#xff0c;去中心化金融&#xff08;Decentralized Finance&#xff0c;DeFi&#xff09;正逐渐成为金融领域的热门话题。DeFi旨在通过区块链技术和智能合约&#xff0c;实现无需信任的金融交易和服务&#xff0c;摆脱传统金融中心化的限制。然而&#xff0c…

去中心化及其局限性

去中心化及其局限性 这张表总结了一部分新的 P2P 网络中的去中心化工具。区块链就是其中的一个&#xff01; 本次演讲我将提出三个问题&#xff1a;&#xff08;1&#xff09;去中心化是什么&#xff1f;我们真的知道答案吗&#xff1f;&#xff08;2&#xff09;我们真的想要去…

去中心化究竟是什么意思?

链接&#xff1a; 去中心化究竟是什么意思&#xff1f;怎样能真正实现去中心化&#xff1f; - 知乎https://zhuanlan.zhihu.com/p/39854232 感谢分享&#xff0c;仅供参考。

区块链去中心化和传统去中心化的区别

去中心化在我们生活中其实并不是一个新概念&#xff0c;也许你没有注意&#xff0c;但是我们生活中早已充斥着去中心化的产物。现实中的微博啊&#xff0c;社交媒体啊这些其实都是去中心化的产物。 在了解去中心化之前&#xff0c;首先我们得知道&#xff0c;什么是中心化&…

一文讲明白互联网如何去中心化

本文不是巧立名目&#xff0c;虚设概念&#xff0c;而是在汉语中找了最恰当的一个词来定义互联网的“去中心化”&#xff0c;因为现实的单调&#xff0c;在“去中心化”议题里浸淫久了会发现&#xff0c;如果目标一致&#xff0c;一切表达都会是趋同的&#xff0c;比如说有一天…

关于去中心化技术实现的意义

谈起去中心化&#xff0c;我们首先得知晓何谓中心化&#xff1f;所谓中心化就是一切以中央为转移。古代的皇权社会就是典型的中心化组织&#xff0c;天下以皇帝为权力中心&#xff0c;一切经济、文化、政治等天下大事都以皇帝为转移&#xff0c;才算合法合规&#xff0c;不然就…

去中心化模型

文章目录 前言 一、去中心化是什么&#xff1f; 二、比特币如何实现去中心化 三、去中心化优点及意义 总结 前言 比特币引用了一个去中心化的模型&#xff0c;这个模型有何意义&#xff1f; 一、去中心化是什么&#xff1f; 在说“货币”时&#xff0c;我们讨论的是数字世界…

去中心化和非去中心化的区别?

什么是中心化&#xff1f;什么是非中心&#xff1f; 中心化的话如下图&#xff1a; 中心化服务所有的请求都围绕中心节点&#xff0c;然后再进行&#xff0c;一但中心节点出现故障那就都整个服务不可用。可以说导致整体崩溃。 注&#xff1a;现在的nacos、zk、等注中心&#xf…

去中心化结构的相关理解

去中心化结构 前言一、C/S架构二、去中心化架构总结 前言 去中心化结构可以与C/S(Client/Server)架构&#xff0c;客户机与服务器这种衣服武器为中心的架构进行对比学习。 一、C/S架构 C/S架构如下图所示&#xff1a; 将整个应用托辊到云端或者租用的VPS主机上。 用户通过客…

为什么我们需要去中心化存储?

为什么我们需要去中心化存储&#xff1f; 我们的社会正处于前所未有的信息大爆炸时代&#xff0c;未来将是数据成为主要生产要素的数字时代&#xff0c;而 Web3 也不外乎于此&#xff0c;作为数据解决方案——去中心化存储&#xff0c;不仅是区块链技术的三大支柱&#xff08;计…

杂记 去中心化系统介绍

一、去中心化系统概述 去中心化系统&#xff08;Decentralized System&#xff09;是一类没有任何中央协调或管理单元的系统。换句话说&#xff0c;没有一个单一的中央服务器来协调或管理系统。与集中式系统相比&#xff0c;分散式系统既有优点也有缺点&#xff0c;因此您为系统…

什么是去中心化?

去中心化是一种现象或结构&#xff0c;其只能出现在拥有众多用户或众多节点的系统中&#xff0c;每个用户都可连接并影响其他节点。通俗地讲&#xff0c;就是每个人都是中心&#xff0c;每个人都可以连接并影响其他节点&#xff0c;这种扁平化、开源化、平等化的现象或结构&…

“去中心化”到底是什么?

“去中心化”到底是什么&#xff1f; 如今在区块链的世界中&#xff0c;当大家分析一个新项目的时候&#xff0c;必然会提问到&#xff1a;“这个项目是否去中心化&#xff1f;” “去中心化”可以说是区块链中最有代表性的一个词&#xff0c;但不可思议的是&#xff0c;也是…

应用密码学:位移密码极简(凯撒密码)

应用密码学:位移密码极简 位移密码、凯撒密码&#xff08;K3&#xff09; 目录 应用密码学:位移密码极简原理加密解密 原理 根据字母映射表将26个字母转换为整数0-25且逐一对应&#xff0c;如下表 根据映射表将明文转换为数字&#xff0c;再整体向前或向后移动固定位数&…

密码学笔记——凯撒密码

凯撒密码 简介 凯撒密码是使用的单表代换&#xff0c;相信大家在平时接触过凯撒密码的在线加密解密&#xff0c;既然接触过&#xff0c;那肯定见到过”偏移量“这个字眼&#xff0c;下面就通俗说一下&#xff1a; 凯撒密码的基本原理就是把一个字母通过移动一定的位数变成另…

python基础编程小实例4——恺撒密码

编程语言&#xff1a;python3.9 题目 恺撒密码是一种替换加密的技术&#xff0c;明文中的所有字母都在字母表上向后&#xff08;或向前&#xff09;按照一个固定数目进行偏移后被替换成密文。例如&#xff0c;当偏移量是3的时候&#xff0c;所有的字母A将被替换成D&#xff0…

使用java语言实现移位密码加密过程

使用java语言实现移位密码加密过程 一、凯撒密码(移位密码)二、运行软件三、代码1.加密2.解密3.运行四、运行结果1.加密结果2.解密结果一、凯撒密码(移位密码) 在密码学中,恺撒密码(英语:Caesar cipher),或称恺撒加密、恺撒变换、变换加密,是一种最简单且最广为人知…

恺撒加密简记

恺撒加密简记 介绍代码如下参考链接 介绍 恺撒密码&#xff08;英语&#xff1a;Caesar cipher&#xff09;&#xff0c;或称恺撒加密、恺撒变换、变换加密&#xff0c;是一种最简单且最广为人知的加密技术。它是一种替换加密的技术&#xff0c;明文中的所有字母都在字母表上向…