RabbitMQ实现延迟队列的方式

article/2025/10/12 0:50:04

1.背景

         最近在做类似拍卖系统的上架功能,卖家上架物品以后,例如到期时间24小时或者48小时,如果无竞拍者或者购买者,则物品自动下架到用户的邮件中。诸如电商用户下单,30分钟未支付,则自动取消订单,归还库存. 实现这些类似的业务场景,大家想到最简单的方式和我想的差不多如下.看看是否和你想的基本一致.

1.1 MySQL+轮询

        把数据写入到MySQL的一张表,然后程序轮询,例如1分钟或者几分钟轮询一次,看上架单是否已经到期,到期则下架. 

                优点:

                        简单粗暴,可以复用业务中的MySQL,不需要额外维护其他中间件。 数据量小的时候没啥问题 ,数据量大了以后,第一个查询慢,第二对数据库产生不必要的压力.

                缺点:

                        无脑轮询,浪费资源,并且即时性很差.

1.2 Redis+轮询实现

        基于Redis+轮询实现,这种方式显然比MySQL要显得聪明。因为操作Redis的速度要比操作MySQL的速度快很多.上架单到期的message写到zset,利用可以排序的特点,把剩余超时时间最短的排在最前面,程序轮询如果数据达到超时条件则执行业务逻辑.

                优点:

                       简单粗暴,可以复用业务中的Redis中间件,不需要额外维护其他中间件。速度相对MySQL快.

                缺点:

                     1.存储成本高,因为Redis的数据全部是在内存中,内存相对MySQL占用磁盘的成本是昂贵的.数据量太大会对Redis的内存要求高.   

                     2.数据消费zset如果中断,则消息丢失,有些数据则不能完整处理。因为zset没有像MQ的ack机制.

1.3 Redis+订阅Key过期事件

        因为Redis可以key设置超时时间,过期Redis自动删除key.同时Redis支持订阅key的过期事件,拿到事件则再操作业务。

                优缺点与上述Redis+轮询实现,基本差不多。这个事件通知机制也没有ack,如果通知到了,但是业务服务挂了,下次继续监听不会再收到消息。

        那么这几个日常能想到的办法都想了,还有哪种比较靠谱的方式来实现这样的业务场景呢?即时性能得到保证的同时,我们也希望容错性得到保证,不希望因为异常情况导致数据丢失等情况的发送或者说尽量避免这种情况。

        最后我采用的是基于RabbitMQ来实现这个业务场景. 因为MQ第一个有Ack机制,保证异常的时候消息能够被重新消费处理, 第二就是MQ削峰的能力强.

2.RabbitMQ队列设置TTL+死信队列

        在RabbitMQ中,我们可以给某个queue绑定一个exchange来作为这个queue的死信队列。触发消息投递到死信队列有几种情况:

        1.消息被reject或者nack且没有设置requeue重入队列 

        2.消息TTL或者队列设置了TTL,时间过期

        3.消息超过了队列的长度,被丢弃

       我们可以看到第二项,给队列设置TTL或者给消息设置TTL.一旦过期,则投递到死信队列,我们只要消费死信队列即可完成延迟队列业务的需求.

3.RabbitMQ消息设置TTL+死信队列

        与上面设置队列TTL类似,我们可以单独给消息设置TTL.有些时候,我们不想对同一个队列设置相同的TTL,这个时候我们可以采取这种方式.

        如果队列和消息都设置了TTL,那么以两者最小的TTL过期时间为准. 这两种情况,都遵循队列的【先进先出,后进后出】的原则.   

        这种方式都会带来一个问题,那就是过期消息【队头阻塞】.举个栗子,下面的场景大家认为消费顺序是怎样的?

        

                想象的输出结果:    1s、 50s、100s

                实际的输出结果:     100s、50s、1s

       这不不对劲啊!!!怎么和我想象中的延迟队列有出入,明明1s的比100s和50s的快到期,但是最后一个被消费的。那么1s的数据就会伴随延迟很久. 明显不符合我们的业务场景,谁先过期就消费谁,这个才是我们的初衷。因为这过期时间没必要有强制的先后顺序。 

        原理是:  消息队列不会对所有消息做定时器,或者起一个线程/进程查看,哪个消息过期了没.这样对MQ的资源消耗太猛了,撑不住。那怎么实现过期原理了?   数据出队的时候,在进行判断。过期了则丢弃到死新队列,否则就队头阻塞等着。即使你后面有1s的消息其实过期了,但是队头100s的消息还没过期,那后面1s的消息你就还得等着. 

        那么怎么实现我们理解那种延迟队列,谁先过期消费谁呢? RabbitMQ到底能不能实现这种功能。答案是肯定的。  RabbitMQ官方维护了一个插件: rabbitmq-delayed-message-exchange

       Github地址: https://github.com/rabbitmq/rabbitmq-delayed-message-exchange

