MQ2

article/2025/8/28 1:20:05

死信队列

什么是死信队列

一般来说,producer将消息投递到queue中,consumer从queue取出消息进行消费,但某些时候由于特定的原因导致queue中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信(Dead Letter),所有的死信都会放到死信队列中。

“死信”消息会被RabbitMQ进行特殊处理,如果配置了死信队列信息,那么该消息将会被丢进死信队列中,如果没有配置,则该消息将会被丢弃。

消费者消费消息

​ 1)正常消费–>手动ack–>MQ从队列中删除消息

​ 2)消费者报错–>没有ack–>消息是待应答状态–>channel断开后–>消费恢复为待分配状态

​ 3)消费者报错–>手动nack–>

​ 1、如果配置了死信队列消息会被发送到死信队列中,

​ 2、如果没有配置会被放入队列首部,如果消费者设置了requeue=false,则消息被丢弃。

1600357615151

死信队列的来源

  • 消息被拒绝(basic.reject或basic.nack)并且requeue=false.
  • 消息TTL过期
  • 队列达到最大长度(队列满了,无法再添加数据到mq中)

配置死信队列

@Configuration
public class RabbitMQConfig {// 声明业务Exchange@Beanpublic TopicExchange businessExchange(){return new TopicExchange("businessExchange");}// 声明业务队列A@Beanpublic Queue businessQueue(){Map<String, Object> args = new HashMap<>();
//       x-dead-letter-exchange    这里声明当前队列绑定的死信交换机args.put("x-dead-letter-exchange", "deadLetterExchange");
//       x-dead-letter-routing-key  这里声明当前队列的死信路由keyargs.put("x-dead-letter-routing-key", "dle.err");return new Queue("businessQueue",true,false,false,args);}// 声明业务队列A绑定关系@Beanpublic Binding businessBinding(Queue businessQueue, TopicExchange businessExchange){return BindingBuilder.bind(businessQueue).to(businessExchange).with("emp.*");}//声明死信Exchange@Beanpublic TopicExchange deadLetterExchange(){return new TopicExchange("deadLetterExchange");}// 声明死信队列A@Beanpublic Queue deadLetterQueue(){return new Queue("dle-queue");}@Beanpublic Binding deadLetterQueueBinding(Queue deadLetterQueue, TopicExchange deadLetterExchange){return BindingBuilder.bind(deadLetterQueue).to(deadLetterExchange).with("dle.*");}}

YML配置

spring:rabbitmq:host: 192.168.193.88port: 5672username: guestpassword: guestvirtual-host: /listener:simple:acknowledge-mode: manual # 设置手动ack

设置消费者

@Component
public class DedaLetterListener {// 监听业务队列@RabbitListener(queues = "businessQueue")public void businessQueue(String msg, Channel channel, Message message) throws IOException {if ("error".equals(msg)) {System.out.println("业务消费者出现问题:" + msg);try {throw new RuntimeException();}catch (Exception e){// 无法消费消息,nackchannel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);}} else {System.out.println("正常消费消息:" + msg);// 正常消费了消息,手动ackchannel.basicAck(message.getMessageProperties().getDeliveryTag(), false);}}// 监听死信队列@RabbitListener(queues = "dle-queue")public void deadLetterQueue(String msg, Channel channel, Message message) throws IOException {System.out.println("死信队列消费消息:" + msg);channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);}
}
参数说明
// deliveryTag:该消息的index
// multiple:是否批量.true:将一次性ack所有小于deliveryTag的消息。
public void basicAck(long deliveryTag, boolean multiple)//deliveryTag: 可以看作消息的编号,它是一个 64 位的长整型值。
//multiple:是否批量.// true表示nack编号之前(小于)所有未被当前消费者确认的消息。// false表示nack这一条消息。
//requeue:是否重新入队列,// 设置为false消息不会重新入队列,会直接从队列中删除,// 设置为true消息会重新添加都消息的头部,分配给其他消费者(如果只有一个还会分配给当前消费者)
public void basicNack(long deliveryTag, boolean multiple, boolean requeue) 

设置提供者

