ActiveMQ:消息中心基本介绍

article/2025/9/1 12:48:39

Redis其实也可以做消息队列,但是更多的企业选择了ActiveMQ,为什么,因为Redis的消息队列比较简单,无法做到像ActiveMQ,那样做做到点对点的消息订阅与发送

首先是哪些情况需要用到消息中心?

1.需要解耦出来的业务

比如淘宝中业务的处理就是使用发布/监听的方式,此处不展开,后面会有详细说明

2.耗时比较久的业务:MQ

在这里插入图片描述

比如订单服务,整套订单流水很长,而RPC调用(比如Dubbo)是同步请求的,在发出请求的时候,客户端就要TM一直等订单系统回应结果!

在这个过程中,在等的过程中就要占用服务器的CPU和内存资源,这还不算最糟糕的情况,如果遇到高并发的情况下,有些订单延迟过高,用户会以为出问题了,会反复提交订单,造成状况进一步恶化,和资源的进一步被占用

在这里插入图片描述

此时如果采用消息中心进行通讯,那么客户端可以不用去管后台的情况,直接通过消息中心返回的消息,用户拿到消息后,认为OK,但是实际上请求仍然在后台排队等待处理

优点:消息中心避免了同步请求带来的问题,既前台应用根本不用等到后台Service非要将业务处理完毕才返回请求,也同时解决了高并发产生的问题;

PS:但是如果订单建立失败,则会涉及到分布式事务,这个后面解释

简单总结:

对于链条比较长且还是高并发的事务可以采用消息中心来处理,这样比RPC同步调用效率会高出不少

3.存在高并发的业务:MQ + Redis

比如常见的秒杀抢购
在这里插入图片描述

假如没有消息中心,采用RPC进行通讯,那么常规的方法可以加乐观锁解决高并发 + 万能的验证码削个峰

另一种方案就是用队列,就是将所有的请求全部放在队列中,这样就可以不用处理高并发产生的问题

在这里插入图片描述

但是这样做,虽然解决了高并发的问题,仍然有其他的问题,问题就是在于客户太多,请求会很多,比如10W个请求,如果商品有有限(比如1),如何解决超卖的问题?

如果要解决超卖的问题,那么我这边的客户端就一定要读取到商品的总数量

这里会出现的问题就是,A客户端从数据库读取到数据怎么能保证不是脏数据?

要做秒杀系统的另一个关键就是:读操作必须是原子性的,不然没有办法解决超卖的问题!

解决的思路就是利用Redis的读写操作原子性的特点来进行改造

在这里插入图片描述

具体步骤是:

1.到达了商品抢购的时间,把抢购的商品的数量通过数据库读取到Redis中

2.用户点击商品抢购的按钮,在controller中,使用Redis的decr,让商品的库存数做减法操作,并且接收到减减之后的结果,判断该结果是否大于等于0 (0~99,合计100)

3.如果不是大于等于0,提示用户,商品抢购完毕,并且让抢购的活动结束

4.如果是大于等于0,说明该商品还有库存可以抢,发送关于商品抢购的消息到消息中心

5.商品系统以订阅的方式获取消息中心的消息,最终修改数据库

使用消息中心的大致思路已经明白了,那么我们接下就来看看具体消息中心指的是什么?

什么是消息中心?

消息中心

1.消息异步接收:消息发送者不需要等待接受者的响应,提高整个应用的效率

2.消息可靠接收:消息发送出去以后保存在一个中间容器中,只有消息的接受者收到消息后才能删除消息

3.消息队列接收:消息以队列的形式接收,一个一个排队处理

比较流行的消息中心:

收费:IBM MQService,BEA WebLogic,Oracle MQ

开源:
ActiveMQ(老牌),
RabbitMQ(老牌,Apeach出品),
Kafka(性能很高,单台就能够处理百万级别的并发,一般用在大数据日志的收集),
RocketMQ(阿里开源中间件)

收费:阿里云GTS分布式事务

Kafka例外,并没有实现JMS,原因它并不是Java语言开发的

JMS

JMS就是Java消息服务(Java Message Servcie)应用程序接口,是JavaEE平台关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送信息,进行异步他通信,可以类比为JDBC

JMS规范

JMS定义了Java中间件中访问消息中心的接口,并没有给予实现,实现JMS接口的消息中心成为JMS Provider,例如ActiveMQ

假如想更换消息中心,只需要更换底层的JAR包即可,无需修改代码

关于JMS的几个概念

