redis实现延迟队列

article/2025/10/12 10:21:51

前言:redis实现延迟队列该怎么做?在这里我分享一下

redis实现延迟队列

    • 一、Redis实现延迟队列
    • 二、redis失效监听事件
    • 三、此种实现面临的问题
    • 四、开发准备
    • 五、基础实现
    • 六、使用redisson实现延迟队列
    • 七、redisson实现延迟队列的原理
    • 八、延迟队列配置
    • 九、疑问解答与加群交流学习

一、Redis实现延迟队列

  1. 失效监听
  2. redisson实现发布订阅延迟

二、redis失效监听事件

集成KeyExpirationEventMessageListener类实现redis失效监听事件

三、此种实现面临的问题

  1. redis的失效监听事件会存在一定的时间差,并且当数据量越大时,误差会越大。
  2. redis的失效监听事件会将所有key失效都会通知到onMessage,如果针对一个key,分布式业务的场景下,会出现重复消费的问题。(可以增加分布式锁的实现,但是redisson分布式锁提供了另一种延迟队列的实现方式)

四、开发准备

redis需要在服务端开启配置,打开redis服务的配置文件 添加notify-keyspace-events Ex

  • 相关参数如下:
K:keyspace事件,事件以__keyspace@<db>__为前缀进行发布;        
E:keyevent事件,事件以__keyevent@<db>__为前缀进行发布;        
g:一般性的,非特定类型的命令,比如del,expire,rename等;       
$:字符串特定命令;        
l:列表特定命令;        
s:集合特定命令;        
h:哈希特定命令;        
z:有序集合特定命令;        
x:过期事件,当某个键过期并删除时会产生该事件;        
e:驱逐事件,当某个键因maxmemore策略而被删除时,产生该事件;        
A:g$lshzxe的别名,因此”AKE”意味着所有事件。

五、基础实现

  1. 加入依赖
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
  1. 可正常连接存取redis数据之后,创建监听类RedisKeyExpirationListener继承KeyExpirationEventMessageListener,重写onMessage方法。(key失效之后,会发出onMessage方法,之呢个获取失效的key值,不能获取key对应的value值)。
