MQ - 如何保证消息不丢失?处理重复消息?消息堆积处理?

article/2025/9/27 3:39:39

什么是消息队列

在百度百科中,消息队列是这么解释的:“消息队列”是在消息的传输过程中保存消息的容器。

消息队列全称为英文 Message Queue 简称(MQ)是一种应用程序对应用程序的通信方法。MQ 是消费-生产者模型的一个典型的代表,一端往消息队列中不断写入消息,而另一端则可以取队列中的消息。消息发布者(生产者)只管把消息发布到 MQ 中而不用管谁来取,消息使用者(消费方)只管从 MQ 中取消息而不用管是谁发布的。

目前市面上的消息队列有很多,例如:Kafka、RabbitMQ、ActiveMQ、RocketMQ、ZeroMQ等。

为什么需要消息队列

从本质上来说是因为互联网的快速发展,业务不断扩张,促使技术架构需要不断的演进。

从以前的单体架构到现在的微服务架构,成百上千的服务之间相互调用和依赖。从互联网初期一个服务器上有 100 个在线用户已经很了不得,到现在坐拥 10 亿日活的微信。我们需要有一个东西来解耦服务之间的关系、控制资源合理合时的使用以及缓冲流量洪峰等等。

消息队列就应运而生了。消息队列的应用场景:异步处理、服务解耦、流量控制。

异步处理

随着公司的业务发展会发现项目的请求链路会越来越长,例如刚开始的电商项目,流程就是扣库存、下单。慢慢地又加上积分服务、短信服务等。这一路同步调用下来需要的时间就很长,客户也会等得不耐烦了,有没有更好的方法调用接口减少响应的时间呢,这时候就是消息队列登场的时刻了。

调用链路长、响应就慢了,并且相对于扣库存和下单,积分和短信没必要这么的及时。因此只需要在下单结束那个流程,扔个消息到消息队列中就可以直接返回响应了。而且积分服务和短信服务可以并行的消费这条消息。

可以看出消息队列可以减少请求的等待,还能让服务异步并发处理,提升系统总体性能。

下面我们看张图就明白了,如下所示:

在这里插入图片描述

服务解耦

模拟秒杀场景,现在用户下单需要经过订单服务、和库存服务,如下图:
在这里插入图片描述
如果库存服务出现问题,会导致订单服务下单失败。而且如果库存服务接口修改了,会导致订单服务也无法工作。

使用消息队列可以实现服务与服务之间的解耦,订单服务不再调用库存服务接口,而是把订单消息写入到消息队列。库存服务从消息队列中拉取消息,然后再减库存,从而实现服务的解耦。

流量控制

想必大家都听过「削峰填谷」,后端服务相对而言都是比较脆弱的,因为业务较重,处理时间较长。像一些例如秒杀活动爆发式流量打过来可能就顶不住了。因此需要引入一个中间件来做缓冲,消息队列再适合不过了。

那么消息队列又是如何完成削峰的呢?好比如你现在有两台机器,每台只能处理 1000 个请求。

在这里插入图片描述

那预估业务会来 3000 个请求,那么这时怎么办?削峰!把请求的流量高峰削掉,每台机器处理了1000个请求,剩下1000 个请求先放到消息队列中,等机器根据自己处理请求的能力去消息队列中拿。

在这里插入图片描述
这里需要注意的是:引入消息队列固然会有很多的好处,但是多引入一个中间件系统的稳定性就下降一层,运维的难度抬高一层。因此要权衡利弊,系统是演进的。

消息队列基本概念

消息队列有两种模型:队列模型和发布/订阅模型。

队列模型

生产者往某个队列里面发送消息,一个队列可以存储多个生产者的消息,一个队列也可以有多个消费者, 但是消费者之间是竞争关系,即每条消息只能被一个消费者消费。
在这里插入图片描述

发布/订阅模型

为了解决一条消息能被多个消费者消费的问题,我们可以使用发布/订阅模型。该模型是将消息发往一个 Topic 主题中,所有订阅了这个 Topic 的订阅者都能消费这条消息。

