RabbitMQ 详解

article/2025/10/22 4:52:07

RabbitMQ 详解

  • MQ 的相关概念
  • RabbitMQ 四大核心概念
  • RabbitMQ 的工作原理
  • RabbitMQ 六大核心部分(模式)
  • 简单模式
  • 工作模式
    • 工作模式案例
    • 消息确认(消息应答)
    • 消息持久化
  • 发布确认模式
  • 交换机(Exchange)
    • Exchange 概念
    • 临时队列
    • 绑定(bindings)
    • 扇出交换机(fanout exchange)
    • 直连交换机(direct exchange)
    • 主题交换机(topic exchange)
  • 死信队列
    • 概念及应用场景
    • 死信来源
    • 死信实例
      • 消息 TTL 过期
      • 队列达到最大长度
      • 消息被拒绝,并且 requeue = false
  • 延时队列
    • 概念
    • 应用场景
    • RabbitMQ 中的 TTL
  • 发布确认高级
  • RabbitMQ的其他知识点
    • 幂等性
    • 优先队列
    • 惰性队列

注意:本文以Java实现为主:Springboot 整合 RabbitMQ可参考这篇文章:https://blog.csdn.net/qq_35387940/article/details/100514134
准备工作:
1、Linux 环境
2、Linux 环境下安装 RabbitMQ :https://blog.csdn.net/qq_36763419/article/details/122152767

MQ 的相关概念

1、什么是 MQ ?
MQ(Message Queue),从字面意思上看,本质是个队列,FIFO先进先出,只不过队列中存放的内容是 message 而已,还是一种跨进程的通信机制,用于上下游传递消息。在互联网架构中,MQ 是一种非常常见的上下游 逻辑解耦 + 物理解耦 的消息通信服务。使用了 MQ 之后,消息发送上游只需依赖 MQ,不用依赖其他的服务。

2、为什么要使用 MQ ?MQ的三大作用。
(1)流量削峰:MQ 中可以对请求进行排队处理,避免系统宕机。
在这里插入图片描述
(2)应用解耦
以电商应用为例,应用中有订单系统、库存系统、物流系统、支付系统。用户创建订单后,如果耦合调用库存系统、物流系统、支付系统,任何一个子系统出了故障,都会造成下单操作异常。当转变成基于消息队列的方式后,系统间调用的问题会减少很多,比如物流系统因为发生故障,需要几分钟来修复。在这几分钟的时间里,物流系统要处理的内存被缓存在消息队列中,用户的下单操作可以正常完成。当物流系统恢复后,继续处理订单信息即可,中单用户感受不到物流系统的故障,提升系统的可用性。
在这里插入图片描述
(3)异步处理
案例一:
在这里插入图片描述
案例二:
在这里插入图片描述

RabbitMQ 四大核心概念

1、生产者产生数据发送消息的程序是生产者。
2、交换机:交换机是 RabbitMQ 非常重要的一个部件,一方面它接收来自生产者的消息,另一方面他讲消息推送到队列中。交换机必须确切的知道如何处理它接受的消息,是将这些消息推送到特定队列还是送到多个队列,亦或者是把消息丢弃,这个得由交换机类型决定交换分为如下及中直连交换机(direct exchange)主题交换机(topic exchange)标题交换机(headers exchange)扇出交换机(fanout exchange)
3、队列:队列是 RabbitMQ 内部使用的一种数据结构,尽管消息流经 RabbitMQ 和应用程序,但它们只能存储在队列中。队列仅受主机的内存和磁盘限制的约束。本质上是一个大的消息缓冲区。许多生产者可以将消息发送到一个队列中,许多消费者可以从队列中接收数据,这就是我们使用队列的方式。
4、消费者:消费者和接受具有相似的含义。消费者大多时候是一个等待接受消息的程序,请注意生产者、消费者、消息中间件大多时候并不在统一机器上,统一个应用既可以是生产者也可以是消费者。

RabbitMQ 的工作原理

在这里插入图片描述
Broker:接收和分发消息的应用,RabbitMQ Server 就是 Message Broker。
Virtual host:出于多租户和安全因素设计的,把 AMQP 的基本组件划分到一个虚拟的分组中,类似于网络中的 namespace 概念。当多个不同的用户使用同一个 RabbitMQ server 提供的服务时,可以划分出多个 vhost,每个用户在自己的 vhost 创建 exchange/queue 等。
Connection:publisher/consumer 和 broker 之间的 TCP 连接。
Channel:如果每一次访问 RabbitMQ 都建立一个 Connection,在消息量大的时候建立 TCP Connection 的开销将是巨大的,效率也较低。Channel 是在 connection 内部建立的逻辑连接,如果应用程序支持多线程,通常每个 thread 创建单独的 channel 进行通讯,AMQP method 包含了 channel id 帮助客户端和 message broker 识别 channel,所以 channel 之间是完全隔离的。Channel 作为轻量级的 Connection 极大减少了操作系统建立 TCP connection 的开销。
Exchange:message 到达 broker 的第一站,根据分发规则,匹配查询表中的 routing key,分发消息到 queue 中去。常用的类型有:direct (point-to-point)、topic (publish-subscribe) and fanout (multicast)。
Queue消息存储的数据结构,消息最终被送到这里等待 consumer 取走。
Binding:exchange 和 queue 之间的虚拟连接,binding 中可以包含 routing key,Binding 信息被保存到 exchange 中的查询表中,用于 message 的分发依据。

RabbitMQ 六大核心部分(模式)

1、简单模式
2、工作模式
3、发布/订阅模式
4、路由模式
5、主题模式
6、消息确认模式
在这里插入图片描述

简单模式

在下图中,P是我们的生产者, C 是我们的消费者。中间的框是一个队列 RabbitMQ 代表使用者保留的消息缓冲区。
在这里插入图片描述
maven依赖:

        <!--rabbitmq 依赖客户端--><dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.14.0</version></dependency>

一、消息生产生产者

package com.mq.rabbitmq.simplemode;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;public class Producer {//队列名称(路由键)private static final String SIMPLE_MODE_QUEUE_NAME = "routeKey";private static final String PRODUCER = "生产者发送";//发送消息public static void main(String[] args) {//创建一个连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("192.168.0.92");connectionFactory.setPort(5672);connectionFactory.setUsername("admin");connectionFactory.setPassword("123456");//使用try...with...resources可自动关闭资源/*https://javaguide.cn/java/basis/java%E5%9F%BA%E7%A1%80%E7%9F%A5%E8%AF%86%E6%80%BB%E7%BB%93/#%E4%BD%BF%E7%94%A8-try-with-resources-%E6%9D%A5%E4%BB%A3%E6%9B%BFtry-catch-finally*/try (//从连接工厂获取连接实例Connection connection = connectionFactory.newConnection();//创建一个通道Channel channel = connection.createChannel()) {/*定义一个队列1、队列名称2、队列里面的消息是否持久化,默认消息存储在内存中(即:不是持久化)3、该队列是否只提供一个消费者进行消费,是否进行共享,true表示可以多个消费者共享消费4、是否自动删除 最后一个消费者端开始连接以后,该队列是否自动删除,true表示自动删除5、其他参数*/channel.queueDeclare(SIMPLE_MODE_QUEUE_NAME, false, false, false, null);//定义要消费的消息String message = "Hello World!";/*发送消息1、发送到指定交换机2、route-key(路由键),可以理解为指定队列3、其他的参数信息4、发送消息的消息体*/channel.basicPublish("", SIMPLE_MODE_QUEUE_NAME, null, message.getBytes());System.out.println(PRODUCER + message);} catch (Exception e) {e.printStackTrace();}}
}