import com.test01.scrm.service.member.api.common.MemberStatusEnum;
import com.test01.scrm.service.member.provider.service.base.IBaseMemberService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.listener.KeyExpirationEventMessageListener;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.stereotype.Component;/*** @author lwl*/
@Component
@Slf4j
public class RedisKeyExpirationListener extends KeyExpirationEventMessageListener {private final IBaseMemberService baseMemberService;private final static String MEMBER_LOCK_ACCOUNT_SUFFIX = ".lock_account";private final static String MEMBER_LOCK_ACCOUNT_DOMAIN_SUFFIX = "T";private final static String MEMBER_LOCK_ACCOUNT_MEMBER_SUFFIX = "M";private final static String MEMBER_REDISSON_LOCK = ".member_lock_redisson";private final static int WAIT_TIME = 5;private final static int LEASE_TIME = 10;public RedisKeyExpirationListener(RedisMessageListenerContainer redisMessageListenerContainer, IBaseMemberService baseMemberService) {super(redisMessageListenerContainer);this.baseMemberService = baseMemberService;}@Overridepublic void onMessage(Message message, byte[] pattern) {//获取失效的keyString expiredKey = message.toString();log.info("================================get on message:{}====================", expiredKey);if (expiredKey.endsWith(MEMBER_LOCK_ACCOUNT_SUFFIX)) {log.info("================================on message:{}====================", expiredKey);try {log.info("=======待解锁账号解锁======expiredKey:{}", expiredKey);String tenantId = expiredKey.substring(expiredKey.indexOf(MEMBER_LOCK_ACCOUNT_DOMAIN_SUFFIX) + 1, expiredKey.indexOf(MEMBER_LOCK_ACCOUNT_MEMBER_SUFFIX));String memberId = expiredKey.substring(expiredKey.indexOf(MEMBER_LOCK_ACCOUNT_MEMBER_SUFFIX) + 1, expiredKey.indexOf(MEMBER_LOCK_ACCOUNT_SUFFIX));baseMemberService.updateAccount(Integer.parseInt(tenantId), Long.parseLong(memberId), MemberStatusEnum.NORMAL.getCode(), null);} catch (Exception exception) {log.info("auto unlock fail,expired key:{},exception:{}", expiredKey, exception.getMessage());}}}
}
  1. 创建一个配置类RedisConfig
/*** @author lwl*/
@Configuration
public class RedisConfig {@Value("${redis.dbIndex}")private Integer dbIndex;private final String TOPIC = "__keyevent@" + dbIndex + "__:expired";private final RedisConnectionFactory redisConnectionFactory;public RedisConfig(RedisConnectionFactory redisConnectionFactory) {this.redisConnectionFactory = redisConnectionFactory;}@Beanpublic RedisMessageListenerContainer redisMessageListenerContainer() {RedisMessageListenerContainer redisMessageListenerContainer = new RedisMessageListenerContainer();redisMessageListenerContainer.setConnectionFactory(redisConnectionFactory);//keyevent事件,事件以__keyevent@<db>__为前缀进行发布//db为redis第几个库 db2...
//        redisMessageListenerContainer.addMessageListener(redisKeyExpirationListener, new PatternTopic(TOPIC));return redisMessageListenerContainer;}
}

六、使用redisson实现延迟队列

由于延时队列持久化在redis中,所以机器宕机数据不会异常丢失,机器重启后,会正常消费队列中积累的任务

七、redisson实现延迟队列的原理

使用redis的zset有序性,轮询zset中的每个元素,到点后将内容迁移至待消费的队列

八、延迟队列配置

package com.test01.scrm.service.member.provider.config.redisson.delay;import org.redisson.api.RBlockingQueue;
import org.redisson.api.RDelayedQueue;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** @author lwl* redisson延迟队列*/
@Configuration
public class RedissonQueueConfig {private final String queueName = "queue";@Beanpublic RBlockingQueue<String> rBlockingQueue(@Qualifier("redissonSingle") RedissonClient redissonClient) {return redissonClient.getBlockingQueue(queueName);}@Bean(name = "rDelayedQueue")public RDelayedQueue<String> rDelayedQueue(@Qualifier("redissonSingle") RedissonClient redissonClient,@Qualifier("rBlockingQueue") RBlockingQueue<String> blockQueue) {return redissonClient.getDelayedQueue(blockQueue);}
}

定义队列使用接口

package com.test01.scrm.service.member.provider.config.redisson.delay;import java.util.concurrent.TimeUnit;/*** @author lwl*/
public interface DelayQueue {/*** 发布** @param object* @return*/Boolean offer(Object object);/*** 带延迟功能的队列** @param object* @param time* @param timeUnit*/void offer(Object object, Long time, TimeUnit timeUnit);void offerAsync(Object object, Long time, TimeUnit timeUnit);Boolean offerAsync(Object object);
}

延迟队列实现

package com.test01.scrm.service.member.provider.config.redisson.delay;import org.redisson.api.RDelayedQueue;
import org.redisson.api.RFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;import javax.annotation.Resource;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;/*** @author lwl*/
@Component
public class RedissonDelayQueue implements DelayQueue {private static Logger log = LoggerFactory.getLogger(RedissonDelayQueue.class);@Resource(name = "rDelayedQueue")private RDelayedQueue<Object> rDelayedQueue;@Overridepublic Boolean offer(Object object) {return rDelayedQueue.offer(object);}@Overridepublic void offer(Object object, Long time, TimeUnit timeUnit) {rDelayedQueue.offer(object, time, timeUnit);}@Overridepublic void offerAsync(Object object, Long time, TimeUnit timeUnit) {rDelayedQueue.offerAsync(object, time, timeUnit);}@Overridepublic Boolean offerAsync(Object object) {boolean flag = false;RFuture<Boolean> rFuture = rDelayedQueue.offerAsync(object);try {flag = rFuture.get();} catch (InterruptedException | ExecutionException e) {log.info("offerAsync exception:{}", e.getMessage());e.printStackTrace();}return flag;}
}

启动一个后台监控线程

package com.test01.scrm.service.member.provider.config.redisson.delay;import org.redisson.api.RBlockingQueue;
import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;
import javax.annotation.Resource;/*** @author test01*/
@Component
public class RedissonTask {@Resource(name = "rBlockingQueue")private RBlockingQueue<Object> rBlockingQueue;@PostConstructpublic void take() {new Thread(() -> {while (true) {try {System.out.println("=========================" + rBlockingQueue.take());} catch (InterruptedException e) {e.printStackTrace();}}}).start();}
}

使用延迟队列发送

package com.test01.scrm.service.member.provider.impl;import org.junit.Test;
import org.junit.runner.RunWith;
import org.mybatis.spring.annotation.MapperScan;
import org.redisson.api.RDelayedQueue;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.context.junit4.SpringRunner;import javax.annotation.Resource;
import java.util.concurrent.TimeUnit;@RunWith(SpringRunner.class)
@SpringBootTest
@ActiveProfiles(value = "llh")
@MapperScan("com.test01.scrm.service.member.provider.mapper")
public class RDelayQueueTests {@Resource(name = "rDelayedQueue")private RDelayedQueue<Object> rDelayedQueue;@Testpublic void offerAsync() {rDelayedQueue.offerAsync("llh send message", 20, TimeUnit.SECONDS);}
}

九、疑问解答与加群交流学习

在这里插入图片描述


http://chatgpt.dhexx.cn/article/09YkclAW.shtml

相关文章

如何设置时间格式

如何设置时间格式 开发工具与关键技术&#xff1a;VisualStudio C# 作者&#xff1a;落白 撰写时间&#xff1a;2019/06/26时间格式的转换是一个令人头疼的存在&#xff0c;格式稍微出了一点差错就保存不进数据库&#xff0c;或者在类型为时间类型的input标签中显示不出来。搞…

html如何修改时间,html怎么设置时间

html设置时间的方法&#xff1a;首先创建一个div&#xff1b;然后给该div添加一个class属性&#xff1b;最后通过“function realSysTime(clock){...}”方法设置时间即可。 本文操作环境&#xff1a;windows7系统、HTML5&&CSS3版、Dell G3电脑。 HTML设置一个当前时间s…

心跳检测时间设置

Eureka Client 向 Eureka Server 发送心跳的频率&#xff08;默认 30 秒&#xff09; client eureka:instance:# 如果x秒内&#xff0c;依然没有收到续约请求&#xff0c;判定服务过期,默认90slease-expiration-duration-in-seconds: 50# 每隔x秒钟&#xff0c;向服务中心发送…

window下时间设置

用了蛮长时间搞出点东西&#xff0c;还被上面鄙视了一番&#xff0c;算了不抱怨了&#xff0c;在鄙视中成长&#xff0c;在鄙视中强大。 我主要是完成两个两个功能:第一个是设置系统时间将其转换成十六进制精确到分钟显示出来&#xff08;这个VC中有现成的api&#xff0c;以前…

Date、Time、DateTimes设置

撰写时间&#xff1a;2019年06月16日 在数据库中设置的Date、Time、datetime类型&#xff0c;在Vs中Date、datetime便会自动转化为DataTIme类型&#xff0c;而Time类型则会转化为TimeSpan类型 常常在查出时间、日期的数据时都会乱码&#xff0c;就像下图一样&#xff0c;那么…

如何更改Windows服务器时间

Windows操作系统自带时间同步功能&#xff0c;它会自动从互联网时间服务器获取时间&#xff0c;以保证系统时间的准确性。但是&#xff0c;有时候我们需要更改时间服务器&#xff0c;以获得更准确的时间同步。小编将为大家介绍如何更改Windows时间服务器&#xff0c;以及Window…

Windows将时间设置到秒的方法

使用win10系统的用户应该都发现了&#xff0c;Win10系统右下角时间只显示到分&#xff0c;不会显示到秒&#xff0c;如果想要看到秒的话&#xff0c;需要点击一下时间才会弹出。那么&#xff0c;Win10系统时间怎么显示到秒? 默认情况下&#xff0c;需要点击后方可查看时间详情…

设置linux的时间

目录 一、什么是时间 &#xff08;1&#xff09;例子1 &#xff08;2&#xff09;例子2 二、什么是本地时间 三、linux设置本地时间的方法 &#xff08;1&#xff09;方式一&#xff1a;通过互联网自动同步 1.修改时间同步服务器 2.查看时间同步情况 &#xff08;2&…

win10电脑时间同步设置方法

win10系统的时间与日期默认是跟网络时间同步的&#xff0c;这样可以保持电脑时间的准确。不过有网友反映自己的win10电脑时间不对&#xff0c;调准时间后过段时间又是如此&#xff0c;不知道如何设置win10电脑时间同步。下面小编就教下大家win10电脑时间同步设置方法。 具体的…

简单3招教你设置电脑时间

案例&#xff1a;电脑时间怎么设置&#xff1f; 【我使用电脑时&#xff0c;电脑显示的时间一直不对&#xff0c;这导致我非常不方便&#xff0c;想问下大家平常使用电脑时有什么设置电脑时间比较简单的方法吗&#xff1f;】 电脑的时间设置很重要&#xff0c;不仅可以保证电…

超越函数e^(-x^2)收敛

超越函数e(-x2)收敛 Γ ( z ) ∫ 0 ∞ e − x 2 d t . \Gamma(z) \int_0^\infty e^{-x^2}dt\,. Γ(z)∫0∞​e−x2dt. 原函数不能直接积分所以采用极坐标变换的方式求解&#xff0c;本人自用码住

积分抵现

积分抵现设置教程 积分抵现 积分抵现包括积分抵现比率、是否开启积分抵现和积分说明。 其中&#xff0c;积分抵现比率为 1积分可抵多少元现金 转载于:https://my.oschina.net/u/4103048/blog/3099967

常见不可积积分

from sympy import * x symbols(x) a sin(x)/x b exp(x**2) c cos(x)/x d sin(x**2) e cos(x**2) f x/ln(x) g ln(x)/(x1) A integrate(a,x) # Si(x)-->超越函数 B integrate(b,x) # sqrt(pi)*erfi(x)/2-->erfi(x)误差函数 C integrate(c,x) # -log…

超越函数e^(-x^2)在(-∞, +∞)上的定积分的两种解法

令 解法一 二重积分极坐标 故 解法二 Γ函数余元公式 又由余元公式&#xff0c;有 于是 故

加速度计求二次积分

我是勤劳的搬运工&#xff1a;https://blog.csdn.net/u011006622/article/details/56286833 摘要 此文档描述并使用MMA7260QT三轴加速计和低功耗的9S08QG8八位单片机实现求解位置的算法 。 在今天先进的电子市场&#xff0c;有不少增加了许多特性和智能的多功能的产品。定…

python绘制积分函数_Python超越函数积分运算以及绘图实现代码

编译环境&#xff1a;ubuntu17.04 Python3.5 所需库&#xff1a;numpy、scipy、matplotlib 下面是理想平面的辐射强度计算(课程大作业&#xff5e;&#xff5e;&#xff5e;) 1、超越函数积分运算 def integral(x,c1,c2,T): return ((c1*0.98)/(x**5))*(1/((np.e**(c2/(x*T)))-…

高数 | 【定积分、变限积分】【一元函数积分学李林880】 及 巧解例题

一、定积分的概念、性质及几何意义 巧解&#xff1a;利用奇偶性&#xff0c;快速选出答案。 利用图形帮助解题。 设出具体函数。 二、定积分的计算 1.利用几何意义 2.换元法巧解 3.区间平移 本题也可用用区间再现 4.绝对值符号讨论 三、变上限定积分 三种变限积分形式 四、李…

Python超越函数积分运算以及绘图实现

编译环境&#xff1a;ubuntu17.04 Python3.5 所需库&#xff1a;numpy、scipy、matplotlib 下面是理想平面的辐射强度计算&#xff08;课程大作业&#xff5e;&#xff5e;&#xff5e;&#xff09; 1、超越函数积分运算 def integral(x,c1,c2,T): return ((c1*0.98)/(x**…

巧用二重积分的积分中值定理

二重积分的积分中值定理&#xff0c;对于部分题目来说&#xff0c;例如极限题目&#xff0c;积分证明题的运用十分便捷。 在解决极限问题中&#xff0c;几分钟值定理能大大简化求解过程&#xff0c;在求极限过程中&#xff0c;包含有二重积分的问题一般都是通过逐渐化简求导来解…

重积分的计算与理解

主要分为二重积分和三重积分。 二重积分 二重积分的基本思想是变成两次积分。物理意义已知面密度f,算质量 即首先把y方向的每一根线段计算出质量(相当于把y的线捏起来了)&#xff0c;然后算x 主要方法如下&#xff1a; 计算 ∬ D f ( x , y ) \iint \limits_Df(x,y) D∬​f(x…