就好比我们在一个微信群里面,我发送一条消息,只要在这个群里面的人都能收到我发送的消息,而队列模型就是一对一聊天,我发给你的消息,只能在你的聊天窗口弹出,是不可能弹出到别人的聊天窗口中的。

这里又有人会问了,那我一对一聊天对每个人都发同样的消息也可以实现一条消息被多个人消费了啊。

这样也对,通过多队列全量存储相同的消息,即数据的冗余可以实现一条消息被多个消费者消费。RabbitMQ 就是采用队列模型,通过 Exchange 模块来将消息发送至多个队列,解决一条消息需要被多个消费者消费问题。

在这里插入图片描述

如何保证消息不丢失

一条消息从产生到被消费,中间会经历三个阶段:生产者、MQ 内部、消费者,消息在这三个阶段中均有可能出现丢失。

就我们市面上常见的消息队列而言,只要配置得当,我们的消息就不会丢。

我们先画张图看看:

在这里插入图片描述
我们从这三个阶段分别入手来看看如何确保消息不会丢失。

生产消息

当生产者往 Broker 写消息,需要处理 Broker 的响应,不论是同步还是异步发送消息,同步和异步回调都需要做好 try-catch 捕获异常,在异常代码块中重试。如果 Broker 返回写入失败等错误消息,需要重试发送。当多次发送失败需要作报警,日志记录等,这样就能保证在生产消息阶段消息不会丢失。

Broker 存储消息

存储消息阶段需要在消息刷盘之后再给生产者响应,假设消息写入缓存中就返回响应,那么机器突然断电消息就没了,而生产者以为已经发送成功了。

如果 Broker 是集群部署,有多副本机制,即消息不仅仅要写入当前 Broker ,还需要写入副本机中。那配置成至少写入两台机子后再给生产者响应。这样基本上就能保证存储的可靠了。一台挂了另一台还在。

消费消息

这里很多人会犯一个错误,就是当消费者获取到消息以后,返回消费成功的状态给 Broker 在执行业务逻辑,如果这时候消费者宕机了怎么办,那么数据不就丢失了?

所以消费端从 Broker 上拉取消息,只要消费端在收到消息后,不立即发送消费确认给 Broker,而是等到执行完业务逻辑后,再发送消费确认,也能保证消息的不丢失。

如果处理重复消息

消息重复有两种情况:

生产者

假设我们发送消息到 Broker,发送成功需要等待 Broker 响应成功给到生产者,有可能存在 Broker 已经写入了,但是由于网络原因,生产者没有收到 Broker 的响应,然后生产者又重发了一次,此时消息就重复了。

消费者

消费者拿到消息消费了,业务逻辑已经走完了,事务提交了,此时需要更新 Consumer offset 了,然后这个消费者挂了,另一个消费者顶上,此时 Consumer offset 还没更新,于是又拿到刚才那条消息,业务又被执行了一遍。于是消息又重复了。

我们可以知道,正常而言消息重复是不可避免的,因此我们只能通过一种解决方案来解决这个问题,

关键点就是幂等。既然我们不能防止重复消息的产生,那么我们只能在业务上处理重复消息所带来的影响。

幂等处理重复消息

[幂等] 我们可以理解为同样的参数多次调用同一个接口和调用一次产生的结果是一致的。

想要解决“消息丢失”和“消息重复消费”的问题,有一个前提条件就是要实现一个全局唯一 ID 生成的技术方案,这也是面试官喜欢考察的问题。

在分布式系统中,全局唯一 ID 生成的实现方法有数据库自增主键、Redis、UUID、Snowflake 算法。

解决问题的方法就是记录唯一的 ID,比如处理订单这种,记录订单 ID,假如有重复的消息过来,先判断下这个 ID 是否已经被处理过了,如果没处理再进行下一步。当然也可以用全局唯一 ID 等等。

处理消息堆积

除了怎么解决消息被重复消费的问题之外,面试官还喜欢问消息积压。原因在于消息积压反映的是性能问题,解决消息积压问题,可以说明候选者有能力处理高并发场景下的消费能力问题。

消息的堆积往往是因为生产者的生产速度与消费者的消费速度不匹配。有可能是因为消息消费失败反复重试造成的,也有可能就是消费者消费能力弱,渐渐地消息就积压了。