JMS Provider:实现了JMS接口和规范的消息中心,比如ActiveMQ

JSM Producer: 消息生产者,创建和发送JMS消息的客户端应用

JMS Consumer:消息消费者,接收和处理JSM消息的客户端应用

JMS Message :JMS 消息,分3个部分

1)消息头:每个消息头字段都有相应的getter和setter方法
2)消息属性:如果需要消除消息字段以外的值,那么可以使用消息属性
3)消息体:封装具体的消息数据

5.JMS Destination:消息的目的地,包含queue和topic

目的地通常是一串字符串,比如XX要去北京,XX的目的地就是“beijing”,在消息中心就是"message-order"等形式表型

6.JMS Domain :消息传递域,JMS规范定义了两种消息传递域,分别是点对点和发布/订阅传递域

p2p与queue

1)点对点:point to point 简称ptp 或者 p2p 消息传递域,该消息传递域发送的消息地称之为队列(queue)

队列queue特点:

A.每个消息只能有一个消费者或者只能有一种消费者

(为什么称之为一种,后面会有解释)

在这里插入图片描述

B.消息的生产者和消费者没有时间上的相关性,消费者在提取消息的时候,消息的生产者是否处于运行状态,消费者还是可以去提取消息

或者说消费者所在服务器挂了,但重启后消费者仍然能获取到消息

这个就是queue下生产者和消费者没有时间上的相关性的含义

在这里插入图片描述

pub/sub与Topic

publish/subscribe 消息传递域,该消息传递域发送的目的地成为主题(topic)

A.每个消息可以有多个消费者

B.生产者和消费者之间有时间上的相关性,订阅一个主体的消费者只能消费自它订阅之后的消息

简单总结:
简单的来说,
pub/sub可以有N多个消费者,pub/sub发布消息相当于广播,有N个就发送N条消息,

不像p2p只能针对一个或者一种消费者一次性只能发一条消息

pub/sub的消费者和订阅者有时间上的相关性,即消费者挂了,生产者在消费者重启的时候发的消息消费者就拿不到了

而p2p在生产者或者消费者挂了后重启,消费者仍然能拿到消息

JMS Session

与JMS Provider 所建立的会话,可设置事务,消息消费签收方式

这个签名方式就是:

设置事务签名:消费者去拿消息,除非等到该消息被提交之后,消息中心才会删除消息

自定义手动签名:可以设置手动签收,手动签收后才消息中心才会删除消息

ActiveMQ

是Apache推出的,一款开源的,完全支持JMS1.1和JavaEE1.4规范的JMS Provider实现的消息中心(MOM)

ActiveMQ能干什么: 最主要的是用来帮助实现高可用,高性能,可伸缩,易用和安全的消息服务系统

Active MQ 特点:

1.完全支持JMS1.1和JavaEE1.4规范(持久化)

2.支持多种传送协议:TCP,UDP,SSL,NIO

3.可拔插的体系结构,可灵活定制,如:集群,负载均衡,消息存储方式,安全管理等

4.很容易和系统集成使用(和spring整合)

5.多语言使用:Java,C,C++,Ruby,PHP,Python

6.在设计上保证了高性能的集群

7.支持通过JDBC把消息持久化到数据库

安装与启动

到ActiveMQ下载即可,但是注意目前不要下载5.15.x版以后的,原因后面解释

http://activemq.apache.org/download-archives.html

下载后

tar -zxvf xxx-acivemq

mv xxx-activemq /usr/local/activemq 即可

ActiveMQ的运行也是十分方便,直接进入active_home/bin

运行./activemq start 即可

在这里插入图片描述

查看是否成功运行 ps -ef | grep activemq

也许你没有运行成功,但是后面会有相关的解决方案

查看配置

这里介绍 ActiveMQ有哪些配置起作用

进入activemq_home/conf下

vi activemq.xml

在这里插入图片描述

进入activemq.xml后可以进行具体的设置

设置ActiveMQ的监听端口,默认为61616

访问地址是:localhost:8161/admin 这些都可以配置的,包括访问的协议等,如下图

在这里插入图片描述
查看jetty.xml
在这里插入图片描述

进入jetty中可以查看管理端的配置

设置ActiveMQ的管理端口,默认为: 8161

或者设置管理端的用户名和密码,默认为admin,admin

想要修改的可以去查询其他攻略这里不展开

解决无法启动和无法访问的问题

  • 防火墙的问题