二、消息消费者

package com.mq.rabbitmq.simplemode;import com.rabbitmq.client.*;public class Consumer {//队列名称(路由键)private static final String SIMPLE_MODE_QUEUE_NAME = "routeKey";private static final String CONSUMER = "消费者收到";public static void main(String[] args) {//创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("192.168.0.92");connectionFactory.setPort(5672);connectionFactory.setUsername("admin");connectionFactory.setPassword("123456");//使用try...with...resources可自动关闭资源try (//从连接工厂中获取连接实例Connection connection = connectionFactory.newConnection();//创建一个通道Channel channel = connection.createChannel()) {System.out.println("等待接收消息......");//推送的消息如何进行消费的接口回调DeliverCallback deliverCallback = (consumerTag, deliver) -> {byte[] byteMsg = deliver.getBody();String message = new String(byteMsg);System.out.println(CONSUMER + message);};//取消消费的一个回调接口,如在消费的时候队列被删除掉了CancelCallback cancelCallback = consumerTag -> System.out.println("消息消费被中断");/*消费者消费消息1、消费者绑定的队列2、消费成功之后是否要自动应答,true表示自动应答,false表示手动应答3、消费者未成功消费的回调*/channel.basicConsume(SIMPLE_MODE_QUEUE_NAME, true, deliverCallback, cancelCallback);} catch (Exception e) {e.printStackTrace();}}
}

工作模式

工作模式案例

工作队列(又称任务队列)的主要思想就是避免立即执行资源密集型任务,而不得不等待它完成,相反我们安排任务在之后执行。我们把任务封装为消息并将其发送到队列。在后台运行的进程将弹出任务并最终执行作业。当有多个工作线程时,这些工作线程将一起处理这些任务。类比负载均衡。
在这里插入图片描述
一、RabbitMQ连接工具类