因为消息发送之后才会出现积压的问题,所以和消息生产者没有关系,又因为绝大部分的消息队列单节点都能达到每秒钟几万的处理能力,相对于业务逻辑来说,性能不会出现在中间件的消息存储上面。所以出问题的肯定是消息消费阶段。

那么我们应该怎么入手解决问题呢?主要有如下方案:

  • 如果是线上突发问题,要临时扩容,增加消费端的数量,与此同时,降级一些非核心的业务。 通过扩容和降级承担流量,表明了对应急问题的处理能力。
  • 其次排查解决异常问题,如通过监控,日志等手段分析是否消费端的业务逻辑代码出现了问题,优化消费端的业务处理逻辑。
  • 如果是消费端的处理能力不足,可以通过水平扩容来提供消费端的并发处理能力。 注意在kafka中在扩容消费者的实例数的同时,必须同步扩容主题 Topic 的分区数量,确保消费者的实例数和分区数相等。如果消费者的实例数超过了分区数,由于分区是单线程消费,所以这样的扩容就没有效果。

最后

上面的问题都是面试关于消息队列必问的考点之一,我们在使用消息队列时也经常会遇到这种问题。
通过上面三点,我们知道了如何保证消息不丢失、处理重复消息、处理消息堆积,重要的是如何保证消息不丢失。首先是将消息推送到 Broker 时我们要保证消息的确是到了 Broker 。然后就是存在 Broker 中的消息要保证持久化,这样能解决 Broker 重启导致的内存中的消息不会被丢失。最后就是消费者在消费消息时,我们通过手动 ack 来告诉 Broker 是不是应该将消息移除队列。


http://chatgpt.dhexx.cn/article/MysrwasG.shtml

相关文章

IBM MQ通道常用知识列举(一)

MQ的几个基本组件: 1. 什么是通道 通道是分布式队列管理器在IBM MQ MQI 客户端和IBM MQ服务器之间或两个IBM MQ服务器之间使用的逻辑通信链路。通道用于将消息从一个队列管理器移动到另一个队列管理器。 2, 启动通道 对发送方、服务器和请求方通道使用 MQSC 命令…

微服务 消息中间件MQ

微服务 消息中间件MQ 1. MQ的定义2. MQ的作用3. MQ的特点4. MQ消费方式5. 常用MQ对比分析 1. MQ的定义 MQ就是消息中间件。面向消息的中间件(message-oriented middleware)MOM能够很好的解决以上问题。是指利用高效可靠的消息传递机制与平台无关的数据交…

MQ消息队列

MQ MQ全称Message Queue(消息队列), 实在消息传输过程中保存消息的容器。多用于分布式系统之间进行通信两种 通信方式: MQ优势 消息到达MQ后直接给出响应,然后服务去消费相应的消息,用户体验极好 应用解耦:提高系…

IBMMQ linux版命令创建队列管理器、队列、通道、window连接(六)

1. 队列、通道介绍 1.1 本地队列 本地队列又分为普通本地队列和传输队列,普通本地队列是应用程序通过API对其进行读写操作的队列;传输队列可以理解为存储-转发队列,比如:我们将某个消 息交给MQ系统发送到远程主机,而此…

IBM MQ 通道

一,定义 通道是分布式队列管理器在IBM MQ MQI 客户端和IBM MQ服务器之间或两个IBM MQ服务器之间使用的逻辑通信链接。 通道是提供从一个队列管理器到另一个队列管理器的通信路径的对象。通道在分布式队列中用于将消息从一个队列管理器移动到另一个队列管理器&#x…

MQ135

