一、背景
最近承接了一个需求,背景是用户当天可以参与比赛竞猜,当天20点前参与竞猜,第二天上午10点出结果。系统的实现思路是用户参与竞猜时增加竞猜记录,第二天早上9点开始进行跑批,跑批依赖业务的配置,要保证在9点前配置完成,跑批结果也要在10点前跑完,否则影响用户的第二天竞猜。
那么上面这个设计过程存在以下几个问题:
- 依赖业务的配置,如果9点配置没完成,会影响跑批执行
- 如果用户数据量大,10点跑批没结束,用户的竞猜结果出不来
- 如何让跑批执行效率高,100万数据在5分钟时间内跑完
二、在开发中常见的跑批应用场景如下
- 定时更新数据状态:比如用户的竞猜结果胜或负
- 业务数据的计算:比如在金融场景计算罚息
- 文件处理:生成统计文件发送给某个邮箱
- 定时执行某个动作:比如到期给用户扣除月还款额
三、跑批应该关注哪些数据
在跑批的过程中我们应该关心哪些数据?对于业务来说需要知道跑批的量级,对于研发需要知道跑批是否正常进行,数据是否都执行完成,跑批用时多长时间,还有对整个跑批过程的监控。
四、我遇到的一次实际跑批案例
案例是我在22年开发的一个竞猜活动,给竞猜用户开奖,预估跑批量级在1000W,9点开始跑批,要保证在10点之前跑批完成。
因为跑批的数据量级较大,当时根据用户id做的分表,所以要考虑跑批时数据查询问题。
因为要求执行的时间和时长有限,我们只能极限的压缩代码执行效率,所以每台机器要开启多线程跑批。
最后要保证不会重复执行同一条数据,单台机器CPU过高后需要使用多台机器跑批等多种问题
总结需要考虑的问题:
1、分表查询问题
2、跑批时线程安全问题
3、控制数据重复执行问题
4、多台机器并行跑批
设计方案:希望在保证执行效率的同时,还要能动态扩缩容机器,动态扩缩容线程,动态扩缩容表。
1、在内容配置平台增加如下配置文件:
{"machineIpMap": {"127.0.0.1": 1,"127.0.0.2": 2,"127.0.0.3": 3},"tableCount": 128,"threads": 5
}
machineIpMap
是“机器ip集合”,主要控制执行跑批的机器数量和计算分表用
tableCount
是“分表数量”,结合机器数量可计算出每台机器需要操作的分表数,例如:机器ip有3个,分表数量128张,那么用128➗3是42余2,那么第一台机器处理0-41索引的表,第二台机器处理42-83索引的表,其余的表就都交给第三台机器执行,第三台会比前面两台多处理两个表都数据。
threads
是每台机器开启的线程数量,需要根据系统性能做调节,合理的线程数能使机器发挥最大性能。
参考代码如下:
public class DrawCodeJob implements ScheduleFlowTask {// 完成跑批的用户数private AtomicInteger finishCount = new AtomicInteger(0);// 竞猜异常用户数private AtomicInteger errorCount = new AtomicInteger(0);private CountDownLatch countDownLatch;@Overridepublic void doTask(Map<String, Object> params) throws Exception {/****************** 第一步:执行前准备工作 ******************/String startTime = DateUtil.now();// 获取统一配置内容Map<String, Object> configMap = configService.getConfig();// 从配置内容找到机器ip集合Map<String, Integer> machineIpMap = (HashMap)configMap.get("machineIpMap");// 分表数量Integer tableCount = (Integer)configMap.get("tableCount");// 工作线程数Integer threads = (Integer)configMap.get("threads");// 初始化报警数据为0finishCount.set(0);errorCount.set(0);// 创建线程池ExecutorService executorService = new ThreadPoolExecutor(threads,threads,60,TimeUnit.SECONDS,new LinkedBlockingQueue<>(),new ThreadFactoryBuilder().setNameFormat("game-pool-%d").build(),new ThreadPoolExecutor.AbortPolicy());/****************** 第二步:获取本机Ip地址,计算分表范围 ******************/String localIp = IPUtils.getLocalIp();// 找到该机器所属序号Integer number = machineIpMap.get(localIp);// 计算分表范围List<Integer> tableRange = calculateTableRange(number, tableCount);/****************** 第三步:查询分表范围内参与用户数量 ******************/// 参与用户数量,在报警、多线程并发控制中使用int countQuizUsers = countQuizUsers();// 控制所有线程执行完,再发出跑批报告countDownLatch = new CountDownLatch(countQuizUsers);/****************** 第四步:遍历分表,因为每个机器序号获得的分表范围不同,会防止数据重复查询 ******************/for (int tableIndex = 0; tableIndex < tableRange.size(); tableIndex++) {log.info("查询第{}张表", tableIndex);// 查询单表数量Integer tableUsers = queryTableUsers(tableIndex);// 计算分页数量int pageNum = tableUsers / 2000;if (tableUsers % 2000 > 0) {pageNum += 1;}// 每张表分页查询for (int i = 0; i < pageNum; i++) {List<GameRecord> gameRecordList = getGameRecordList(tableIndex, i);// 多线程处理竞猜结果(伪代码)executorService.execute(() -> {try {doHandler(gameRecordList);} catch (Exception e) {// 增加跑批异常数量errorCount.incrementAndGet();log.error("跑批出现异常", e);} finally {// 增加跑批完成数量finishCount.incrementAndGet();countDownLatch.countDown();}});doHandler(gameRecordList);}}/****************** 第五步:跑批结束上报信息 ******************/// 最多等待10分钟countDownLatch.await(10, TimeUnit.MINUTES);// 跑批结束时间String endTime = DateUtil.now();// 需要报告都信息String msg = String.format("竞猜跑批结束\n" +"跑批开始时间:%s\n" +"跑批结束时间:%s\n" +"完成数据:%s\n" +"异常用户数:%s\n"startTime, endTime, finishCount.get(), winCount.get(), failCount.get(), errorCount.get(), countQuizUsers, pageSize, pageNum);// 发送报告,邮件,短信等就可以收到信息Profiler.businessReport(msg);}
}