简单粗暴的分布式定时任务解决方案

article/2025/10/10 2:34:53

分布式定时任务

  • 1.为什么需要定时任务?
  • 2.数据库实现分布式定时任务
  • 3.基于redis实现

1.为什么需要定时任务?

因为有时候我们需要定时的执行一些操作,比如业务中产生的一些临时文件,临时文件不能立即删除,因为不清楚用户是否操作完毕,不能立即删除,需要隔一段时间然后定时清楚,还有像是一些电商项目,每月进行数据清算。比如某些业务的排行榜,实时性不是高的也可以使用定时任务去统计,然后在做更新。但是我们现在大多数的应用都是分布式的?相当于你写的一个定时任务会在多个子系统中运行,而且是同时的,我们只需要其中一个任务运行就可以了,如果多次运行不仅会无故消耗系统资源,还会导致任务执行出现意外,那么怎么保证这个任务只执行一次呢?其实解决方案有很多。

分布式任务执行出现的问题,如下图所示:

image-20230310211627527

  1. 使用数据库唯一约束加锁
  2. 使用redis的setNX命令
  3. 使用分布式框架Quartz,TBSchedule,elastic-job,Saturn,xxl-job等

当然技术是为业务服务的,我们怎么选择合适的技术,还得是看业务场景,比如一些任务的执行频率不高,也不是特别要求效率,也不复杂,我们完全用不上为了一个定时任务去引入一些第三方的框架作为定时任务实现,我们来介绍两种方式来实现分布式定时任务。

2.数据库实现分布式定时任务

数据库实现定时任务的核心思路:

  1. 需要两张表,一张定时任务配置表,还有一张定时任务运行记录表
  2. 任务配置表有一个唯一约束字段,运行记录表由运行日期+任务名称作为唯一约束,这是实现的核心思路
  3. 使用注解+aop对定时任务进行代理,统一进行加锁操作,避免多次运行
  4. 这种适合任务不频繁,一天在某个时间点执行,对性能要求不高的业务场景,实现起来也比较简单

表SQL语句:

-- 任务运行记录表
CREATE TABLE `task_record` (`ID` varchar(20) NOT NULL COMMENT 'ID',`start_time` datetime DEFAULT NULL COMMENT '定时任务开始时间',`ent_time` datetime DEFAULT NULL COMMENT '定时任务结束时间',`is_success` varchar(1) DEFAULT NULL COMMENT '是否执行成功',`error_cause` longtext COMMENT '失败原因',`task_name` varchar(100) NOT NULL COMMENT '任务名称',`run_date` varchar(6) DEFAULT NULL COMMENT '运行日期',PRIMARY KEY (`ID`),UNIQUE KEY `run_date_task_name_idx` (`run_date`,`task_name`) USING BTREE COMMENT '运行日期+任务名称作为唯一约束'
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='定时任务运行记录表';-- 任务配置表
CREATE TABLE `task_config` (`id` varchar(32) NOT NULL COMMENT '序号',`task_describe` varchar(225) DEFAULT NULL COMMENT '任务描述',`task_name` varchar(100) DEFAULT NULL COMMENT '任务名称',`task_valid` varchar(1) DEFAULT NULL COMMENT '任务有效标志',`create_time` datetime DEFAULT NULL COMMENT '创建时间',PRIMARY KEY (`id`),UNIQUE KEY `task_index` (`task_name`) COMMENT '唯一性约束'
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='定时任务配置表';

1.定时任务标识注解:

/*** 标注在定时任务上,避免多个微服务的情况下定时任务执行重复* @author compass* @date 2023-03-09* @since 1.0**/
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface DatabaseLock {//定时任务使用的键,千万不要重复String lockName() default "";//定时任务描述String lockDesc() default "";
}

2.使用aop代理定时任务方法进行拦截

/*** 代理具体的定时任务,仅有一个任务会被成功执行** @author compass* @date 2023-03-09* @since 1.0**/
@Aspect
@Slf4j
@Component
public class DatabaseLockAspect {@Resourceprivate TaskService taskService;private static final String TASK_IS_VALID = "1";@Around("@annotation( com.springboot.example.annotation.DatabaseLock)")public Object cacheLockPoint(ProceedingJoinPoint pjp) {Date startTime = new Date();TaskRecord taskRecord = new TaskRecord();String isRunSuccess = "1";String taskConfigId;String errorCause = "";Boolean currentDayRunRecord ;Method cacheMethod = null;for (Method method : pjp.getTarget().getClass().getMethods()) {if (null != method.getAnnotation(DatabaseLock.class)) {cacheMethod = method;break;}}if (cacheMethod != null) {String lockName = cacheMethod.getAnnotation(DatabaseLock.class).lockName();String lockDesc = cacheMethod.getAnnotation(DatabaseLock.class).lockDesc();// 运行主键,避免多次运行核心关键String runDate = DateUtil.format(new Date(), "yyyyMMdd");String taskRecordId = IdUtil.getSnowflakeNextIdStr();taskRecord.setTaskName(lockName);taskRecord.setId(taskRecordId);taskRecord.setRunDate(runDate);if (StringUtils.isBlank(lockName)) {throw new RuntimeException("定时任务锁名称不能为空");}if (StringUtils.isBlank(lockDesc)) {throw new RuntimeException("定时任务锁描述不能为空");}TaskConfig taskConfig = taskService.hasRun(lockName);// 还未运行过,进行初始化处理if (taskConfig == null) {TaskConfig config = new TaskConfig();taskConfigId = IdUtil.getSnowflakeNextIdStr();config.setId(taskConfigId);config.setTaskDescribe(lockDesc);config.setTaskName(lockName);config.setTaskValid("1");config.setCreateTime(new Date());try {// 添加时出现异常,已经运行过该定时任务taskService.addTaskConfig(config);taskConfig = config;} catch (Exception e) {e.printStackTrace();}// 有效标志位0表示无需执行} else if (!TASK_IS_VALID.equals(taskConfig.getTaskValid())) {String message =  "该定时任务已经禁用";log.warn("method:{}未获取锁:{}[运行失败原因:{}]", cacheMethod, lockName, message);throw new RuntimeException(String.format("method:%s未获取锁:%s[运行失败原因:%s]", cacheMethod, lockName, message));}// 添加运行记录,以runKey为唯一标识,插入异常,说明执行过try {currentDayRunRecord = taskService.addCurrentDayRunRecord(taskRecord);} catch (Exception e) {log.warn("method:{}未获取锁:{}[运行失败原因:已经有别的服务进行执行]", cacheMethod, lockName);return  null;}// 没有执行过,开始执行if (currentDayRunRecord) {try {log.warn("method:{}获取锁:{},运行成功!", cacheMethod, lockName);return pjp.proceed();} catch (Throwable e) {e.printStackTrace();isRunSuccess = "0";errorCause = ExceptionUtils.getExceptionDetail(e);} finally {Date endTime = new Date();taskRecord.setStartTime(startTime);taskRecord.setId(IdUtil.getSnowflakeNextIdStr());taskRecord.setEntTime(endTime);taskRecord.setIsSuccess(isRunSuccess);taskRecord.setErrorCause(errorCause);// 修改运行记录taskService.updateTaskRunRecord(taskRecord);}}}return null;}
}

3.TaskService实现操作数据库接口与实现

public interface TaskService {/*** 判断定时任务是否运行过** @param taskName* @return com.springboot.example.bean.task.TaskConfig* @author compass* @date 2023/3/10 21:22* @since 1.0.0**/TaskConfig hasRun(String taskName);/*** 将首次运行的任务添加到任务配置表** @param taskConfig* @return java.lang.Boolean* @author compass* @date 2023/3/10 21:23* @since 1.0.0**/Boolean addTaskConfig(TaskConfig taskConfig);/*** 更新定时任务运行记录** @param taskRecord* @return java.lang.Boolean* @author compass* @date 2023/3/10 21:23* @since 1.0.0**/Boolean updateTaskRunRecord(TaskRecord taskRecord);/*** 新增一条运行记录,只有新增成功的服务才可以得到运行劝* @param taskRecord* @return java.lang.Boolean* @author compass* @date 2023/3/10 21:23* @since 1.0.0**/Boolean addCurrentDayRunRecord(TaskRecord taskRecord);
}@Slf4j
@Service
public class TaskServiceImpl implements TaskService {@Resourceprivate TaskConfigMapper taskConfigMapper;@Resourceprivate TaskRecordMapper taskRecordMapper;@Overridepublic TaskConfig hasRun(String taskName) {QueryWrapper<TaskConfig> wrapper = new QueryWrapper<>();wrapper.eq("task_name",taskName);return taskConfigMapper.selectOne(wrapper);}@Overridepublic Boolean addTaskConfig(TaskConfig taskConfig) {return taskConfigMapper.insert(taskConfig)>0;}@Overridepublic Boolean updateTaskRunRecord(TaskRecord taskRecord ) {QueryWrapper<TaskRecord> wrapper = new QueryWrapper<>();wrapper.eq("task_name",taskRecord.getTaskName());wrapper.eq("run_date",taskRecord.getRunDate());return  taskRecordMapper.update(taskRecord,wrapper)>0;}@Overridepublic Boolean addCurrentDayRunRecord(TaskRecord taskRecord) {return taskRecordMapper.insert(taskRecord)>0;}
}

4.数据库对应的实体类

// 配置类
@Data
@TableName("task_config")
public class TaskConfig {/*** 序号*/@TableId(value = "id", type = IdType.ASSIGN_ID)private String id;/*** 任务描述*/private String taskDescribe;/*** 任务名称*/private String taskName;/*** 任务有效标志*/private String taskValid;/*** 创建时间*/private Date createTime;}
// 运行记录类
@Data
@TableName("task_record")
public class TaskRecord {/*** ID*/@TableId(value = "id", type = IdType.ASSIGN_ID)private String id;/*** 定时任务开始时间*/private Date startTime;/*** 定时任务结束时间*/private Date entTime;/*** 是否执行成功*/private String isSuccess;/*** 失败原因*/private String  errorCause;/*** 运行日期[yyyyMMdd]*/private String  runDate;/*** 任务名称(任务名称+运行日期为唯一索引)*/private String  taskName;}

3.基于redis实现

  1. 主要是基于setNX来实现的,setNX表示这个key存在,则设置value失败,只有这个key不存在的时候,才会set成功
  2. 我们可以给这个key指定过期时间,让他一定会释放锁,不然容易出现死锁的情况

1.操作redis锁的工具类

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisCallback;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;import java.util.concurrent.TimeUnit;/*** redis锁工具类,如果redis是集群的话需要考虑数据延时性,这里默认为redis单个节点** @author compass* @date 2023-03-10* @since 1.0**/
@SuppressWarnings(value = {"all"})
@Component
public class RedisLockUtils {@Autowiredprivate RedisTemplate<String, Object> redisTemplate;/*** 加锁** @param key* @param value* @param time* @param timeUnit* @return boolean* @author compass* @date 2023/3/10 22:13* @since 1.0.0**/public boolean lock(String key, String value, long time, TimeUnit timeUnit) {return (Boolean)redisTemplate.execute((RedisCallback) connection -> {Boolean setNX = connection.setNX(key.getBytes(), value.getBytes());if (setNX){return   connection.expire(key.getBytes(),time);}return false;});}/*** 立即释放锁,如果任务执行的非常快,可能会导致其他应用获得到锁,二次执行** @param key* @return boolean* @author compass* @date 2023/3/10 22:13* @since 1.0.0**/public boolean fastReleaseLock(String key) {return redisTemplate.delete(key);}/*** 缓慢释放锁(隔离小段时间再释放锁,可以完全避免掉别的应用获取到锁)** @param key* @param time* @param timeUnit* @return boolean* @author compass* @date 2023/3/10 22:13* @since 1.0.0**/public boolean turtleReleaseLock(String key, long time, TimeUnit timeUnit) {return redisTemplate.expire(key, time, timeUnit);}}

2.aop切入,统一管理定时任务

import com.springboot.example.annotation.CacheLock;
import lombok.extern.slf4j.Slf4j;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.springframework.stereotype.Component;import javax.annotation.Resource;
import java.lang.reflect.Method;
import java.util.concurrent.TimeUnit;/*** 代理具体的定时任务,仅有一个任务会被成功执行** @author compass* @date 2023-03-09* @since 1.0**/
@Aspect
@Slf4j
@Component
public class CacheLockAspect {@Resourceprivate RedisLockUtils redisLockUtils;/*** 加锁值,可以是任意值**/private static final String LOCK_VALUE = "1";@Around("@annotation(com.springboot.example.annotation.CacheLock)")public Object cacheLockPoint(ProceedingJoinPoint pjp) {Method cacheMethod = null;for (Method method : pjp.getTarget().getClass().getMethods()) {if (null != method.getAnnotation(CacheLock.class)) {cacheMethod = method;break;}}if (cacheMethod!=null){CacheLock cacheLock = cacheMethod.getAnnotation(CacheLock.class);String lockName = cacheLock.lockName();long time = cacheLock.timeOut();boolean successLock = redisLockUtils.lock(lockName,LOCK_VALUE, time, TimeUnit.SECONDS);if (successLock){log.info("method:{}获取锁成功:{}", cacheMethod, lockName);try {// 获得锁调用被代理的定时任务return pjp.proceed();} catch (Throwable throwable) {throwable.printStackTrace();}finally {// 延时5秒再去释放锁redisLockUtils.turtleReleaseLock(lockName,5,TimeUnit.SECONDS);}}else {log.warn("method:{}获取锁失败:{}", cacheMethod, lockName);}}return null;}
}

3.自定义注解


/*** 标注在定时任务上,避免多个微服务的情况下定时任务执行重复* @author compass* @date 2023-03-09* @since 1.0**/
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface CacheLock {//定时任务使用的键,千万不要重复String lockName() ;// 占用锁的时间,单位是秒,默认10分钟long timeOut() default 60*10;
}

今天就先介绍这两种方式,后续我再为大家续上使用别的框架进行实现,不过在实现的过程中使用 redisTemplate.opsForValue().setIfAbsent() 出现了一点小肯,他返回的是null值,然后出现空指针,然后我不得不采用execute的方式去执行。


http://chatgpt.dhexx.cn/article/CP3VWPwy.shtml

相关文章

Java 实现分布式定时任务

文章目录 前言一、技术点二、代码实践1、引入库2、创建启动线程入口3、表结构4、任务解析5、任务拉取 三、结果展示四、总结 前言 最近有一个需求&#xff1a;需要实现分布式定时任务。而市面上的定时任务大多数都是基于Scheduled注解进行实现。不符合需求。所以根据需求整体思…

分布式定时任务调度

前言 什么是分布式定时任务? 把分散的&#xff0c;可靠性差的计划任务纳入统一的平台&#xff0c;并实现集群管理调度和分布式部署的一种定时任务的管理方式。叫做分布式定时任务。 为什么要采用分布式定时任务&#xff1f; 单点定时任务的缺点: 功能相对简单&#xff0c…

分布式定时任务-XXL-JOB-教程+实战

一.定时任务概述 1.定时任务认识 1.1.什么是定时任务 定时任务是按照指定时间周期运行任务。使用场景为在某个固定时间点执行&#xff0c;或者周期性的去执行某个任务&#xff0c;比如&#xff1a;每天晚上24点做数据汇总&#xff0c;定时发送短信等。 1.2.常见定时任务方案…

几种常用的分布式定时任务

1. 什么是分布式定时任务 把分散的&#xff0c;可靠性差的计划任务纳入统一的平台&#xff0c;并实现集群管理调度和分布式部署的一种定时任务的管理方式。叫做分布式定时任务。 2. 常见开源方案 elastic-job xxl-job quartz saturn opencron antares elastic-job el…

分布式定时任务

分布式定时任务 1&#xff0c;什么是分布式定时任务&#xff1b;2&#xff0c;为什么要采用分布式定时任务&#xff1b;3&#xff0c;怎么样设计实现一个分布式定时任务&#xff1b;4&#xff0c;当前比较流行的分布式定时任务框架&#xff1b; 1&#xff0c;什么是分布式定时…

python类型转换函数str

str函数&#xff0c;将数字转为字符串&#xff1a;

Python 类型转换(数据类型转换函数大全)

文章目录 虽然 Python 是弱类型编程语言&#xff0c;不需要像 Java 或 C 语言那样还要在使用变量前声明变量的类型&#xff0c;但在一些特定场景中&#xff0c;仍然需要用到类型转换。 比如说&#xff0c;我们想通过使用 print() 函数输出信息“您的身高&#xff1a;”以及浮点…

python怎么转换文件格式_python怎么转换数据类型

在处理数据的时候&#xff0c;经常需要转换数据的格式&#xff0c;来方便数据遍历等操作。下面我们来看一下Python中的几种数据类型转换。 1、字符串转字典&#xff1a;dict_string "{name:linux,age:18}" to_dict eval(dict_string) print(type(to_dict)) 也可以用…

python强制类型转换

1.强转为int string->int a string print(int(a))a 1.2 print(int(a))a 12 print(int(a))string仅在无特殊字符&#xff08;包括小数点&#xff09;且全为数字的情况下可强转为float boolen->int a True b False print(int(a)) print(int(b))float->int a …

python批量转换数据类型_python中数据类型转换

1、list转str 假设有一个名为test_list的list,转换后的str名为test_str 则转换方法: test_str = "".join(test_list) 例子: 需要注意的是该方法需要list中的元素为字符型,若是整型,则需要先转换为字符型后再转为str类型。 2、str转list 假设有一个名为test_str的…

python类型转换

一、int——支持转换为 int 类型的&#xff0c;仅有 float、str、bytes&#xff0c;其他类型均不支持。 1、str与bytes类型是什么&#xff0c;有什么区别 文本总是Unicode&#xff0c;由str类型表示&#xff0c;二进制数据则由bytes类型表示。 字符串是 以字符为单位进行处理…

浅谈Python中的类型转换

目录 &#xff08;一&#xff09;前言 &#xff08;二&#xff09;四种常见转换类型 1. int()函数 &#xff08;1&#xff09;int()函数的格式 &#xff08;2&#xff09;示例 2. float()函数 &#xff08;1&#xff09;float()函数格式&#xff1a; &#xff08;2&…

初学ansys:模态分析及谐响应分析

谐响应为线性系统对简谐激励的稳态响应&#xff0c;当系统含有阻尼或者激励为复数&#xff08;相位不为0或pi&#xff09;,谐响应为复数。 ansys可在模态分析的基础上进行谐响应分析&#xff1a; 2阶频率377.47&#xff0c;振型如下&#xff1a; 当z向加速度10g,右端平面z向位…

关于模态分析

模态分析目的是获得固有频率、模态振型、振型参与系数、有效质量 模态分析是动力学的基础分析,谐响应分析的前提是进行模态分析。 什么是固有频率呢&#xff1f;共振频率呢&#xff1f; 比如一个单摆做好后&#xff0c;他的振动频率等于2Pi&#xff08;l/g&#xff09;^&#x…

Ansys-模态分析基础上的谱分析学习收获

谱分析是一种将模态分析和已知谱联系起来的、计算结构位移和应力的分析方法&#xff0c;主要用于确定结构对随机载荷或时间变化载荷&#xff08;如地震载荷&#xff09;的动力响应。谱是谱值和频率的关系曲线&#xff0c;它反映了时间历程载荷的强度和频率之间的关系。响应谱&a…

什么是模态分析?什么是振型?

模态和振型是两个比较难懂的概念&#xff0c;涉及的理论比较多&#xff0c;我想通过一句话引出&#xff0c;然后通过逐步解释的方法去阐释这两个概念。 以一根梁为例&#xff0c;通过理论计算寻找其固有频率、阻尼比、振型的过程就是解析模态分析&#xff0c;通过实验得到的就…

模态分析实例—斜齿圆柱齿轮的固有频率分析

本例介绍了对一个复杂结构—斜齿圆柱齿轮模型的创建方法&#xff0c;以及利用ANSYS对其进行固有频率和振型研究即模态分析的方法、步骤和过程。 APDL: /CLEAR,NOSTART /FILNAME,EXAMPLE11/PREP7 ET,1,SOLID45 MP,EX,1,2E11 MP,PRXY,1,0.3 MP,DENS,1,7800 K,1,21.87E-3 K,2,22.…

使用ANSYS进行对称边界的模态分析,制作【春节快乐】

这里写自定义目录标题 想法由来1. 建模2. 使用对称边界进行模态分析2.1 处理几何模型2.2 网格划分并设置边界条件进行计算2.3 求解2.4查看结果 延续去年的传统&#xff0c;最近几天一直在想做个什么东西来迎接新年。本来想用keras训练个深度网络&#xff0c;从一大堆图片中识别…

[Ansys Workbench] 平面对称斜拉桥的模态分析

1. 题目 2. 预处理 使用静态结构和模态分析两个模块 2.1 定义材料 2.2 定义几何结构 使用 DesignModeler 不知道 DM 中怎么使用对称轴画图……我就用了笨方法画了 一个主梁 使用 Concept - Lines From Sketches 从草图生成线 得到的线在结构树中如图所示 选择草图中所有的线&…