这里写自定义目录标题 以MQ135为例 原文链接 首先声明,公式不是官方给定的,而且有很多的局限性。 这篇文章是个人对MQ系列传感器电压与浓度转换的公式进行一个探索。 以MQ135为例 在氨气曲线上采点,得 x(ppm)[10,2…

MQ简介以及架构图

一、什么是MQ Message Query(MQ),消息队列中间件,很多初学者认为,MQ通过消息的发送和接受来实现程序的异步和解耦,mq主要用于异步操作,这个不是mq的真正目的,只不过是mq的应用&…

IBMMQ java远程访问(四)

当应用程序和MQ不在同一台主机上的时候,我们将要通过通道去访问; 1.创建队列管理器时勾选创建服务器链接通道; 2.创建一个本地队列 命名为Q1 3.创建通道 右击通道->新建->服务器链接通道 命名为SERVERQM2 4.代码示例调用 package com.i…

MQ理论介绍与主流MQ对比

1、什么是MQ? MQ(Message Queue)消息队列,是基础数据结构中“先进先出”的一种数据结构。指把要传输的数据(消息)放在队列中,用队列机制来实现消息传递——生产者产生消息并把消息放入队列,然…

什么是MQ

MQ概述 MQ全称 Message Queue(消息队列),是在消息的传输过程中保存消息的容器。多用于分布式系统之间进 行通信。 分布式系统之间进行通信: 远程调用:各系统间直接通过远程调用的方式; 借助第三方完成系统…

MQ的概念和RabbitMQ知识点(无代码)

目录 1. MQ的基本概念 1.1 MQ概述 1.2 MQ的优势 1. 应用解耦 2. 异步提速 3. 削峰填谷 1.3 MQ的劣势以及可能引发的问题 1.4 常见的MQ产品 2. RabbitMQ的知识点 2.1 RabbitMQ的工作模式 1. 简单模式 2. 工作队列模式 Work Queue 3. 发布订阅模式 Publish/subscri…

MQ简介

一、何为MQ? MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方法。应用程序通过读写出入队列的消息(针对应用程序的数据)来通信,而无需专用连接来链接它们。消息传递指的是程序之间通…

【MQ基本概念 MQ的工作原理】

一、 MQ 基本概念 1 、 MQ 概述 MQ 全称 Message Queue (消息队列),是在消息的传输过程中保存 消息的容器。多用于分布式系统之间进 行通信。 小结 MQ ,消息 队列,存储消息的中间件 分布式系统通信两种方式&…

mq的基本介绍和基本用法

一 、什么是MQ,有什么用 MQ 是message queue ,消息队列,也叫消息中间件,遵守JMS(java message service)规范的一种软件。(同时还有另一个叫AMQP的应用层协议,语言无关性不受产品 语言等限制,r…

MQ(Message Queue)简介

一、何为MQ? MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方法。应用程序通过读写出入队列的消息(针对应用程序的数据)来通信,而无需专用连接来链接它们。消息传递指的是程序之间通…

MQ消息队列详解、四大MQ的优缺点分析

MQ消息队列详解、四大MQ的优缺点分析 前言面试题切入面试官心理分析面试题剖析①为什么要使用MQ系统解耦异步调用流量削峰消息队列的优缺点四大主流MQ(kafka、ActiveMQ、RabbitMQ、RocketMQ)各自的优缺点 前言 近期有了想跳槽的打算,所以自己…

file和filestream

在C#编程环境中,当我们对电脑文件进行读写、移动、复制、删除等这些操作时,这些都可以在system.IO名称空间下,所以当我们想要通过C#编程语言对其进行操作时,需要添加该名称空间,它内部包含对数据流和文件进行同步或异步…

java filestream_java FileStream文件流操作

直接上代码,函数使用说明详见Java API文档 import java.io.*; public class StreamDemo { public static void main(String[] args) { File fnew File("F:\\workspace\\JavaPrj\\test.txt"); FileOutputStream outnull; try { outnew FileOutputStream(f)…

c#FileStream文件读写

//C#文件流写文件,默认追加FileMode.Append string msg "okffffffffffffffff"; byte[] myByte System.Text.Encoding.UTF8.GetBytes(msg); using (FileStream fsWrite new FileStream("D:\1.txt", FileMode.Append)) {fsWrite.Write(myByte, 0, myByte.…

FileStream类

FileStream类的官方介绍请见&#xff1a; https://msdn.microsoft.com/zh-cn/library/system.io.filestream.aspx 一、使用FileStream来读取文本文件 <pre name"code" class"csharp"> string path "E:\TEMP\TestFileStream\the ol…