启动管理端,前提一定要将防火墙开放对应的端口

如果是阿里云ECS一定要重新配置安全组规则,增加对应的端口开放

  • JDK版本问题

注意启动前提之二:必须JDK1.8,假如是1.7将无法启动

在这里插入图片描述
将JDK更换为1.8后

ps -ef | grep activemq
在这里插入图片描述

Java中ActiveMQ的使用

  • 下载Java组件

下载的时候注意,一定是Broker不是Core

并且Java组件版本一定要与自己用的activemq版本一一对应,这就是前面为什么要求一定要下载5.15.X的版本的原因,因为之后的Java组件根本没有对应的版本!

这里我选用的是:5.15.3版本的
在这里插入图片描述

ActiveMQ启动

启动后进入Queue,相关界面如下

在这里插入图片描述

现在开始做Produce和Consumer来对MQ进行操作

DEMO演示-producer

首先是producer

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQTopic;
import javax.jms.*;public class MessageProducer {public static void main(String[] args) throws Exception {//创建连接工厂ActiveMQConnectionFactory factory =new ActiveMQConnectionFactory("tcp://192.168.230.130:61616");//从连接工厂创建连接Connection connection =factory.createConnection();//开启连接connection.start();//这里说明一下,true代表开启事务,一但开启事务,后面的session就必须要提交//不然activeMQ会认为producer这边出了问题,不认生产的消息//至于10是乱填的,因为一但确定事务开启之后,后面填什么都是一样的Session session = connection.createSession(true, 10);//创建消息发送的地点,地点有两种:p2p和topicDestination destination = session.createQueue("message-test");//创建消息发送者javax.jms.MessageProducer producer = session.createProducer(destination);//创建消息对象TextMessage message = session.createTextMessage("测试消息中心");//发送消息producer.send(message);//提交事务session.commit();//关闭会话session.close();//关闭连接connection.close();}
}

执行一次生产消息,结果

在这里插入图片描述

DEMO演示-消费者consumer

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQTopic;
import javax.jms.*;public class MessageConsumer {public static void main(String[] args) throws Exception {//创建连接工厂ActiveMQConnectionFactory factory =new ActiveMQConnectionFactory("tcp://192.168.230.130:61616");//从连接工厂创建连接Connection connection =factory.createConnection();//开启连接connection.start();//不然无法提交事务Session session = connection.createSession(true, Session.SESSION_TRANSACTED);//创建消息发送的地点,地点有两种:p2p和topic//此处要去消费消息,地点一定要相同Destination destination = session.createQueue("message-test");//创建消息的消费者javax.jms.MessageConsumer consumer = session.createConsumer(destination);//接收消息TextMessage receive = (TextMessage) consumer.receive();//获取消息内容String text = receive.getText();System.out.println(text);session.commit();//关闭会话session.close();//关闭连接connection.close();}
}

最后的结果:

在这里插入图片描述

假如消费者将事务提交的语句删除了

 session.commit();

虽然消费者能够拿到消息,但是由于消费者没有提交事务,ActiveMQ会认为Counsumer的消息处理出现了异常,所以并不会删除消息

在这里插入图片描述

当Consumer这边的事务提交以后,这边的消息中心才最终删除了消息

在这里插入图片描述

ActiveMQ消息的持久化

消息中心消息的两种种模式:

一、事务模式

	static final int SESSION_TRANSACTED = 0;//当事务提交为true时,就为此值,但此值在代码是被忽略的,所以true后,填何种int值都无所谓

二、非事务模式

   static final int AUTO_ACKNOWLEDGE = 1;  消息自动签收static final int CLIENT_ACKNOWLEDGE = 2;  客户端必须调用acknowledge签收static final int DUPS_OK_ACKNOWLEDGE = 3; 不签收

持久化消息类型

此处我们使用Windows版本的ActiveMQ来模仿

持久化消息:
使用:DeliveryMode.PERSISTENT
默认就是持久化
特点:发送的消息会持久化到硬盘

非持久化消息
使用:DeliveryMode.NON_PERSISTENT
方法:producer.setPriority(DeliveryMode.NON_PERSISTENT);
特点:发送的消息会保存到内存

具体的配置:

在这里插入图片描述
找到data目录
在这里插入图片描述

在这里插入图片描述
简单的来说就是将内存中的数据通过IO写到硬盘中去

假如将该目录删除,那么重启activeMQ的时候,所有的数据都会丢失

另一种是LevelDB,谷歌开发的数据库,写入效率高

另一种方式写入到关系型数据库中:

在这里插入图片描述

消息消费者的问题

原先的模式,receive()方法,一次性只能拿一条

要反复调用receive()方法,就很烦,虽然我们要换种方法来,就是采用监听器的方法来做消息的处理

先看效果:
在这里插入图片描述
启动带有监听器的消费者:

在这里插入图片描述

而这种方式就是企业中最常见的业务处理方法,即调用监听器来处理业务

具体实现方式是:

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQTopic;
import javax.jms.*;public class MessageConsumer {public static void main(String[] args) throws Exception {//创建连接工厂ActiveMQConnectionFactory factory =new ActiveMQConnectionFactory("tcp://192.168.230.130:61616");//从连接工厂创建连接Connection connection =factory.createConnection();//开启连接connection.start();//不然无法提交事务Session session = connection.createSession(true, Session.SESSION_TRANSACTED);//创建消息发送的地点,地点有两种:p2p和topic//此处要去消费消息,地点一定要相同Destination destination = session.createQueue("message-test");//创建消息的消费者javax.jms.MessageConsumer consumer = session.createConsumer(destination);//接收消息//TextMessage receive = (TextMessage) consumer.receive();//获取消息内容//String text = receive.getText();//System.out.println(text);//session.commit();consumer.setMessageListener(new MessageListener() {//使用匿名内部类的方法来实现该接口public void onMessage(Message message) {TextMessage textMessage = (TextMessage) message;//获取消息String text = null;try {text = textMessage.getText();System.out.println(text);} catch (JMSException e) {e.printStackTrace();}}});//关闭会话//session.close();//关闭连接//connection.close();}
}

消息的传递域: Queue

特点是:

一次性只能由一个消费者或一种消费者接受到消息

为什么?因为在有的场景中,有些业务必须而且只能由一个Service来接收,比如订单服务

但是如果有多个相同的消费者在监听的时候,那么必然是轮着来的

所以这个就是一种消费者的含义

为什么?因为采用的时候负载均衡用的

并且没有时间上的关联性,就算消费者挂掉了,后面消费者重启后,也能拿到消息

消息的传递域: Topic

时间的关联性的解释:

消费者,只能在订阅的时候才能拿到消息,如果消费者意外挂了,那么它将不能在挂了之后重启拿到消息


http://chatgpt.dhexx.cn/article/fKSe0NRQ.shtml

相关文章

业务消息中心系统设计与实现(一)

目录 解决问题场景? 那么这款内部业务消息中心需要满足哪些功能呢? 哈喽小伙伴,我是kilde,和有需要的小伙伴分享一个业务消息中心的设计思想与实现,喜欢的小伙伴可以点赞关注博主,觉得有用的也可以打赏博主哦,也希望这个博客能给各位有需要的小伙伴或多或少解决一些场景问题…

仿微博消息中心的系统设计与实现

最近在实现一个类似于微博、网易云的消息中心模块。主要实现的功能是,将系统中的点赞、评论、等消息做汇合。今天跟大家分享下,我们的设计和实现思路。 首先说明,我们目前是微服务的架构。所以本篇文章中对于消息中心的设计也是建立在微服务的…

消息中心构架设计说明书

目录 1 1. 文档介绍.... 4 1.1 文档目的... 4 1.2 文档范围... 4 1.3 读者对象... 4 1.4 参考文献... 4 1.5 术语与缩写解释... 4 2 系统概述.... 5 3 设计约束.... 6 4 设计策略.... 7 5 系统总体结构.... 8 6 …

04_消息中心(MessageCenter)

一介绍 该消息中心是基于委托和事件(观察者模式)设计的,是 MxFramework框架 的一个子模块。 主要是为了处理消息的收发。 二、消息处理中心(代码如下) /**** * Title: MXFramework* 主题: 消息中心* …

实际项目中的消息中心

前一篇文章讲到我们项目的工作流,这一篇我们扒一扒项目中的消息中心,msgcenter。消息可以分成很多种消息:留存可重复查看的DB消息,短暂保存在redis的comet消息,短信形式的msg消息,推送到手机的push消息等等…

消息中心

1 系统结构 消息中心体系结构如下图所示: 图中红色线表示移动消息的推送路径。 此结构适用于企业消息中心,也适用于平台,消息推送代理的消息推送服务接口(Web Service)可以作为开放服务。 本地服务器是消息源。…

消息中心(系统消息)实现

需求 用户能即时的收到来自系统或者其他用户发来的消息,在web界面右下角弹窗提醒,用户还能标记消息是否已阅状态。 即时通讯 概念:即时通讯(实时通信,Instant Messaging,简称IM)是一个实时通…

消息中心设计

1 参考文档 产品参考:消息通知系统设计 | 人人都是产品经理 (woshipm.com) 2 消息中心目标职责 消息中心仅作为消息发送使用,跟业务没有任何关系,涉及到业务部分有业务系统自行处理;消息中心的功能只有消息生产、消息展示、消息推…

如何设计一个消息中心

如今的内容型产品,不管提供的是什么类型的内容,在其主功能之外,不可避免的会有另一个十分重要的功能——消息中心。 而无论是信息流、论坛、信箱,还是私聊、群聊、通知,推拉模型是内容型(包括:社…

聊聊消息中心的设计与实现逻辑

厌烦被消息打扰,又怕突然间的安静; 一、业务背景 微服务的架构体系中,会存在很多基础服务,提供一些大部分服务都可能需要的能力,比如文件管理、MQ队列、缓存机制、消息中心等等,这些服务需要提供各种可以复…

4. 消息中心的设计与实现

消息中心的设计与实现 一、引言 运用场景: 1、消息的主动提醒(客户端被动接收) 2、客户模块(及时通讯) 3、单一登录(一个账号只能在一个设备登录) 消息中心的实现方案: 1、客户端轮…

mysql格式化数字去掉千分位

前言 使用format格式化数字时,超过1000就会有千分位,但有时我们不希望有这个千分位,那怎么去掉呢? 1. select format(11111.123,2) 2. select convert(11111.123,decimal(12,2))

upper mysql_MySQL函数

MySQL函数 Lower 转换小写 upper 转换大写 substr 取子串(substr(被截取的字符串,起始下标,截取的长度)) length 取长度 trim 去空格 str_to_date 将字符串转换成日期 date_format 格式化日期 format 设置千分位 round 四舍五入 rand() 生成随机数 Ifnull 可以将null转换成一个…

MySql FORMAT 去掉千位分隔符,

加上墨西哥的地区参数即可 SELECT FORMAT(35555566.8, 2, es_MX) AS value; 结果

MySQL知识总结

目录 知识点条件查询排序常见单行处理函数(可嵌套)多行处理函数分组查询distinct去重 连接查询⭐⭐⭐⭐⭐内连接外连接多表连接(两张表以上) 子查询where子句中的子查询from 子句中的子查询select后面出现的子查询 union合并查询结…

MySQL知识点

总结汇总MySQL数据库面试题(2020最新版)_ThinkWon的博客-CSDN博客_mysql数据库面试题 1. 索引 (1)主键索引 唯一非空,属于聚簇索引 (2)唯一索引 unique 可为空(多个null也可) (3&…

mysql 处理金额_MYSQL处理金额相关函数

1.FORMAT()数字千分位分割 FORMAT(X,D) 1.X需要格式化的数字 2.D保留小数位数 例:SELECT FORMAT(12334555.213,2) 2.ABS() 求绝对值 ABS(X) SELECT ABS(-23); SELECT ABS(21-23); SELECT ABS(23); 3.四舍五入保留小数 ROUND(X,D) 1.X需要格式化的数字 2.D保留小数位数(不写时默…

C语言字符数组的输入和输出

字符数组的输入输出有两种方法&#xff1a; &#xff08;1&#xff09;逐个字符输入输出。用格式符“%c”输入或输出一个字符。例如 int main() {char c[6]; //定义一个字符串for (int i 0; i < 5; i){scanf("%c", &c[i]); //输入字符串}for (int i 0; …

C语言 | 字符数组

C语言字符数组的定义 字符数组是用来存放字符数据的数组&#xff0c;字符数组中的一个元素存放一个字符&#xff0c;定义字符数组的方法和定义数值型数组的方法类似。 //例子&#xff1a;char character[10];C语言字符数组的初始化 C语言对字符数组初始化&#xff0c;最容易理解…

C语言,字符数组与字符串

文章目录 字符数组基本介绍字符串注意事项字符串的访问和遍历字符串的表示形式用字符数组存放一个字符串&#xff1a; 用字符数组存放一个字符串,用字符指针指向一个字符串使用字符指针变量和字符数组两种方法表示字符串的讨论 字符串相关函数常用字符串函数一览字符串函数应用…