4.RabbitMQ的Delay插件

        RabbitMQ开启此插件可以定义一个Delay的Exchange交换机,用户发送的消息message,发送到这个交换机上,该插件会延迟时间将数据message投递到Queue中。只要有message超时则进行投递到Queue。用户最后从队列中将消息进行消费即可,不需要使用TTl+死信队列的方式进行延迟队列的实现了。

1.定义一个Exchange, type类型为=>x-delayed-message .且Header头添加一个标签=>x-delayed-type(direct)

2.投递消息的时候,在消息的header中传递TTL过期时间:  x-delay => 60000 (ms,单位  毫秒)

 此时消费的顺序是:  1s、50s、100s

注意事项:    

1.该插件虽然能够实现延迟队列的效果,但是官方说明了该插件的局限性。 就是队列的数据量不能太大, 例如堆积了几十W或者上百万的数据。这个插件存在局限性,用在生产环境需要谨慎判断自己消息的堆积数据量. 

2.可以使用Promethues监控当前堆积的消息数量(消息还未超时,没投递到Queue的消息的数量)  
指标名称:

erlang_mnesia_tablewise_size{table="rabbit_delayed_messagerabbit@rabbitmq-server"}

 3.如果数据量比较大,那么可以考虑使用RocketMQ原生支持的延迟队列. 


http://chatgpt.dhexx.cn/article/4FWrXiDf.shtml

相关文章

接口的实现详解

接口 接口就是定义的规则,规范。 声明类时需要使用的关键字时class,声明接口的关键字时interface; 接口本身就是抽象的,需要一个实现类去实现接口中定义的内容。 接口当中不能定义方法: 接口本身就是抽象的 所在我们…

Qt实现简易计算器

目录 写在前面 一、设计思路 效果展示: 二、功能实现 三、设计代码 1.mainwindow.cpp 2.calculate.cpp 写在前面 上篇文章写了C中如何实现简单的计算器,先用C写看来我的选择是正确的,明白了其中的原理再用Qt实现是水到渠成的事&#…

vue实现复制功能

目录 一、vue实现复制功能 1.功能实现 2.模板结构 3.js行为 4.样式 二.延伸扩展 一、vue实现复制功能 1.功能实现 点击复制弹出复制成功信息,粘贴即可获得复制数据。 2.模板结构 在自己想要复制的内容所在标签上添加一个类名和一个点击事件方法,…

多态的实现

多态实现条件 在Java中要实现多态,那么必须要满足以下几个条件,缺一不可: 必须在继承体系下 子类必须要对父类中的方法进行重写 通过父类的引用调用重写的方法 多态体现:在代码运行时,当传递不同类对象时,…

java实现接口

接口的简述: 1、在java编程语言中,接口是属于抽象类型的,是抽象方法的结合,用interface来定义接口,一个类通过继承接口的方式,从而来继承接口的抽象方法;类可以继承接口,但是接口不可…

java实现链表

一、链表定义 链表通过指针将一组零散的内存块串联在一起进行使用。 数据格式: 根据上面的图展示,每个内存块可以称为链条的一个“结点”,结点包含了数据和下一个结点的地址;同时有2个结点特殊:第一个结点和最后一个…

redis实现延迟队列

前言:redis实现延迟队列该怎么做?在这里我分享一下 redis实现延迟队列 一、Redis实现延迟队列二、redis失效监听事件三、此种实现面临的问题四、开发准备五、基础实现六、使用redisson实现延迟队列七、redisson实现延迟队列的原理八、延迟队列配置九、疑…

