文章目录
- 前言
- 一、技术点
- 二、代码实践
- 1、引入库
- 2、创建启动线程入口
- 3、表结构
- 4、任务解析
- 5、任务拉取
- 三、结果展示
- 四、总结
前言
最近有一个需求:需要实现分布式定时任务。而市面上的定时任务大多数都是基于@Scheduled注解进行实现。不符合需求。所以根据需求整体思路如下:
- 要求接口传入cron表达式,根据表达式进行解析出未来近几次次执行时间,然后将其存入数据库表(taskSchedule)中。
- 启动一个线程一直去取表中的数据进行任务处理。
- 由于是分布式需要实例之间保存心跳。并且要进行抢占式取任务。需要用到redis锁。
一、技术点
1、解析corn表达式
文章链接:
https://blog.csdn.net/qq_43548590/article/details/127424171?spm=1001.2014.3001.5502
https://blog.csdn.net/qq_43548590/article/details/127424630?spm=1001.2014.3001.5502
2、Java集成Redisson分布式锁
文章链接:
https://blog.csdn.net/qq_43548590/article/details/127420314?spm=1001.2014.3001.5502
二、代码实践
1、引入库
<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-webflux</artifactId></dependency><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-openfeign</artifactId></dependency><dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId></dependency><dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-jpa</artifactId></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.10</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId></dependency><dependency><groupId>org.quartz-scheduler</groupId><artifactId>quartz</artifactId><version>2.3.2</version></dependency><dependency><groupId>com.cronutils</groupId><artifactId>cron-utils</artifactId><version>9.1.5</version></dependency><dependency><groupId>org.redisson</groupId><artifactId>redisson</artifactId><version>3.16.8</version></dependency><dependency><groupId>cn.hutool</groupId><artifactId>hutool-all</artifactId><version>5.8.8</version></dependency></dependencies>
2、创建启动线程入口
创建StarterRunner 继承CommandLineRunner 接口。
目的:容器启动之后,加载实现类的逻辑资源,已达到完成资源初始化的任务
这里初始化了两个线程TaskManager为拉取任务,TimeManager为解析cron线程
import au.com.koalaclass.timer.state.InstanceState;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;import javax.annotation.Resource;/*** 启动线程入口*/
@Component
@Slf4j
public class StarterRunner implements CommandLineRunner {@ResourceInstanceState instanceState;@ResourceTimeManager timeManager;@ResourceTaskManager taskManager;Thread timeManagerThread;Thread taskManagerThread;/*** 启动线程*/private void startThreads() {timeManagerThread = new Thread(timeManager);timeManagerThread.start();taskManagerThread = new Thread(taskManager);taskManagerThread.start();}@Overridepublic void run(String... args) throws Exception {log.info("启动了, uuid=" + instanceState.getUuid());startThreads();}
}
3、表结构
这里使用的是Jpa ,BaseEntity为公司内部模块(id,CreateTime,UpdateTime)此次自行更改
1.task_history 任务记录表,用于记录已发送的任务
import au.com.koalaclass.framework.entity.BaseEntity;
import lombok.Data;import javax.persistence.Column;
import javax.persistence.Entity;
import javax.persistence.Table;@Data
@Entity
@Table(name = "task_history")
public class TaskHistoryEntity extends BaseEntity {private long scheduleTime;private String CallUrl;@Column(length = 4000)private String callParams;private String taskUuid;private String triggerUuid;private String taskName;private long dispatchTime;private boolean executeStatus;private String executeResult;private long executeTime;
}
2.task_schedule 根据cron解析后的表
import au.com.koalaclass.framework.entity.BaseEntity;
import lombok.Data;
import lombok.experimental.Accessors;import javax.persistence.Column;
import javax.persistence.Entity;
import javax.persistence.Table;@Data
@Entity
@Table(name = "task_schedule")
@Accessors(chain = true)
public class TaskScheduleEntity extends BaseEntity {/*** 计划时间*/private long scheduleTime;/*** 回调地址*/private String CallUrl;/*** 回调参数*/@Column(length = 4000)private String callParams;/*** 任务uuid*/private String taskUuid;/*** 任务名称*/private String taskName;/*** 触发器UUID*/private String triggerUuid;/*** 实例UUid*/private String instanceUuid;/*** 发送时间*/private long dispatchTime;}
3.trigger_expression 任务表
import au.com.koalaclass.framework.entity.BaseEntity;
import lombok.Data;
import lombok.experimental.Accessors;import javax.persistence.Column;
import javax.persistence.Entity;
import javax.persistence.Table;@Data
@Entity
@Table(name = "trigger_expression")
@Accessors(chain = true)
public class TriggerExpressionEntity extends BaseEntity {/*** cron表达式*/private String expression;/*** 约束开始时间*/private long fromTime;/*** 约束结束时间*/private long endTime;/*** 约束最大产生的计划时间数*/private long maxTimes;/*** 以产生的计划时间数*/private long scheduleTimes;/*** 上一次生成task的最后一位时间*/private long lastTimeGenerated;/*** 任务地址*/private String callUrl;/*** 任务参数*/@Column(length = 4000)private String callParams;/*** 任务名称*/private String name;/*** uuid*/private String uuid;/*** 状态* 0 开启生成* 1 关闭*/private int status;/*** 开启*/public static int OPEN=0;/*** 关闭*/public static int CLOSE=1;}
4、任务解析
TimeManager
使用@Scheduled进行定时处理每一分钟从TriggerExpression表中获取一次任务。根据最后一次获取任务的时间进行重新解析cron。
package au.com.koalaclass.timer.thread;import au.com.koalaclass.timer.service.RedissonService;
import au.com.koalaclass.timer.service.TriggerExpressionService;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;import javax.annotation.Resource;@Component
@Data
@Slf4j
public class TimeManager implements Runnable {private final String LOCK = "schedule_generator";@Resourceprivate TriggerExpressionService service;@Resourceprivate RedissonService redissonService;private boolean stopFlag = false;/*** 启动*/@Override@Scheduled(cron = "0 0/1 * * * ?") //每十分钟执行一次public void run() {if (!redissonService.tryLock(LOCK)) {log.warn("拉起任务,获取锁失败");try {Thread.sleep(10*1000);} catch (InterruptedException e) {e.printStackTrace();}}try {log.info("扫描trigger expression 生成task");service.generateTaskSchedule();}catch (Exception e) {log.error("生成任务出错,原因:"+e.getMessage());} finally {redissonService.unlock(LOCK);}}
}
Service层
此处每次生成未来要执行的30条日期
package au.com.koalaclass.timer.service.impl;import au.com.koalaclass.timer.dao.TriggerExpressionDao;
import au.com.koalaclass.timer.entity.TaskScheduleEntity;
import au.com.koalaclass.timer.entity.TriggerExpressionEntity;
import au.com.koalaclass.timer.form.CreateExpressionForm;
import au.com.koalaclass.timer.form.CreateSingleExpressionForm;
import au.com.koalaclass.timer.service.TaskScheduleService;
import au.com.koalaclass.timer.service.TriggerExpressionService;
import au.com.koalaclass.timer.utils.CronUtil;
import cn.hutool.core.util.StrUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;import javax.annotation.Resource;
import java.text.ParseException;
import java.util.Date;
import java.util.List;
import java.util.UUID;@Slf4j
@Service
public class TriggerExpressionServiceImpl implements TriggerExpressionService {@Resourceprivate TriggerExpressionDao dao;@Resourceprivate TaskScheduleService taskScheduleService;@Overridepublic void createExpression(CreateExpressionForm form) {TriggerExpressionEntity expression = formToDto(form);dao.save(expression);this.generateTaskSchedule();}@Overridepublic void generateTaskSchedule() {//查询最后一次转换时间小于当前时间+2分钟long minutes=1000*60*2;long iTime=System.currentTimeMillis()+minutes;log.info("计算后的时间"+iTime);List<TriggerExpressionEntity> list = dao.findAllByLastTimeGeneratedLessThanAndStatus(iTime,TriggerExpressionEntity.OPEN);log.info("查询Trigger结果:"+list.toString());for (TriggerExpressionEntity item : list) {//生成次数long num=30;if(item.getMaxTimes()!=0){num=item.getMaxTimes();}//判断是否可以继续生成if(item.getStatus()==TriggerExpressionEntity.OPEN){//解析30次List<Long> timeList = null;try {timeList = CronUtil.nextTimes(item.getExpression(),item.getLastTimeGenerated()==0?System.currentTimeMillis():item.getLastTimeGenerated(),(int) num);} catch (ParseException e) {e.printStackTrace();}//最后一次生成时间long lastTimeGenerated=0;//生成次数int generatedNumber=0;for (Long time : timeList) {//判断开始时间为0或者开始时间小于等于生成时间并且结束时间为0或者结束时间大于生成时间if(((item.getFromTime()==0||time>=item.getFromTime())&&(item.getEndTime()==0||time<=item.getEndTime()))){TaskScheduleEntity task = new TaskScheduleEntity();task.setScheduleTime(time).setTaskUuid(UUID.randomUUID().toString().replaceAll("-","")).setTaskName(item.getName()).setCallUrl(item.getCallUrl()).setCallParams(item.getCallParams()).setTriggerUuid(item.getUuid());task = taskScheduleService.createTaskSchedule(task);log.info("生成任务:"+task);lastTimeGenerated=time;generatedNumber++;}else{item.setStatus(TriggerExpressionEntity.CLOSE);}}if(item.getMaxTimes()!=0&&item.getMaxTimes()<=item.getScheduleTimes()+generatedNumber)item.setStatus(TriggerExpressionEntity.CLOSE);item.setLastTimeGenerated(lastTimeGenerated);item.setScheduleTimes(item.getScheduleTimes()+generatedNumber);dao.save(item);}}}@Overridepublic TaskScheduleEntity createSingleExpression(CreateSingleExpressionForm form) {return taskScheduleService.createTaskSchedule(formToTaskScheduleEntity(form));}public TaskScheduleEntity formToTaskScheduleEntity(CreateSingleExpressionForm form){TaskScheduleEntity schedule = new TaskScheduleEntity();schedule.setScheduleTime(form.getScheduleTime()).setTaskUuid(UUID.randomUUID().toString().replaceAll("-","")).setTaskName(form.getTaskName()).setCallUrl(form.getCallUrl()).setCallParams(form.getCallParams());return schedule;}public TriggerExpressionEntity formToDto(CreateExpressionForm form){TriggerExpressionEntity entity = new TriggerExpressionEntity();if(form.getUuid()==null|| StrUtil.isEmpty(form.getUuid())){entity.setUuid(UUID.randomUUID().toString().replaceAll("-",""));}else{entity.setUuid(form.getUuid());}entity.setExpression(form.getExpression()).setCallParams(form.getCallParams()).setFromTime(form.getFromTime()).setEndTime(form.getEndTime()).setCallUrl(form.getCallUrl()).setMaxTimes(form.getMaxTimes()).setName(form.getName());return entity;}
}
5、任务拉取
TaskManager
拉取任务将任务进行过滤如果时间不足200毫秒则直接执行否则将任务发布出去。订阅方会自动处理
package au.com.koalaclass.timer.thread;import au.com.koalaclass.timer.entity.TaskScheduleEntity;
import au.com.koalaclass.timer.event.TimerCallerEvent;
import au.com.koalaclass.timer.service.TaskScheduleService;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.stereotype.Component;import javax.annotation.Resource;
import java.util.*;@Component
@Data
@Slf4j
public class TaskManager implements Runnable {@Resourceprivate ApplicationEventPublisher publisher;@Resourceprivate TaskScheduleService taskScheduleService;private boolean stopFlag = false;private Timer timer = new Timer();/*** 拉取任务*/private void pull() {log.info("拉取任务");List<TaskScheduleEntity> tasks = taskScheduleService.pullTask();/******构造假数据,开始**************/
// if (tasks == null) {
// tasks = new ArrayList<>();
// }
//
// TaskScheduleEntity task1 = new TaskScheduleEntity();
// task1.setId(1L);
// task1.setTaskUuid(UUID.randomUUID().toString());
// task1.setScheduleTime(System.currentTimeMillis());
// //TaskQueue.push(ts);
// tasks.add(task1);
//
// TaskScheduleEntity task2 = new TaskScheduleEntity();
// task2.setTaskUuid(UUID.randomUUID().toString());
// task2.setScheduleTime(System.currentTimeMillis() + 10 * 1000);
// tasks.add(task2);/******构造假数据,完成**************///装载任务if (tasks != null && tasks.size() > 0) {for (TaskScheduleEntity task : tasks) {timeTask(task);}} else {try {Thread.sleep(5*1000);} catch (InterruptedException e) {e.printStackTrace();}}}private void timeTask(TaskScheduleEntity task) {log.info("定时:"+task);long delay = task.getScheduleTime() - System.currentTimeMillis();//如果距离执行时间不足200毫秒,直接触发执行;否则加入定时器,由定时器去触发。if (delay < 200) {callTask(task);} else {TimerTask timerTask = new MyTimerTask(task);timer.schedule(timerTask, delay);}}private void callTask(TaskScheduleEntity task) {TimerCallerEvent event = new TimerCallerEvent(task);event.setTaskSchedule(task);log.info("发送消息:" + event);publisher.publishEvent(event);}@Overridepublic void run() {while (!stopFlag) {try {pull();}catch (Exception e) {log.error("拉取任务异常:"+e.getMessage());}try {Thread.sleep(1000 * 5);} catch (InterruptedException e) {e.printStackTrace();}}}public class MyTimerTask extends TimerTask{private TaskScheduleEntity task;public MyTimerTask(TaskScheduleEntity task) {this.task = task;}@Overridepublic void run() {callTask(task);}}
}
Service层
package au.com.koalaclass.timer.service.impl;import au.com.koalaclass.timer.dao.InstanceDao;
import au.com.koalaclass.timer.dao.TaskScheduleDao;
import au.com.koalaclass.timer.entity.InstanceEntity;
import au.com.koalaclass.timer.entity.TaskScheduleEntity;
import au.com.koalaclass.timer.exception.TaskScheduleException;
import au.com.koalaclass.timer.service.RedissonService;
import au.com.koalaclass.timer.service.TaskScheduleService;
import au.com.koalaclass.timer.state.InstanceState;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.List;@Slf4j
@Service
public class TaskScheduleServiceImpl implements TaskScheduleService {private final static String LOCK = "lock_pull_task";@ResourceTaskScheduleDao taskScheduleDao;@ResourceInstanceDao instanceDao;@ResourceInstanceState instanceState;@ResourceRedissonService redissonService;@Overridepublic List<TaskScheduleEntity> pullTask() {List<TaskScheduleEntity> tasks=new ArrayList<>();//判断是否上锁if (!redissonService.tryLock(LOCK)) {log.warn("拉取任务,获取锁失败");return tasks;}try {//只拉取2分钟内要执行的long now = System.currentTimeMillis();long time = now + 2 * 60 * 1000;//查询2分钟内需要触发的任务List<TaskScheduleEntity> dbTasks = taskScheduleDao.findAllByScheduleTimeBeforeOrderByScheduleTimeAsc(time);//查询所有存活的实例List<InstanceEntity> dbInstances = instanceDao.findAll();//过滤本次拉取的任务tasks = filterTask(dbTasks, dbInstances);//更新数据库,标记已分配给自己tasks = updateTaskToMe(tasks);log.info("抢到任务:{}条", tasks.size());} catch (Exception e) {log.error("拉取任务异常:{}", e);} finally {redissonService.unlock(LOCK);}return tasks;}@Overridepublic TaskScheduleEntity createTaskSchedule(TaskScheduleEntity entity) {return taskScheduleDao.save(entity);}/*** 更新任务到自己实例** @param tasks* @return*/private List<TaskScheduleEntity> updateTaskToMe(List<TaskScheduleEntity> tasks) {long now = System.currentTimeMillis();if (tasks.size() > 0) {for (TaskScheduleEntity task : tasks) {task.setInstanceUuid(instanceState.getUuid());task.setDispatchTime(now);}tasks = taskScheduleDao.saveAll(tasks);}return tasks;}/*** 筛选没分配的,和已分配但是实例死亡的** @param dbTasks* @param dbInstances* @return*/private List<TaskScheduleEntity> filterTask(List<TaskScheduleEntity> dbTasks, List<InstanceEntity> dbInstances) {List<TaskScheduleEntity> tasks = new ArrayList<>();for (TaskScheduleEntity task : dbTasks) {if (tasks.size() >= 10) {return tasks;}if (task.getDispatchTime() < 1) {log.info("抢到没有被分配的任务:"+task);tasks.add(task);} else if (!hasDispatchedToMe(task) && !instanceLive(dbInstances, task)) {log.info("抢到别人死亡的任务:"+task);tasks.add(task);}}return tasks;}/*** 已分给我自己** @param task* @return*/private boolean hasDispatchedToMe(TaskScheduleEntity task) {return instanceState.getUuid().equalsIgnoreCase(task.getInstanceUuid());}/*** 实例已经死亡** @param instances* @param task* @return*/private boolean instanceLive(List<InstanceEntity> instances, TaskScheduleEntity task) {for (InstanceEntity instance : instances) {if (instance.getUuid() != null && instance.getUuid().equalsIgnoreCase(task.getInstanceUuid())) {return true;}}return false;}
}
TimerCallerEvent
package au.com.koalaclass.timer.event;import au.com.koalaclass.timer.entity.TaskScheduleEntity;
import lombok.Getter;
import lombok.Setter;
import org.springframework.context.ApplicationEvent;/*** 定时器叫醒事件。定时器到了时间就产生一个叫醒事件,叫醒事件的侦听器接收该事件。* 参考TimerCallerListener*/
@Getter
@Setter
public class TimerCallerEvent extends ApplicationEvent {private TaskScheduleEntity taskSchedule;public TimerCallerEvent(Object source) {super(source);}
}
TimerCallerListener
package au.com.koalaclass.timer.listener;import au.com.koalaclass.timer.entity.TaskScheduleEntity;
import au.com.koalaclass.timer.event.TimerCallerEvent;
import au.com.koalaclass.timer.service.CallerExecutorService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.event.EventListener;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;import javax.annotation.Resource;/*** 叫醒事件的侦听器,负责执行具体任务*/
@Slf4j
@Component
public class TimerCallerListener {@ResourceCallerExecutorService callerExecutorService;/*** 由线程池负责执行具体的任务** @param event*/@Async("callerThreadPoolTaskExecutor")@EventListenerpublic void processTimerCallerEvent(TimerCallerEvent event) {log.info("接收消息:" + event);try {TaskScheduleEntity task = event.getTaskSchedule();if (task == null) return;callerExecutorService.execute(task);} catch (Exception e) {}}
}
三、结果展示
可以看到根据cron表达式将任务解析进行调用
四、总结
上边用到了一些公司内部包可以自行过滤。保持呼吸部分代码这里没有贴出来。解决的思路就是解析cron然后进行定时逐个发送。
项目地址:https://download.csdn.net/download/qq_43548590/86796289