    @AutowiredRabbitTemplate rabbitTemplate;@RequestMapping("/send")public void send(String msg){System.out.println("msg = [" + msg + "]");rabbitTemplate.convertAndSend("businessExchange","emp.add",msg);}

死信消息的变化

​ 如果队列配置了参数 x-dead-letter-routing-key 的话,“死信”的路由key将会被替换成该参数对应的值。如果没有设置,则保留该消息原有的路由key。

比如:

​ 如果原有消息的路由key是testA,被发送到业务Exchage中,然后被投递到业务队列QueueA中,如果该队列没有配置参数x-dead-letter-routing-key,则该消息成为死信后,将保留原有的路由keytestA,如果配置了该参数,并且值设置为testB,那么该消息成为死信后,路由key将会被替换为testB,然后被抛到死信交换机中。

死信队列的应用场景

  一般用在较为重要的业务队列中,确保未被正确消费的消息不被丢弃,一般发生消费异常可能原因主要有由于消息信息本身存在错误导致处理异常,处理过程中参数校验异常,或者因网络波动导致的查询异常等等,当发生异常时,当然不能每次通过日志来获取原消息,然后让运维帮忙重新投递消息。通过配置死信队列,可以让未正确处理的消息暂存到另一个队列中,待后续排查清楚问题后,编写相应的处理代码来处理死信消息,这样比手工恢复数据要好太多了 。

延时队列

什么是延时队列

​ 延迟队列存储的对象肯定是对应的延时消息,所谓"延时消息"是指当消息被发送以后,并不想让消费者立即拿到消息,而是等待指定时间后,消费者才拿到这个消息进行消费。

普通队列中的元素总是等着希望被早点取出处理,而延时队列中的元素则是希望被在指定时间得到取出和处理 。

延时队列的设置

RbbitMQ中存在TTL机制,一条消息或者该队列中的所有消息的最大存活时间,单位是毫秒。换句话说,如果一条消息设置了TTL属性或者进入了设置TTL属性的队列,那么这条消息如果在TTL设置的时间内没有被消费,则会成为“死信”。如果同时配置了队列的TTL和消息的TTL,那么较小的那个值将会被使用。

给消息设置TTL时间
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-message-ttl", 6000);// 但是毫秒
channel.queueDeclare(queueName, durable, exclusive, autoDelete, args);

每条消息的超时时间是6s,如果6s内没有被消费者消费,该消息就会变成死信。

给队列设置超时时间
    @Beanpublic Queue businessQueue1(){Map<String, Object> args = new HashMap<>();args.put("x-message-ttl", 5000);  // 这个队列中的所有的消息最多能活6sreturn new Queue("5-queue",true,false,false,args);}

但这两种方式是有区别的,如果设置了队列的TTL属性,那么一旦消息过期,就会被队列丢弃,而第二种方式,消息即使过期,也不一定会被马上丢弃,因为消息是否过期是在即将投递到消费者之前判定的,如果当前队列有严重的消息积压情况,则已过期的消息也许还能存活较长时间

1600502409459

​ 为什么这两种方法处理的方式不一样?因为第二种方法里,队列中己过期的消息肯定在队列头部, RabbitMQ 只要定期从队头开始扫描是否有过期的消息即可。而第一种方法里,每条消息的过期时间不同,如果要删除所有过期消息势必要扫描整个队列,所以不如等到此消息即将被消费时再判定是否过期,如果过期再进行删除即可。

​ RabbitMQ 会确保在过期时间到达后将队列删除,但是不保障删除的动作有多及时 。在RabbitM 重启后,持久化的队列的过期时间会被重新计算。

配置延时队列

@Configuration
public class RabbitMQConfigTTL {// 声明业务Exchange@Beanpublic TopicExchange businessExchange(){return new TopicExchange("ttl-Exchange");}// 创建延时队列1@Beanpublic Queue businessQueue1(){Map<String, Object> args = new HashMap<>();args.put("x-dead-letter-exchange", "deadLetterExchange");args.put("x-dead-letter-routing-key", "dle.err");args.put("x-message-ttl", 5000);   // 超时时间是5sreturn new Queue("5-queue",true,false,false,args);}// 创建延时队列2@Beanpublic Queue businessQueue2(){Map<String, Object> args = new HashMap<>();args.put("x-dead-letter-exchange", "deadLetterExchange");args.put("x-dead-letter-routing-key", "dle.err");args.put("x-message-ttl", 20000); //  // 超时时间是20sreturn new Queue("20-queue",true,false,false,args);}// 延时队列绑定关系@Beanpublic Binding businessBinding1(Queue businessQueue1, TopicExchange businessExchange){return BindingBuilder.bind(businessQueue1).to(businessExchange).with("emp.*");}// 延时队列绑定@Beanpublic Binding businessBinding2(Queue businessQueue2, TopicExchange businessExchange){return BindingBuilder.bind(businessQueue2).to(businessExchange).with("user.*");}//声明死信Exchange@Beanpublic TopicExchange deadLetterExchange(){return new TopicExchange("deadLetterExchange");}// 声明死信队列@Beanpublic Queue deadLetterQueue(){return new Queue("dle-queue",true,false,false,null);}// 死信队列绑定交换机@Beanpublic Binding deadLetterQueueBinding(Queue deadLetterQueue, TopicExchange deadLetterExchange){return BindingBuilder.bind(deadLetterQueue).to(deadLetterExchange).with("dle.*");}
}

YAML配置

spring:rabbitmq:host: 192.168.193.88port: 5672username: guestpassword: guestvirtual-host: /listener:simple:acknowledge-mode: manual # 设置手动ack

设置提供者

    @RequestMapping("/ttl")public void test1(String msg) {System.out.println("p:" + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));if ("5".equals(msg)) { // 添加到5s队列rabbitTemplate.convertAndSend("ttl-Exchange", "emp.add", msg);} else if ("20".equals(msg)) { // 添加到20s队列中rabbitTemplate.convertAndSend("ttl-Exchange", "user.add", msg);}}

设置消费者

    @RabbitListener(queues = "dle-queue")public void dleQueue(String msg, Channel channel, Message message) throws IOException {System.out.println("dleQueue1:" + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);}

消费者消息确认机制

为了保证消息从队列可 地达到消费者, RabbitMQ 提供了消 息确认机制 messageacknowledgement 消费者在订阅队列时,可以指定 aut oAck 参数,当 autoAck 等于 false时, RabbitMQ 会等待消费者显式地回复确认信号后才从内存 (或者磁盘)中移去消息(实质上是先打上删除标记,之后再删除)。

当 utoAck 等于 true 时, RabbitMQ 会自动把发送出去的消息置为确认,然后从内存(或者磁盘)中删除,而不管消费者是否真正地消费到了这些消息采用消息确认机制后,只要设置 autoAck 参数为 false ,消费者就有足够的时间处理消息(任务〉 ,不用担心处理消息过程中消费者进程挂掉后消息丢失的问题 因为 RabbitMQ 会一直等待持有消息直到消费者显式调 Basic.Ack 命令为止

当autoAck 参数置为 false ,对于 RabbitMQ 服务端而 ,队列中的消息分成了两个部分部分是等待投递给消费者的消息: 部分是己经投递给消费者,但是还没有收到消费者确认信号的消息。如果 RabbitMQ 直没有收到消费者的确认信号,并且消费此消息的消费者己经断开连接,则 RabbitMQ 会安排该消息重新进入队列,等待投递给下 个消费者,当然也有可能还是原来的那个消费者。

RabbitMQ 不会为未确认的消息设置过期时间,它判断此消息是否需要重新投递给消费者的 唯一依据是消费该消息的消费者连接是否己经断开,这么设计的原因是 RabbitMQ 许消费者消费一条消息的时间可以很久很久。

自动ACK

ack分为自动ack和手动ack两种
如果是自动ack,有两个弊端:

  1. MQ 只需要确认消息发送成功,无需等待应答就会丢弃消息,这样导致如果消费者客户端还未处理完消息,出现异常或者断电时消息丢失的后果。
  2. 自动ack没有qos控制,可能消费者客户端因为瞬间收到太多消息导致服务挂掉

所以,常用的是手动ack应答

手动ACK

手动ack存在弊端:

如果消费者存在Bug的话,就会导致所有的消息都抛出异常,然后队列的Unacked消息数暴涨,导致MQ响应越来越慢,然后down掉 。

原因:因为上面消费者抛出异常,所以MQ没有得到ack响应,注意:这些消息会堆积在Unacked消息里,不会抛弃,即使另外打开一个消费者也不会被消费,直到原来的消费者客户端断开重连时,才会变成ready,这时如果通过qos设置了prefetch,没有ack响应的话,Broker不会再分配新的消息下来,就导致了阻塞

NACK

nack是什么呢?其实就是会通知MQ把消息塞回的队列头部(不是尾部),而不是变成Unacked,这样消费者客户端可以直接获取到这条消息。但是问题又来了,如果消费者有问题,那就算放回队列头部了,下次取出消费,还是会报错,又被送回队首,这样就陷入死循环了 。

消费者消息拒绝

在消费者接收到消息后,如果想明确拒绝当前的消息而不是确认,那么应该怎么做呢?可以使用Basic.Reject 这个命令,消费者客户端可以调用与其对应的 channel.basicReject 方法来告诉 RabbitMQ 拒绝这个消息。
Channel 类中的 basicReject 方法定义如下:

void basicReject(long deliveryTag, boolean requeue) throws IOException; 

deliveryTag:消息的编号。

requeue:是否把拒绝后的消息重新入队列

​ 如果参数设置为 true ,则 RabbitMQ 会重新将这条消息存入队列,以便可以发送给下一个订阅的消费者;

如果 requeue 参数设置为 false ,则 RabbitMQ立即会把消息从队列中移除,而不会把它发送给新的消费者。

也可以使用basicNack方法来拒绝

注意点

channel.basicReject 或者 channel.basicNack 中的 requeue 设直为 false ,可以启用“死信队列”的功能。死信队列可以通过检测被拒绝或者未送达的消息来追踪问题。

消费者消费模式

​ RabbitMQ的消费模式分为两种 推( Pu )模式和拉( Pull )模式,推模式采用 Basic Consume 进行消费,而拉模式则是调用 Basic Get 进行消费。

  public String basicConsume(String queue, boolean autoAck, Consumer callback) // 推模式public GetResponse basicGet(String queue, boolean autoAck)  // 拉模式

Basic Consume 将信道( Channel )直为接收模式,直到取消队列的订阅为止。在接收 模式期间, RabbitMQ 会不断地推送消息给消费者,当然推送消息的个数还是会受到 Basic.Qos 的限制。

如果只想从队列获得单条消息而不是持续订阅,建议还是使用 Basic.Get 进行消费.但 是不能将 Basic.Get 放在一个循环里来代替 Basic.Consume ,这样做会严重影响 RabbitMQ 的性能。如果要实现高吞吐量,消费者理应使用 Basic.Consume 方法。

消息持久化

提供者

1、事务 --》确认消息已经到了队列

2、Confirm --》确认消息已经到了队列

3、return --》消息没有路由到队列 (在某些情况下,如果我们发送消息的时候,当前的exchange或者routeKey路由不到的时候,这个时候如果我们需要监听这种不可到达的消息,就要使用Return Listener)

MQServer

1、交换机,队列,消息全部持久化

2、镜像队列机制(解决MQ把消息持久化到磁盘时MQ宕机)

消费者

1、自动ack

2、手动ack

消息的重复消费

消息的重复消费场景如下:

消息已经投递了消费者,消费者正常消费后准备给MQ应答ACK,此时网络出现了闪断。channel断开连接,消息再次被放入到MQ的头部,为了保证消息至少被消费一次,当网络恢复正常后MQ再次发送消息给消费者。此时同一个消息就会被消费两次。、

消息的幂等性: 使用同样的条件,一次请求和重复的多次请求对系统资源的影响是一致的

大量消息堆积如何处理

出现的原因

​ 1、消费者网络故障

​ 2、消费者出现异常,没有ack

1603624551770

解决方案

1603624821007

MQ优点和缺点

1、优点

​ 1、异步

​ 2、解耦

​ 3、削锋

2、缺点

​ 1、系统的可用性降低(MQ挂了。。。)

​ 2、复杂度提高

​ 3、一致性问题


http://chatgpt.dhexx.cn/article/BNNDcOqy.shtml

相关文章

MQ-2学习笔记

1.工作原理 MQ-2型烟雾传感器属于二氧化锡半导体气敏材料&#xff0c;属于表面离子式N型半导体。处于200~300摄氏度时&#xff0c;二氧化锡吸附空气中的氧&#xff0c;形成氧的负离子吸附&#xff0c;使半导体中的电子密度减少&#xff0c;从而使其电阻值增加。当与烟雾接触时…

01、RabbitMQ入门

目录 1.、什么是MQ 2、应用场景 3、主流MQ框架 4、Docker安装部署RabbitMQ 5、RabbitMQ管理平台 6、MQ的核心概念 单一生产者和单一消费者 7、springboot整合rabbitmq 执行测试方法testRabbitmq&#xff0c;控制台输出&#xff1a;receive msg : test rabbitmq messag…

MQ2烟雾传感器

1、MQ-2气体传感器所使用的气敏材料是在清洁空气中电导率较低的二氧化锡(SnO2)。当传感器所处环境中存在可燃气体时&#xff0c;传感器的电导率随空气中可燃气体浓度的增加而增大。使用简单的电路即可将电导率的变化转换为与该气体浓度相对应的输出信号。MQ-2气体传感器可用于家…

MQ-2烟雾传感器

一、MQ-2烟雾传感器简介 MQ-2常用于家庭和工厂的气体泄漏监测装置&#xff0c;适宜于液化气、苯、烷、酒精、氢气、烟雾等的探测。故因此&#xff0c;MQ-2可以准确来说是一个多种气体探测器。 MQ-2的探测范围极其的广泛。它的优点&#xff1a;灵敏度高、响应快、稳定性好、寿…

MQ-2烟雾浓度传感器(STM32F103)

本实验是通过串口调试助手显示STM32F103C8T6采集到MQ-2传感器的电压值。 一、 概述 1. 简介 MQ-2可用于家庭和工厂的气体泄漏监装置&#xff0c;适宜于液化气、丁烷、丙烷、甲烷、酒精、烟雾等的探测。它的优点是灵敏度高、响应快、稳定性好。寿命长、驱动电路简单以及方便安…

MQ-2烟雾传感器的原理及使用教程

一、MQ-2烟雾传感器简介 MQ-2常用于家庭和工厂的气体泄漏监测装置&#xff0c;适宜于液化气、苯、烷、酒精、氢气、烟雾等的探测。故因此&#xff0c;MQ-2可以准确来说是一个多种气体探测器。 MQ-2的探测范围极其的广泛。它的优点&#xff1a;灵敏度高、响应快、稳定性好、寿命…

MQ-2烟雾传感器的使用

一、MQ-2烟雾传感器简介 MQ-2烟雾传感器采用在清洁空气中电导率较低的二氧化锡(SnO2)&#xff0c;属于表面离子式N型半导体。当MQ-2烟雾传感器在200到300摄氏度环境时&#xff0c;二氧化锡吸附空气中的氧&#xff0c;形成氧的负离子吸附&#xff0c;使半导体中的电子密度减少&a…

MQ-2烟雾传感器模块功能实现(STM32)

认识MQ-2模块与其工作原理 MQ-2型烟雾传感器属于二氧化锡半导体气敏材料&#xff0c;属于表面离子式N型半导体。当处于200~300摄氏度时&#xff0c;二氧化锡吸附空气中的氧&#xff0c;形成氧的负离子吸附&#xff0c;使半导体中的电子密度减少&#xff0c;从而使其电阻值增加。…

MQ-2烟雾浓度传感器

文章目录 一、模块简介二、工作原理三、程序设计 本实验将采集到的传感器数据利用ADC转换&#xff0c;将转换后的电压值显示在串口调试助手上 一、模块简介 MQ-2烟雾传感器所使用的气敏材料是在清洁空气中电导率较低的二氧化锡(SnO2)。当烟雾传感器所处环境中存在可燃气体时&a…

python杨辉三角输出指定行_python杨辉三角输出指定行_使用python打印十行杨辉三角过程详解...

如何用python输出杨辉三角 程序输出需要实现如下效果&#xff1a; [1] [1,1] [1,2,1] [1,3,3,1] .. 方法&#xff1a;迭代&#xff0c;生成器 123456789101112131415161718192021 def triangles() L [1] while True: yiled L L [1] [L[i] L[I1] for i in range(len(L)-1)] …

蓝桥杯 python 杨辉三角

欢迎使用Markdown编辑器 你好&#xff01; 这是你第一次使用 Markdown编辑器 所展示的欢迎页。如果你想学习如何使用Markdown编辑器, 可以仔细阅读这篇文章&#xff0c;了解一下Markdown的基本语法知识。 新的改变 我们对Markdown编辑器进行了一些功能拓展与语法支持&#x…

python杨辉三角 简单方法

何为杨辉三角&#xff0c;杨辉三角就是&#xff0c;第一行与第二行分别为1和1,1 再往后第三行就有规律了&#xff0c;除了开头和结尾的数&#xff08;都是1&#xff09;&#xff0c;每个数都是自己左上角和右上角的和。如图&#xff1a; 如何用编程把它实现呢&#xff1f;我们…

Python 杨辉三角

前言&#xff1a;我在学习Python的时候&#xff0c;正好学到列表推导式&#xff0c;于是这里尝试运用列表推导式来写一个杨辉三角。如果能点出其中不足或提出优化建议&#xff0c;感激不尽。 杨辉三角&#xff1a;杨辉三角左右两侧的数字都是1&#xff0c;而里面的数字等于它肩…

python杨辉三角

题目 杨辉三角形&#xff0c;也称帕斯卡三角&#xff0c;其定义为&#xff1a;顶端是 1,视为(row0).第1行(row1)(1&1)两个1,这两个1是由他们上头左右两数之和 (不在三角形内的数视为0).依此类推产生第2行(row2):011;112;101.第3行(row3):011;123; 213;101. 循此法可以产生以…

Python程序:输出杨辉三角的几种办法

文章目录 一、问题描述二、问题分析三、第一种方法1、具体代码2、运行结果3、程序的改进 四、第二种方法1、具体代码2、运行结果 五、总结分析 一、问题描述 给定一个非负整数 n&#xff0c;生成「杨辉三角」的前 n行。 在「杨辉三角」中&#xff0c;每个数是它左上方和右上方…

用python实现杨辉三角的几种不同方式

杨辉三角的概念 比较详细的知识可以看这里&#xff0c;在杨辉三角中&#xff0c;每个数是它左上方和右上方的数的和。 1/ \1 1/ \ / \1 2 1/ \ / \ / \1 3 3 1/ \ / \ / \ / \1 4 6 4 1/ \ / \ / \ / \ / \ 1 5 10 10 5 1解法1&#xff1a;动态规…

【杨辉三角python】

文章目录 杨辉三角实现方法&#xff08;Python&#xff09; 一、杨辉三角是什么&#xff1f;二、杨辉三角解法 杨辉三角实现方法&#xff08;Python&#xff09; 一、杨辉三角是什么&#xff1f; 杨辉三角&#xff0c;是二项式系数在三角形中的一种几何排列&#xff0c;中国南…

Python实现杨辉三角(2种实现方案)

杨辉三角形&#xff0c;又称贾宪三角形、帕斯卡三角形&#xff0c;是二项式系数在三角形中的一种几何排列。 下图显示了杨辉三角的前 7 行&#xff1a; 递归打印杨辉三角 杨辉三角形中的数&#xff0c;正是(xy)的 N 次方幂展开式各项的系数&#xff0c;下面以递归的方法来打印…

【Python实现杨辉三角】

目录 什么是杨辉三角 杨辉三角解法 1. 定义法 2. 计算杨辉三角 补0法 3. 杨辉三角&#xff0c;对称法 4. 杨辉三角&#xff0c;单列表方法 5.列表嵌套&#xff08;二维数组&#xff09; 6. 新旧两行&#xff0c;一次性开辟新行 7.yield函数 8.zip函数 参考资料链接&…

c++自定义函数

对于小型应用程序来说不设计自定义函数完全可行&#xff0c;但随着程序越来越大&#xff0c;越来越复杂&#xff0c;实现的功能越来越多&#xff0c;如果不使用函数&#xff0c;main函数将变的越来越复杂越来越雍肿越来越令人难懂&#xff0c;而在更改程序的每一个功能的时候&a…