如何设置时间格式

如何设置时间格式 开发工具与关键技术:VisualStudio C# 作者:落白 撰写时间:2019/06/26时间格式的转换是一个令人头疼的存在,格式稍微出了一点差错就保存不进数据库,或者在类型为时间类型的input标签中显示不出来。搞…

html如何修改时间,html怎么设置时间

html设置时间的方法:首先创建一个div;然后给该div添加一个class属性;最后通过“function realSysTime(clock){...}”方法设置时间即可。 本文操作环境:windows7系统、HTML5&&CSS3版、Dell G3电脑。 HTML设置一个当前时间s…

心跳检测时间设置

Eureka Client 向 Eureka Server 发送心跳的频率(默认 30 秒) client eureka:instance:# 如果x秒内,依然没有收到续约请求,判定服务过期,默认90slease-expiration-duration-in-seconds: 50# 每隔x秒钟,向服务中心发送…

window下时间设置

用了蛮长时间搞出点东西,还被上面鄙视了一番,算了不抱怨了,在鄙视中成长,在鄙视中强大。 我主要是完成两个两个功能:第一个是设置系统时间将其转换成十六进制精确到分钟显示出来(这个VC中有现成的api,以前…

Date、Time、DateTimes设置

撰写时间:2019年06月16日 在数据库中设置的Date、Time、datetime类型,在Vs中Date、datetime便会自动转化为DataTIme类型,而Time类型则会转化为TimeSpan类型 常常在查出时间、日期的数据时都会乱码,就像下图一样,那么…

如何更改Windows服务器时间

Windows操作系统自带时间同步功能,它会自动从互联网时间服务器获取时间,以保证系统时间的准确性。但是,有时候我们需要更改时间服务器,以获得更准确的时间同步。小编将为大家介绍如何更改Windows时间服务器,以及Window…

Windows将时间设置到秒的方法

使用win10系统的用户应该都发现了,Win10系统右下角时间只显示到分,不会显示到秒,如果想要看到秒的话,需要点击一下时间才会弹出。那么,Win10系统时间怎么显示到秒? 默认情况下,需要点击后方可查看时间详情…

设置linux的时间

目录 一、什么是时间 (1)例子1 (2)例子2 二、什么是本地时间 三、linux设置本地时间的方法 (1)方式一:通过互联网自动同步 1.修改时间同步服务器 2.查看时间同步情况 (2&…

win10电脑时间同步设置方法

win10系统的时间与日期默认是跟网络时间同步的,这样可以保持电脑时间的准确。不过有网友反映自己的win10电脑时间不对,调准时间后过段时间又是如此,不知道如何设置win10电脑时间同步。下面小编就教下大家win10电脑时间同步设置方法。 具体的…

简单3招教你设置电脑时间

案例:电脑时间怎么设置? 【我使用电脑时,电脑显示的时间一直不对,这导致我非常不方便,想问下大家平常使用电脑时有什么设置电脑时间比较简单的方法吗?】 电脑的时间设置很重要,不仅可以保证电…

超越函数e^(-x^2)收敛

超越函数e(-x2)收敛 Γ ( z ) ∫ 0 ∞ e − x 2 d t . \Gamma(z) \int_0^\infty e^{-x^2}dt\,. Γ(z)∫0∞​e−x2dt. 原函数不能直接积分所以采用极坐标变换的方式求解,本人自用码住

积分抵现

积分抵现设置教程 积分抵现 积分抵现包括积分抵现比率、是否开启积分抵现和积分说明。 其中,积分抵现比率为 1积分可抵多少元现金 转载于:https://my.oschina.net/u/4103048/blog/3099967

常见不可积积分

from sympy import * x symbols(x) a sin(x)/x b exp(x**2) c cos(x)/x d sin(x**2) e cos(x**2) f x/ln(x) g ln(x)/(x1) A integrate(a,x) # Si(x)-->超越函数 B integrate(b,x) # sqrt(pi)*erfi(x)/2-->erfi(x)误差函数 C integrate(c,x) # -log…