RabbitMQ真延时队列实现消息提醒功能

article/2025/8/20 14:24:39

RabbitMQ真延时队列实现消息提醒功能

一、需求场景

用户可以制定多个计划,同时可给该计划设置是否需要到点提醒,且中途可以取消提醒或修改提醒时间

二、需要解决的问题

学习过rabbitmq的同学们都知道,通过TTL+死信队列可以实现延时队列的效果,

图片描述

​ TTL+死信队列实现延时队列示意图

但是这个延时队列有个弊端,即里面的消息死亡并非是异步的,举个例子:

消息1设置的死亡时间是5分钟,消息2设置的死亡时间是10分钟,当消息2比消息1先进入队列时,消息2没有死亡即使消息1已经到达了死亡时间也会被消息2所阻塞,导致无法被消费。这样就无法满足上述需求:每条消息的死亡相互独立这种场景了

三、解决方法

那有没有既需要延时触发、也可以满足延时时间不一样的场景的方法呢?

有!

那就是rabbitmq的插件大法,安装方法跟添加管理台插件时大同小异。

插件名字叫rabbitmq_delayed_message_exchange 翻译过来就是延时消息交换机

开启该插件后,就会在原来的交换机类型上又多加了一种类型的交换机:x-delayed-message交换机

之后的消息的延时触发都会交给该交换机完成,而无需再使用两个交换机路,两个队列。

1、如何安装?

我是用docker安装rabbitmq的,所以这里只介绍docker安装插件的方法。

这是插件的下载地址 https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases

小编用的是手动安装的方式,将安装包拷贝到rabbitmq映射的文件目录下的plugins文件夹

docker cp rabbitmq_delayed_message_exchange-3.8.9-0199d11c.ez rabbitmq(rabbitmq的容器名称):/plugins
rm rabbitmq_delayed_message_exchange-3.8.9-0199d11c.ez
docker exec -it rabbitmq bin/bash
rabbitmq-plugins enable rabbitmq_delayed_message_exchange

2、如何开发?

按照上面的需求,我们结合实际的业务开发,因为涉及一些数据库操作的实体,为避免篇幅过长,推荐配合源码食用。

1)git拉取项目

项目地址:https://github.com/CaiCaiXian/rabbitmq-plan.git

2) 数据库配置

不含创建数据库语句

计划表sql:

SET NAMES utf8mb4;
SET FOREIGN_KEY_CHECKS = 0;-- ----------------------------
-- Table structure for qk_plan
-- ----------------------------
DROP TABLE IF EXISTS `qk_plan`;
CREATE TABLE `qk_plan`  (`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '唯一标识',`user_id` bigint(20) NOT NULL COMMENT '用户id',`type_id` int(11) NOT NULL COMMENT '类型id',`title` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL DEFAULT '' COMMENT '标题',`content` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL DEFAULT '' COMMENT '内容',`location` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL DEFAULT '' COMMENT '地点',`start_time` datetime(0) NOT NULL COMMENT '开始时间',`end_time` datetime(0) NOT NULL COMMENT '结束时间',`create_time` datetime(0) NOT NULL COMMENT '创建时间',`version` int(11) NULL DEFAULT NULL COMMENT '版本号',`status` int(11) NULL DEFAULT NULL COMMENT '是否生效',PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 1349648474282508291 CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;SET FOREIGN_KEY_CHECKS = 1;

计划类型sql:

SET NAMES utf8mb4;
SET FOREIGN_KEY_CHECKS = 0;-- ----------------------------
-- Table structure for qk_plantype
-- ----------------------------
DROP TABLE IF EXISTS `qk_plantype`;
CREATE TABLE `qk_plantype`  (`id` bigint(11) NOT NULL AUTO_INCREMENT,`name` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL,PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 5 CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;SET FOREIGN_KEY_CHECKS = 1;

注意一个小细节:

我在计划表中加入了version(版本号)和status(是否生效)两个字段,是为了满足需求场景中提到的 消息可以被修改和取消 的需求。

3)配置交换机,绑定队列

@Configuration
public class DelayMQConfig {/*** 交换机名称*/public final static String DELAY_EXCHANGE_NAME = "delay_exchange";/*** 队列名称*/public final static String DELAY_QUEUE_NAME = "delay_queue";/*** 路由key 不是topic不能使用通配符*/public final static String DELAY_ROUTE_KEY = "delay.notify";/*** 延迟交换机*/@Bean("delayExchange")public CustomExchange delayExchange(){Map<String,Object> args = new HashMap<>();args.put("x-delayed-type","direct");return new CustomExchange(DELAY_EXCHANGE_NAME,"x-delayed-message",true,true,args);}/*** 延迟队列*/@Bean("delayQueue")public Queue delayQueue(){return new Queue(DELAY_QUEUE_NAME,true,false,false);}@Beanpublic Binding bindingDelayExchangeQueue(@Qualifier("delayExchange") Exchange exchange,@Qualifier("delayQueue") Queue queue){return BindingBuilder.bind(queue).to(exchange).with(DELAY_ROUTE_KEY).noargs();}
}

在这里定义一个类型为x-delayed-message的交换机,注意这里返回的是CustomExchange意思是自定义交换机,交换机名称为delay_exchange

args.put(“x-delayed-type”,“direct”) 将属性 x-delayed-type 设为direct交换机。

同时我们给该交换机绑定一个队列 名称为:delay_queue,路由KEY是 delay.notify

4)配置消费者,绑定队列

@Component
public class NotifyListener {@AutowiredPlanService planService;//此处绑定要监听的队列@RabbitListener(queues = DelayMQConfig.DELAY_QUEUE_NAME)@RabbitHandlerpublic void onMessage(PlanDTO msg, Message message, Channel channel) throws IOException {long deliveryTag = message.getMessageProperties().getDeliveryTag();try {//通知用户planService.notifyUser(msg.getId(),msg.getVersion());channel.basicAck(deliveryTag, false);//System.out.println("消息被确认!");} catch (IOException e) {channel.basicNack(deliveryTag, false, false);//System.out.println("消息被否定确认!");}}
}

我们通过队列名给刚刚配置好的队列绑定上消费者,这样就实现了交换机—>队列—>消费者的模型了。

开发者只需要完善notifyUser方法的代码,如发送到邮箱,就可以实现发送提醒的效果了。

注意

channel.basicAck(deliveryTag, false);中,是否多条确认要设置为false,不能一次确认多条消息,否则会把时间还没到的消息也一块确认了。

5)notifyUser(发送消息提醒)方法

因为计划可能被修改过,也可能被取消,所以我们在发送提醒时要确保版本号和我们消息中的计划的版本号要一致,且消息并没有被取消提醒才可以发送提醒。通过上述提到的version和status字段判断即可。

  @Overridepublic void notifyUser(Long planId,Integer version) {PlanDTO planDTO = planDao.selectPlanById(planId);SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");String now = sdf.format(new Date());//保证计划是生效的且和通知时是同一个版本if(planDTO != null && PlanConstant.CAN_USE.equals(planDTO.getStatus()) && planDTO.getVersion().equals(version)){//发送通知System.out.println(now + " 你消息提醒:"+planDTO.toString());}else{System.out.println(now + " " + planDTO.getTitle()+": 该消息已取消提醒");}}

6)编写生产者

无论是更新还是新增计划,我们都需要重新向队列中发送一条消息。

   @Overridepublic boolean saveOrUpdatePlan(PlanDTO planDTO) {PlanEntity planEntity = new PlanEntity();//dto拷贝给数据库操作实体BeanUtil.copyProperties(planDTO,planEntity);try {//判断是更新还是新增Long id = planEntity.getId();if(ObjectUtil.isNotNull(id)){//更新,将数据库的版本号加一Integer old = planDao.selectVersion(id);planEntity.setVersion(old + 1);updateById(planEntity);}else {//新增 初始化版本号planEntity.setVersion(0);save(planEntity);}//获取数据库中最新数据PlanDTO newPlanDTO = planDao.selectPlanById(planEntity.getId());//判断开始时间是不是比当前时间大且消息提醒是生效long x = (planDTO.getStartTime().getTime() - System.currentTimeMillis());if( x >= 0 && planDTO.getStatus().equals(PlanConstant.CAN_USE)){//计划加入消息队列rabbitTemplate.convertAndSend(DelayMQConfig.DELAY_EXCHANGE_NAME, DelayMQConfig.DELAY_ROUTE_KEY,newPlanDTO, msg->{//设置延迟msg.getMessageProperties().setDelay((int)x);return msg;});}return true;}catch (Exception e){e.printStackTrace();return false;}}

其中这段为生产者的核心代码,rabbitTemplate.convertAndSend(交换机名称,路由Key,传递的消息实体,消息配置器)

用lamdba表达式的方式设置消息的延迟时间,单位为毫秒。

rabbitTemplate.convertAndSend(DelayMQConfig.DELAY_EXCHANGE_NAME, DelayMQConfig.DELAY_ROUTE_KEY,newPlanDTO, msg->{//设置延迟msg.getMessageProperties().setDelay((int)x);return msg;
});

四、效果演示

万事具备,我们启动一下项目来看看效果。

postman添加两个计划,一个计划正常执行,一个计划中途取消。

创建计划1:
在这里插入图片描述

创建计划2:
在这里插入图片描述
修改计划1的时间:

在这里插入图片描述
取消计划2:
在这里插入图片描述
最后效果:
在这里插入图片描述
可以看到原本在40分需要被提醒的消息被取消,而45分只发送了计划1的提醒,完全符合我们的效果!
完毕!


http://chatgpt.dhexx.cn/article/ruVRHBim.shtml

相关文章

企业微信 消息 html,企业微信怎么设置消息提醒

企业微信是一款非常不错的办公软件&#xff0c;用户加入企业群就能实时了解企业的动态。而且大家只需设置消息提醒&#xff0c;软件就会在第一时间通知你&#xff0c;不会让你错过任何重要的消息&#xff0c;下面小编为大家带来相关的设置教程。 方法/步骤分享&#xff1a; 1、…

vue websocket 新消息提醒

概述&#xff1a; 不是当前聊天&#xff0c;有其他消息来就通过2种方式接受到提醒。在连接的上下文中判断&#xff0c;符合条件的弹框&#xff0c;显示红点&#xff0c;此处调用了element弹框组件列表点击事件&#xff0c;红点消失列表显示&#xff0c;属性中包含小红点 前提…

html5载入提示音,html5新消息提示声音

【实例简介】 【实例截图】 【核心代码】HTML5手机声音提示 #chatBox{width:400px;border:1px solid #d3d3d3;margin:50px auto;} #chat {max-height:220px;overflow-y:auto;max-width:400px;} #chat > ul > li{padding:3px;clear:both;padding:4px;margin:10px 0px 5px …

【Android】消息提示notification

notification 1、notification消息提示 由Android系统来管理和维护的&#xff0c;因此用户可以随时进入查看。某些信息不需要用户马上处理&#xff0c;可以利用通知&#xff0c;即延迟消息&#xff0c;比如软件的更新、短信、新闻等。 2、消息包含的内容 3、代码 <Button…

消息提醒系统:设计模式与实现方案 (公告(通告)、消息、提醒等基本功能数据库表设计与实现)

参考地址&#xff1a; 公告(通告),消息,提醒等基本功能数据库表设计_DamonREN的博客-CSDN博客 多种消息提醒系统的设计模式、实现方案&#xff08;附功能截图表结构&#xff09;_黑夜的风的博客-CSDN博客_消息提醒 设计一个百万级的消息推送系统 - crossoverJie - 博客园 案…

android开发 app消息提醒功能,APP消息提醒设计:ios和android的最佳设计方案 – 25学堂...

我们都知道APP一项重要功能就是消息推送,那么通知栏的设计极大程度上反应了这个APP是否合理,那如何可以方便地为用户展示各种通知内容。也就将是我们APP设计师跟APP产品经理重点思考的问题?也要关注移动APP布局设计经验之道! 自从去年发布的iOS5中也引入了这一功能,以替代…

vue浏览器消息提示

vue浏览器消息提示 JS部分 //判断浏览器是否支持浏览器消息弹窗 suportNotify() {if (window.Notification) {// 支持console.log("支持" "Web Notifications API");//如果支持Web Notifications API&#xff0c;再判断浏览器是否支持弹出实例this.show…

html如何设置提示收到消息,从零开始实现一个消息提示框

引言 消息提示框在实际应用场景当中比较常见,最常用的就是element ui的消息提示框,我们通常都是直接使用它们,但是我们有没有尝试过去探究其实现原理,并自己动手实现呢?为了提升我们的个人能力和竞争力,我们可以尝试来实现这样一个消息提示框。 实现效果 我们来查看一下最…

多种消息提醒系统的设计模式、实现方案(附功能截图+表结构)

网站需要增加3种消息提醒系统。需要实现的功能如下&#xff1a; 1.评论提醒。 实现功能 他人回复自己后&#xff0c;右上角自动提醒“未阅读的新消息”的数量。 点击后&#xff0c;清空新消息的提示。 思路 这个是最简单的。在数据库查询&#xff1a; select count(id) …

前端实现实时消息提醒消息通知

需求&#xff1a;当用户收到待审批和待处理的消息后状态栏图标闪烁并进行弹窗提醒&#xff0c;点击消息跳转到指定的消息。 实现方式&#xff1a;web端c端。 说明&#xff1a; 客户不需要非常的及时的接收消息&#xff0c;所以未对接websocket协议&#xff0c;使用20秒刷新一…

Win11任务栏消息提醒功能如何开启教学

Win11任务栏消息提醒功能如何开启教学。我们可以将电脑的消息提示功能开启起来&#xff0c;这样我们接收到各种程序的消息通知时&#xff0c;都可以通过这个功能及时的给出提示。那么Win11任务栏消息提醒功能如何开启&#xff0c;我们接下来看看具体的操作方法吧。 设置方法&am…

用户登录登出功能实现

用户登录登出功能实现 一、功能需求分析 1. 登录退出功能分析 流程图 功能 登录页面登录功能退出功能 二、登录页面 1. 接口设计 接口说明 类目说明请求方法GETurl定义/user/login/参数格式无参数 返回结果 登录页面 2.后端代码 user/views.py代码&#xff1a; fro…

xubuntu系统偶发自动登出

项目场景&#xff1a; 系统&#xff1a;xubuntu-16.04.3-desktop 问题描述 使用xubuntu系统期间&#xff0c;在root用户下进行相关开发&#xff0c;突然系统会回到普通用户登录界面&#xff0c;需要输入密码进入到普通用户下   它会终止所有打开的应用程序和进程&#xff0…

Spring Security OAuth2 单点登录和登出

文章目录 1. 单点登录1.1 使用内存保存客户端和用户信息1.1.1 认证中心 auth-server1.1.2 子系统 service-11.1.3 测试 1.2 使用数据库保存客户端和用户信息1.3 单点登录流程1.2.1 请求授权码&#xff0c;判断未登录&#xff0c;重定向登录页1.2.2 登录成功&#xff0c;重定向继…

OneNote的正确登出方式

1. 账号已经是登录状态 点击oneNote右上角账户信息 》 注销按钮 &#xff08;退出登录&#xff0c;注意要确保笔记已经同步到云端&#xff09; 》会弹出以下提示窗口 》点击是 2. 关闭笔记本 点击左上角 》 文件 》信息》选中自己的笔记本 》设置 》关闭&#xff08;可防止…

springboot2.7+springsecurity实现简单的登录登出

由于网上搜索的资料大部分都是使用的springsecurity的低于5.7的版本&#xff0c;springsecurity的配置类是通过继承WebSecurityConfigurerAdapter这个类来重写configure&#xff0c;以此来实现认证和授权。近期学习springsecurity使用的是5.7的版本&#xff0c;所以简单整理一下…

Vue实现登录以及登出

首先先了解一下&#xff0c;我们的效果实现流程 首先登录概述及业务流程和相关技术点 1.登录页面的布局 2.创建两个Vue.js文件 3.一个我们来做登录页和注册页 4.登录页面的布局 5.配置路由 6…

iOS快捷指令:一键登录/登出南京大学校园网

软件版本要求&#xff1a;iOS13及以上 演示机型&#xff1a;iPhone 12 mini 演示系统版本&#xff1a;iOS14.6 文章目录 导言核心步骤&#xff1a;最简单的一键登录指令自动化&#xff1a;连接NJU-WLAN后自动登录修改细节&#xff1a;让指令变得更优雅小练习&#xff1a;创建…

瑞吉外卖 —— 2、后台登录和登出

目录 1、后台登录功能 1.1、接口分析 1.1.2、登录校验逻辑 1.2、代码 1.2.1、统一的返回结果实体类 1.2.2、controller 方法 1.3、测试 2、后台退出功能 2.1、分析 2.2、代码 3、未登录访问首页跳转到登录页面 3.1、分析 3.2、代码 1、后台登录功能 1.1、接口分析…

oc账号无法登出,oc登出后官网还显示登陆状态?

《C4D的十万个为什么》首发于 公众号&#xff1a;苦七君 免费搜索查看更多问题&#xff1a;kuqijun.com 问题&#xff1a; 在本电脑上&#xff0c;在C4D里面登出oc账号后&#xff0c;官网上还是显示登陆的。。导致账号被限制在此电脑上了&#xff0c;无法用其他电脑登陆。 正…