前言:redis实现延迟队列该怎么做?在这里我分享一下
redis实现延迟队列
- 一、Redis实现延迟队列
- 二、redis失效监听事件
- 三、此种实现面临的问题
- 四、开发准备
- 五、基础实现
- 六、使用redisson实现延迟队列
- 七、redisson实现延迟队列的原理
- 八、延迟队列配置
- 九、疑问解答与加群交流学习
一、Redis实现延迟队列
- 失效监听
- redisson实现发布订阅延迟
二、redis失效监听事件
集成KeyExpirationEventMessageListener类实现redis失效监听事件
三、此种实现面临的问题
- redis的失效监听事件会存在一定的时间差,并且当数据量越大时,误差会越大。
- redis的失效监听事件会将所有key失效都会通知到onMessage,如果针对一个key,分布式业务的场景下,会出现重复消费的问题。(可以增加分布式锁的实现,但是redisson分布式锁提供了另一种延迟队列的实现方式)
四、开发准备
redis需要在服务端开启配置,打开redis服务的配置文件 添加notify-keyspace-events Ex
- 相关参数如下:
K:keyspace事件,事件以__keyspace@<db>__为前缀进行发布;
E:keyevent事件,事件以__keyevent@<db>__为前缀进行发布;
g:一般性的,非特定类型的命令,比如del,expire,rename等;
$:字符串特定命令;
l:列表特定命令;
s:集合特定命令;
h:哈希特定命令;
z:有序集合特定命令;
x:过期事件,当某个键过期并删除时会产生该事件;
e:驱逐事件,当某个键因maxmemore策略而被删除时,产生该事件;
A:g$lshzxe的别名,因此”AKE”意味着所有事件。
五、基础实现
- 加入依赖
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
- 可正常连接存取redis数据之后,创建监听类
RedisKeyExpirationListener
继承KeyExpirationEventMessageListener
,重写onMessage
方法。(key失效之后,会发出onMessage方法,之呢个获取失效的key值,不能获取key对应的value值)。
import com.test01.scrm.service.member.api.common.MemberStatusEnum;
import com.test01.scrm.service.member.provider.service.base.IBaseMemberService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.listener.KeyExpirationEventMessageListener;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.stereotype.Component;/*** @author lwl*/
@Component
@Slf4j
public class RedisKeyExpirationListener extends KeyExpirationEventMessageListener {private final IBaseMemberService baseMemberService;private final static String MEMBER_LOCK_ACCOUNT_SUFFIX = ".lock_account";private final static String MEMBER_LOCK_ACCOUNT_DOMAIN_SUFFIX = "T";private final static String MEMBER_LOCK_ACCOUNT_MEMBER_SUFFIX = "M";private final static String MEMBER_REDISSON_LOCK = ".member_lock_redisson";private final static int WAIT_TIME = 5;private final static int LEASE_TIME = 10;public RedisKeyExpirationListener(RedisMessageListenerContainer redisMessageListenerContainer, IBaseMemberService baseMemberService) {super(redisMessageListenerContainer);this.baseMemberService = baseMemberService;}@Overridepublic void onMessage(Message message, byte[] pattern) {//获取失效的keyString expiredKey = message.toString();log.info("================================get on message:{}====================", expiredKey);if (expiredKey.endsWith(MEMBER_LOCK_ACCOUNT_SUFFIX)) {log.info("================================on message:{}====================", expiredKey);try {log.info("=======待解锁账号解锁======expiredKey:{}", expiredKey);String tenantId = expiredKey.substring(expiredKey.indexOf(MEMBER_LOCK_ACCOUNT_DOMAIN_SUFFIX) + 1, expiredKey.indexOf(MEMBER_LOCK_ACCOUNT_MEMBER_SUFFIX));String memberId = expiredKey.substring(expiredKey.indexOf(MEMBER_LOCK_ACCOUNT_MEMBER_SUFFIX) + 1, expiredKey.indexOf(MEMBER_LOCK_ACCOUNT_SUFFIX));baseMemberService.updateAccount(Integer.parseInt(tenantId), Long.parseLong(memberId), MemberStatusEnum.NORMAL.getCode(), null);} catch (Exception exception) {log.info("auto unlock fail,expired key:{},exception:{}", expiredKey, exception.getMessage());}}}
}
- 创建一个配置类
RedisConfig
/*** @author lwl*/
@Configuration
public class RedisConfig {@Value("${redis.dbIndex}")private Integer dbIndex;private final String TOPIC = "__keyevent@" + dbIndex + "__:expired";private final RedisConnectionFactory redisConnectionFactory;public RedisConfig(RedisConnectionFactory redisConnectionFactory) {this.redisConnectionFactory = redisConnectionFactory;}@Beanpublic RedisMessageListenerContainer redisMessageListenerContainer() {RedisMessageListenerContainer redisMessageListenerContainer = new RedisMessageListenerContainer();redisMessageListenerContainer.setConnectionFactory(redisConnectionFactory);//keyevent事件,事件以__keyevent@<db>__为前缀进行发布//db为redis第几个库 db2...
// redisMessageListenerContainer.addMessageListener(redisKeyExpirationListener, new PatternTopic(TOPIC));return redisMessageListenerContainer;}
}
六、使用redisson实现延迟队列
由于延时队列持久化在redis中,所以机器宕机数据不会异常丢失,机器重启后,会正常消费队列中积累的任务
七、redisson实现延迟队列的原理
使用redis的zset有序性,轮询zset中的每个元素,到点后将内容迁移至待消费的队列
八、延迟队列配置
package com.test01.scrm.service.member.provider.config.redisson.delay;import org.redisson.api.RBlockingQueue;
import org.redisson.api.RDelayedQueue;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** @author lwl* redisson延迟队列*/
@Configuration
public class RedissonQueueConfig {private final String queueName = "queue";@Beanpublic RBlockingQueue<String> rBlockingQueue(@Qualifier("redissonSingle") RedissonClient redissonClient) {return redissonClient.getBlockingQueue(queueName);}@Bean(name = "rDelayedQueue")public RDelayedQueue<String> rDelayedQueue(@Qualifier("redissonSingle") RedissonClient redissonClient,@Qualifier("rBlockingQueue") RBlockingQueue<String> blockQueue) {return redissonClient.getDelayedQueue(blockQueue);}
}
定义队列使用接口
package com.test01.scrm.service.member.provider.config.redisson.delay;import java.util.concurrent.TimeUnit;/*** @author lwl*/
public interface DelayQueue {/*** 发布** @param object* @return*/Boolean offer(Object object);/*** 带延迟功能的队列** @param object* @param time* @param timeUnit*/void offer(Object object, Long time, TimeUnit timeUnit);void offerAsync(Object object, Long time, TimeUnit timeUnit);Boolean offerAsync(Object object);
}
延迟队列实现
package com.test01.scrm.service.member.provider.config.redisson.delay;import org.redisson.api.RDelayedQueue;
import org.redisson.api.RFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;import javax.annotation.Resource;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;/*** @author lwl*/
@Component
public class RedissonDelayQueue implements DelayQueue {private static Logger log = LoggerFactory.getLogger(RedissonDelayQueue.class);@Resource(name = "rDelayedQueue")private RDelayedQueue<Object> rDelayedQueue;@Overridepublic Boolean offer(Object object) {return rDelayedQueue.offer(object);}@Overridepublic void offer(Object object, Long time, TimeUnit timeUnit) {rDelayedQueue.offer(object, time, timeUnit);}@Overridepublic void offerAsync(Object object, Long time, TimeUnit timeUnit) {rDelayedQueue.offerAsync(object, time, timeUnit);}@Overridepublic Boolean offerAsync(Object object) {boolean flag = false;RFuture<Boolean> rFuture = rDelayedQueue.offerAsync(object);try {flag = rFuture.get();} catch (InterruptedException | ExecutionException e) {log.info("offerAsync exception:{}", e.getMessage());e.printStackTrace();}return flag;}
}
启动一个后台监控线程
package com.test01.scrm.service.member.provider.config.redisson.delay;import org.redisson.api.RBlockingQueue;
import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;
import javax.annotation.Resource;/*** @author test01*/
@Component
public class RedissonTask {@Resource(name = "rBlockingQueue")private RBlockingQueue<Object> rBlockingQueue;@PostConstructpublic void take() {new Thread(() -> {while (true) {try {System.out.println("=========================" + rBlockingQueue.take());} catch (InterruptedException e) {e.printStackTrace();}}}).start();}
}
使用延迟队列发送
package com.test01.scrm.service.member.provider.impl;import org.junit.Test;
import org.junit.runner.RunWith;
import org.mybatis.spring.annotation.MapperScan;
import org.redisson.api.RDelayedQueue;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.context.junit4.SpringRunner;import javax.annotation.Resource;
import java.util.concurrent.TimeUnit;@RunWith(SpringRunner.class)
@SpringBootTest
@ActiveProfiles(value = "llh")
@MapperScan("com.test01.scrm.service.member.provider.mapper")
public class RDelayQueueTests {@Resource(name = "rDelayedQueue")private RDelayedQueue<Object> rDelayedQueue;@Testpublic void offerAsync() {rDelayedQueue.offerAsync("llh send message", 20, TimeUnit.SECONDS);}
}