package com.mq.rabbitmq.workmode;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;public class RabbitMQUtils {public static Channel getChannel() throws Exception {//创建一个连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("192.168.31.91");connectionFactory.setPort(5672);connectionFactory.setUsername("admin");connectionFactory.setPassword("123456");Connection connection = connectionFactory.newConnection();return connection.createChannel();}
}

二、消息生产者

package com.mq.rabbitmq.workmode;import com.rabbitmq.client.Channel;import java.util.Scanner;public class WorkModeProducer {//路由键:routeKeyprivate static final String WORK_MODE_QUEUE_NAME = "routeKey";private static final String PRODUCER = "生产者发送 ";public static void main(String[] args) {//获取连接通道try (Channel channel = RabbitMQUtils.getChannel()) {//创建队列channel.queueDeclare(WORK_MODE_QUEUE_NAME, false, false, false, null);//发送从控制台输入的消息Scanner scanner = new Scanner(System.in);while (scanner.hasNext()) {String message = scanner.next();//发布消息channel.basicPublish("", WORK_MODE_QUEUE_NAME, null, message.getBytes());System.out.println(PRODUCER + message);}} catch (Exception e) {e.printStackTrace();}}
}

三、消息消费者(两份)

package com.mq.rabbitmq.workmode;import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;public class WorkModeConsumerOne {//定义路由键private static final String WORK_MODE_QUEUE_NAME = "routeKey";//消费者oneprivate static final String CONSUMER = "消费者 one 收到 ";public static void main(String[] args) {//获取连接通道try {Channel channel = RabbitMQUtils.getChannel();//消息回调,确认回复等信息DeliverCallback deliverCallback = (consumerTag, delivery) -> {long deliveryTag = delivery.getEnvelope().getDeliveryTag();try {byte[] body = delivery.getBody();String receiveMessage = new String(body);System.out.println(CONSUMER + receiveMessage);/*手动确认:deliveryTag:队列位置multiple:true 表示确认指定队列deliveryTag位置之前的全部消息,false 表示确认指定位置消息*/channel.basicAck(deliveryTag, true);} catch (Exception e) {e.printStackTrace();//消费者消费消息出现异常时,消息重回队列/*requeue:true 表示重回队列false 表示放弃消息*/channel.basicNack(deliveryTag, true, true);/*与 Channel.basicNack 相比少一个参数不处理该消息了直接拒绝,可以将其丢弃了*///channel.basicReject(deliveryTag, true);}};//取消消费的一个回调接口,如在消费的时候队列被删除掉了CancelCallback cancelCallback = consumerTag -> {System.out.println(consumerTag + "消息消费被中断");};System.out.println("等待消费者 one 消费");//开启手动确认后,autoAck要设置成falsechannel.basicConsume(WORK_MODE_QUEUE_NAME, false, deliverCallback, cancelCallback);} catch (Exception e) {e.printStackTrace();}}
}

由以上可知,rabbtimq在所有消费者中轮询分发消息,把消息均匀发送给所有消费者。以上代码实现了消息确认、消费异常消息重回队列

消息确认(消息应答)

一、自动确认(应答)
消息在发送后被认为已经发送成功(可能导致消息丢失),这种模式需要在高吞吐量和数据传输安全性方面做权衡,另一方面,该模式没有对传递的消息数量进行限制,消费者来不及处理是,可能会导致消息积压所以自动应答的模式仅适用于消费者可以高效并以某种速率能够处理这些消息的情况下使用。

二、手动确认(应答):手动应答的好处是可以批量应答并且减少网络拥堵。
在这里插入图片描述
具体代码及解释,查看【工作模式 - 消费者

消息持久化

消息持久化是为了保证 RabbitMQ服务 down 掉之后重启,能够保证生产者发送的消息不丢失。为了确保消息不丢失,需要做两件事:将 队列消息 都标记为持久化。

一、队列持久化:RabbitMQ 重启后,队列任然存在
在这里插入图片描述
但是需要注意的就是如果之前声明的队列不是持久化的,需要把原先队列先删除,或者重新创建一个持久化的队列,不然就会出现错误。

二、消息持久化:将消息标记为持久化并不能完全保证不会丢失消息。尽管它告诉 RabbitMQ 将消息保存到磁盘,但是
这里依然存在当消息刚准备存储在磁盘的时候 但是还没有存储完,消息还在缓存的一个间隔点。此时并没
有真正写入磁盘。持久性保证并不强。
在这里插入图片描述
三、不公平分发
RabbitMQ 分发消息的默认采用的使用轮询方式。为了在不同场景下设置不同的分发策略。可以通过如下参数设置:
在这里插入图片描述

发布确认模式

发布确认分为:
1、单个确认发布
2、批量确认发布
3、异步确认发布

//开启确认发布模式
channel.confirmSelect()

异步确认发布
在这里插入图片描述
一、生产者

package com.mq.rabbitmq.publishconfirm;import com.mq.rabbitmq.workmode.RabbitMQUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmCallback;
import com.rabbitmq.client.ConfirmListener;import java.io.IOException;
import java.util.Scanner;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;public class PublishConfirmModeProducer {//路由键:routeKeyprivate static final String WORK_MODE_QUEUE_NAME = "routeKey";private static final String PRODUCER = "生产者发送 ";public static void main(String[] args) {//获取连接通道try (Channel channel = RabbitMQUtils.getChannel()) {//创建队列//持久化队列:durable 设置为 true。//channel.queueDelete(WORK_MODE_QUEUE_NAME);//final boolean durable = true;/*在消息生产者中使用:prefetchCount 表示预取值,指通道 channel 中最多允许4条未确认的消息,当满足 unAckCount = prefetchCount 时,RabbitMQ 不会将消息分发到 channel 通道。否则可以正常分发到该通道。*///channel.basicQos(4);channel.queueDeclare(WORK_MODE_QUEUE_NAME, false, false, false, null);//开启发布确认模式channel.confirmSelect();/*线程安全有序的一个哈希表,适用于高并发的情况1.轻松的将序号与消息进行关联2.轻松批量删除条目 只要给到序列号3.支持并发访问*/ConcurrentSkipListMap<Long, String> unConfirmsListMap = new ConcurrentSkipListMap<>();//            /*
//             确认收到消息的回调方法
//                 1、deliveryTag:消息序列号
//                 2、multiple:批量确认,true 可以确认小于等于当前序列号的消息。false 可以确认当前序列号的消息
//             */
//            ConfirmCallback ackCallback = (deliveryTag, multiple) -> {
//                //删除确认的消息
//                if (multiple) {
//                    System.out.println("确认收到消息");
//
//                    //返回的是小于等于当前序列号的确认的消息,是一个Map,清除之后剩下的就是未被确认的消息
//                    ConcurrentNavigableMap<Long, String> confirmed = unConfirmsListMap.headMap(deliveryTag, true);
//                    //清除该部分未确认的消息
//                    confirmed.clear();
//                } else {
//                    //只清除当前序列的消息
//                    unConfirmsListMap.remove(deliveryTag);
//                }
//            };
//            /*
//             未确认收到(确认失败)消息的回调方法
//             */
//            ConfirmCallback nackCallback = (deliveryTag, multiple) -> {
//                //打印未确认的消息
//                String message = unConfirmsListMap.get(deliveryTag);
//                System.out.println("发布的消息" + message + "未确认,序列号:" + deliveryTag);
//            };
//               /*
//             添加一个异步确认的监听器
//                1、确认收到消息的回调 ackCallback
//                2、未收到消息的回调 nackCallback
//             */
//            channel.addConfirmListener(ackCallback,nackCallback);ConfirmListener confirmListener = new ConfirmListener() {@Overridepublic void handleAck(long deliveryTag, boolean multiple) throws IOException {System.out.println(multiple);//删除确认的消息if (multiple) {System.out.println("确认收到消息");//返回的是小于等于当前序列号的确认的消息,是一个Map,清除之后剩下的就是未被确认的消息ConcurrentNavigableMap<Long, String> confirmed = unConfirmsListMap.headMap(deliveryTag, true);//清除该部分未确认的消息confirmed.clear();} else {//只清除当前序列的消息unConfirmsListMap.remove(deliveryTag);}}@Overridepublic void handleNack(long deliveryTag, boolean multiple) throws IOException {//打印未确认的消息String message = unConfirmsListMap.get(deliveryTag);System.out.println("发布的消息" + message + "未确认,序列号:" + deliveryTag);}};channel.addConfirmListener(confirmListener);//发送从控制台输入的消息Scanner scanner = new Scanner(System.in);while (scanner.hasNext()) {String message = scanner.next();/*channel.getNextPublishSeqNo()获取下一个消息的序列号通过序列号与消息体进行一个关联全部都是未确认的消息体*/long nextPublishSeqNo = channel.getNextPublishSeqNo();unConfirmsListMap.put(nextPublishSeqNo, message);//发布消息//当 durable 为 true 的时候channel.basicPublish("", WORK_MODE_QUEUE_NAME, null, message.getBytes());System.out.println(PRODUCER + message);}} catch (Exception e) {e.printStackTrace();}}
}

二、消费者

package com.mq.rabbitmq.publishconfirm;import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;public class PublishConfirmModeConsumerOne {//定义路由键private static final String WORK_MODE_QUEUE_NAME = "routeKey";//消费者oneprivate static final String CONSUMER = "消费者 one 收到 ";public static void main(String[] args) {AtomicReference<String> receiveMessage = new AtomicReference<>("");//获取连接通道try {Channel channel = RabbitMQUtils.getChannel();//消息回调,确认回复等信息DeliverCallback deliverCallback = (consumerTag, delivery) -> {long deliveryTag = delivery.getEnvelope().getDeliveryTag();try {byte[] body = delivery.getBody();receiveMessage.set(new String(body));System.out.println(CONSUMER + receiveMessage);int num= Integer.valueOf(receiveMessage.get());int a = 10 / num;/*FdeliveryTag:队列位置multiple:true 表示确认指定队列deliveryTag位置之前的全部消息,false 表示确认指定位置消息*/channel.basicAck(deliveryTag, false);} catch (Exception e) {e.printStackTrace();//消费者消费消息出现异常时,消息重回队列(用于否定确认)/*requeue:true 表示重回队列false 表示放弃消息*/
//                    channel.basicNack(deliveryTag, true, true);if (Objects.equals(receiveMessage.get(),"0")) {channel.basicNack(deliveryTag, true, false);}else {channel.basicNack(deliveryTag, true, true);}/*与 Channel.basicNack 相比少一个参数不处理该消息了直接拒绝,可以将其丢弃了*///channel.basicReject(deliveryTag, true);}};//取消消费的一个回调接口,如在消费的时候队列被删除掉了CancelCallback cancelCallback = consumerTag -> {System.out.println(consumerTag + "消息消费被中断");};System.out.println("等待消费者 one 消费");//开启手动确认后,autoAck要设置成falsechannel.basicConsume(WORK_MODE_QUEUE_NAME, false, deliverCallback, cancelCallback);} catch (Exception e) {e.printStackTrace();}}
}

交换机(Exchange)

通过交换机将同一个消息转发给多个消费者消费,这种模式成为 发布/订阅 模式

Exchange 概念

一、为什么要有交换机呢?

之前的简单模式和工作模式中,我们不难看出,消费者绑定的都是同一个队列,多个消费者之间存在竞争关系,也就是说同一个队列的消息消费者消费一次,不能重复消费。
在这里插入图片描述
现在这样一个需求,一个消息需要被消费多次。例如(日志系统):我们会启动两个消费者,其中一个消费者接收到消息后把日志存储在磁盘,另外一个消费者接收到消息后把消息打印在屏幕上,事实上第一个程序发出的日志消息将广播给所有消费者。那么该如何设计处理呢?可以使用交换机来解决此问题。设计理念图如下:
在这里插入图片描述
二、Exchange 的概念及分类

1、RabbitMQ 消息传递模型的核心思想是:生产者的消息从不会直接发送到队列。实际上,通常生产者升值都不知道这些消息传递到哪些队列中。相反,生产者只能将消息发送到交换机(Exchange),由交换机将消息转发并推送到队列当中。交换机必须确切的知道如何处理收到的消息,将消息放入特定的队列或者丢弃消息,这就由交换机的类型来决定。

2、交换机的类型如下:
(1)直连交换机(direct exchange)
(2)主题交换机(topic exchange)
(3)标题交换机(headers exchange)
(4)扇出交换机(fanout excahnge)
其实还有一个默认的交换机,通常情况下不命名都会使用默认的交换机。

临时队列

每当我们连接 RabbitMQ 时,我们都需要一个全新的空队列,为此我们可以创建一个具有随机名称的队列,或者能让服务器为我们选择一个随机队列名称那就更好了。其次一旦断开了消费者的连接,队列兼备自动删除。 创建临时队列的方式如下:

String queueName = channel.queueDeclare().getQueue();

临时队列创建成功后:
在这里插入图片描述

绑定(bindings)

binding 的概念其实就是 exchangequeue 之间的桥梁,交换机(exchange)与队列(queue)之间是绑定(binding)关系。
在这里插入图片描述

扇出交换机(fanout exchange)

fanout exchange 非常简单,它是将收到的所有消息 广播 给所有绑定它的 queue 中。

实际案例: 生产者发送日志,将日志消息发送到 fanout exchange 中,再由 fanout exchange 将消息转发到绑定在改交换机的 临时queue,消费者 C1 控制台输出日志,消费者 C2 则将日志存储到磁盘中。
在这里插入图片描述
一、生产者

package com.mq.rabbitmq.fanoutexchange;import com.rabbitmq.client.Channel;import java.util.Scanner;public class FanoutExchangeProducer {private static final String EXCHANGE_NAME = "fanout_exchange";public static void main(String[] args) {try {//获取信道Channel channel = RabbitMQUtils.getChannel();//定义交换机:扇出交换机channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);Scanner scanner = new Scanner(System.in);System.out.println("请输入日志信息");while (scanner.hasNext()) {String logMsg = scanner.nextLine();channel.basicPublish(EXCHANGE_NAME, "", null, logMsg.getBytes("UTF-8"));}} catch (Exception e) {e.printStackTrace();}}
}

二、消费者
1、控制台输出日志

package com.mq.rabbitmq.fanoutexchange;import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;public class ConsumerOne {private static final String EXCHANGE_NAME = "fanout_exchange";//定义路由键 routeKey(binding)private static final String ROUTE_KEY = "binding";public static void main(String[] args) {try {//获取信道Channel channel = RabbitMQUtils.getChannel();//定义交换机channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);//通过系统生成随机的临时队列String queueName = channel.queueDeclare().getQueue();//将临时队列与交换机绑定channel.queueBind(queueName, EXCHANGE_NAME, ROUTE_KEY);//输出日志到控制台System.out.println("等待接收消息, 把接收到的消息打印在屏幕上......");DeliverCallback deliverCallback = (consumerTag, delivery) -> {//获取序列long deliveryTag = delivery.getEnvelope().getDeliveryTag();try {byte[] bytes = delivery.getBody();String message = new String(bytes, "UTF-8");System.out.println("控制台打印接收到的消息:" + message);channel.basicAck(deliveryTag, true);} catch (Exception e) {e.printStackTrace();channel.basicNack(deliveryTag, true, true);}};CancelCallback cancelCallback = consumerTag -> {System.out.println("消息消费被中断");};channel.basicConsume(queueName, false, deliverCallback, cancelCallback);} catch (Exception e) {e.printStackTrace();}}
}

2、将日志保存到文件

package com.mq.rabbitmq.fanoutexchange;import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import org.apache.commons.io.FileUtils;import java.io.File;public class ConsumerTwo {private static final String EXCHANGE_NAME = "fanout_exchange";//定义路由键 routeKey(binding)private static final String ROUTE_KEY = "binding";public static void main(String[] args) {try {//获取信道Channel channel = RabbitMQUtils.getChannel();//定义交换机channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);//通过系统生成随机的临时队列String queueName = channel.queueDeclare().getQueue();//将临时队列与交换机绑定channel.queueBind(queueName, EXCHANGE_NAME, ROUTE_KEY);//输出日志到控制台System.out.println("等待接收消息, 把接收到的消息保存到磁盘......");DeliverCallback deliverCallback = (consumerTag, delivery) -> {//获取序列long deliveryTag = delivery.getEnvelope().getDeliveryTag();try {String message = new String(delivery.getBody(), "UTF-8");File file = new File("D:\\rabbitmq_log.txt");//此方式为横向拼接
//                    FileOutputStream fileOutputStream = new FileOutputStream(file, true);
//                    fileOutputStream.write(message.getBytes());FileUtils.writeStringToFile(file, message, "UTF-8");System.out.println("日志保存磁盘成功");channel.basicAck(deliveryTag, true);} catch (Exception e) {e.printStackTrace();channel.basicNack(deliveryTag, true, true);}};CancelCallback cancelCallback = consumerTag -> {System.out.println("消息消费被中断");};channel.basicConsume(queueName, false, deliverCallback, cancelCallback);} catch (Exception e) {e.printStackTrace();}}
}

直连交换机(direct exchange)

一、direct excahnge 概念

我们知道交换机 exchange 与 queue 之间的关系叫做 binding ,绑定用参数 routeKey(路由键)表示。首先 routeKey 是交换机与队列之前的桥梁,那么简单的可以将其作为交换机只能通过特定的 routeKey 才能向特定的队列中转发消息。例如,日志按照日志级别来划分,error 级别的日志保存到磁盘,warning、info 级别日志在控制台输出。这个时候,routeKey 起到了一个消息分类的作用。
在这里插入图片描述
二、多重绑定

当然如果 exchange 的绑定类型是 direct,但是它绑定的多个队列的 key 如果都相同,在这种情况下虽然绑定类型是 direct 但是它表现的就和 fanout 有点类似了,就跟广播差不多,如下图所示。
在这里插入图片描述
三、案例说明
在这里插入图片描述
1、生产者

package com.mq.rabbitmq.directexchange;import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;import java.util.HashMap;
import java.util.Map;
import java.util.Set;public class DirectProducer {private static final String EXCHANGE_NAME = "direct_exchange";public static void main(String[] args) {//初始化 routeKey(bindingKey)-message 的MapMap<String, Object> routeKeyMessageMap = new HashMap<>();routeKeyMessageMap.put("routeKey_info", "普通 info 日志信息");routeKeyMessageMap.put("routeKey_warning", "警告 warning 日志信息");routeKeyMessageMap.put("routeKey_error", "错误 error 日志信息");//debug没有消费者接受这个消息,所以就丢失了routeKeyMessageMap.put("routeKey_debug", "调试 debug 日志信息");try {//获取连接通道Channel channel = RabbitMQUtils.getChannel();//定义交换机名称及其类型channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);//发送消息Set<Map.Entry<String, Object>> entries = routeKeyMessageMap.entrySet();for (Map.Entry<String, Object> entry : entries) {String routeKey = entry.getKey();Object message = entry.getValue();//发送消息channel.basicPublish(EXCHANGE_NAME, routeKey, null, (message == null ? "" : message.toString()).getBytes("UTF-8"));System.out.println("生产者发送消息:" + message);}} catch (Exception e) {e.printStackTrace();}}
}

2、消费者
(1)写入 error 日志信息

package com.mq.rabbitmq.directexchange;import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import org.apache.commons.io.FileUtils;import java.io.File;//将错误值保存到文件
public class DirectErrorConsumer {//定义交换机名称private static final String EXCHANGE_NAME = "direct_exchange";public static void main(String[] args) {try {//获取连接通道Channel channel = RabbitMQUtils.getChannel();//定义队列名称:file_queue 的队列String queueName = "file_queue";channel.queueDeclare(queueName, false, false, false, null);//定义交换机channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);//将交换机与 queueName 绑定,绑定键(路由键) routeKey_errorchannel.queueBind(queueName, EXCHANGE_NAME, "routeKey_error");//消费回调DeliverCallback deliverCallback = (consumerTag, delivery) -> {long deliveryTag = delivery.getEnvelope().getDeliveryTag();try {String message = new String(delivery.getBody(), "UTF-8");File file = new File("D:\\rabbitmq_log.txt");FileUtils.writeStringToFile(file, message, "UTF-8");System.out.println("消费者一收到消息:" + message + ", 并将该消息写入磁盘");//手动确认channel.basicAck(deliveryTag, false);} catch (Exception e) {//消费出错消息重会队列channel.basicNack(deliveryTag, false, true);}};//中断回调CancelCallback cancelCallback = consumerTag -> {System.out.println("消息消费被中断");};channel.basicConsume(queueName, false, deliverCallback, cancelCallback);} catch (Exception e) {e.printStackTrace();}}
}

(2)控制台输出 info 、warning 日志信息

package com.mq.rabbitmq.directexchange;import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;//将info、warning等日志级别输出
public class DirectConsoleConsumer {//定义交换机名称private static final String EXChANGE_NAME = "direct_exchange";public static void main(String[] args) {try {//获取连接通道Channel channel = RabbitMQUtils.getChannel();//定义队列名称:console_queue 的队列String queueName = "console_queue";channel.queueDeclare(queueName, false, false, false, null);//定义交换机及其类型channel.exchangeDeclare(EXChANGE_NAME, BuiltinExchangeType.DIRECT);//将 queue_name 列绑定交换机,其中路由键为:routeKey_info、routeKey_warningchannel.queueBind(queueName, EXChANGE_NAME, "routeKey_info");channel.queueBind(queueName, EXChANGE_NAME, "routeKey_warning");//消息消费回调DeliverCallback deliverCallback = (consumerTag, delivery) -> {long deliveryTag = delivery.getEnvelope().getDeliveryTag();try {String message = new String(delivery.getBody(), "UTF-8");System.out.println("消费者二收到消息:" + message);//手动确认channel.basicAck(deliveryTag, false);} catch (Exception e) {e.printStackTrace();//消费失败消息重回队列channel.basicNack(deliveryTag, false, true);}};//消费者消费中断CancelCallback cancelCallback = consumerTag -> {System.out.println("消息消费被中断");};channel.basicConsume(queueName,false,deliverCallback,cancelCallback);} catch (Exception e) {e.printStackTrace();}}
}

主题交换机(topic exchange)

在日志系统中,我们不仅需要根据日志级别订阅日志,还希望从发出日志的源订阅日志。这时候,direct exchange 就办不到了,这个时候只能采用 topic exchange

topic exchange 的要求: 发送类型是 topic exchange 的消息的 路由键 route_key 不能随便写,必须是一个单词列表,以点号隔开。这些单词可以是任意单词,比如说:stock.usd.nysenyse.vmwquick.orange.rabbit 这种类型的。单词列表的长度不能超过 255 个字节。在这个规则列表中,其中有两个替换符:
1、* 可以代替一个单词
2、# 可以代替零个或多个
交换机类型是 topic 的时候,值得注意的是,①当一个队列的绑定键是 # 时,那么队列将接收所有的数据,有点像 fanout 。②当队列绑定键没有 #* 时,那么该对列就像 direct
在这里插入图片描述
一、生产者

package com.mq.rabbitmq.topicexchange;import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;import java.util.HashMap;
import java.util.Map;
import java.util.Set;//主题交换机-生产者
public class TopicProducer {//定义交换机名称private static final String EXCHANGE_NAME = "topic_exchange";public static void main(String[] args) {try {//获取连接通道Channel channel = RabbitMQUtils.getChannel();//定义创建交换机及类型channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC, false, true, null);//定义路由键及其消息Map<String, Object> routeKeyMessage = new HashMap<>();routeKeyMessage.put("quick.orange.rabbit", "被队列 Q1Q2 接收到");routeKeyMessage.put("lazy.orange.elephant", "被队列 Q1Q2 接收到");routeKeyMessage.put("quick.orange.fox", "被队列 Q1 接收到");routeKeyMessage.put("lazy.brown.fox", "被队列 Q2 接收到");routeKeyMessage.put("lazy.pink.rabbit", "虽然满足两个绑定但只被队列 Q2 接收一次");routeKeyMessage.put("quick.brown.fox", "不匹配任何绑定不会被任何队列接收到会被丢弃");routeKeyMessage.put("quick.orange.male.rabbit", "是四个单词不匹配任何绑定会被丢弃");routeKeyMessage.put("lazy.orange.male.rabbit", "是四个单词但匹配 Q2");//发送消息Set<Map.Entry<String, Object>> entries = routeKeyMessage.entrySet();for (Map.Entry<String, Object> entry : entries) {//路由键(绑定键)String routeKey = entry.getKey();Object message = entry.getValue();channel.basicPublish(EXCHANGE_NAME, routeKey, null, (message == null ? "" : message.toString()).getBytes("UTF-8"));System.out.println("生产者发出消息" + message);}} catch (Exception e) {e.printStackTrace();}}
}

二、消费者
1、接收路由键为 *.orange.* 的消息

package com.mq.rabbitmq.topicexchange;import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;//消费者一
public class TopicConsumerOne {//定义交换机名称private static final String EXCHANGE_NAME = "topic_exchange";public static void main(String[] args) {try {//获取连接通道Channel channel = RabbitMQUtils.getChannel();//定义交换机及其类型channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC,false,true,null);//定义队列 Q1String queueName = "Q1";channel.queueDeclare(queueName, false, false, false, null);//绑定队列 routeKey = *.orange.*channel.queueBind(queueName, EXCHANGE_NAME, "*.orange.*");System.out.println("消费者一等待接收消息......");//消费回调DeliverCallback deliverCallback = (consumerTag, delivery) -> {long deliveryTag = delivery.getEnvelope().getDeliveryTag();try {String message = new String(delivery.getBody(), "UTF-8");System.out.println("接收队列:" + queueName + "绑定键(路由键):" + deliveryTag + ", 消息:" + message);//手动确认channel.basicAck(deliveryTag, false);} catch (Exception e) {e.printStackTrace();channel.basicNack(deliveryTag, false, true);}};//消费中断回调CancelCallback cancelCallback = consumerTag -> {System.out.println("消费消息被中断");};channel.basicConsume(queueName, false, deliverCallback, cancelCallback);} catch (Exception e) {e.printStackTrace();}}
}

2、接受路由键为 *.*.rabbitlazy.# 的消息

package com.mq.rabbitmq.topicexchange;import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;//消费者二
public class TopicConsumerTwo {//定义交换机名称private static final String EXCHANGE_NAME = "topic_exchange";public static void main(String[] args) {try {//获取连接通道Channel channel = RabbitMQUtils.getChannel();//定义交换机及其类型channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC,false,true,null);//定义队列 Q1String queueName = "Q2";channel.queueDeclare(queueName, false, false, false, null);//绑定队列 routeKey = *.orange.* / lazy.#channel.queueBind(queueName, EXCHANGE_NAME, "*.*.rabbit");channel.queueBind(queueName, EXCHANGE_NAME, "lazy.#");System.out.println("消费者二等待接收消息......");//消费回调DeliverCallback deliverCallback = (consumerTag, delivery) -> {long deliveryTag = delivery.getEnvelope().getDeliveryTag();try {String message = new String(delivery.getBody(), "UTF-8");System.out.println("接收队列:" + queueName + "绑定键(路由键):" + deliveryTag + ", 消息:" + message);//手动确认channel.basicAck(deliveryTag, false);} catch (Exception e) {e.printStackTrace();channel.basicNack(deliveryTag, false, true);}};//消费中断回调CancelCallback cancelCallback = consumerTag -> {System.out.println("消费消息被中断");};channel.basicConsume(queueName, false, deliverCallback, cancelCallback);} catch (Exception e) {e.printStackTrace();}}
}

死信队列

概念及应用场景

死信,顾名思义就是无法被消费的消息。一般来说说 producer 将消息发送到 broker 或者直接到 queue 里,consumer 从 queue 取出消息进行消费,但是由于某种特定原因导致 queue 中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,有死信自然就有了死信队列应用场景如下:

(1)为了保证订单业务的消息数据不丢失,需要用到 RabbitMQ 的死信队列机制,当消息消费发生异常时,将消息投入死信队列中。
(2)用户在商城下单成功并点击去支付后,在指定的时间段内未支付时,自动失效。

死信来源

1、消息 TTL(Time to live【生存时间值】)过期,即消息的生存时间过期
2、队列达到最大长度(队列满了,无法在添加消息到队列 mq 中)。
3、消息被拒绝(channel.basicReject() 或channel.basicNack() )并且 requeue = false

死信实例

一、架构图
在这里插入图片描述

消息 TTL 过期

一、生产者

package com.mq.rabbitmq.deadmq.msgttl;import com.mq.rabbitmq.deadmq.RabbitMQUtils;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;//消息生产者
public class MsgTTLProducer {//定义交换机名称private static final String EXCHANGE_NAME = "normal_exchange";//定义路由键private static final String ROUTE_KEY = "normal_route_key";public static void main(String[] args) {try {//获取连接通道Channel channel = RabbitMQUtils.getChannel();//设置交换机名称及其类型channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);//设置消息的TTL时间,等待 10 秒后发送到 normal_route_key 路由键的消息过期AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();//该信息是用作演示队列个数限制for (int i = 0; i < 10; i++) {String message = "info" + i;//发送消息到交换机channel.basicPublish(EXCHANGE_NAME, ROUTE_KEY, properties, message.getBytes("UTF-8"));System.out.println("生产者发送消息:" + message);}} catch (Exception e) {e.printStackTrace();}}
}

二、消费者
1、正常消费

package com.mq.rabbitmq.deadmq.msgttl;import com.mq.rabbitmq.deadmq.RabbitMQUtils;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;import java.util.HashMap;
import java.util.Map;//正常消息消费者
public class MsgTTLNormalConsumer {//定义正常交换机的名称private static final String NORMAL_EXCHANGE = "normal_exchange";//定义死信队列交换机名称private static final String DEAD_EXCHANGE = "dead_exchange";//定义正常队列名称private static final String NORMAL_QUEUE = "normal_queue";//定义失败队列名称private static final String DEAD_QUEUE = "dead_queue";public static void main(String[] args) {try {//获取连接通道Channel channel = RabbitMQUtils.getChannel();//定义正常(死信)交换机及其类型channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);//定义(死信)队列channel.queueDeclare(DEAD_QUEUE, false, false, false, null);//死信队列与死信交换机绑定channel.queueBind(DEAD_QUEUE, DEAD_EXCHANGE, "dead_route_key");//正常队列绑定死信队列消息Map<String, Object> params = new HashMap<>();//正常队列设置死信交换机 参数 key 为固定值params.put("x-dead-letter-exchange", DEAD_EXCHANGE);//正常队列设置死信 routing-key 参数 key 为固定值params.put("x-dead-letter-routing-key", "dead_route_key");//定义正常队列channel.queueDeclare(NORMAL_QUEUE, false, false, false, params);//正常队列绑定正常交换机channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, "normal_route_key");System.out.println("等待接收消息......");//确认回调DeliverCallback deliverCallback = (consumerTag, delivery) -> {long deliveryTag = delivery.getEnvelope().getDeliveryTag();try {String message = new String(delivery.getBody(), "UTF-8");System.out.println("消费者 1 接收到消息:" + message);channel.basicAck(deliveryTag, false);} catch (Exception e) {e.printStackTrace();channel.basicNack(deliveryTag, false, true);}};//中断回调CancelCallback cancelCallback = consumerTag -> {System.out.println("消息消费被中断");};channel.basicConsume(NORMAL_QUEUE, false, deliverCallback, cancelCallback);} catch (Exception e) {e.printStackTrace();}}}

2、死信消费

package com.mq.rabbitmq.deadmq.msgttl;import com.mq.rabbitmq.deadmq.RabbitMQUtils;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;//正常消息消费者
public class MsgTTLDeadConsumer {//定义死信队列交换机名称private static final String DEAD_EXCHANGE = "dead_exchange";//定义失败队列名称private static final String DEAD_QUEUE = "dead_queue";public static void main(String[] args) {try {//获取连接通道Channel channel = RabbitMQUtils.getChannel();//(死信)交换机及其类型channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);//定义(死信)队列channel.queueDeclare(DEAD_QUEUE, false, false, false, null);//死信队列与死信交换机绑定channel.queueBind(DEAD_QUEUE, DEAD_EXCHANGE, "dead_route_key");System.out.println("等待接收死信队列的消息......");//确认回调DeliverCallback deliverCallback = (consumerTag, delivery) -> {long deliveryTag = delivery.getEnvelope().getDeliveryTag();try {String message = new String(delivery.getBody(), "UTF-8");System.out.println("消费者 2 接收到消息:" + message);channel.basicAck(deliveryTag, false);} catch (Exception e) {e.printStackTrace();channel.basicNack(deliveryTag, false, true);}};//中断回调CancelCallback cancelCallback = consumerTag -> {System.out.println("消息消费被中断");};channel.basicConsume(DEAD_QUEUE, false, deliverCallback, cancelCallback);} catch (Exception e) {e.printStackTrace();}}
}

以下案例只展示关键概念和关键代码。

队列达到最大长度

参考 消息 TTL 过期 ,在 normal 消费者中加入如下代码可控制,正常消费队列的最大长度:
在这里插入图片描述

消息被拒绝,并且 requeue = false

消息确认的时候,加入如下代码,消息加入死信队列:
在这里插入图片描述

延时队列

概念

延时队列的内部是有序的,最重要的特性就体现在它的延时属性上,延时队列中的元素是希望指定时间到了以后或之前去除和处理。简单说,延时队列就是用来存放指定时间被处理的元素的队列。

应用场景

1、订单在十分钟之内未支付自动取消。
2、新创建的店铺如果在十天之内都没有上传过商品,则自动发送消息提醒。
3、用户注册成功后没如果三天内没有登录则短信提醒。
4、用户发起退款,如果三天内没有得到处理则通知相关运营人员。
5、预定会议后们需要在预定时间点前十分钟通知各个会议人员参加会议。

RabbitMQ 中的 TTL

TTL 是 RabbitMQ 中一个消息或者队列的属性,表明一条消息或者该队列中的所有
消息的最大存活时间,单位是毫秒。换句话说,如果一条消息设置了 TTL 属性或者进入了设置 TTL 属性的队列,那么这条消息如果在 TTL 设置的时间内没有被消费,则会成为"死信"。如果同时配置了队列的 TTL 和消息的TTL,那么较小的那个值将会被使用,有两种方式设置 TTL。

消息设置 TTL: 指的是针对每条消息设置消息的最大存活时间。
队列设置 TTL: 指的是在创建队列的时候设置队列的 x-message-ttl 属性。
二者的区别: 如果设置了队列的 TTL 属性,那么一旦消息过期,就会被队列丢弃(如果配置了死信队列被丢到死信队列中),而第二种方式,消息即使过期,也不一定会被马上丢弃,因为消息是否过期是在即将投递到消费者之前判定的,如果当前队列有严重的消息积压情况,则已过期的消息也许还能存活较长时间;另外,还需要注意的一点是,如果不设置 TTL,表示消息永远不会过期,如果将 TTL 设置为 0,则表示除非此时可以直接投递该消息到消费者,否则该消息将会被丢弃。

延时队列的详细介绍:https://blog.csdn.net/Zero_dot_degree/article/details/107521638

发布确认高级

在这里插入图片描述

在这里插入图片描述
代码地址:https://github.com/1914526816lhw/cloud-microservice/tree/master/cloud-stream-rabbitmq-provider8801/src/main/java/com/atguigu/springcloud/messagequeue

RabbitMQ的其他知识点

幂等性

1、 概念
用户对于同一操作发起的一次请求或者多次请求的结果是一致的,不会因为多次点击而产生了副作用。举个最简单的例子,那就是支付,用户购买商品后支付,支付扣款成功,但是返回结果的时候网络异常,此时钱已经扣了,用户再次点击按钮,此时会进行第二次扣款,返回结果成功,用户查询余额发现多扣钱了,流水记录也变成了两条。在以前的单应用系统中,我们只需要把数据操作放入事务中即可,发生错误
立即回滚,但是再响应客户端的时候也有可能出现网络中断或者异常等等

2、消息重复消费
消费者在消费 MQ 中的消息时,MQ 已把消息发送给消费者,消费者在给 MQ 返回 ack 时网络中断,故 MQ 未收到确认信息,该条消息会重新发给其他的消费者,或者在网络重连后再次发送给该消费者,但实际上该消费者已成功消费了该条消息,造成消费者消费了重复的消息。

3、 解决思路
MQ 消费者的幂等性的解决一般使用全局 ID 或者写个唯一标识比如时间戳 或者 UUID 或者订单消费者消费 MQ 中的消息也可利用 MQ 的该 id 来判断,或者可按自己的规则生成一个全局唯一 id,每次消费消息时用该 id 先判断该消息是否已消费过。
4、 消费端的幂等性保障
在海量订单生成的业务高峰期,生产端有可能就会重复发生了消息,这时候消费端就要实现幂等性,这就意味着我们的消息永远不会被消费多次,即使我们收到了一样的消息。业界主流的幂等性有两种操作:a.唯一 ID+指纹码机制,利用数据库主键去重, b.利用 redis 的原子性去实现

5、 唯一 ID+指纹码机制
指纹码:我们的一些规则或者时间戳加别的服务给到的唯一信息码,它并不一定是我们系统生成的,基本都是由我们的业务规则拼接而来,但是一定要保证唯一性,然后就利用查询语句进行判断这个 id 是否存在数据库中,优势就是实现简单就一个拼接,然后查询判断是否重复;劣势就是在高并发时,如果是单个数据库就会有写入性能瓶颈当然也可以采用分库分表提升性能,但也不是我们最推荐的方式。

6、 Redis 原子性
利用 redis 执行 setnx 命令,天然具有幂等性。从而实现不重复消费

优先队列

1、使用场景
在我们系统中有一个订单催付的场景,我们的客户在天猫下的订单,淘宝会及时将订单推送给我们,如果在用户设定的时间内未付款那么就会给用户推送一条短信提醒,很简单的一个功能对吧,但是,tmall商家对我们来说,肯定是要分大客户和小客户的对吧,比如像苹果,小米这样大商家一年起码能给我们创造很大的利润,所以理应当然,他们的订单必须得到优先处理,而曾经我们的后端系统是使用 redis 来存放的定时轮询,大家都知道 redis 只能用 List 做一个简简单单的消息队列,并不能实现一个优先级的场景,所以订单量大了后采用 RabbitMQ 进行改造和优化,如果发现是大客户的订单给一个相对比较高的优先级,否则就是默认优先级。

2、如何添加
(1)队列添加优先级

Map<String, Object> params = new HashMap();
params.put("x-max-priority", 10);
channel.queueDeclare("hello", true, false, false, params);

(2)消息添加优先级

AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().priority(5).build();

惰性队列

1、使用场景
RabbitMQ 从 3.6.0 版本开始引入了惰性队列的概念。惰性队列会尽可能的将消息存入磁盘中,而在消费者消费到相应的消息时才会被加载到内存中,它的一个重要的设计目标是能够支持更长的队列,即支持更多的消息存储。当消费者由于各种各样的原因(比如消费者下线、宕机亦或者是由于维护而关闭等)而致使长时间内不能消费消息造成堆积时,惰性队列就很有必要了。默认情况下,当生产者将消息发送到 RabbitMQ 的时候,队列中的消息会尽可能的存储在内存之中,这样可以更加快速的将消息发送给消费者。即使是持久化的消息,在被写入磁盘的同时也会在内存中驻留一份备份。当 RabbitMQ 需要释放内存的时候,会将内存中的消息换页至磁盘中,这个操作会耗费较长的
时间,也会阻塞队列的操作,进而无法接收新的消息。虽然 RabbitMQ 的开发者们一直在升级相关的算法,但是效果始终不太理想,尤其是在消息量特别大的时候。

2、两种模式
队列具备两种模式:default 和 lazy。默认的为 default 模式,在 3.6.0 之前的版本无需做任何变更。lazy模式即为惰性队列的模式,可以通过调用 channel.queueDeclare 方法的时候在参数中设置,也可以通过Policy 的方式设置,如果一个队列同时使用这两种方式设置的话,那么 Policy 的方式具备更高的优先级。如果要通过声明的方式改变已有队列的模式的话,那么只能先删除队列,然后再重新声明一个新的。在队列声明的时候可以通过“x-queue-mode”参数来设置队列的模式,取值为“default”和“lazy”。下面示例中演示了一个惰性队列的声明细节:

Map<String, Object> args = new HashMap<String, Object>();
args.put("x-queue-mode", "lazy");
channel.queueDeclare("myqueue", false, false, false, args)

3、内存开销对比
在这里插入图片描述
在发送 1 百万条消息,每条消息大概占 1KB 的情况下,普通队列占用内存是 1.2GB,而惰性队列仅仅占用 1.5MB 。


http://chatgpt.dhexx.cn/article/XMTgGpSF.shtml

相关文章

RabbitMq原理及应用

一、简介 MQ(Message Queue),即消息队列&#xff0c;是一种实现应用级别之间的通信手段。不同应用之间可以通过读写消息&#xff0c;以消息为媒介传递应用数据&#xff0c;不需要应用之间建立强连接。此方式与远程调用&#xff08;RPC&#xff09;是应用通信的常见方式。在这个…

RabbitMQ原理解析

场景模拟 在介绍RabbitMQ之前&#xff0c;我们先来看下面一个电商项目的场景&#xff1a; 商品的原始数据保存在数据库中&#xff0c;增删改查都在数据库中完成。搜索服务数据来源是索引库&#xff08;Elasticsearch&#xff09;&#xff0c;如果数据库商品发生变化&#xff0…

Rabbitmq基本原理和架构

全栈工程师开发手册 &#xff08;作者&#xff1a;栾鹏&#xff09; 架构系列文章 rabbitmq官网:https://www.rabbitmq.com/rabbitmqctl.8.html MQ全称为Message Queue, 是一种分布式应用程序的的通信方法&#xff0c;它是消费-生产者模型的一个典型的代表&#xff0c;produc…

RabbitMQ — RabbitMQ使用以及原理解析

RabbitMQ使用以及原理解析 RabbitMQ是一个由erlang开发的AMQP(Advanved Message Queue)的开源实现;在RabbitMQ官网上主要有这样的模块信息, Work queues消息队列,Publish/Subscribe发布订阅服务,Routing, Topics, RPC等主要应用的模块功能. 几个概念说明: Broker:它提供一种传…

RabbitMQ介绍及工作原理

RabbitMQ介绍及工作原理 一&#xff0c;什么是RabbitMQ ​ RabbitMQ是一种称为消息代理或队列管理器的消息队列软件。它是一个可以定义队列的软件&#xff0c;应用程序可以连接到队列并将消息传输到它们。消息队列的基本体系结构很简单&#xff1a;存在称为生产者的客户端应用…

彻底理解RabbitMQ底层原理

1.RabbitMQ概念 RabbitMQ 是一个由 Erlang 语言开发的 AMQP 的开源实现。 AMQP &#xff1a;Advanced Message Queue&#xff0c;高级消息队列协议。它是应用层协议的一个开放标准 &#xff0c;为 面向消息的中间件设计&#xff0c;基于此协议的客户端与消息中间件可传递消息&…

rabbitmq消息队列原理

一、rabbitmq架构 RabbitMQ是一个流行的开源消息队列系统&#xff0c;是AMQP&#xff08;高级消息队列协议&#xff09;标准的实现&#xff0c;由以高性能、健壮、可伸缩性出名的Erlang语言开发&#xff0c;并继承了这些优点。rabbitmq简单架构如下&#xff1a; 上图简单展…

SpringBoot整合RabbitMQ及其原理分析

上一篇&#xff1a;RabbitMQ基础知识 1、相关依赖 这里无需指定版本号&#xff0c;让其跟着SpringBoot版本走。本示例使用SpringBoot版本号为2.7.10。 <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-…

rabbitmq详解

rabbitmq 一、简介二、业务场景1、异步2、应用解耦3、流量削峰 三、下载四、界面认识五、五种模型示例0、springboot依赖配置1、Hello World简单模型2、Work queues工作队列3、Publish/Subscribe发布订阅模型4、Routing路由模型5、Topics主题模型6、消息转换器 六、进阶1、消费…

[RabbitMQ--1] MQ简介

目录 1.MQ 的相关概念 1.1.什么是 MQ&#xff1f; 1.2.为什么要用MQ&#xff1f;MQ的应用场景 1.2.1.流量消峰&#xff1a; 1.2.2.任务异步处理&#xff1a; 1.2.3.应用解耦 2.AMQP和JMS 3.MQ 的分类 1.ActiveMQ 2.Kafka 3..RocketMQ 4..RabbitMQ 4.RabbitMQ 1.四…

什么是RabbitMq?其原理?

什么是RabbitMq&#xff1f; RabbitMQ是一个实现了AMQP&#xff08;Advanced Message Queuing Protocol&#xff09;高级消息队列协议的消息队列服务&#xff0c;用Erlang语言。 rabbitmq原理 1.Producer&#xff1a;即数据的发送方。创建消息并将其发布(发送)到代理服务器 一…

RabbitMQ原理详解

RabbitMQ&#xff1a;我们通常谈到消息队列&#xff0c;就会联想到这其中的三者&#xff1a;生产者、消费者和消息队列&#xff0c;生产者将消息发送到消息队列&#xff0c;消费者从消息队列中获取消息进行处理。对于RabbitMQ&#xff0c;它在此基础上做了一层抽象&#xff0c;…

你应该知道的 9 种 前端设计模式

本篇文章给大家介绍 9 种 前端设计模式。有一定的参考价值&#xff0c;有需要的朋友可以参考一下&#xff0c;希望对大家有所帮助。 什么是设计模式&#xff1f; 设计模式是对软件设计开发过程中反复出现的某类问题的通用解决方案。设计模式更多的是指导思想和方法论&#xff…

前端需要了解的设计模式

什么是设计模式&#xff1f; 设计模式是对软件设计开发过程中反复出现的某类问题的通用解决方案。设计模式更多的是指导思想和方法论&#xff0c;而不是现成的代码&#xff0c;当然每种设计模式都有每种语言中的具体实现方式。学习设计模式更多的是理解各种模式的内在思想和解…

前端设计模式应用

前端设计模式应用 什么是设计模式 软件设计中常见问题的解决方案模型: 历史经验的总结与特定语言无关 设计模式背景 模式语言:城镇、建筑、建造 (A Pattern Language:Towns, Buildings,Construction)1977设计模式:可复用面向对象软件的基础 (Design Patterns: Elements of …

web端设计和web前端开发的区别

Web前端开发技术主要包括三个要素&#xff1a;HTML、CSS和JavaScript&#xff01; 它要求前端开发工程师不仅要掌握基本的Web前端开发技术&#xff0c;网站性能优化、SEO和服务器端的基础知识&#xff0c;而且要学会运用各种工具进行辅助开发以及理论层面的知识&#xff0c;包括…

前端开发中常用设计模式-总结篇

本文是向大家介绍前端开发中常用的设计模式&#xff0c;它使我们编写的代码更容易被复用&#xff0c;也更容易被人理解&#xff0c;并且保证代码的稳定可靠性。 1.什么是设计模式 通俗来讲&#xff0c;就是日常使用设计的一种惯性思维。 因为对应的这种思维&#xff0c;以及对…

前端的设计模式有哪些呢

谈谈设计模式~ 文章目录 什么是设计模式设计模式分类1. 结构型模式2. 创建型模式3. 行为型模式具体使用 什么是设计模式 设计模式&#xff0c;是对软件设计开发过程中反复出现的某类问题的通用解决方案。设计模式是指一种思想和方法论&#xff0c;先有设计思想&#xff0c;才能…

【面试】最新web前端经典面试题试题及答案(持续更新)-html/css、js、vue、http、web安全、前端性能、浏览器、js算法

author: aSuncat JavaScript知识点大全&#xff1a;https://www.yuque.com/webfront/js 所有最新最全面试题&#xff0c;持续更新在语雀。见 语雀-前端面试题&#xff0c;欢迎点击关注~ 阅读目录 html/ css&#xff1a;https://blog.csdn.net/aSuncat/article/details/88789368…

为什么说前端一定要学好设计模式?

设计模式&#xff08;Design Pattern&#xff09;大家一定不陌生。通俗地讲&#xff0c;它是前辈们对代码开发经验的总结&#xff0c;让你写出可扩展、可读、可维护的高质量代码&#xff0c;还能让你在遇到相似的问题、场景时&#xff0c;快速找到更优的解决方案。 也许你会说&…