RabbitMQ
- (一)、中间件简述
- 1.中间件概述
- (1).什么是中间件
- (2).为什么使用中间件?
- (3).中间件的特点
- (4).在项目中什么时候使用中间件技术 (成本!!)
- 2. 中间件技术及架构的概述
- (1).中间件全家福
- (2).学习中间件的方式和技巧
- (3).单体架构
- (4).分布式架构
- 3.基于消息中间件的分布式系统的架构
- (1).消息中间件应用的场景
- (2).常见的消息中间件
- (3).消息中间件的本质及设计
- (4).消息中间件的核心组成部分 💖
- (5).小结
- 4.消息队列协议
- (1).什么是协议
- (2).网络协议的三要素
- (3).**面试题:为什么消息中间件不直接使用 http协议**
- (4).AMQP协议
- (5).MQTT协议
- (6).OpenMessage协议 (国产)
- (7).Kafka协议
- (8).小结
- 5.消息队列持久化
- (1).持久化
- (2).常见的持久化
- 6.消息的分发策略
- (1).消息的分发策略
- (2).场景分析一
- (3).场景分析二
- (4).消息分发策略的机制和对比
- 7.消息队列高可用和高可靠
- (1).什么是高可用机制
- (2).集群模式1 - Master-slave主从共享数据的部署方式
- (3).集群模式2 - Master-slave主从同步部署方式
- (4).集群模式3 - 多主集群同步部署模式
- (5).集群模式4 - 多主集群转发部署模式
- (6).集群模式5 Master-slave与 Broker-cluster组合的方案
- (8).什么是高可靠机制
- (二)、RabbitMQ入门及安装
- 1. RabbitMQ入门及安装
- (1). 概述
- (2).下载RabbitMQ
- (3) 安装Erlang
- (4). 安装socat
- (5). 安装rabbitmq
- 2.RabbitMQ可视化界面
- (1).安装操作
- (2). 授权账号和密码
- 3. RabbitMQ之Docker安装
- (1). Dokcer安装RabbitMQ
- 4.RabbitMQ的角色分类
- (1).RabbitMQ的角色分类
- (三)、RabbitMQ入门案例
- 1. RabbitMQ入门案例 - Simple 简单模式
- (1).构建一个Maven项目
- (2).导入依赖
- (3).第一种Simple模型
- 2. 什么是AMQP
- (1). 什么是AMQP
- (2) AMQP生产者流转过程
- (3). AMQP消费者流转过程
- 3.RabbitMQ的核心组成部分
- (1).RabbitMQ的核心组成部分
- (2).RabbitMQ整体架构是什么样子的?
- (3).RabbitMQ的运行流程
- (4).RabbitMQ支持的消息模型
- 4. RabbitMQ入门案例 -Work模式
- (1).轮询分发模式 (Polling)
- (2).公平分发模式 (Fair)
- 5.RabbitMQ入门案列-Fanout (发布订阅方式-关注)
- (1).创建生产者
- (2).创建消费者
- 3.简单总结
- 6.RabbitMQ入门案列-Direct(路由-过滤)
- (1).创建生产者
- (2).创建消费者
- (3).简单总结
- 7.RabbitMQ入门案列-Topic(模糊匹配)
- (1).创建生产者
- (2).创建消费者
- (3).简单总结
- 8.RabbitMQ入门案列-Headers
- (1).创建生产者
- (2).创建消费者
- 9.RabbitMQ使用场景
- (1).解耦、销峰、异步
- (2).高内聚、低耦合
- (四)、SpringBoot整合RabbitMQ
- 1.准备工作
- (1).搭建一个SpringBoot项目
- (2).引入配置文件
- 2.订阅与发布 (Faout)
- (1).生产者
- (2).配置类
- (3).消费者
- 3.路由模式(Direct)
- (1).生产者
- (2).配置文件
- (3).消费者
- 4.主题模式 (Topic)
- (1).生产者
- (2).消费者
- (五)、RabbitMQ高级
- 1.过期时间TTL (队列)
- (1).生产者
- (2).配置文件
- (3).消费者
- 2.过期时间TTL (消息)
- (1).生产者
- (2).配置文件
- (3).消费者
- 3.死信队列 (接盘侠) DLX
- (1).概述
- (2).生产者
- (3).配置文件-(非死信配置)
- (4).配置文件-(死信配置)
- (5).消费者
- 4. 内存磁盘的监控
- (1).RabbitMQ内存警告
- (2).RabbitMQ的内存控制
- (3).RabbitMQ的内存换页
- (4).RabbitMQ的磁盘预警
- 5.集群
- (1).集群搭建
- (2).单机多实例搭建
- (3).Web监控
- (4).小结
- (六).分布式事务💘
- 1.基本概述
- (1).两阶段提交(2PC)需要数据库产商的支持,java组件有atomikos等。
- (2).补偿事务(TCC) 严选,阿里,蚂蚁金服。
- (3).本地消息表(异步确保)比如:支付宝、微信支付主动查询支付状态,对账单的形式
- (4).MQ 事务消息 异步场景,通用性较强,拓展性较高。💚
- (5).总结
- (七)、Springboot整合rabbitmq集群配置详解
- 1.引入starter
- 2.详细配置如下
- 3.Spring AMQP的主要对象
- 4.使用:
(一)、中间件简述
1.中间件概述
加入我们创建了一个队列,然后后面对这个队列进行了属性的修改,那么我们修改后的队列不会对原有的队列进行覆盖或者修改,只会报错。
(1).什么是中间件
我国企业从20世纪80年代开始就逐渐进行信息化建设,由于方法和体系的不成熟,以及企业业务的市场需求的不断变化,一个企业可能同时运行着多个不同的业务系统,这些系统可能基于不同的操作系统、不同的数据库、异构的网络环境。现在的问题是,如何把这些信息系统结合成一个有机地协同工作的整体,真正实现企业跨平台、分布式应用。中间件便是解决之道,它用自己的复杂换取了企业应用的简单。
中间件
(Middleware)是处于操作系统和应用程序之间的软件,也有人认为它应该属于操作系统中的一部分。人们在使用中间件时,往往是一组中间件集成在一起,构成一个平台(包括开发平台和运行平台),但在这组中间件中必须要有一个通信中间件,即中间件+平台+通信
,这个定义也限定了只有用于分布式系统中才能称为中间件,同时还可以把它与支撑软件和使用软件区分开来
(2).为什么使用中间件?
- 可以屏蔽底层操作系统的复杂性
使程序员面对一个简单而统一的开发环境,减少程序设计的复杂性,将注意力主要集中在我们的业务上,不必在为程序在不同的系统软件上重复移植工作
- 使开发变得更加简便,开发周期缩短,减少了系统的维护和运营成本
(3).中间件的特点
为解决分布异构问题,人们提出了中间件(middleware)的概念。中间件位于平台(硬件和操作系统)和应用之间的通用服务,如下图所示,这些服务具有标准的程序接口和协议。针对不同的操作系统和硬件平台,它们可以有符合接口的协议规范的多种实现。
也很难给中间件一个严格的定义,但中间件应具有如下的一些特点:
(1)满足大量应用的需要
(2)运行于多种硬件和 OS平台
(3)支持分布计算,提供跨网络、硬件和 OS平台的透明性的应用或服务的交互
(4)支持标准的协议 (TCP ICP协议)
(5)支持标准的接口
由于标准接口
对于可移植性和标准协议对于互操作性的重要性,中间件已成为许多标准化工作的主要部分。
简单说: 中间件有一个很大的特点: 是脱离于具体设计目标,而具备提供普遍独立功能需求的模块。
(4).在项目中什么时候使用中间件技术 (成本!!)
在项目的架构和重构中,使用任何技术和架构的改变我们都需要谨慎斟酌和思考,因为任何技术的融入和变化都可能人员,技术,和成本的增加,中间件的技术一般现在一些互联网公司或者项目中使用比较多,如果你仅仅还只是一个初创公司建议还是使用单体架构,最多加个缓存中间件即可,不要盲目追求新或者所谓的高性能
,而追求的背后一定是业务的驱动和项目的驱动,因为一旦追求就意味着你的学习成本,公司的人员结构以及服务器成本,维护和运维的成本都会增加,所以需要谨慎选择和考虑
。
但是作为一个开放人员,一定要有学习中间件技术的能力和思维,否则很容易当项目发展到一个阶段在去掌握估计或者在面试中提及,就会给自己带来不小的困扰,在当今这个时代这些技术也并不是什么新鲜的东西,如果去掌握和挖掘最关键的还是自己花时间和经历去探讨和研究。
2. 中间件技术及架构的概述
(1).中间件全家福
(2).学习中间件的方式和技巧
- 理解中间件在项目架构中的作用,以及各中间件的底层实现
- 可以使用一些类比的生活概念去理解中间件
- 使用一些流程图或者脑图的方式去梳理各个中间件在架构中的作用
- 尝试用 java技术去实现中间件的原理
- 静下来去思考中间件在项目中设计的和使用的原因
- 如果找到对应的代替总结方案
- 尝试编写博文总结类同中间件技术的对比和使用场景
- 学会查看中间件的源码以及开源项目和博文
(3).单体架构
在企业开发当中,大部分的初期架构都采用的是单体架构的模式进行架构,而这种架构的典型的特点:就是把所有的业务和模块,源代码,静态资源文件等都放在一个工程中,如果其中的一个模块升级或者迭代发生一个很小的变动都会重新编译和重新部署项目。这种这狗存在的问题是:
- 耦合度太高
- 不易维护
- 服务器的成本高
- 以及升级架构的复杂度也会增大
这样就有后续的分布式架构系统。如下
比如我们现在做的 : 村委会管理项目就是单体架构。如果哪里有错误,就需要关闭服务器重新运行和打包,然后再部署到服务器上。
(4).分布式架构
何谓分布式系统:
通俗一点:就是一个请求由服务器端的多个服务(服务或者系统)协同处理完成
和单体架构不同的是,单体架构是一个请求发起 jvm调度线程(确切的是 tomcat线程池)分配线程 Thread来处理请求直到释放,而分布式系统是:一个请求是由多个系统共同来协同完成,jvm和环境都可能是独立。如果生活中的比喻的话,单体架构就像建设一个小房子很快就能够搞定,如果你要建设一个鸟巢或者大型的建筑,你就必须是各个环节的协同和分布,这样目的也是项目发展到后期的时候要去部署和思考的问题。我们也不难看出来:分布式架构系统存在的特点和问题如下:
存在问题:
- 学习成本高,技术栈过多
- 运维成本和服务器成本增高
- 人员的成本也会增高
- 项目的负载度也会上升
- 面临的错误和容错性也会成倍增加
- 占用的服务器端口和通讯的选择的成本高
- 安全性的考虑和因素逼迫可能选择 RMI/MQ相关的服务器端通讯
好处:
- 服务系统的独立,占用的服务器资源减少和占用的硬件成本减少,确切的说是:可以合理的分配服务资源,不造成服务器资源的浪费
- 系统的独立维护和部署,耦合度降低,可插拔性
- 系统的架构和技术栈的选择可以变的灵活(而不是单纯地选择 java)
- 弹性的部署,不会造成平台因部署造成的瘫痪和停服的状态
类似于王者荣耀 不停服更新的操作
3.基于消息中间件的分布式系统的架构
从上图中可以看出来,消息中间件的是
- 利用可靠的消息传递机制进行系统和系统直接的通讯
- 通过提供消息传递和消息的派对机制,它可以在分布式系统环境下扩展进程间的通讯
(1).消息中间件应用的场景
1**. 跨系统数据传递**
2. 高并发的流量削峰(降低部署时间消耗)
3. 数据的并发和异步处理
4. 大数据分析与传递
5. 分布式事务,比如你有一个数据要进行迁移或者请求并发过多的时候,
比如你有10 W的并发请求下订单,我们可以在这些订单入库之前,我们可以把订单请求堆积到消息队列中,让它稳健可靠的入库和执行.
串行执行: 阻塞的
并行执行: 非阻塞的
(2).常见的消息中间件
ActiveMQ、RabbitMQ、Kafka、RocketMQ等
(3).消息中间件的本质及设计
它是一种接受数据、接受请求、存储数据、发送数据等功能的技术服务
MQ消息队列:负责数据的传接受,存储和传递,所以性能要高于普通服务和技术.
谁来生产消息,存储消息和消费消息呢?
(4).消息中间件的核心组成部分 💖
- 消息的协议
- 消息的持久化机制
- 消息的分发策略
- 消息的高可用,高可靠
- 消息的容错机制
(5).小结
其实不论选择单体架构还是分布式架构
都是项目开发的一个阶段,在什么阶段选择合适的架构方式,而不能盲目追求,最后造成的后果和问题都需要自己买单。但作为一个开发人员学习和探讨新的技术使我们每个程序开发者都应该去保持和思考的问题。当我们没办法去改变社会和世界的时候,我们为了生活和生存那就必须要迎合企业和市场的需求,发挥你的价值和所学的才能,创造价值和实现自我
4.消息队列协议
(1).什么是协议
我们知道消息中间件负责数据的传递、存储和分发消费三个部分,数据的存储和分发的过程中肯定要遵循某种约定成俗的规范,你是采用底层的TCP/IP,UDP协议还是其他的自己去构建等,而这些约定成俗的规范就成为: 协议。
所谓协议是指:
- 计算机底层操作系统和应用程序通讯时共同遵守的一组约定,只有遵循共同的约定和规范,系统和底层操作系统之间才能相互交流
- 和一般的网络应用程序的不同它主要负责数据的接受和传递,所以性能比较的高
- 协议对数据格式和计算机之间交换数据都必须严格遵守规范
(2).网络协议的三要素
- 语法:语法是用户数据与控制信息的结构与格式,以及数据出现的顺序
- 语义:语义是解释控制信息每个部分的意义,它规定了需要发出何种控制信息,以及完成的动作与做出什么样的响应
- 时序:时序是对事件发生顺序的详细说明
比如我 MQ发送一个信息,是以什么数据格式发送到队列中
,然后每个部分的含义是什么
,发送完毕以后的执行的动作
,以及消费者消费消息的动作
,消费完毕的相应结构和反馈是什么
,然后按照对应的执行顺序进行处理
。如果你还是不理解:大家每天都在接触的 http请求协议:
- 语法:http规定了请求报文和响应报文的格式
- 语义:客户端主动发起请求称之为请求(这是一种定义,同时你发起的是 post/get请求)
- 时序:一个请求对应一个响应(一定先有请求在有响应,这个是时序)
而消息中间件采用的并不是 http协议,而常见的消息中间件协议有有:OpenWire、AMQP、MQTT、Kafka,OpenMessage协议
(3).面试题:为什么消息中间件不直接使用 http协议
- 因为
http请求报文头和响应报文头是比较复杂
的,包含了Cookie
,数据的加密解密,窗台码,响应码等附加的功能
,但是对于一个消息而言,我们并不需要这么复杂,也没有这个必要性,它其实就是负责数据传递,存储,分发就行,一定要追求的是高性能。尽量简洁,快速 - 大部分情况下 http大部分都是短链接,在实际的交互过程中,一个请求到响应都很有可能会中断,中断以后就不会执行持久化,就会造成请求的丢失。这样就不利于消息中间件的业务场景,因为消息中间件可能是一个长期的获取信息的过程,
出现问题和故障要对数据或消息执行持久化等,目的是为了保证消息和数据的高可靠和稳健的运行。
(4).AMQP协议
AMQP:(全称:Advanced Message Queuing Protocol)是高级消息队列协议。由摩根大通集团联合其他公司共同设计。是一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。Erlang中的实现由 RabbitMQ等
特性:
- 分布式事务支持
- 消息的持久化支持
- 高性能和高可靠的消息处理优势
(5).MQTT协议
MQTT协议(Message Queueing Telemetry Transport)消息队列是 IBM开放的及时通讯协议,物联网系统架构中的重要组成部分
特点:
- 轻量
- 结构简单
- 传输快,不支持事务
- 没有持久化设计
应用场景:
- 适用于计算能力有限
- 低带宽
- 网络不稳定的场景
支持者:
(6).OpenMessage协议 (国产)
是近几年由阿里、雅虎和滴滴出行、Stremalio等公司共同参与创立的分布式信息中间件、流处理等领域的应用开发标准
特点:
- 结构简单
- 解析速度快
- 支持事务和持久化设计
(7).Kafka协议
Kafka协议是基于 TCP/IP的二进制协议。消息内部是 通过长度来分割,由一些基本数据类型组成
特点:
- 结构简单
- 解析速度快
- 无事务支持
- 有持久化设计
(8).小结
协议:实质上在 tcp/ip协议基础之上构建的一种约定俗称的规范和机制、它的主要目的可以让客户端(应用程序 java,go)进行沟通和通讯。并且这种写一下规范必须具有持久性,高可用,高可靠的性能
5.消息队列持久化
(1).持久化
简单来说就是将数据存入磁盘,而不是存在内存中随服务器重启断开而消失,使数据能够永久保存。
(2).常见的持久化
6.消息的分发策略
(1).消息的分发策略
MQ消息 队列有如下几个角色
- 生产者
- 存储消息
- 消费者
那么生产者生成消息
以后,MQ进行存储
,消费者是如何获取消息的呢?一般获取数据的方式无外乎推(push)或者拉(pull)两种方式,典型的 git就有推拉机制
,我们发送的 http (request respon)请求就是一种典型的拉取数据库数据返回的过程。而消息队列 MQ是一种推送的过程,而这些推机制会使用到很多的业务场景也有很多对应推机制策略.
(2).场景分析一
比如我在 APP上下了一个订单,我们的系统和服务很多,我们如何得知这个消息被哪个系统或者哪些服务器或者系统进行消费,那这个时候就需要一个分发的策略。这就需要消费策略。或者称之为消费的方法论
(3).场景分析二
在发送消息的过程中可能会出现异常,或者网络的抖动,故障等等因为造成消息的无法消费,比如用户在下订单,消费 MQ接受,订单系统出现故障,导致用户支付失败,那么这个时候就需要消息中间件就必须支持消息重试机制策略。也就是支持:出现问题和故障的情况下,消息不丢失还可以进行重发
(4).消息分发策略的机制和对比
发布订阅: 定于公众号
轮询分发: 不看网速的问题,完全公平的
-》抢票机制
公平分发: 会造成数据的倾斜(会收到网速的影响,能者多劳模式)
重发:假如创建了订单,但没有完成。那么就会留存数据(抢票未付钱,预留20分钟)
7.消息队列高可用和高可靠
(1).什么是高可用机制
所谓高可用:是指产品在规定的条件和规定的时刻或时间内处于可执行规定功能状态的能力
当业务量增加时,请求也过大,一台消息中间件服务器的会触及硬件(CPU,内存,磁盘)的极限,一台消息服务器你已经无法满足业务的需求,所以消息中间件必须支持集群部署,来达到高可用的目的。
(2).集群模式1 - Master-slave主从共享数据的部署方式
(3).集群模式2 - Master-slave主从同步部署方式
解释:这种模式写入消息同样在 Master主节点上,但是主节点会同步数据到 slave节点形成副本,和 zookeeper或者 redis主从机制很雷同。这样可以达到负载均衡的效果,如果消费者有多个这样就可以去不同的节点进行消费,以为消息的拷贝和同步会占用很大的带宽和网络资源。在后去的 rabbitmq中会有使用
(4).集群模式3 - 多主集群同步部署模式
(5).集群模式4 - 多主集群转发部署模式
解释:如果你插入的数据是 broker-1中国,元数据信息会存储数据的相关描述和记录存放的位置(队列)。它会对描述信息也就是元数据信息进行同步,如果消费者在 broker-2中进行消费,发现自己节点没有对应的信息,可以从对应的元数据信息中去查询,然后返回对应的消息信息,场景:比如买火车票或者黄牛买演唱会门票,比如第一个黄牛有顾客说要买的演唱会门票,但是没有但是他回去联系其他的黄牛询问,如果有就返回
(6).集群模式5 Master-slave与 Broker-cluster组合的方案
解释:实现多主多从的热备机制来完成消息的高可用以及数据的热备机制,在生产规模达到一定的阶段的时候,这种使用的频率比较高。
终归三句话:
- 要么消息共享
- 要么消息同步
- 要么元数据共享
(8).什么是高可靠机制
所谓高可靠是指:系统可以无故障持续运行,比如一个系统突然崩溃,报错,异常等等并不影响线上业务的正常运行,出错的几率极低,就称之为:高可靠
在高并发的业务场景中,如果不能保证系统的高可靠,那造成的隐患和损失是非常严重的
如何保证中间件消息的可靠性呢,可以从两个方面考虑:
- 消息的传输:通过协议来保证系统间数据解析的正确性
- 消息的存储区可靠:通过持久化来保证消息的可靠性
(二)、RabbitMQ入门及安装
1. RabbitMQ入门及安装
(1). 概述
简单概述:
RabbitMQ是一个开源的遵循 AMQP协议实现的基于 Erlang语言编写,支持多种客户端(语言),用于在分布式系统中存储消息,转发消息,具有高可用,高可扩性,易用性等特征
(2).下载RabbitMQ
- 下载地址:https://www.rabbitmq.com/download.html
- 环境准备:CentOS7.x + /Erlang
RabbitMQ是采用 Erlang语言开发的,所以系统环境必须提供 Erlang环境,第一步就是安装 Erlang
(3) 安装Erlang
安装下载
mkdir -p /home/rabbitmq
cd /home/rabbitmq
# 将安装包上传到linux系统中
erlang-22.0.7-1.el7.x86_64.rpm
rabbitmq-server-3.7.18-1.el6.noarch.rpm
# 解压erlang
rpm -Uvh erlang-22.0.7-1.el7.x86_64.rpm
yum install -y erlang
#查看版本号
erl -v
(4). 安装socat
安装下载
yum install -y socat
(5). 安装rabbitmq
安装下载
rpm -Uvh rabbitmq-server-3.7.18-1.el6.noarch.rpm
yum install rabbitmq-server -y
启动服务
# 启动服务
systemctl start rabbitmq-server
# 查看服务状态,如图
systemctl status rabbitmq-server.service
# 开机自启动
systemctl enable rabbitmq-server
# 停止服务
systemctl stop rabbitmq-server
2.RabbitMQ可视化界面
(1).安装操作
默认情况下,是没有安装web端的客户端插件,需要安装才可以生效
rabbitmq-plugins enable rabbitmq_management
利用我们的域名访问即可
说明:rabbitmq有一个默认账号和密码是:guest
默认情况只能在 localhost本计下访问,所以需要添加一个远程登录的用户
安装完毕以后,重启服务即可
systemctl restart rabbitmq-server
一定要记住,在对应服务器(阿里云,腾讯云等)的安全组中开放15672
端口
(2). 授权账号和密码
新增用户
rabbitmqctl add_user admin admin
设置用户分配操作权限
rabbitmqctl set_user_tags admin administrator
用户级别:
- administrator:可以登录控制台、查看所有信息、可以对 rabbitmq进行管理
- monitoring:监控者 登录控制台,查看所有信息
- policymaker:策略制定者 登录控制台,指定策略
- managment 普通管理员 登录控制台
为用户添加资源权限
rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"
网页登录成功
修改密码
rabbitmqctl change_password admin xxxx
3. RabbitMQ之Docker安装
(1). Dokcer安装RabbitMQ
(1). 查询rabbitmq
docker search rabbitmq(2).拉取
docker pull rabbitmq:management(3). 安装容器docker run -di --name myrabbit -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin -p 15672:15672 -p 55672:55672 -p 5672:5672 -p 21613:21613 -p 1883:1883 rabbitmq:management
(4). 启动容器
docker start 容器id
4.RabbitMQ的角色分类
层层递进----
(1).RabbitMQ的角色分类
- none:
- 不能访问management plugin (也就是不能访问rabbitMq 的web应用)
- mangement: 查看自己相关节点信息
- 列出自己可以通过AMQP登入的虚拟机
- 查看自己的虚拟机节点 virtual hosts的queues,exchaanges和bindings信息
- 查看和关闭自己的channels和connections
- 查看有关自己的虚拟机节点virtual hotos的统计信息,包括用户在这个节点 virtual hostos中的信息活动。
- Policymaker
- 包含mangement所有权限
- 查看和创建和删除自己的virtual hosts所属的polices和parmeters信息。
- Monitoring
包含management所有权限
- 罗列出所有的virtual hosts 包括不能登入的virtual hostos
- 查看其他用户的 connections和channels信息
- 查看节点级别的数据 如: clustering和memory使用情况
- 查看所有的 virtual hostos的全局统计信息
- Adminitrator
- 最高权限
- 可以创建和删除 virtuak hostos
- 可以查看,创建和删除 uers
- 查看创建 permissions
- 关闭所有用户的connections
(三)、RabbitMQ入门案例
1. RabbitMQ入门案例 - Simple 简单模式
实现步骤
- jdk1.8
- 构建一个 maven工程
- 导入 rabbitmq的 maven依赖
- 启动 rabbitmq-server服务
- 定义生产者
- 定义消费者
- 观察消息的在 rabbitmq-server服务中的进程
(1).构建一个Maven项目
(2).导入依赖
java原生依赖
<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.10.0</version>
</dependency>
(3).第一种Simple模型
在上图的模型中,有以下概念:
- 生产者,也就是要发送消息的程序
- 消费者:消息的接受者,会一直等待消息到来。
- 消息队列:图中红色部分。类似一个邮箱,可以缓存消息;生产者向其中投递消息,消费者从其中取出消息。
- 生产者
- 假如说我们没有指定交换机,那么系统就会选择默认的交换机。并且在发布信息的时候,我们的第二个参数一定要编写队列名而不是,路由Key。
- 如果我们有指定交换机的话,那么就必须只当路由Key而不是:我们的队列名。
- 接受信息的参数: 第一个是队列名、第二个是: 是否自动确认、第三个是接受监听的信息。
Products.java
package com.jsxs.rabbitmq.simple;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Producer {public static void main(String[] args) throws IOException, TimeoutException {// 所有的中间件技术都是基于tcp/ip协议基础之上的协议规范,只不过rabbitmq遵循的是 ampq 协议也就是tcp/ip是基础.//port ip// 1.创建链接工厂ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("8.130.48.9"); // ipconnectionFactory.setPort(5672); // portconnectionFactory.setUsername("admin"); //usernameconnectionFactory.setPassword("xxxxx"); // passwordconnectionFactory.setVirtualHost("/"); // 消息放在哪Connection connection = null;Channel channel = null;try {// 2: 创建连接Connection Rabbitmq为什么是基于channel去处理而不是链接? 长连接----信道channelconnection = connectionFactory.newConnection("生成者");// 3: 通过连接获取通道Channelchannel = connection.createChannel();// 4: 通过通创建交换机,声明队列,绑定关系,路由key,发送消息,和接收消息String queueName = "queue1";channel.queueDeclare(queueName, false, false, false, null);// 5: 准备消息内容String message = "Hello jsxs!!!";// 6: 发送消息给队列queue// @params1: 交换机 @params2 队列、路由key @params 消息的状态控制 @params4 消息主题// 面试题:可以存在没有交换机的队列吗?不可能,虽然没有指定交换机但是一定会存在一个默认的交换机。channel.basicPublish("", queueName, null, message.getBytes());System.out.println("消息发送成功!!!");} catch (Exception ex) {ex.printStackTrace();} finally {// 7: 关闭通道if (channel != null && channel.isOpen()) {try {channel.close();} catch (Exception ex) {ex.printStackTrace();}}// 8: 关闭连接if (connection != null && connection.isOpen()) {try {connection.close();} catch (Exception ex) {ex.printStackTrace();}}}}
}
遇见的问题:
- 假如出现slf4j的错误我们需要再导入一个包
<dependency><groupId>org.slf4j</groupId><artifactId>slf4j-simple</artifactId><version>1.7.32</version></dependency>
- 假如出现错误如下
connection error; protocol method: #method<connection.close>(reply-code=530, reply-text=N
我们需要再次打开操作,然后对其进行set permison即可
- 我们一定要打开端口号 5672 这个端口号!!!
5672: RabbitMQ的通讯端口
25672: RabbitMQ的节点间的CLI通讯端口是
15672: RabbitMQ HTTP_API的端口,管理员用户才能访问,用于管理RabbitMQ,需要启动Management插件。
1883,8883: MQTT插件启动时的端口。
61613、61614: STOMP客户端插件启用的时候的端口。
15674、15675: 基于webscoket的STOMP端口和MOTT端口
- 消费者
package com.jsxs.rabbitmq.simple;import com.rabbitmq.client.*;import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;public class Consumer {public static void main(String[] args) throws IOException, TimeoutException {// 所有的中间件技术都是基于tcp/ip协议基础之上的协议规范,只不过rabbitmq遵循的是 ampq 协议也就是tcp/ip是基础.//port ip// 1.创建链接工厂ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("8.130.48.9"); // ipconnectionFactory.setPort(5672); // portconnectionFactory.setUsername("admin"); //usernameconnectionFactory.setPassword("XXXX"); // passwordconnectionFactory.setVirtualHost("/"); // 消息放在哪// 2.创建链接 ConnectionConnection connection = connectionFactory.newConnection("生产者"); // 工厂创建链接// 3.通过链接获取通道ChannelChannel channel = connection.createChannel(); // 链接创建通道// 4.通过创建交换机,声明队列,绑定关系,路由Key,发送消息和接受消息channel.basicConsume("queue1", true, new DeliverCallback() {@Overridepublic void handle(String s, Delivery meassage) throws IOException {System.out.println(new String("收到消息是" + new String(meassage.getBody(),"UTF-8")));}}, new CancelCallback() {@Overridepublic void handle(String s) throws IOException {System.out.println("接受消息失败!!!");}});System.out.println("开始接受消费");System.in.read(); //进行阻断 阻塞// 7.关闭通道if (channel!=null&&channel.isOpen()){channel.close(); // 先关闭通道connection.close(); //再关闭链接System.out.println("消息发送成功!!!");}}
}
首先我们debug一下生产者让其过渡到MQ缓存,然后MQ缓存存放生产者的信息。然后一直等待消费者来接受。
非持久化队列: 我们消费者消除之后,那么MQ缓存中就会消除。
持久化队列:我们消费之后并不会移除队列,依然会存储。
2. 什么是AMQP
(1). 什么是AMQP
AMQP全称:Advanced Message Queuing Protocol(高级消息队列协议)。是应用层协议的一个开发标准,为面向消息的中间件设计.
(2) AMQP生产者流转过程
面试题: 为什么RabbitMQ是基于channel去处理而不是链接?
因为链接需要走的是"三次握手四次挥手"这个原理,基本效率是比较低的。所以我们使用了信道的方式对其进行运行
(3). AMQP消费者流转过程
3.RabbitMQ的核心组成部分
(1).RabbitMQ的核心组成部分
String message = "Hello jsxs!!!";// 6: 发送消息给队列queue// @params1: 交换机 @params2 队列、路由key @params 消息的状态控制 @params4 消息主题// 面试题:可以存在没有交换机的队列吗?不可能,虽然没有指定交换机但是一定会存在一个默认的交换机。channel.basicPublish("", queueName, null, message.getBytes());
核心概念:
Server: 又称Broker,接受客户端的链接,实现AMQP实体服务
,安装Rabbitma-server
Connection: 连接,应用程序与Broker的网络链接 TCP/IP/协议:“三次握手和四次挥手”
Channel: 网络通信,几乎所有的操作都在Channel中进行
,Channel是进行消息读写的通道,客户端可以建立对各Channel,每个Channel代表一个会话任务
。
Message:消息,服务于应用程序之间的传送的数据,由Properties组成,Properties可以是对消息进行修饰
,比如消息的优先级,延迟等高级特性,Body则就是消息体的内容
Virtual Host: 虚拟地址,用于进行逻辑隔离,最上面的消息路由,一个虚拟主机理由可以有若干个Exchange和Queueu,同一个虚拟机里面不能有相同名字的Exchange
.
Exchange : 交换机,接受消息,根据路由键发送消息到绑定的队列
。
Bindings: Exchange和Queue之间的虚拟链接,Bingding中可以保护多个 Routinue key
。
Routing key: 是一个路由规则
,虚拟机可以用它来确定如何路由一个特定消息。
Queue : 队列也成为Message Queue,消息队列,保存消息并将他们转发给消费者
。
在正式开发中: 我们的交换机一定要指定一个交换机,尽量不要去使用默认的交换机。
(2).RabbitMQ整体架构是什么样子的?
生产者->交互机->消息队列(MQ)->消费者
(3).RabbitMQ的运行流程
(4).RabbitMQ支持的消息模型
总共六种:
https://www.rabbitmq.com/getstarted.html
一、 简单模式
功能:一个生产者P发送消息到队列Q,一个消费者C接收
① 生产者实现思路:
创建连接工厂ConnectionFactory,设置服务地址127.0.0.1,端口号5672,设置用户名、密码、virtual host,从连接工厂中获取连接connection,使用连接创建通道channel,使用通道channel创建队列queue,使用通道channel向队列中发送消息,关闭通道和连接。
② 消费者实现思路:
创建连接工厂ConnectionFactory,设置服务地址127.0.0.1,端口号5672,设置用户名、密码、virtual host,从连接工厂中获取连接connection,使用连接创建通道channel,使用通道channel创建队列queue, 创建消费者并监听队列,从队列中读取消息。
二、 工作模式
功能:一个生产者,多个消费者,每个消费者获取到的消息唯一,多个消费者只有一个队列
任务队列:避免立即做一个资源密集型任务,必须等待它完成,而是把这个任务安排到稍后再做。我们将任务封装为消息并将其发送给队列。后台运行的工作进程将弹出任务并最终执行作业。当有多个worker同时运行时,任务将在它们之间共享。
三、 publish/subscribe发布订阅(共享资源)
功能:一个生产者发送的消息会被多个消费者获取。一个生产者、一个交换机、多个队列、多个消费者。
生产者:可以将消息发送到队列或者是交换机。
消费者:只能从队列中获取消息。
① 生产者实现思路:
创建连接工厂ConnectionFactory,设置服务地址127.0.0.1,端口号5672,设置用户名、密码、virtual host,从连接工厂中获取连接connection,使用连接创建通道channel,使用通道channel创建队列queue,使用通道channel创建交换机并指定交换机类型为fanout,使用通道向交换机发送消息,关闭通道和连接。
② 消费者实现思路:
创建连接工厂ConnectionFactory,设置服务地址127.0.0.1,端口号5672,设置用户名、密码、virtual host,从连接工厂中获取连接connection,使用连接创建通道channel,使用通道channel创建队列queue,绑定队列到交换机,设置Qos=1,创建消费者并监听队列,使用手动方式返回完成。可以有多个队列绑定到交换机,多个消费者进行监听。
四、 路由模式
该方式一个路由键对应一个消息队列,一个消息队列可以对应多个路由键,一个消息队列对应一个消费者,当一个队列下有多个消费者时,MQ采用的是轮询机制,选取一个消费者消费该队列下的消息,其他消费者则轮空。该模式给消息指明了准确的路线,告诉消息必须按照我制定的路线规则来走,适合于比较简单的场景,缺点是路由规则不够灵活。
五、 主题模式
基本思想和路由模式是一样的,只不过路由键支持模糊匹配,符号“#”匹配一个或多个词,符号“*”匹配不多不少一个词,路由规则变得灵活多变,可拓展性非常的强
应用场景:
② 单发送,单接收的应用场景
③ 多发送,单接收的应用场景(主要)
六、 RPC ->我们不常用的模式
4. RabbitMQ入门案例 -Work模式
- 消息产生者将消息放入队列消费者可以有多个,消费者 1, 消费者 2。同时监听同一个队列,消息被消费?C1 C2 共同争抢当前的消息队列内容,谁先拿到谁负责消费消息 (隐患,高并发情况下,默认会产生某一个消息被多个消费者共同使用,可以设置一个开关 (syncronize, 与同步锁的性能不一样) 保证一条消息只能被一个消费者使用)
- 应用场景:红包;大项目中的资源调度 (任务分配系统不需知道哪一个任务执行系统在空闲,直接将任务扔到消息队列中,空闲的系统自动争抢)
一个消息只能被一个消费者获取
工作队列模式的特点有三:
- 一个生产者,一个队列,多个消费者同时竞争消息
- 任务量过高时可以提高工作效率
- 消费者获得的消息是无序的
(1).轮询分发模式 (Polling)
一个消息提供者发送十条消息
package com.jsxs.rabbitmq.work.fair;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;/*** 生产者*/
public class Producer {public static void main(String[] args) {// 1.消息头ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setPort(5672);connectionFactory.setHost("8.130.48.9");connectionFactory.setUsername("admin");connectionFactory.setPassword("xxx");connectionFactory.setVirtualHost("/"); // 虚拟机Connection connection =null;Channel channel=null;try {connection= connectionFactory.newConnection(); // 连接channel=connection.createChannel(); //管道String queName="queue1";channel.queueDeclare(queName,false,false,false,null); //声明String message="工作模式";for (int i = 0; i < 10; i++) {String a=i+"";channel.basicPublish("",queName,null, a.getBytes(StandardCharsets.UTF_8));}System.out.println("信息发送通过");} catch (IOException e) {e.printStackTrace();} catch (TimeoutException e) {e.printStackTrace();} finally {// 关闭通道if (channel!=null && channel.isOpen()){try {channel.close();} catch (IOException e) {e.printStackTrace();} catch (TimeoutException e) {e.printStackTrace();}}if (connection!=null&&connection.isOpen()){try {connection.close();} catch (IOException e) {e.printStackTrace();}}}}
}
创建第一个消费者 性能比较差->(Thread.Sleep(2000))
// 保证一次只分发一次 限制发送给同一个消费者 不得超过一条消息channel.basicQos(1);
package com.jsxs.rabbitmq.work.fair;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Consumer {public static void main(String[] args) throws IOException, TimeoutException {ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("8.130.48.9");connectionFactory.setPort(5672);connectionFactory.setUsername("admin");connectionFactory.setPassword("xxx");// 连接Connection connection=connectionFactory.newConnection();Channel channel=connection.createChannel();// 保证一次只分发一次 限制发送给同一个消费者 不得超过一条消息channel.basicQos(1);// 4、监听队列,接收消息DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {// handleDelivery(消费者标识, 消息包的内容, 属性信息(生产者的发送时指定), 读取到的消息)@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("消费者获取消息:" + new String(body));// 模拟消息处理延时,加个线程睡眠时间try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}// 手动回执消息channel.basicAck(envelope.getDeliveryTag(), false);}};// basicConsume(队列名称, 是否自动确认, 回调对象)channel.basicConsume("queue1", false, defaultConsumer);}
}
创建第二个消费者-》(性能比较好 Thread.Sleep(1000))
// 保证一次只分发一次 限制发送给同一个消费者 不得超过一条消息channel.basicQos(1);
package com.jsxs.rabbitmq.work.polling;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Consumer2 {public static void main(String[] args) throws IOException, TimeoutException {ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("8.130.48.9");connectionFactory.setPort(5672);connectionFactory.setUsername("admin");connectionFactory.setPassword("xxx");connectionFactory.setVirtualHost("/");// 连接和管道的创建Connection connection=connectionFactory.newConnection();Channel channel=connection.createChannel();// 保证一次只分发一次 限制发送给同一个消费者 不得超过一条消息channel.basicQos(1);// 4、监听队列,接收消息DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {// handleDelivery(消费者标识, 消息包的内容, 属性信息(生产者的发送时指定), 读取到的消息)@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("消费者获取消息:" + new String(body));// 模拟消息处理延时,加个线程睡眠时间try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}// 手动回执消息channel.basicAck(envelope.getDeliveryTag(), false);}};// basicConsume(队列名称, 是否自动确认, 回调对象)channel.basicConsume("queue1", false, defaultConsumer);}
}
首先我们先启动生产者生产消息,通过交互机向消息队列中存储十条消息,然后我们分别开启两个消费者,进行对消息的消费。
生产者->消息
两个消费者->消费
上面的代码实现就是轮询分发的方式。现象:消费者1 处理完消息之后,消费者2 才能处理,它两这样轮着来处理消息,直到消息处理完成,这种方式叫轮询分发(round-robin),结果就是不管两个消费者谁忙,「数据总是你一个我一个」,不管消费者处理数据的性能。
假如说我们生产者在设置队列的时候进行配置的是持久化,那么我们消费者就应该在接受的时候进行设置删除消息的配置
(也就是布尔值相同)
注意:autoAck属性设置为true,表示消息自动确认。消费者在消费时消息的确认模式可以分为『自动确认和手动确认』。
// basicConsume(队列名称, 是否自动确认autoAck, 回调对象)channel.basicConsume("queue1", false, defaultConsumer);
自动确认:在队列中的消息被消费者读取之后会自动从队列中删除。不管消息是否被消费者消费成功,消息都会删除。
手动确认:当消费者读取消息后,消费端需要手动发送ACK用于确认消息已经消费成功了(也就是需要自己编写代码发送ACK确认),如果设为手动确认而没有发送ACK确认,那么消息就会一直存在队列中(前提是进行了持久化操作),后续就可能会造成消息重复消费,如果过多的消息堆积在队列中,还可能造成内存溢出,『手动确认消费者在处理完消息之后要及时发送ACK确认给队列』。
// 手动回执消息channel.basicAck(envelope.getDeliveryTag(), false);
使用轮询分发的方式会有一个明显的缺点,例如消费者1 处理数据的效率很慢,消费者2 处理数据的效率很高,正常情况下消费者2处理的数据应该多一点才对,而轮询分发则不管你的性能如何,反正就是每次处理一个消息,对于这种情况可以使用公平分发的方式来解决。
(2).公平分发模式 (Fair)
前提:
- 生产者设置一次只分发一个消息
// 保证一次只分发一次 限制发送给同一个消费者 不得超过一条消息channel.basicQos(1);
- 如果生产者设置持久化,我们要设置自动提交或者手动提交
创建消费者
// 第二个参数是否持久化,加入持久化我们要进行提交channel.queueDeclare(queName,false,false,false,null); //声明
生产者
// 保证一次只分发一次 限制发送给同一个消费者 不得超过一条消息channel.basicQos(1);
package com.jsxs.rabbitmq.work.polling;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;/*** 生产者*/
public class Producer {public static void main(String[] args) {// 1.消息头ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setPort(5672);connectionFactory.setHost("8.130.48.9");connectionFactory.setUsername("admin");connectionFactory.setPassword("xxx");connectionFactory.setVirtualHost("/"); // 虚拟机Connection connection =null;Channel channel=null;try {connection= connectionFactory.newConnection(); // 连接channel=connection.createChannel(); //管道String queName="queue1";channel.queueDeclare(queName,false,false,false,null); //声明String message="工作模式";for (int i = 0; i < 10; i++) {String a=i+"";channel.basicPublish("",queName,null, a.getBytes(StandardCharsets.UTF_8));}System.out.println("信息发送通过");} catch (IOException e) {e.printStackTrace();} catch (TimeoutException e) {e.printStackTrace();} finally {// 关闭通道if (channel!=null && channel.isOpen()){try {channel.close();} catch (IOException e) {e.printStackTrace();} catch (TimeoutException e) {e.printStackTrace();}}if (connection!=null&&connection.isOpen()){try {connection.close();} catch (IOException e) {e.printStackTrace();}}}}
}
消费者->性能不好的Thrad.Sleep(2000)
// 保证一次只分发一次 限制发送给同一个消费者 不得超过一条消息channel.basicQos(1);
package com.jsxs.rabbitmq.work.polling;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Consumer2 {public static void main(String[] args) throws IOException, TimeoutException {ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("8.130.48.9");connectionFactory.setPort(5672);connectionFactory.setUsername("admin");connectionFactory.setPassword("xxx");connectionFactory.setVirtualHost("/");// 连接和管道的创建Connection connection=connectionFactory.newConnection();Channel channel=connection.createChannel();// 保证一次只分发一次 限制发送给同一个消费者 不得超过一条消息channel.basicQos(1);// 4、监听队列,接收消息DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {// handleDelivery(消费者标识, 消息包的内容, 属性信息(生产者的发送时指定), 读取到的消息)@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("消费者获取消息:" + new String(body));// 模拟消息处理延时,加个线程睡眠时间try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}// 手动回执消息channel.basicAck(envelope.getDeliveryTag(), false);}};// basicConsume(队列名称, 是否自动确认, 回调对象)channel.basicConsume("queue1", false, defaultConsumer);}
}
性能好的->Threadd.Sleep(1000)
// 保证一次只分发一次 限制发送给同一个消费者 不得超过一条消息channel.basicQos(1);
package com.jsxs.rabbitmq.work.polling;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Consumer2 {public static void main(String[] args) throws IOException, TimeoutException {ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("8.130.48.9");connectionFactory.setPort(5672);connectionFactory.setUsername("admin");connectionFactory.setPassword("xxx");connectionFactory.setVirtualHost("/");// 连接和管道的创建Connection connection=connectionFactory.newConnection();Channel channel=connection.createChannel();// 保证一次只分发一次 限制发送给同一个消费者 不得超过一条消息channel.basicQos(1);// 4、监听队列,接收消息DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {// handleDelivery(消费者标识, 消息包的内容, 属性信息(生产者的发送时指定), 读取到的消息)@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("消费者获取消息:" + new String(body));// 模拟消息处理延时,加个线程睡眠时间try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}// 手动回执消息channel.basicAck(envelope.getDeliveryTag(), false);}};// basicConsume(队列名称, 是否自动确认, 回调对象)channel.basicConsume("queue1", false, defaultConsumer);}
}
运行结果: 我们发现会受到消费者的性能的影响
5.RabbitMQ入门案列-Fanout (发布订阅方式-关注)
发布订阅模式(Publish/Subscribe):这种模式需要涉及到交换机了,也可以称它为广播模式,消息通过交换机广播到所有与其绑定的队列中。
详细介绍:一个生产者将消息首先发送到交换机上(这里的交换机类型为fanout),然后交换机绑定到多个队列,这样每个发到fanout类型交换器的消息会被分发到所有的队列中,最后被监听该队列的消费者所接收并消费
。如下图所示:
(1).创建生产者
package com.jsxs.rabbitmq.fanout;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;public class Producer {public static void main(String[] args) throws IOException, TimeoutException {ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("8.130.48.9");connectionFactory.setPort(5672);connectionFactory.setUsername("admin");connectionFactory.setPassword("xxx");connectionFactory.setVirtualHost("/");Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel();String queueName="queue1";channel.queueDeclare(queueName,false,false,false,null);String message="xxx";//5.准备交换机String exchangeName = "fanout-exchange";//6.定义路由keyString routeKey = "";//7.指定交换机的类型String type = "fanout";for (int i = 0; i < 10; i++) {String s=i+"";channel.basicPublish(exchangeName,routeKey,null,s.getBytes(StandardCharsets.UTF_8));}// 关闭资源channel.close();connection.close();}}
(2).创建消费者
由于从这里开始涉及到交换机了,使用这里介绍一下四种交换机的类型:
- direct(直连):消息中的路由键(RoutingKey)如果和 Bingding 中的 bindingKey 完全匹配,交换器就将消息发到对应的队列中。是基于完全匹配、单播的模式。
- fanout(广播):把所有发送到fanout交换器的消息路由到所有绑定该交换器的队列中,fanout 类型转发消息是最快的。
- topic(主题):通过模式匹配的方式对消息进行路由,将路由键和某个模式进行匹配,此时队列需要绑定到一个模式上。匹配规则:
① RoutingKey 和 BindingKey 为一个 点号 ‘.’ 分隔的字符串。 比如: stock.usd.nyse;可以放任意的key在routing_key中,当然最长不能超过255 bytes。
② BindingKey可使用 * 和 # 用于做模糊匹配:*匹配一个单词,#匹配0个或者多个单词; - headers:不依赖于路由键进行匹配,是根据发送消息内容中的headers属性进行匹配,除此之外 headers 交换器和 direct 交换器完全一致,但性能差很多,目前几乎用不到了。
消费者1:
注意:在发送消息前,RabbitMQ服务器中必须的有队列,否则消息可能会丢失
,如果还涉及到交换机与队列绑定,那么就得先声明交换机、队列并且设置绑定的路由值(Routing Key),以免程序出现异常,由于本例所有的声明都是在消费者中,所以我们首先要启动消费者。如果RabbitMQ服务器中已经存在了声明的队列或者交换机,那么就不在创建,如果没有则创建相应名称的队列或者交换机。
//5.准备交换机String exchangeName = "fanout-exchange";//6.定义路由keyString routeKey = "";//7.指定交换机的类型String type = "fanout";channel.queueDeclare(queueName,false,false,false,null);//8.声明交换机channel.exchangeDeclare(exchangeName,type,true); // 交换机名字 交换机类型 是否持久化//9.绑定队列和交换机 queueBind(队列名, 交换机名, 路由key[交换机的类型为fanout ,routingKey设置为""])channel.queueBind(queueName, exchangeName, "");
package com.jsxs.rabbitmq.fanout;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** 生产者模式: 交换机类型是faount和队列名字是queue1*/
public class Consumer1 {public static void main(String[] args) throws IOException, TimeoutException {ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("8.130.48.9");connectionFactory.setPort(5672);connectionFactory.setUsername("admin");connectionFactory.setPassword("xxx");connectionFactory.setVirtualHost("/");Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel();String queueName="queue1";//5.准备交换机String exchangeName = "fanout-exchange";//6.定义路由keyString routeKey = "";//7.指定交换机的类型String type = "fanout";channel.queueDeclare(queueName,false,false,false,null);//8.声明交换机channel.exchangeDeclare(exchangeName,type,true); // 交换机名字 交换机类型 是否持久化//9.绑定队列和交换机 queueBind(队列名, 交换机名, 路由key[交换机的类型为fanout ,routingKey设置为""])channel.queueBind(queueName, exchangeName, "");//10. 监听消息DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {// handleDelivery(消费者标识, 消息包的内容, 属性信息(生产者的发送时指定), 读取到的消息)@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {//1. 获取交换机信息String exchange = envelope.getExchange();//2. 获取消息信息String s = new String(body, "UTF-8");System.out.println("交换机的名称是: "+exchange+" 消费者获取的信息是: "+s);}};//11. 获取消息channel.basicConsume(queueName,true,defaultConsumer); //队列名字,是否自动消除// 不关闭资源,让消费者一直出去读取状态}
}
消费者2
两个消费者: 除了队列名字不一样之外其他的都一样
String queueName="queue2";String exchangeName="fanout-exchange";String type="fanout";//1. 声明队列channel.queueDeclare(queueName,false,false,false,null);//2. 声明交换机channel.exchangeDeclare(exchangeName,type,true);//3.队列和交换机绑定channel.queueBind(queueName,exchangeName,"");
package com.jsxs.rabbitmq.fanout;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Consumer2 {public static void main(String[] args) throws IOException, TimeoutException {ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("8.130.48.9");connectionFactory.setPort(5672);connectionFactory.setUsername("admin");connectionFactory.setPassword("xxx");connectionFactory.setVirtualHost("/");Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel();String queueName="queue2";String exchangeName="fanout-exchange";String type="fanout";//1. 声明队列channel.queueDeclare(queueName,false,false,false,null);//2. 声明交换机channel.exchangeDeclare(exchangeName,type,true);//3.队列和交换机绑定channel.queueBind(queueName,exchangeName,"");//4.监听队列DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {// 1.获取交换机的名字String exchange = envelope.getExchange();// 2.获取信息String s = new String(body, "UTF-8");System.out.println("交换机的名字是: " + exchange + " 获取的信息是: " + s);}};//5. 接受数据String s = channel.basicConsume(queueName, true, defaultConsumer);//6. 资源不关闭}
}
我们启动生产者: 我们发现我们只是生产了十条消息,总条数确实20条,原因就是: 我们一个交换机绑定了两个队列,交换机为每一个队列都分发原本的十条消息。
我们可以通过web页面查看交换机绑定的队列
3.简单总结
发布订阅模式引入了交换机的概念,所以相对前面的类型更加灵活广泛一些。这种模式需要设置类型为fanout的交换机,并且将交换机和队列进行绑定,当消息发送到交换机后,交换机会将消息发送到所有绑定的队列,最后被监听该队列的消费者所接收并消费。发布订阅模式也可以叫广播模式,不需要RoutingKey的判断。
发布订阅模式与工作队列模式的区别:
1、work模式不用定义交换机,而发布/订阅模式需要定义交换机。
2、发布/订阅模式的生产方是面向交换机发送消息,工作队列模式的生产方是面向队列发送消息(底层使用默认交换机)。
3、发布/订阅模式需要设置队列和交换机的绑定,工作队列模式不需要设置,实际上工作队列模式会将队列绑 定到默认的交换机 。
6.RabbitMQ入门案列-Direct(路由-过滤)
路由模式(Routing)的特点:
- 该模式的交换机为direct,意思为定向发送,精准匹配。
- 队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key)
- 消息的发送方在 向 Exchange发送消息时,也必须指定消息的 RoutingKey。
- Exchange不再把消息交给每一个绑定的队列,而是根据消息的RoutingKey进行判断,只有队列的Routingkey与消息的 Routing key完全一致,才会接收到消息。
详细介绍:生产者将消息发送到direct交换器,同时生产者在发送消息的时候会指定一个路由key,而在绑定队列和交换器的时候又会指定一个路由key,那么消息只会发送到相应routing key相同的队列,然后由监听该队列的消费者进行消费消息。模型如下图所示:
(1).创建生产者
生产者
我们在指定交换机的同时我们需要指定路由Key.
package com.jsxs.rabbitmq.direct;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;/*** 生产者* 交换机要设置路由键和交换机类型*/
public class Producer {public static void main(String[] args) throws IOException, TimeoutException {ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("8.130.48.9");connectionFactory.setPort(5672);connectionFactory.setUsername("admin");connectionFactory.setPassword("xxx");connectionFactory.setVirtualHost("/");Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel();// 定义交换机的名字String exchangeName="routing_exchange";// 设置三个路由键->并发送三个信息for (int i = 0; i < 3; i++) {// 设定三个Routing keyString routingKey="";switch (i){case 0:routingKey="error"; break;case 1:routingKey="info";break;case 2:routingKey="warning";break;}// 开始发送信息String message="Hello Message!!"+routingKey;//发布消息: 交换机名字/路由键/消息属性/发送的信息channel.basicPublish(exchangeName,routingKey,null,message.getBytes(StandardCharsets.UTF_8));}channel.close();connection.close();}
}
(2).创建消费者
- 消费者1
1. 声明队列 queue1
2. 声明交换机并指定交换机的类型为: direct
3. 对列与交换机进行绑定并指定路由Key
package com.jsxs.rabbitmq.direct;import com.rabbitmq.client.*;import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;/*** 消费者--*/
public class Consumer1 {public static void main(String[] args) throws IOException, TimeoutException {ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("8.130.48.9");connectionFactory.setPort(5672);connectionFactory.setUsername("admin");connectionFactory.setPassword("xxx");connectionFactory.setVirtualHost("/");Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel();String queName="queue1";String exchangeName="routing_exchange";String exchangeType="direct";// 1.声明队列channel.queueDeclare(queName,false,false,false,null); //第一个是队列名字。持久化,// 2. 声明交换机channel.exchangeDeclare(exchangeName,exchangeType,true); //交换机名字,交换机类型,是否持久化//3.交换机绑定队列channel.queueBind(queName,exchangeName,"error"); // 队列名字 交换机名字 路由// 4.监听消息DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String exchange = envelope.getExchange(); //交换机String routingKey = envelope.getRoutingKey(); //路由String s = new String(body, "UTF-8");System.out.println("交换机是: " + exchange + " 路由是: " + routingKey + " 接受到信息是:" + s);}};//4. 接受消息channel.basicConsume(queName,true,defaultConsumer); // 队列名字/是否自动撤销///5. 不关闭资源->让其一直进行监听状态}
}
- 消费者2
1. 声明队列 queue2
2. 声明交换机并指定交换机的类型为: direct
3. 对列与交换机进行绑定并指定路由Key
package com.jsxs.rabbitmq.direct;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Consumer2 {public static void main(String[] args) throws IOException, TimeoutException {ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("8.130.48.9");connectionFactory.setPort(5672);connectionFactory.setUsername("admin");connectionFactory.setPassword("xxx");connectionFactory.setVirtualHost("/");Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel();String queueName="queue2";String exchangeName="routing_exchange";String exchangeType="direct";channel.queueDeclare(queueName,false,false,false,null);channel.exchangeDeclare(exchangeName,exchangeType,true);channel.queueBind(queueName,exchangeName,"error");channel.queueBind(queueName,exchangeName,"info");channel.queueBind(queueName,exchangeName,"warning");DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String exchange = envelope.getExchange(); //交换机String routingKey = envelope.getRoutingKey(); //路由String s = new String(body, "UTF-8");System.out.println("交换机是: " + exchange + " 路由是: " + routingKey + " 接受到信息是:" + s);}};channel.basicConsume(queueName,true,defaultConsumer);
// 不关闭资源}
}
运行结果:
队列中有四个消息
通过路由Key过滤之后
(3).简单总结
- Routing模式需要将交换机设置为Direct类型。
- Routing模式要求队列在绑定交换机时要指定routing key,当发送消息到交换机后,交换机会根据routing key将消息发送到对应的队列。
7.RabbitMQ入门案列-Topic(模糊匹配)
Topic类型与Direct相比,都是可以根据RoutingKey
把消息路由到不同的队列。但是Topic类型的Exchange可以让队列在绑定Routing key 的时候使用通配符
进行匹配,也就是模糊匹配,这样与之前的模式比起来,它更加的灵活!
Topic主题模式的Routingkey 一般都是有一个或多个单词组成,多个单词之间以”.”分割,例如: log.insert ,它的通配符规则如下:
- *:匹配不多不少恰好1个词
- #:匹配0或多个单词
简单举例:
log.*:只能匹配log.error,log.info 等
log.#:能够匹配log.insert,log.insert.abc,log.news.update.abc 等
图解:
- 红色Queue:绑定的是usa.# ,因此凡是以 usa.开头的routing key 都会被匹配到。
- 黄色Queue:绑定的是#.news ,因此凡是以 .news结尾的 routing key 都会被匹配
(1).创建生产者
package com.jsxs.rabbitmq.topics;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;/*** 生产者*/
public class Producer {public static void main(String[] args) throws IOException, TimeoutException {ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("8.130.48.9");connectionFactory.setPort(5672);connectionFactory.setUsername("admin");connectionFactory.setPassword("Lwt121788..");connectionFactory.setVirtualHost("/");Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel();String exchangeName="topic_exchange";for (int i = 0; i < 4; i++) {String routingKey="";switch (i){case 0: //假设i=0,为select消息routingKey = "log.select";break;case 1: //假设i=1,为info消息routingKey = "log.delete";break;case 2: //假设i=2,为log.news.add消息routingKey = "log.news.add";break;case 3: //假设i=3,为log.news.update消息routingKey = "log.news.update";break;}String message="Hello Message!!"+routingKey;// 交换机名字 路由 消息属性 消息channel.basicPublish(exchangeName,routingKey,null,message.getBytes(StandardCharsets.UTF_8));}channel.close();connection.close();}
}
(2).创建消费者
1.消费者1
1. 声明队列 queue1
2. 声明交换机 (name,tyepe.isConsist)
3. 队列绑定交换机并指明runtingKey
package com.jsxs.rabbitmq.topics;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Consumer1 {public static void main(String[] args) throws IOException, TimeoutException {ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("8.130.48.9");connectionFactory.setPort(5672);connectionFactory.setUsername("admin");connectionFactory.setPassword("Lwt121788..");connectionFactory.setVirtualHost("/");Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel();String queueName="queue1";String exchangeName="topic_exchange";String exchangeType="topic";channel.queueDeclare(queueName,false,false,false,null);channel.exchangeDeclare(exchangeName,exchangeType,true); // 交换机名字 交换机类型 是否持久化channel.queueBind(queueName,exchangeName,"log.*");DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String exchange = envelope.getExchange();String routingKey = envelope.getRoutingKey();String s = new String(body, "UTF-8");System.out.println("交换机是: " + exchange + " 路由是: " + routingKey + " 消息是:" + s);}};channel.basicConsume(queueName,true,defaultConsumer);//资源不关闭}
}
2.消费者2
1. 声明队列 queue1
2. 声明交换机 (name,tyepe.isConsist)
3. 队列绑定交换机并指明runtingKey
package com.jsxs.rabbitmq.topics;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Consumer2 {public static void main(String[] args) throws IOException, TimeoutException {ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("8.130.48.9");connectionFactory.setPort(5672);connectionFactory.setUsername("admin");connectionFactory.setPassword("Lwt121788..");connectionFactory.setVirtualHost("/");Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel();String queueName="queue2";String exchangeName="topic_exchange";String exchangeType="topic";channel.queueDeclare(queueName,false,false,false,null);channel.exchangeDeclare(exchangeName,exchangeType,true); // 交换机名字 交换机类型 是否持久化channel.queueBind(queueName,exchangeName,"log.#");DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String exchange = envelope.getExchange();String routingKey = envelope.getRoutingKey();String s = new String(body, "UTF-8");System.out.println("交换机是: " + exchange + " 路由是: " + routingKey + " 消息是:" + s);}};channel.basicConsume(queueName,true,defaultConsumer);//资源不关闭}
}
我们发送的消息是四条,但是我们web上是六条。原因是:我们利用模糊查询,查询到的数据是六条。所以是六条。
(3).简单总结
- Topic主题模式需要设置类型为topic的交换机,交换机和队列进行绑定,并且指定通配符方式的routing key,当发送消息到交换机后,交换机会根据routing key将消息发送到对应的队列。
- Topic主题模式可以实现 Publish/Subscribe发布与订阅模式 和 Routing路由模式 的功能;只是Topic在配置routing key 的时候可以使用通配符,所以显得更加灵活。
8.RabbitMQ入门案列-Headers
header模式与routing不同的地方在于,header模式取消routingkey,使用header中的 key/value(键值对)匹配队列。
(1).创建生产者
1.声明两个队列
2. 声明一个交换机并指定交换机的类型为 headers
3. 将队列与交换机进行绑定并 绑定hashMap
package com.jsxs.rabbitmq.header;import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Hashtable;
import java.util.concurrent.TimeoutException;public class Producer {public static void main(String[] args) throws IOException, TimeoutException {// 1.设置连接的操作ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("8.130.48.9");connectionFactory.setPort(5672);connectionFactory.setUsername("admin");connectionFactory.setPassword("xxx");connectionFactory.setVirtualHost("/");Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel();String queueName1="queue1";String queueName2="queue2";String exchangeName="exchange_header";String exchangeType="headers";// 声明队列 队列名channel.queueDeclare(queueName1,false,false,false,null);channel.queueDeclare(queueName2,false,false,false,null);// 声明交换机: 交换机的蜜罐子/交换机的类型/是否持久化channel.exchangeDeclare(exchangeName,exchangeType,true); // 类型声明为: header// 进行交换机的绑定Hashtable<String, Object> header_consumer1 = new Hashtable<>();header_consumer1.put("inform_type_consumer1","consumer1");Hashtable<String, Object> header_consumer2 = new Hashtable<>();header_consumer2.put("inform_type_consumer2","consumer2");// 参数: 队列名/交换机名/是否持久化/headerchannel.queueBind(queueName1,exchangeName,"",header_consumer1);channel.queueBind(queueName2,exchangeName,"",header_consumer2);// 发送十条消息for (int i = 0; i < 10; i++) {String message="inform to producer: "+i;Hashtable<String, Object> headers = new Hashtable<>();headers.put("inform_type_consumer1","consumer1");headers.put("inform_type_consumer2","consumer2");AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();builder.headers(headers);channel.basicPublish(exchangeName,"",builder.build(),message.getBytes(StandardCharsets.UTF_8));System.out.println("Send to email: " + message);}// 关闭资源channel.close();connection.close();}
}
(2).创建消费者
- 第一个消费者
package com.jsxs.rabbitmq.header;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.Hashtable;
import java.util.concurrent.TimeoutException;public class Consumer1 {public static void main(String[] args) throws IOException, TimeoutException {ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("8.130.48.9");connectionFactory.setPort(5672);connectionFactory.setUsername("admin");connectionFactory.setPassword("xxx");connectionFactory.setVirtualHost("/");Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel();String queueName1="queue1";String exchangeName="exchange_header";Hashtable<String, Object> headers_consumer1 = new Hashtable<>();headers_consumer1.put("inform_type_consumer1","consumer1");channel.queueBind(queueName1,exchangeName,"",headers_consumer1);channel.queueDeclare(queueName1,false,false,false,null);// 我们开始对其DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String exchange = envelope.getExchange(); // 交换机String routingKey = envelope.getRoutingKey(); //路由long deliveryTag = envelope.getDeliveryTag(); //消息Id mq在channel中用来标识消息的id,可用于确认消息已接受String s = new String(body, "UTF-8");System.out.println("交换机: "+exchange+" 路由:"+routingKey+" 消息ID "+deliveryTag+"---->"+s);}};channel.basicConsume(queueName1,true,defaultConsumer);}
}
- 第二个消费者
package com.jsxs.rabbitmq.header;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.Hashtable;
import java.util.concurrent.TimeoutException;public class Consumer2 {public static void main(String[] args) throws IOException, TimeoutException {ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("8.130.48.9");connectionFactory.setPort(5672);connectionFactory.setUsername("admin");connectionFactory.setPassword("xxx");connectionFactory.setVirtualHost("/");Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel();String queueName2="queue2";String exchangeName="exchange_header";Hashtable<String, Object> headers_consumer2 = new Hashtable<>();headers_consumer2.put("inform_type_consumer2","consumer2");channel.queueBind(queueName2,exchangeName,"",headers_consumer2);channel.queueDeclare(queueName2,false,false,false,null);// 我们开始对其DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String exchange = envelope.getExchange(); // 交换机String routingKey = envelope.getRoutingKey(); //路由long deliveryTag = envelope.getDeliveryTag(); //消息Id mq在channel中用来标识消息的id,可用于确认消息已接受String s = new String(body, "UTF-8");System.out.println("交换机: "+exchange+" 路由:"+routingKey+" 消息ID "+deliveryTag+"---->"+s);}};channel.basicConsume(queueName2,true,defaultConsumer);}
}
生产者提供20条
9.RabbitMQ使用场景
(1).解耦、销峰、异步
同步异步的问题(串行)
串行方式:将订单信息写入数据库成功后,发送注册邮件,再发送注册短信。以上三个任务全部完成后,返回给客户端。
总共开销时间-> 1+2+3+4; 串行执行
public void makeOrder(){//1.发送订单//2.发送短信服务//3.发送email服务//4.发送app服务
}
并行方式 异步线程池
并行方式:将订单信息写入数据库成功后,发送注册邮件的同时,发送注册短信。以上三个任务完成后,返回给客户端。与串行的差别是,并行的方式可以提高处理的时间。
public void test(){//异步theadpool.submit(new Callable<Object>{//1.发送短信服务})//异步theadpool.submit(new Callable<Object>{//2.})//异步theadpool.submit(new Callable<Object>{//3.})//异步theadpool.submit(new Callable<Object>{//4.})
}
存在问题
- 耦合度高
- 需要自己写线程池自己维护成本太高
- 出现了消息可能会丢失,需要你自己做消息补偿
- 如何保证消息的可靠性你自己写
- 如果服务器承载不了,你需要自己去写高可用
异步消息队列的方式
好处:
- 完全解耦,用 MQ建立桥接
- 有独立的线程池和运行模型
- 出现了消息可能会丢失,MQ有持久化功能
- 如何保证消息的可靠性,死信队列和消息转移等
- 如果服务器承载不了,你需要自己去写高可用,HA镜像模型高可用
按照以上约定,用户的响应时间相当于是订单信息写入数据库的时间,也就是50毫秒。注册邮件,发送短信写入消息队列后,直接返回,因此写入消息队列的速度很快,基本可以忽略,因此用户的响应时间可能是50毫秒。因此架构改变后,系统的吞吐量提高到每秒20QPS。比串行提高了3倍,比并行提高了两倍
- 削峰: 运行效率提高,可以处理更多的请求,减轻服务器压力。
- 解耦: 独立的线程池,灵活性更高一些。
- 异步: 异步执行
(2).高内聚、低耦合
好处:
- 完全解耦,用 MQ建立桥接
- 有独立的线程池和运行模型
- 出现了消息可能会丢失,MQ有持久化功能
- 如何保证消息的可靠性,死信队列和消息转移等
- 如果服务器承载不了,你需要自己去写高可用,HA镜像模型高可用
按照以上约定,用户的响应时间相当于是订单信息写入数据库的时间,也就是50毫秒。注册邮件,发送短信写入消息队列后,直接返回,因此写入消息队列的速度很快,基本可以忽略,因此用户的响应时间可能是50毫秒。因此架构改变后,系统的吞吐量提高到每秒20QPS。比串行提高了3倍,比并行提高了两倍.
(四)、SpringBoot整合RabbitMQ
1.准备工作
(1).搭建一个SpringBoot项目
我们选择两个依赖: 第一个是:web 和 第二个是:RabbitMQ
(2).引入配置文件
# 服务端口
server:port: 8080# RabbitMQ配置
spring:rabbitmq:host: 8.130.48.9port: 5672username: adminpassword: xxxvirtual-host: /
2.订阅与发布 (Faout)
我们客户通过下订单,然后像四个队列中发送订单信息,服务接收者分别是: 短信、邮件、SMS短信、微信提示
(1).生产者
1. 交换机的类型为""。
package com.jsxs.service;import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Service;import javax.annotation.Resource;
import java.util.UUID;/*** @Author Jsxs* @Date 2023/4/2 11:24* @PackageName:com.jsxs.service* @ClassName: OrderService* @Description: TODO* @Version 1.0*/@Service
public class OrderService {@Resource // 获取rabbitMQ的服务private RabbitTemplate rabbitTemplate;/**** @param userId* @param productID* @param num*/public void makeOrder(String userId,String productID,int num){//1. 生成订单String orderID = UUID.randomUUID().toString().replace("-","");System.out.println("订单号已经生产成功-"+orderID);//2. 设置交换机名字和路由String exchangeName="fanout_order_producer";String routineKey="";//3. 发送消息// 参数: (交换机、路由key或队列名、消息内容)rabbitTemplate.convertAndSend(exchangeName,routineKey,orderID);}
}
(2).配置类
1. 声明交换机
2. 声明四个队列
3. 将交换机与队列进行绑定
package com.jsxs.config;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** @Author Jsxs* @Date 2023/4/2 11:52* @PackageName:com.jsxs.config* @ClassName: RabbitMQConfig* @Description: TODO* @Version 1.0*/
@Configuration
public class RabbitMQConfig {// 1. 声明注册fanout模式的交换机@Beanpublic FanoutExchange fanoutExchange(){// (交换机的名字、是否持久化。是否自动删除)return new FanoutExchange("fanout_order_producer",true,false);}// 2. 声明四个队列: 短信、SMS、WeChat、@Beanpublic Queue SmsQueue(){return new Queue("sms.fanout.queue",true);}@Beanpublic Queue MessageQueue(){return new Queue("message.fanout.queue",true);}@Beanpublic Queue EmailQueue(){return new Queue("email.fanout.queue",true);}@Beanpublic Queue WeChatQueue(){return new Queue("wechat.fanout.queue",true);}// 3. 将队列与交换机进行绑定的操作@Beanpublic Binding SmsBind(){return BindingBuilder.bind(SmsQueue()).to(fanoutExchange());}@Beanpublic Binding MessageBind(){return BindingBuilder.bind(MessageQueue()).to(fanoutExchange());}@Beanpublic Binding EmailBind(){return BindingBuilder.bind(EmailQueue()).to(fanoutExchange());}@Beanpublic Binding WechatBind(){return BindingBuilder.bind(WeChatQueue()).to(fanoutExchange());}
}
(3).消费者
我们创建一个和proder同级的springboot目录
package com.jsxs.service.faout;import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;/*** @Author Jsxs* @Date 2023/4/2 13:45* @PackageName:com.jsxs.service.faout* @ClassName: EmailConsumer* @Description: TODO* @Version 1.0*/
@Service
@RabbitListener(queues = {"email.fanout.queue"}) // 这个客户端的队列是哪个?
public class EmailConsumer {@RabbitHandler // 接收到的消息放在这public void receiveMessage(String message){System.out.println("email接收到的信息是:->"+message);}
}
message
package com.jsxs.service.faout;import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;/*** @Author Jsxs* @Date 2023/4/2 13:44* @PackageName:com.jsxs.service.faout* @ClassName: MessageConsumer* @Description: TODO* @Version 1.0*/
@Service
@RabbitListener(queues = {"message.fanout.queue"}) // 这个客户端的队列是哪个?
public class MessageConsumer {@RabbitHandler // 接收到的消息放在这public void receiveMessage(String message){System.out.println("Message接收到的信息是:->"+message);}
}
sms
package com.jsxs.service.faout;import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;/*** @Author Jsxs* @Date 2023/4/2 13:44* @PackageName:com.jsxs.service.faout* @ClassName: SmsConsumer* @Description: TODO* @Version 1.0*/
@Service
@RabbitListener(queues = {"sms.fanout.queue"}) // 这个客户端的队列是哪个?
public class SmsConsumer {@RabbitHandler // 接收到的消息放在这public void receiveMessage(String message){System.out.println("sms接收到的信息是:->"+message);}
}
package com.jsxs.service.faout;import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;/*** @Author Jsxs* @Date 2023/4/2 13:44* @PackageName:com.jsxs.service.faout* @ClassName: WechatConsumer* @Description: TODO* @Version 1.0*/
@Service
@RabbitListener(queues = {"wechat.fanout.queue"}) // 这个客户端的队列是哪个?
public class WechatConsumer {@RabbitHandler // 接收到的消息放在这public void receiveMessage(String message){System.out.println("wechat接收到的信息是:->"+message);}
}
生产者为四个队列各发送1个信息
启动我们的消费者。进行接受消息。每一个客户端接受的消息都是一样的消息。
3.路由模式(Direct)
ctrl+r
: 在Idea中,我们可以选择一个变量名进行替换成另一个变量名。
(1).生产者
1. 指定我们的交换机的名字
2. 指定我们的路由Key
package com.jsxs.service;import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Service;import javax.annotation.Resource;
import java.util.UUID;/*** @Author Jsxs* @Date 2023/4/2 11:24* @PackageName:com.jsxs.service* @ClassName: OrderService* @Description: TODO : 分别给对应的路由Key发送消息.* @Version 1.0*/@Service
public class OrderService {@Resource // 获取rabbitMQ的服务private RabbitTemplate rabbitTemplate;/**** @param userId* @param productID* @param num*/public void makeOrder(String userId,String productID,int num){//1. 生成订单String orderID = UUID.randomUUID().toString().replace("-","");System.out.println("订单号已经生产成功-"+orderID);//2. 设置交换机名字和路由String exchangeName="direct_order_producer";//3. 发送消息// 参数: (交换机、路由key或队列名、消息内容)rabbitTemplate.convertAndSend(exchangeName,"sms","1");rabbitTemplate.convertAndSend(exchangeName,"message","2");rabbitTemplate.convertAndSend(exchangeName,"email","3");rabbitTemplate.convertAndSend(exchangeName,"wechat","4");}
}
(2).配置文件
1. 一个SpringBoot中假如存在多个@Configuration那么我们可以
使用@Order()->进行指定我们的优先级顺序。
2. 声明交换机的类型和名字 (DirectExchange)
3. 声明四个队列
4. 将队列与交换机进行绑定并配置路由Key
package com.jsxs.config;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.annotation.Order;/*** @Author Jsxs* @Date 2023/4/2 14:50* @PackageName:com.jsxs.config* @ClassName: DirectRabbitMQConfig* @Description: TODO : Direct 比 Foaunt 多了一个路由Key,类型在我们的SpringBoot中不用手动的进行设置,我们只需要在交换机声明的时候,进行寻找对应的类即可,* @Version 1.0*/
@Configuration
@Order(1) // 假如说存在多个配置文件,我们可以用这个注解指定优先级顺序
public class DirectRabbitMQConfig {// 1. 声明注册direct模式的交换机@Beanpublic DirectExchange directExchange(){// (交换机的名字、是否持久化。是否自动删除)return new DirectExchange("direct_order_producer",true,false);}// 2. 声明四个队列: 短信、SMS、WeChat、@Beanpublic Queue SmsQueue(){return new Queue("sms.direct.queue",true);}@Beanpublic Queue MessageQueue(){return new Queue("message.direct.queue",true);}@Beanpublic Queue EmailQueue(){return new Queue("email.direct.queue",true);}@Beanpublic Queue WeChatQueue(){return new Queue("wechat.direct.queue",true);}// 3. 将队列与交换机进行绑定的操作@Beanpublic Binding SmsBind(){return BindingBuilder.bind(SmsQueue()).to(directExchange()).with("sms");}@Beanpublic Binding MessageBind(){return BindingBuilder.bind(MessageQueue()).to(directExchange()).with("message");}@Beanpublic Binding EmailBind(){return BindingBuilder.bind(EmailQueue()).to(directExchange()).with("email");}@Beanpublic Binding WechatBind(){return BindingBuilder.bind(WeChatQueue()).to(directExchange()).with("wechat");}
}
(3).消费者
1. 设置监听的那个队列?
2. 监听到的数据,输出在哪?
package com.jsxs.service.direct;import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;/*** @Author Jsxs* @Date 2023/4/2 13:45* @PackageName:com.jsxs.service.faout* @ClassName: EmailConsumer* @Description: TODO* @Version 1.0*/
@Service
@RabbitListener(queues = {"email.direct.queue"}) // 这个客户端的队列是哪个?
public class EmailConsumer {@RabbitHandler // 接收到的消息放在这public void receiveMessage(String message){System.out.println("email接收到的信息是:->"+message);}
}
message
1. 设置监听的那个队列?
2. 监听到的数据,输出在哪?
package com.jsxs.service.direct;import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;/*** @Author Jsxs* @Date 2023/4/2 13:44* @PackageName:com.jsxs.service.faout* @ClassName: MessageConsumer* @Description: TODO* @Version 1.0*/
@Service
@RabbitListener(queues = {"message.direct.queue"}) // 这个客户端的队列是哪个?
public class MessageConsumer {@RabbitHandler // 接收到的消息放在这public void receiveMessage(String message){System.out.println("Message接收到的信息是:->"+message);}
}
sms
1. 设置监听的那个队列?
2. 监听到的数据,输出在哪?
package com.jsxs.service.direct;import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;/*** @Author Jsxs* @Date 2023/4/2 13:44* @PackageName:com.jsxs.service.faout* @ClassName: SmsConsumer* @Description: TODO* @Version 1.0*/
@Service
@RabbitListener(queues = {"sms.direct.queue"}) // 这个客户端的队列是哪个?
public class SmsConsumer {@RabbitHandler // 接收到的消息放在这public void receiveMessage(String message){System.out.println("sms接收到的信息是:->"+message);}
}
1. 设置监听的那个队列?
2. 监听到的数据,输出在哪?
package com.jsxs.service.direct;import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;/*** @Author Jsxs* @Date 2023/4/2 13:44* @PackageName:com.jsxs.service.faout* @ClassName: WechatConsumer* @Description: TODO* @Version 1.0*/
@Service
@RabbitListener(queues = {"wechat.direct.queue"}) // 这个客户端的队列是哪个?
public class WechatConsumer {@RabbitHandler // 接收到的消息放在这public void receiveMessage(String message){System.out.println("wechat接收到的信息是:->"+message);}
}
服务端先提供消息
客户端接收消息: 接受各自的消息,通过路由key进行区分的
4.主题模式 (Topic)
所有的模式都可以使用注解配置和配置类配置,这里我们用注解进行配置
(1).生产者
package com.jsxs.service;import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Service;import javax.annotation.Resource;
import java.util.UUID;/*** @Author Jsxs* @Date 2023/4/2 11:24* @PackageName:com.jsxs.service* @ClassName: OrderService* @Description: TODO : 分别给对应的路由Key发送消息.* @Version 1.0*/@Service
public class OrderService {@Resource // 获取rabbitMQ的服务private RabbitTemplate rabbitTemplate;/**** @param userId* @param productID* @param num*/public void makeOrder(String userId,String productID,int num){//1. 生成订单String orderID = UUID.randomUUID().toString().replace("-","");System.out.println("订单号已经生产成功-"+orderID);//2. 设置交换机名字和路由String exchangeName="topic_order_producer";//3. 发送消息// 参数: (交换机、路由key或队列名、消息内容)rabbitTemplate.convertAndSend(exchangeName,"sms","1");rabbitTemplate.convertAndSend(exchangeName,"sms","2");rabbitTemplate.convertAndSend(exchangeName,"sms","3");rabbitTemplate.convertAndSend(exchangeName,"sms","4");}}
(2).消费者
1. 绑定
2. 生命队列
3. 声明交换机
4. 路由key
package com.jsxs.service.topic;import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Service;/*** @Author Jsxs* @Date 2023/4/2 13:45* @PackageName:com.jsxs.service.faout* @ClassName: EmailConsumer* @Description: TODO* @Version 1.0*/
@Service
@RabbitListener(bindings = @QueueBinding(// 利用注解声明队列value = @Queue(value = "email.topic.queue",durable = "true",autoDelete = "false"),// 利用注解声明交换机exchange = @Exchange(value = "topic_order_producer",type = ExchangeTypes.TOPIC),// 路由key是 "#.sms.#"key = "#.sms.#"
))
public class EmailConsumer {@RabbitHandler // 接收到的消息放在这public void receiveMessage(String message){System.out.println("email接收到的信息是:->"+message);}
}
message
package com.jsxs.service.topic;import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Service;/*** @Author Jsxs* @Date 2023/4/2 13:44* @PackageName:com.jsxs.service.faout* @ClassName: MessageConsumer* @Description: TODO* @Version 1.0*/
@Service
@RabbitListener(bindings = @QueueBinding(// 利用注解声明队列value = @Queue(value = "message.topic.queue",durable = "true",autoDelete = "false"),// 利用注解声明交换机exchange = @Exchange(value = "topic_order_producer",type = ExchangeTypes.TOPIC),// 路由key是 "#.sms.#"key = "#.sms.#"
))
public class MessageConsumer {@RabbitHandler // 接收到的消息放在这public void receiveMessage(String message){System.out.println("Message接收到的信息是:->"+message);}
}
sms
package com.jsxs.service.topic;import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Service;/*** @Author Jsxs* @Date 2023/4/2 13:44* @PackageName:com.jsxs.service.faout* @ClassName: SmsConsumer* @Description: TODO* @Version 1.0*/
@Service
@RabbitListener(bindings = @QueueBinding(// 利用注解声明队列value = @Queue(value = "sms.topic.queue",durable = "true",autoDelete = "false"),// 利用注解声明交换机exchange = @Exchange(value = "topic_order_producer",type = ExchangeTypes.TOPIC),// 路由key是 "#.sms.#"key = "#.sms.#"
))
public class SmsConsumer {@RabbitHandler // 接收到的消息放在这public void receiveMessage(String message){System.out.println("sms接收到的信息是:->"+message);}
}
package com.jsxs.service.topic;import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Service;/*** @Author Jsxs* @Date 2023/4/2 13:44* @PackageName:com.jsxs.service.faout* @ClassName: WechatConsumer* @Description: TODO* @Version 1.0*/
@Service
// 监听的同时 我们对其进行绑定
@RabbitListener(bindings = @QueueBinding(// 利用注解声明队列value = @Queue(value = "wechat.topic.queue",durable = "true",autoDelete = "false"),// 利用注解声明交换机exchange = @Exchange(value = "topic_order_producer",type = ExchangeTypes.TOPIC),// 路由key是 "#.sms.#"key = "#.sms.#"
))
public class WechatConsumer {@RabbitHandler // 接收到的消息放在这public void receiveMessage(String message){System.out.println("wechat接收到的信息是:->"+message);}
}
我们模糊查询的是只要包含的有 sms ,就发送信息,
(五)、RabbitMQ高级
1.过期时间TTL (队列)
概述
过期时间 TTl表示可以对消息设置预期的时间,在这个时间内都可以被消费者接收获取;过了之后消息将自动被删除。RabbitMQ可以对消息和队列设置 TTL,目前有两种方法可以设置 x-message-ttl
- 第一种方法是通过
队列属性
设置,队列中所有消息都有相同的过期时间 - 第二种方法是对
消息进行
单独设置,每条消息 TTL可以不同
如果上述两种方法同时使用,则消息的过期时间以两者 TTL较小的那个数值为准。消息在队列的生存时间一旦超过设置的 TTL值,就称为 dead message被投递到死信队列,消费者将无法再收到该消息.
1. 设置队列TTL
(1).生产者
package com.jsxs.service;import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Service;import javax.annotation.Resource;
import java.util.UUID;/*** @Author Jsxs* @Date 2023/4/2 11:24* @PackageName:com.jsxs.service* @ClassName: OrderService* @Description: TODO : 分别给对应的路由Key发送消息.* @Version 1.0*/@Service
public class OrderService {@Resource // 获取rabbitMQ的服务private RabbitTemplate rabbitTemplate;/**** @param userId* @param productID* @param num*/public void makeOrder(String userId,String productID,int num){//1. 生成订单String orderID = UUID.randomUUID().toString().replace("-","");System.out.println("订单号已经生产成功-"+orderID);//2. 设置交换机名字和路由String exchangeName="ttl_order_producer";//3. 发送消息// 参数: (交换机、路由key或队列名、消息内容)rabbitTemplate.convertAndSend(exchangeName,"ttl","1");rabbitTemplate.convertAndSend(exchangeName,"ttl","2");rabbitTemplate.convertAndSend(exchangeName,"ttl","3");rabbitTemplate.convertAndSend(exchangeName,"ttl","4");}
}
(2).配置文件
1. 声明交换机: (假如交换机已经被定义了,我们通过代码对其进行修改属性,那么我们的代码一定会报错的。)
2. 我们声明队列的同时: 要通过HashMap定义他的过期时间和传参 (队列名、是否持久化、是否自动删除、是否、参数)
3. 将队列绑定我们的交换机
4. "x-message-ttl 这是key值
package com.jsxs.config;import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** @Author Jsxs* @Date 2023/4/3 16:16* @PackageName:com.jsxs.config* @ClassName: TTLRabbitMQConfig* @Description: TODO* @Version 1.0*/
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;import java.util.HashMap;@Configuration
public class TTLRabbitMQConfig {// 1. 声明注册direct模式的交换机@Beanpublic DirectExchange directExchange(){// (交换机的名字、是否持久化。是否自动删除)return new DirectExchange("ttl_order_producer",true,false);}// 2. 声明队列: 以及过期时间@Beanpublic Queue SmsQueue(){HashMap<String, Object> args = new HashMap<>();args.put("x-message-ttl",5000);return new Queue("sms.ttl.queue",true,false,false,args);}@Beanpublic Queue MessageQueue(){HashMap<String, Object> args = new HashMap<>();args.put("x-message-ttl",5000);return new Queue("message.ttl.queue",true,false,false,args);}@Beanpublic Queue EmailQueue(){HashMap<String, Object> args = new HashMap<>();args.put("x-message-ttl",5000);return new Queue("email.ttl.queue",true,false,false,args);}@Beanpublic Queue WeChatQueue(){HashMap<String, Object> args = new HashMap<>();args.put("x-message-ttl",5000);return new Queue("wechat.ttl.queue",true,false,false,args);}// 3. 将队列与交换机进行绑定的操作@Beanpublic Binding SmsBind(){return BindingBuilder.bind(SmsQueue()).to(directExchange()).with("ttl");}@Beanpublic Binding MessageBind(){return BindingBuilder.bind(MessageQueue()).to(directExchange()).with("ttl");}@Beanpublic Binding EmailBind(){return BindingBuilder.bind(EmailQueue()).to(directExchange()).with("ttl");}@Beanpublic Binding WechatBind(){return BindingBuilder.bind(WeChatQueue()).to(directExchange()).with("ttl");}
}
(3).消费者
1 .email
package com.jsxs.service.ttl;import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;/*** @Author Jsxs* @Date 2023/4/2 13:45* @PackageName:com.jsxs.service.faout* @ClassName: EmailConsumer* @Description: TODO* @Version 1.0*/
@Service
@RabbitListener(queues = {"email.ttl.queue"}) // 这个客户端的队列是哪个?
public class EmailConsumer {@RabbitHandler // 接收到的消息放在这public void receiveMessage(String message){System.out.println("email接收到的信息是:->"+message);}
}
- message
package com.jsxs.service.ttl;import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;/*** @Author Jsxs* @Date 2023/4/2 13:44* @PackageName:com.jsxs.service.faout* @ClassName: MessageConsumer* @Description: TODO* @Version 1.0*/
@Service
@RabbitListener(queues = {"message.ttl.queue"}) // 这个客户端的队列是哪个?
public class MessageConsumer {@RabbitHandler // 接收到的消息放在这public void receiveMessage(String message){System.out.println("Message接收到的信息是:->"+message);}
}
- sms
package com.jsxs.service.ttl;import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;/*** @Author Jsxs* @Date 2023/4/2 13:44* @PackageName:com.jsxs.service.faout* @ClassName: SmsConsumer* @Description: TODO* @Version 1.0*/
@Service
@RabbitListener(queues = {"sms.ttl.queue"}) // 这个客户端的队列是哪个?
public class SmsConsumer {@RabbitHandler // 接收到的消息放在这public void receiveMessage(String message){System.out.println("sms接收到的信息是:->"+message);}
}
package com.jsxs.service.ttl;import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;/*** @Author Jsxs* @Date 2023/4/2 13:44* @PackageName:com.jsxs.service.faout* @ClassName: WechatConsumer* @Description: TODO* @Version 1.0*/
@Service
@RabbitListener(queues = {"wechat.ttl.queue"}) // 这个客户端的队列是哪个?
public class WechatConsumer {@RabbitHandler // 接收到的消息放在这public void receiveMessage(String message){System.out.println("wechat接收到的信息是:->"+message);}
}
我们启动生产者之后,会在消息队列中产生消息..
如果我们消费者没有在五秒内对这个消息进行接收,那么就会自动消失
2.过期时间TTL (消息)
(1).生产者
3.给消息设置过期时间MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {message.getMessageProperties().setExpiration("5000"); // 过期时间message.getMessageProperties().setContentEncoding("UTF-8"); // 字符编码return message;}};
package com.jsxs.service;import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Service;import javax.annotation.Resource;
import java.util.UUID;/*** @Author Jsxs* @Date 2023/4/2 11:24* @PackageName:com.jsxs.service* @ClassName: OrderService* @Description: TODO : 分别给对应的路由Key发送消息.* @Version 1.0*/@Service
public class OrderService {@Resource // 获取rabbitMQ的服务private RabbitTemplate rabbitTemplate;/**** @param userId* @param productID* @param num*/public void makeOrder(String userId,String productID,int num){//1. 生成订单String orderID = UUID.randomUUID().toString().replace("-","");System.out.println("订单号已经生产成功-"+orderID);//2. 设置交换机名字和路由String exchangeName="ttl_order_producer";// 3.给消息设置过期时间MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {message.getMessageProperties().setExpiration("5000"); // 过期时间message.getMessageProperties().setContentEncoding("UTF-8"); // 字符编码return message;}};//4. 发送消息// 参数: (交换机、路由key或队列名、消息内容)rabbitTemplate.convertAndSend(exchangeName,"ttl","1",messagePostProcessor);rabbitTemplate.convertAndSend(exchangeName,"ttl","2",messagePostProcessor);rabbitTemplate.convertAndSend(exchangeName,"ttl","3",messagePostProcessor);rabbitTemplate.convertAndSend(exchangeName,"ttl","4",messagePostProcessor);}
}
(2).配置文件
我们对前两个队列进行进行队列TTL和消息TTL两个的同时设置;对后两个队列仅仅设置消息TTL。这四个队列的交换机是一致的。
package com.jsxs.config;import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** @Author Jsxs* @Date 2023/4/3 16:16* @PackageName:com.jsxs.config* @ClassName: TTLRabbitMQConfig* @Description: TODO* @Version 1.0*/
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;import java.util.HashMap;@Configuration
public class TTLRabbitMQConfig {// 1. 声明注册direct模式的交换机@Beanpublic DirectExchange directExchange(){// (交换机的名字、是否持久化。是否自动删除)return new DirectExchange("ttl_order_producer",true,false);}// 2. 声明队列: 以及过期时间@Beanpublic Queue SmsQueue(){HashMap<String, Object> args = new HashMap<>();args.put("x-message-ttl",5000);return new Queue("sms.ttl.queue",true,false,false,args);}@Beanpublic Queue MessageQueue(){HashMap<String, Object> args = new HashMap<>();args.put("x-message-ttl",5000);return new Queue("message.ttl.queue",true,false,false,args);}// ---------------上面我们同时设置队列过期时间和消息过期时间-----下面我们设置仅消息过期时间@Beanpublic Queue EmailQueue(){return new Queue("email.message.ttl.queue",true,false,false);}@Beanpublic Queue WeChatQueue(){return new Queue("wechat.message.ttl.queue",true,false,false);}// 3. 将队列与交换机进行绑定的操作@Beanpublic Binding SmsBind(){return BindingBuilder.bind(SmsQueue()).to(directExchange()).with("ttl");}@Beanpublic Binding MessageBind(){return BindingBuilder.bind(MessageQueue()).to(directExchange()).with("ttl");}@Beanpublic Binding EmailBind(){return BindingBuilder.bind(EmailQueue()).to(directExchange()).with("ttl");}@Beanpublic Binding WechatBind(){return BindingBuilder.bind(WeChatQueue()).to(directExchange()).with("ttl");}
}
(3).消费者
package com.jsxs.service.ttl;import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;/*** @Author Jsxs* @Date 2023/4/2 13:45* @PackageName:com.jsxs.service.faout* @ClassName: EmailConsumer* @Description: TODO* @Version 1.0*/
@Service
@RabbitListener(queues = {"email.message.ttl.queue"}) // 这个客户端的队列是哪个?
public class EmailConsumer {@RabbitHandler // 接收到的消息放在这public void receiveMessage(String message){System.out.println("email接收到的信息是:->"+message);}
}
- message
package com.jsxs.service.ttl;import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;/*** @Author Jsxs* @Date 2023/4/2 13:44* @PackageName:com.jsxs.service.faout* @ClassName: MessageConsumer* @Description: TODO* @Version 1.0*/
@Service
@RabbitListener(queues = {"message.ttl.queue"}) // 这个客户端的队列是哪个?
public class MessageConsumer {@RabbitHandler // 接收到的消息放在这public void receiveMessage(String message){System.out.println("Message接收到的信息是:->"+message);}
}
- sms
package com.jsxs.service.ttl;import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;/*** @Author Jsxs* @Date 2023/4/2 13:44* @PackageName:com.jsxs.service.faout* @ClassName: SmsConsumer* @Description: TODO* @Version 1.0*/
@Service
@RabbitListener(queues = {"sms.ttl.queue"}) // 这个客户端的队列是哪个?
public class SmsConsumer {@RabbitHandler // 接收到的消息放在这public void receiveMessage(String message){System.out.println("sms接收到的信息是:->"+message);}
}
- wechate
package com.jsxs.service.ttl;import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;/*** @Author Jsxs* @Date 2023/4/2 13:44* @PackageName:com.jsxs.service.faout* @ClassName: WechatConsumer* @Description: TODO* @Version 1.0*/
@Service
@RabbitListener(queues = {"wechat.message.ttl.queue"}) // 这个客户端的队列是哪个?
public class WechatConsumer {@RabbitHandler // 接收到的消息放在这public void receiveMessage(String message){System.out.println("wechat接收到的信息是:->"+message);}
}
3.死信队列 (接盘侠) DLX
(1).概述
支持队列TTL,不支持消息TTL.
DLX,全称 Dead-Letter-Exchange
,可以称之为死信交换机,也有人称之为死信邮箱。当消息在一个队列中变成死信之后,它能被重新发送到另一个交换机中,这个交换机就是 DLX,绑定 DLX的队列就称之为死信队列。消息变成死信,可能是由于以下原因:
- 消息被拒绝
- 消息过期
- 队列达到最大长度
DLX也是一个正常的交换机,和一般的交换机没有区别,它能在任何的队列上被指定,实际上就是设置某一个队列的属性,当这个队列中存在死信时,Rabbitmq就会自动地将这个消息重新发布到设置的 DLX上去,进而被路由到另一个队列,即死信队列。
要想使用死信队列,只需要在定义队列的时候设置队列参数x-dead-letter-exchange
指定交换机即可.
(2).生产者
正常生产者
这里我们只负责正常生产者: 配置我们正常的非死信路由key
package com.jsxs.service;import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Service;import javax.annotation.Resource;
import java.util.UUID;/*** @Author Jsxs* @Date 2023/4/2 11:24* @PackageName:com.jsxs.service* @ClassName: OrderService* @Description: TODO : 分别给对应的路由Key发送消息.* @Version 1.0*/@Service
public class OrderService {@Resource // 获取rabbitMQ的服务private RabbitTemplate rabbitTemplate;/**** @param userId* @param productID* @param num*/public void makeOrder(String userId,String productID,int num){//1. 生成订单String orderID = UUID.randomUUID().toString().replace("-","");System.out.println("订单号已经生产成功-"+orderID);//2. 设置交换机名字和路由String exchangeName="ttl_message_order_producer";//3. 发送消息// 参数: (交换机、路由key或队列名、消息内容)rabbitTemplate.convertAndSend(exchangeName,"one","1");rabbitTemplate.convertAndSend(exchangeName,"two","2");rabbitTemplate.convertAndSend(exchangeName,"three","3");rabbitTemplate.convertAndSend(exchangeName,"four","4");}
}
(3).配置文件-(非死信配置)
我们需要添加死信的交换机与死信的路由key。将非死信交换机与死信交换机做一个连接的操作。
package com.jsxs.config;import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** @Author Jsxs* @Date 2023/4/3 16:16* @PackageName:com.jsxs.config* @ClassName: TTLRabbitMQConfig* @Description: TODO* @Version 1.0*/
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;import java.util.HashMap;@Configuration
public class TTLRabbitMQConfig {// 1. 声明注册direct模式的交换机@Beanpublic DirectExchange directExchange(){// (交换机的名字、是否持久化。是否自动删除)return new DirectExchange("ttl_message_order_producer",true,false);}// 2. 声明队列: 以及过期时间@Beanpublic Queue SmsQueue(){HashMap<String, Object> args = new HashMap<>();// 下面的key需要向web界面去寻找....// 设置队列的过期时间args.put("x-message-ttl",5000);// 死信交换机args.put("x-dead-letter-exchange","dead_order_producer");// 死信路由keyargs.put("x-dead-letter-routing-key","dead_sms"); // fanout 不需要配置路由keyreturn new Queue("sms.ttl.queue",true,false,false,args);}@Beanpublic Queue MessageQueue(){HashMap<String, Object> args = new HashMap<>();// 下面的key需要向web界面去寻找....// 设置队列的过期时间args.put("x-message-ttl",5000);// 死信交换机args.put("x-dead-letter-exchange","dead_order_producer");// 死信路由keyargs.put("x-dead-letter-routing-key","dead_message"); // fanout 不需要配置路由keyreturn new Queue("message.ttl.queue",true,false,false,args);}// ---------------上面我们同时设置队列过期时间和消息过期时间-----下面我们设置仅消息过期时间@Beanpublic Queue EmailQueue(){HashMap<String, Object> args = new HashMap<>();// 下面的key需要向web界面去寻找....// 设置队列的过期时间args.put("x-message-ttl",5000);// 死信交换机args.put("x-dead-letter-exchange","dead_order_producer");// 死信路由keyargs.put("x-dead-letter-routing-key","dead_email"); // fanout 不需要配置路由keyreturn new Queue("email.ttl.queue",true,false,false,args);}@Beanpublic Queue WeChatQueue(){HashMap<String, Object> args = new HashMap<>();// 下面的key需要向web界面去寻找....// 设置队列的过期时间args.put("x-message-ttl",5000);// 死信交换机args.put("x-dead-letter-exchange","dead_order_producer");// 死信路由keyargs.put("x-dead-letter-routing-key","dead_wechat"); // fanout 不需要配置路由keyreturn new Queue("wechat.ttl.queue",true,false,false,args);}// 3. 将队列与交换机进行绑定的操作@Beanpublic Binding SmsBind(){return BindingBuilder.bind(SmsQueue()).to(directExchange()).with("one");}@Beanpublic Binding MessageBind(){return BindingBuilder.bind(MessageQueue()).to(directExchange()).with("two");}@Beanpublic Binding EmailBind(){return BindingBuilder.bind(EmailQueue()).to(directExchange()).with("three");}@Beanpublic Binding WechatBind(){return BindingBuilder.bind(WeChatQueue()).to(directExchange()).with("four");}
}
(4).配置文件-(死信配置)
package com.jsxs.config;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;/*** 死信队列配置*/
@Configuration
public class DeadRabbitMQConfig {// 1. 声明注册direct模式的交换机@Beanpublic DirectExchange deadDirectExchange(){// (交换机的名字、是否持久化。是否自动删除)return new DirectExchange("dead_order_producer",true,false);}// 2. 声明队列: 以及过期时间@Beanpublic Queue deadSmsQueue(){return new Queue("dead.sms.queue",true);}@Beanpublic Queue deadMessageQueue(){return new Queue("dead.message.queue",true);}@Beanpublic Queue deadEmailQueue(){return new Queue("dead.email.queue",true);}@Beanpublic Queue deadWeChatQueue(){return new Queue("dead.wechat.queue",true);}// 3. 将队列与交换机进行绑定的操作@Beanpublic Binding deadSmsBind(){return BindingBuilder.bind(deadSmsQueue()).to(deadDirectExchange()).with("dead_sms");}@Beanpublic Binding deadMessageBind(){return BindingBuilder.bind(deadMessageQueue()).to(deadDirectExchange()).with("dead_message");}@Beanpublic Binding deadEmailBind(){return BindingBuilder.bind(deadEmailQueue()).to(deadDirectExchange()).with("dead_email");}@Beanpublic Binding deadWechatBind(){return BindingBuilder.bind(deadWeChatQueue()).to(deadDirectExchange()).with("dead_wechat");}
}
(5).消费者
package com.jsxs.service.ttl;import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;/*** @Author Jsxs* @Date 2023/4/2 13:45* @PackageName:com.jsxs.service.faout* @ClassName: EmailConsumer* @Description: TODO* @Version 1.0*/
@Service
@RabbitListener(queues = {"email.ttl.queue"}) // 这个客户端的队列是哪个?
public class EmailConsumer {@RabbitHandler // 接收到的消息放在这public void receiveMessage(String message){System.out.println("email接收到的信息是:->"+message);}
}
message
package com.jsxs.service.ttl;import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;/*** @Author Jsxs* @Date 2023/4/2 13:44* @PackageName:com.jsxs.service.faout* @ClassName: MessageConsumer* @Description: TODO* @Version 1.0*/
@Service
@RabbitListener(queues = {"message.ttl.queue"}) // 这个客户端的队列是哪个?
public class MessageConsumer {@RabbitHandler // 接收到的消息放在这public void receiveMessage(String message){System.out.println("Message接收到的信息是:->"+message);}
}
sms
package com.jsxs.service.ttl;import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;/*** @Author Jsxs* @Date 2023/4/2 13:44* @PackageName:com.jsxs.service.faout* @ClassName: SmsConsumer* @Description: TODO* @Version 1.0*/
@Service
@RabbitListener(queues = {"sms.ttl.queue"}) // 这个客户端的队列是哪个?
public class SmsConsumer {@RabbitHandler // 接收到的消息放在这public void receiveMessage(String message){System.out.println("sms接收到的信息是:->"+message);}
}
package com.jsxs.service.ttl;import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;/*** @Author Jsxs* @Date 2023/4/2 13:44* @PackageName:com.jsxs.service.faout* @ClassName: WechatConsumer* @Description: TODO* @Version 1.0*/
@Service
@RabbitListener(queues = {"wechat.ttl.queue"}) // 这个客户端的队列是哪个?
public class WechatConsumer {@RabbitHandler // 接收到的消息放在这public void receiveMessage(String message){System.out.println("wechat接收到的信息是:->"+message);}
}
在我们的有效期内会在正常的交换机中
时间过期或者长度上限会进入我们的死信队列
在有效期内,我们消费者能够进行正常的消费...
4. 内存磁盘的监控
(1).RabbitMQ内存警告
当内存使用超过配置或者磁盘空间对于配置的阀值时,RabbitMQ会暂时阻塞客户端发来的消息,以此避免服务器的崩溃,客户端与服务端的心态检测机制也会失效。
(2).RabbitMQ的内存控制
参考帮助文档:https://rabbitmq.com/configure.html
当出现警告的时候,可以通过配置去修改和调整
命令的方式
下面的方式我们选择其一就行了,不是全部选举
rabbitmqctl set_vm_memory_high_watermark <fraction>
rabbitmqctl set_vm_memory_high_watermark absolute 50MB
fraction/value 为内存阈值。默认情况是:0.4/2GB,代表的含义是:当 RabbitMQ的内存超过40%时,就会产生警告并且会阻塞所有生产者的连接。通过此命令修改阈值在 Broker重启以后将会失效,通过修改配置文件设置的阈值则不会随着重启而消失,但修改了配置文件一样要重启 Broker才会生效.
配置文件Rabbitmq.conf
#默认 : /etc/rabbitmq/rabbitmq.conf ->手动安装
vm_memory_high_watermark.relative=0.4
#使用relative相对值设置fraction,建议在0.4-0.7之间
vm_memory_high_watermark.absolute=2GB
(3).RabbitMQ的内存换页
在某个Broker节点及内存阻赛生产者之前,它会尝试将队列中的消息换页到碰盘以释放内存空间,持久化和非持久化的消息都会写入磁盘中,其中持久化的消息本身就在碰盘中有一个副本,所以在转移的过程中持久化的消息会先从内存中清除掉。
默认情况下,内存到达的阔值是50%时就会换页处理。也就是说,在默认情况下该内存的闻值是0.4的情况下,当内存超过0.4*0.5=0.2时,会进行换页动作,
比如有1000MB内存,当内存的使用率达到了400MB,已经达到了极限,但是因为配置的换页内存0.5,这个时候会在达到极限400mb之前,会把内存中的200MB进行转移到磁盘中,从而达到稳健的运行,
可以通过设置 vm_memory_high_watermark_paging_ratio
来进行调整
vm_memory_high_watermark.relative=0.4
vm_memory_high_watermark_paging_ratio=0.7 (小于1)
因为我们设置1,整个电脑的内存已经全部属于我们的RabbitMQ了,所以在设置分页已经没有什么意义了。
(4).RabbitMQ的磁盘预警
当磁盘的剩余空间低于确定的成值时,RabbitMQ同样会阻塞生产者,这样可以避免因非持久化的消息持续换页而耗尽E盘空间导致服务器崩清。
默认情况下:磁盘预警为50MB的时候会进行预警。表示当前磁盘空间第50MB的时候会阳塞生产者并且停止内存消息换页到磁盘的过程。
这个闻值可以减小,但是不能完全的消除因磁盘耗尽而导致崩溃的可能性。比如在两次磁盘空间的检查空隙内,第次检查是:60MB,第二检查可能就是1MB,就会出现警告。
通过命令方式进行修改
rabbitmqctl set_disk_free_limit <disk_limit>
rabbitmqctl set_disk_free_limit memory_limit <fraction>
5.集群
RabbitMQ这款消息队列中间件产品本身是基于Erlang编写,Erlang语言天生具备分布式特性(通过同步Erlang集群各节点的magic cookie来实现)。因此,RabbitMQ天然支持CIustering。这使得RabbitMQ本身不需要像ActiveMQ、Kafka那样通过ZooKeeper分别来实现HA方案和保存集群的元数据。集群是保证可靠性的一种方式,同时可以通过水平广展以达到增加消息吞吐量能力的目的。
在实际便用过程中多采取多机多实例部署方式,为了便于同学们练习搭建,有时候你不得不在一台电脑上去搭建一个rabbitmq集群,本章主要针对单机多实例多种方式来进行开层。
(1).集群搭建
配置的前提是你的 rabbitmq可以运行起来,比如ps aux|grep rabbitmq
你能看到相关进程,又比如运行你可以看到类似如下信息而不报错:
- 查看状态
查看状态
ps aux|grep rabbitmq
正在运行中...
或者用下面的命令查看
systemctl status rabbitmq-server
2.关闭服务
因为集群不需要我们再用传统的方式去 开启服务
systemctl stop rabbitmq-server
注意:确保RabbitMQ可以运行的,确保完成之后,把单机版的RabbitMQ服务停止,后台看不到RabbitMQ的进程为止
(2).单机多实例搭建
场景: 假设有两个rabbitmq节点,分别为: rabbitmq-1、rabbitmq-2. rabbitmq-2作为从节点。
启动命令:RABBITMQ_NODE_PORT=5672 RABBITMQ_NODENAME=rabbit-1 rabbitmq-server -detached
结束命令:rabbitmqctl -n rabbit-1 stop
1、启动第一个节点rabbitmq-1
> sudo RABBITMQ_NODE_PORT=5672 RABBITMQ_NODENAME=rabbit-1 rabbitmq-server start &
...............省略...................########## Logs: /var/log/rabbitmq/rabbit-1.log###### ## /var/log/rabbitmq/rabbit-1-sasl.log##########Starting broker...completed with 7 plugins.
至此节点rabbit-1启动完成。
2、启动第二个节点rabbit-2
注意:web管理插件端口占用,所以还要指定其web插件占用的端口号
RABBITMQ_SERVER_START_ARGS=”-rabbitmq_management listener [{port,15673}]”
sudo RABBITMQ_NODE_PORT=5673 RABBITMQ_SERVER_START_ARGS="-rabbitmq_management listener [{port,15673}]" RABBITMQ_NODENAME=rabbit-2 rabbitmq-server start &
..............省略..................########## Logs: /var/log/rabbitmq/rabbit-2.log###### ## /var/log/rabbitmq/rabbit-2-sasl.log##########Starting broker...completed with 7 plugins.
至此节点rabbit-2启动完成
3、验证启动 “ps aux|grep rabbitmq”
ps aux|grep rabbitmq
4、rabbit-1操作作为主节点
#停止应用> sudo rabbitmqctl -n rabbit-1 stop_app
#目的是清除节点上的历史数据(如果不清除,无法将节点加入到集群)
> sudo rabbitmqctl -n rabbit-1 reset
#启动应用
> sudo rabbitmqctl -n rabbit-1 start_app
5、rabbit2操作为从节点
# 停止应用> sudo rabbitmqctl -n rabbit-2 stop_app
# 目的是清除节点上的历史数据(如果不清除,无法将节点加入到集群)
> sudo rabbitmqctl -n rabbit-2 reset
# 将rabbit2节点加入到rabbit1(主节点)集群当中【Server-node服务器的主机名】
> sudo rabbitmqctl -n rabbit-2 join_cluster rabbit-1@Jsxs
# 启动应用
> sudo rabbitmqctl -n rabbit-2 start_app
6、验证集群状态
> sudo rabbitmqctl cluster_status -n rabbit-1
//集群有两个节点:rabbit-1@Server-node、rabbit-2@Server-node
[{nodes,[{disc,['rabbit-1@Server-node','rabbit-2@Server-node']}]},{running_nodes,['rabbit-2@Server-node','rabbit-1@Server-node']},{cluster_name,<<"rabbit-1@Server-node.localdomain">>},{partitions,[]},{alarms,[{'rabbit-2@Server-node',[]},{'rabbit-1@Server-node',[]}]}]
(3).Web监控
默认是关闭的web界面,我们需要打开.
rabbitmq-plugins enable rabbitmq_management
注意在访问的时候:web结面的管理需要给15672 node-1 和15673的node-2 设置用户名和密码。如下:
主人认证了 从人不用认证了
# 15672 端口
rabbitmqctl -n rabbit-1 add_user admin admin
rabbitmqctl -n rabbit-1 set_user_tags admin administrator
rabbitmqctl -n rabbit-1 set_permissions -p / admin ".*" ".*" ".*"
# 15673 端口
rabbitmqctl -n rabbit-2 add_user admin admin
rabbitmqctl -n rabbit-2 set_user_tags admin administrator
rabbitmqctl -n rabbit-2 set_permissions -p / admin ".*" ".*" ".*"
我们在主机中添加一个队列,发现从机里面的也跟着被添加了。
# 停止掉 从机服务2
rabbitmqctl -n rabbit-2 stop_app
# 停止掉 主机服务1
rabbitmqctl -n rabbit-1 stop_app
# 开启掉 从机服务2
rabbitmqctl -n rabbit-2 start_app
我们停掉从机2,就会发现我们的15673web页面访问不到了,而且主机那边会爆红。队列和集群依然存在。如果主机挂了,那么从节点就无法启动和运行了了。除非主节点重新复活....
(4).小结
Tips:
如果采用多机部署方式,需读取其中一个节点的cookie, 并复制到其他节点(节点之间通过cookie确定相互是否可通信)。cookie存放在/var/lib/rabbitmq/.erlang.cookie。
例如:主机名分别为rabbit-1、rabbit-2
1、逐个启动各节点
2、配置各节点的hosts文件( vim /etc/hosts)
ip1:rabbit-1
ip2:rabbit-2
其它步骤雷同单机部署方式
(六).分布式事务💘
1.基本概述
分布式事务指事务的操作位于不同的节点上,需要保证事务的 AICD 特性。
例如在下单场景下,库存和订单如果不在同一个节点上,就涉及分布式事务。
分布式事务的方式
在分布式系统中,要实现分布式事务,无外乎那几种解决方案。
(1).两阶段提交(2PC)需要数据库产商的支持,java组件有atomikos等。
两阶段提交(Two-phase Commit,2PC),通过引入协调者(Coordinator)来协调参与者的行为,并最终决定这些参与者是否要真正执行事务。
1.准备阶段
协调者询问参与者事务是否执行成功
,参与者发回事务执行结果
。
2 提交阶段
如果事务在每个参与者上都执行成功,事务协调者发送通知让参与者提交事务;否则,协调者发送通知让参与者回滚事务。
需要注意的是,在准备阶段,参与者执行了事务,但是还未提交
。只有在提交阶段接收到协调者发来的通知后,才进行提交或者回滚。
存在的问题
- 2.1 同步阻塞 所有事务参与者在等待其它参与者响应的时候都处于同步阻塞状态,无法进行其它操作。
- 2.2 单点问题 协调者在 2PC 中起到非常大的作用,发生故障将会造成很大影响。特别是在阶段二发生故障,所有参与者会一直等待状态,无法完成其它操作。
- 2.3 数据不一致 在阶段二,如果协调者只发送了部分 Commit 消息,此时网络发生异常,那么只有部分参与者接收到 Commit 消息,也就是说只有部分参与者提交了事务,使得系统数据不一致。
- 2.4 太过保守 任意一个节点失败就会导致整个事务失败,没有完善的容错机制。
(2).补偿事务(TCC) 严选,阿里,蚂蚁金服。
TCC 其实就是采用的补偿机制,其核心思想是:针对每个操作,都要注册一个与其对应的确认和补偿(撤销)操作
。它分为三个阶段:
- Try 阶段主要是对业务系统做检测及资源预留.
- Confirm 阶段主要是对业务系统做确认提交,Try阶段执行成功并开始执行 Confirm阶段时,默认 - - - Confirm阶段是不会出错的。即:只要Try成功,Confirm一定成功。
- Cancel 阶段主要是在业务执行错误,需要回滚的状态下执行的业务取消,预留资源释放。
举个例子,假入 Bob 要向 Smith 转账,思路大概是: 我们有一个本地方法,里面依次调用
1:首先在 Try 阶段,要先调用远程接口把 Smith 和 Bob 的钱给冻结起来。
2:在 Confirm 阶段,执行远程调用的转账的操作,转账成功进行解冻。
3:如果第2步执行成功,那么转账成功,如果第二步执行失败,则调用远程冻结接口对应的解冻方法 (Cancel)。
优点: 跟2PC比起来,实现以及流程相对简单了一些,但数据的一致性比2PC也要差一些。
缺点: 缺点还是比较明显的,在2,3步中都有可能失败。TCC属于应用层的一种补偿方式,所以需要程序员在实现的时候多写很多补偿的代码,在一些场景中,一些业务流程可能用TCC不太好定义及处理(代码冗余!)。
(3).本地消息表(异步确保)比如:支付宝、微信支付主动查询支付状态,对账单的形式
本地消息表与业务数据表处于同一个数据库中
,这样就能利用本地事务来保证在对这两个表的操作满足事务特性
,并且使用了消息队列来保证最终一致性。
- 在分布式事务操作的一方完成写业务数据的操作之后向本地消息表发送一个消息,
本地事务能保证这个消息一定会被写入本地消息表中
。 - 之后将本地消息表中的消息转发到 Kafka 等消息队列中,
如果转发成功则将消息从本地消息表中删除,否则继续重新转发
。 - 在分布式事务操作的另一方从
消息队列中读取一个消息,并执行消息中的操作
。
优点: 一种非常经典的实现,避免了分布式事务,实现了最终一致性。
缺点: 消息表会耦合到业务系统中,如果没有封装好的解决方案,会有很多杂活需要处理。
(4).MQ 事务消息 异步场景,通用性较强,拓展性较高。💚
有一些第三方的MQ是支持事务消息的
,比如RocketMQ,他们支持事务消息的方式也是类似于采用的二阶段提交,但是市面上一些主流的MQ都是不支持事务消息的,比如 Kafka 不支持。
以阿里的 RabbitMQ 中间件为例,其思路大致为:
第一阶段Prepared消息
,会拿到消息的地址。第二阶段
执行本地事务,第三阶段
通过第一阶段拿到的地址去访问消息,并修改状态。- 也就是说在业务方法内要想消息队列提交两次请求,
一次发送消息和一次确认消息
。如果确认消息发送失败了RabbitMQ会定期扫描消息集群中的事务消息,这时候发现了Prepared消息,它会向消息发送者确认,所以生产方需要实现一个check接口,RabbitMQ会根据发送端设置的策略来决定是回滚还是继续发送确认消息。这样就保证了消息发送与本地事务同时成功或同时失败。
优点: 实现了最终一致性,不需要依赖本地数据库事务。
缺点: 实现难度大,主流MQ不支持,RocketMQ事务消息部分代码也未开源。
(5).总结
通过本文我们总结并对比了几种分布式分解方案的优缺点,分布式事务本身是一个技术难题,是没有一种完美的方案应对所有场景的,具体还是要根据业务场景去抉择吧。阿里RocketMQ去实现的分布式事务,现在也有除了很多分布式事务的协调器,比如LCN等,大家可以多去尝试。
(七)、Springboot整合rabbitmq集群配置详解
pringboot整合rabbitmq
集群创建方式这里省略
整合开始
1.引入starter
<parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.2.6.RELEASE</version><relativePath/> <!-- lookup parent from repository -->
</parent>
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2.详细配置如下
rabbitmq:addresses: 127.0.0.1:6605,127.0.0.1:6606,127.0.0.1:6705 #指定client连接到的server的地址,多个以逗号分隔(优先取addresses,然后再取host)
# port:##集群配置 addresses之间用逗号隔开# addresses: ip:port,ip:portpassword: adminusername: 123456virtual-host: / # 连接到rabbitMQ的vhostrequested-heartbeat: #指定心跳超时,单位秒,0为不指定;默认60spublisher-confirms: #是否启用 发布确认publisher-reurns: # 是否启用发布返回connection-timeout: #连接超时,单位毫秒,0表示无穷大,不超时cache:channel.size: # 缓存中保持的channel数量channel.checkout-timeout: # 当缓存数量被设置时,从缓存中获取一个channel的超时时间,单位毫秒;如果为0,则总是创建一个新channelconnection.size: # 缓存的连接数,只有是CONNECTION模式时生效connection.mode: # 连接工厂缓存模式:CHANNEL 和 CONNECTIONlistener:simple.auto-startup: # 是否启动时自动启动容器simple.acknowledge-mode: # 表示消息确认方式,其有三种配置方式,分别是none、manual和auto;默认autosimple.concurrency: # 最小的消费者数量simple.max-concurrency: # 最大的消费者数量simple.prefetch: # 指定一个请求能处理多少个消息,如果有事务的话,必须大于等于transaction数量.simple.transaction-size: # 指定一个事务处理的消息数量,最好是小于等于prefetch的数量.simple.default-requeue-rejected: # 决定被拒绝的消息是否重新入队;默认是true(与参数acknowledge-mode有关系)simple.idle-event-interval: # 多少长时间发布空闲容器时间,单位毫秒simple.retry.enabled: # 监听重试是否可用simple.retry.max-attempts: # 最大重试次数simple.retry.initial-interval: # 第一次和第二次尝试发布或传递消息之间的间隔simple.retry.multiplier: # 应用于上一重试间隔的乘数simple.retry.max-interval: # 最大重试时间间隔simple.retry.stateless: # 重试是有状态or无状态template:mandatory: # 启用强制信息;默认falsereceive-timeout: # receive() 操作的超时时间reply-timeout: # sendAndReceive() 操作的超时时间retry.enabled: # 发送重试是否可用retry.max-attempts: # 最大重试次数retry.initial-interval: # 第一次和第二次尝试发布或传递消息之间的间隔retry.multiplier: # 应用于上一重试间隔的乘数retry.max-interval: #最大重试时间间隔
注:相关配置很多,大家只需要关注一些常用的配置即可
对于发送方而言,需要做以下配置:
1 配置CachingConnectionFactory
2 配置Exchange/Queue/Binding
3 配置RabbitAdmin创建上一步的Exchange/Queue/Binding
4 配置RabbitTemplate用于发送消息,RabbitTemplate通过CachingConnectionFactory获取到Connection,然后想指定Exchange发送
对于消费方而言,需要做以下配置:
1 配置CachingConnectionFactory
2 配置Exchange/Queue/Binding
3 配置RabbitAdmin创建上一步的Exchange/Queue/Binding
4 配置RabbitListenerContainerFactory
5 配置@RabbitListener/@RabbitHandler用于接收消息
在默认情况下主要的配置如下:
3.Spring AMQP的主要对象
注:如果不了解AMQP请前往官网了解.
4.使用:
通过配置类加载的方式:
package com.yd.demo.config;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class RabbitConfig {private static final Logger logger = LoggerFactory.getLogger(RabbitConfig.class);public static final String RECEIVEDLXEXCHANGE="spring-ex";public static final String RECEIVEDLXQUEUE="spring-qu1";public static final String RECEIVEDLXROUTINGKEY="aa";public static final String DIRECTEXCHANGE="spring-ex";public static final String MDMQUEUE="mdmQueue";public static final String TOPICEXCHANGE="spring-top";@Value("${spring.rabbitmq.addresses}")private String hosts;@Value("${spring.rabbitmq.username}")private String userName;@Value("${spring.rabbitmq.password}")private String password;@Value("${spring.rabbitmq.virtual-host}")private String virtualHost;/* @Value("${rabbit.channelCacheSize}")private int channelCacheSize;*/
// @Value("${rabbit.port}")
// private int port;
/* @Autowiredprivate ConfirmCallBackListener confirmCallBackListener;@Autowiredprivate ReturnCallBackListener returnCallBackListener;*/@Beanpublic ConnectionFactory connectionFactory(){CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory();cachingConnectionFactory.setAddresses(hosts);cachingConnectionFactory.setUsername(userName);cachingConnectionFactory.setPassword(password);
// cachingConnectionFactory.setChannelCacheSize(channelCacheSize);//cachingConnectionFactory.setPort(port);cachingConnectionFactory.setVirtualHost(virtualHost);//设置连接工厂缓存模式:cachingConnectionFactory.setCacheMode(CachingConnectionFactory.CacheMode.CONNECTION);//缓存连接数cachingConnectionFactory.setConnectionCacheSize(3);//设置连接限制cachingConnectionFactory.setConnectionLimit(6);logger.info("连接工厂设置完成,连接地址{}"+hosts);logger.info("连接工厂设置完成,连接用户{}"+userName);return cachingConnectionFactory;}@Beanpublic RabbitAdmin rabbitAdmin(){RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory());rabbitAdmin.setAutoStartup(true);rabbitAdmin.setIgnoreDeclarationExceptions(true);rabbitAdmin.declareBinding(bindingMdmQueue());//声明topic交换器rabbitAdmin.declareExchange(directExchange());logger.info("管理员设置完成");return rabbitAdmin;}@Beanpublic RabbitListenerContainerFactory listenerContainerFactory() {SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();factory.setConnectionFactory(connectionFactory());factory.setMessageConverter(new Jackson2JsonMessageConverter());//最小消费者数量factory.setConcurrentConsumers(10);//最大消费者数量factory.setMaxConcurrentConsumers(10);//一个请求最大处理的消息数量factory.setPrefetchCount(10);//factory.setChannelTransacted(true);//默认不排队factory.setDefaultRequeueRejected(true);//手动确认接收到了消息factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);logger.info("监听者设置完成");return factory;}@Beanpublic DirectExchange directExchange(){return new DirectExchange(DIRECTEXCHANGE,true,false);}@Beanpublic Queue mdmQueue(){Map arguments = new HashMap<>();// 绑定该队列到私信交换机arguments.put("x-dead-letter-exchange",RECEIVEDLXEXCHANGE);arguments.put("x-dead-letter-routing-key",RECEIVEDLXROUTINGKEY);logger.info("队列交换机绑定完成");return new Queue(RECEIVEDLXQUEUE,true,false,false,arguments);}@BeanBinding bindingMdmQueue() {return BindingBuilder.bind(mdmQueue()).to(directExchange()).with("");}@Beanpublic RabbitTemplate rabbitTemplate(){RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());rabbitTemplate.setMandatory(true);//发布确认
// rabbitTemplate.setConfirmCallback(confirmCallBackListener);// 启用发布返回
// rabbitTemplate.setReturnCallback(returnCallBackListener);logger.info("连接模板设置完成");return rabbitTemplate;}/* @Beanpublic TopicExchange topicExchange(){return new TopicExchange(TOPICEXCHANGE,true,false);}*//*
*//*** @return DirectExchange*//*@Beanpublic DirectExchange dlxExchange() {return new DirectExchange(RECEIVEDLXEXCHANGE,true,false);}
*//*
** @return Queue
*//*@Beanpublic Queue dlxQueue() {return new Queue(RECEIVEDLXQUEUE,true);}
*//** @return Binding*//*@Beanpublic Binding binding() {return BindingBuilder.bind(dlxQueue()).to(dlxExchange()).with(RECEIVEDLXROUTINGKEY);}*/
}
@Bean
public ConnectionFactory connectionFactory(){
CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory();
cachingConnectionFactory.setAddresses(hosts);
cachingConnectionFactory.setUsername(userName);
cachingConnectionFactory.setPassword(password);
// cachingConnectionFactory.setChannelCacheSize(channelCacheSize);
//cachingConnectionFactory.setPort(port);
cachingConnectionFactory.setVirtualHost(virtualHost);
//设置连接工厂缓存模式:
cachingConnectionFactory.setCacheMode(CachingConnectionFactory.CacheMode.CONNECTION);
//缓存连接数
cachingConnectionFactory.setConnectionCacheSize(3);
//设置连接限制
cachingConnectionFactory.setConnectionLimit(6);
logger.info(“连接工厂设置完成,连接地址{}”+hosts);
logger.info(“连接工厂设置完成,连接用户{}”+userName);
return cachingConnectionFactory;
}
@Bean
public RabbitAdmin rabbitAdmin(){
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory());
rabbitAdmin.setAutoStartup(true);
rabbitAdmin.setIgnoreDeclarationExceptions(true);
rabbitAdmin.declareBinding(bindingMdmQueue());
//声明topic交换器
rabbitAdmin.declareExchange(directExchange());
logger.info(“管理员设置完成”);
return rabbitAdmin;
}
@Bean
public RabbitListenerContainerFactory listenerContainerFactory() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory());
factory.setMessageConverter(new Jackson2JsonMessageConverter());
//最小消费者数量
factory.setConcurrentConsumers(10);
//最大消费者数量
factory.setMaxConcurrentConsumers(10);
//一个请求最大处理的消息数量
factory.setPrefetchCount(10);
//
factory.setChannelTransacted(true);
//默认不排队
factory.setDefaultRequeueRejected(true);
//手动确认接收到了消息
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
logger.info(“监听者设置完成”);
return factory;
}
@Bean
public DirectExchange directExchange(){
return new DirectExchange(DIRECTEXCHANGE,true,false);
}
@Bean
public Queue mdmQueue(){
Map arguments = new HashMap<>();
// 绑定该队列到私信交换机
arguments.put(“x-dead-letter-exchange”,RECEIVEDLXEXCHANGE);
arguments.put(“x-dead-letter-routing-key”,RECEIVEDLXROUTINGKEY);
logger.info(“队列交换机绑定完成”);
return new Queue(RECEIVEDLXQUEUE,true,false,false,arguments);
}
@Bean
Binding bindingMdmQueue() {
return BindingBuilder.bind(mdmQueue()).to(directExchange()).with("");
}
@Bean
public RabbitTemplate rabbitTemplate(){
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());
rabbitTemplate.setMandatory(true);
//发布确认
// rabbitTemplate.setConfirmCallback(confirmCallBackListener);
// 启用发布返回
// rabbitTemplate.setReturnCallback(returnCallBackListener);
logger.info(“连接模板设置完成”);
return rabbitTemplate;
}
/ @Bean
public TopicExchange topicExchange(){
return new TopicExchange(TOPICEXCHANGE,true,false);
}/
/
//*
* @return DirectExchange
//
@Bean
public DirectExchange dlxExchange() {
return new DirectExchange(RECEIVEDLXEXCHANGE,true,false);
}
//
*
* @return Queue
//
@Bean
public Queue dlxQueue() {
return new Queue(RECEIVEDLXQUEUE,true);
}
//
* @return Binding
//
@Bean
public Binding binding() {
return BindingBuilder.bind(dlxQueue()).to(dlxExchange()).with(RECEIVEDLXROUTINGKEY);
}*/
}