如何设计一个多线程处理跑批功能

article/2025/10/16 5:42:58

一、背景

最近承接了一个需求,背景是用户当天可以参与比赛竞猜,当天20点前参与竞猜,第二天上午10点出结果。系统的实现思路是用户参与竞猜时增加竞猜记录,第二天早上9点开始进行跑批,跑批依赖业务的配置,要保证在9点前配置完成,跑批结果也要在10点前跑完,否则影响用户的第二天竞猜。
那么上面这个设计过程存在以下几个问题:

  1. 依赖业务的配置,如果9点配置没完成,会影响跑批执行
  2. 如果用户数据量大,10点跑批没结束,用户的竞猜结果出不来
  3. 如何让跑批执行效率高,100万数据在5分钟时间内跑完

二、在开发中常见的跑批应用场景如下

  1. 定时更新数据状态:比如用户的竞猜结果胜或负
  2. 业务数据的计算:比如在金融场景计算罚息
  3. 文件处理:生成统计文件发送给某个邮箱
  4. 定时执行某个动作:比如到期给用户扣除月还款额

三、跑批应该关注哪些数据

在跑批的过程中我们应该关心哪些数据?对于业务来说需要知道跑批的量级,对于研发需要知道跑批是否正常进行,数据是否都执行完成,跑批用时多长时间,还有对整个跑批过程的监控。

四、我遇到的一次实际跑批案例

案例是我在22年开发的一个竞猜活动,给竞猜用户开奖,预估跑批量级在1000W,9点开始跑批,要保证在10点之前跑批完成。
因为跑批的数据量级较大,当时根据用户id做的分表,所以要考虑跑批时数据查询问题。
因为要求执行的时间和时长有限,我们只能极限的压缩代码执行效率,所以每台机器要开启多线程跑批。
最后要保证不会重复执行同一条数据,单台机器CPU过高后需要使用多台机器跑批等多种问题

总结需要考虑的问题:
1、分表查询问题
2、跑批时线程安全问题
3、控制数据重复执行问题
4、多台机器并行跑批

设计方案:希望在保证执行效率的同时,还要能动态扩缩容机器,动态扩缩容线程,动态扩缩容表。
1、在内容配置平台增加如下配置文件:

{"machineIpMap": {"127.0.0.1": 1,"127.0.0.2": 2,"127.0.0.3": 3},"tableCount": 128,"threads": 5
}

machineIpMap是“机器ip集合”,主要控制执行跑批的机器数量和计算分表用
tableCount是“分表数量”,结合机器数量可计算出每台机器需要操作的分表数,例如:机器ip有3个,分表数量128张,那么用128➗3是42余2,那么第一台机器处理0-41索引的表,第二台机器处理42-83索引的表,其余的表就都交给第三台机器执行,第三台会比前面两台多处理两个表都数据。
threads是每台机器开启的线程数量,需要根据系统性能做调节,合理的线程数能使机器发挥最大性能。跑批执行流程
参考代码如下:

public class DrawCodeJob implements ScheduleFlowTask {// 完成跑批的用户数private AtomicInteger finishCount = new AtomicInteger(0);// 竞猜异常用户数private AtomicInteger errorCount = new AtomicInteger(0);private CountDownLatch countDownLatch;@Overridepublic void doTask(Map<String, Object> params) throws Exception {/****************** 第一步:执行前准备工作 ******************/String startTime = DateUtil.now();// 获取统一配置内容Map<String, Object> configMap = configService.getConfig();// 从配置内容找到机器ip集合Map<String, Integer> machineIpMap = (HashMap)configMap.get("machineIpMap");// 分表数量Integer tableCount = (Integer)configMap.get("tableCount");// 工作线程数Integer threads = (Integer)configMap.get("threads");// 初始化报警数据为0finishCount.set(0);errorCount.set(0);// 创建线程池ExecutorService executorService = new ThreadPoolExecutor(threads,threads,60,TimeUnit.SECONDS,new LinkedBlockingQueue<>(),new ThreadFactoryBuilder().setNameFormat("game-pool-%d").build(),new ThreadPoolExecutor.AbortPolicy());/****************** 第二步:获取本机Ip地址,计算分表范围 ******************/String localIp = IPUtils.getLocalIp();// 找到该机器所属序号Integer number = machineIpMap.get(localIp);// 计算分表范围List<Integer> tableRange = calculateTableRange(number, tableCount);/****************** 第三步:查询分表范围内参与用户数量 ******************/// 参与用户数量,在报警、多线程并发控制中使用int countQuizUsers = countQuizUsers();// 控制所有线程执行完,再发出跑批报告countDownLatch = new CountDownLatch(countQuizUsers);/****************** 第四步:遍历分表,因为每个机器序号获得的分表范围不同,会防止数据重复查询 ******************/for (int tableIndex = 0; tableIndex < tableRange.size(); tableIndex++) {log.info("查询第{}张表", tableIndex);// 查询单表数量Integer tableUsers = queryTableUsers(tableIndex);// 计算分页数量int pageNum = tableUsers / 2000;if (tableUsers % 2000 > 0) {pageNum += 1;}// 每张表分页查询for (int i = 0; i < pageNum; i++) {List<GameRecord> gameRecordList = getGameRecordList(tableIndex, i);// 多线程处理竞猜结果(伪代码)executorService.execute(() -> {try {doHandler(gameRecordList);} catch (Exception e) {// 增加跑批异常数量errorCount.incrementAndGet();log.error("跑批出现异常", e);} finally {// 增加跑批完成数量finishCount.incrementAndGet();countDownLatch.countDown();}});doHandler(gameRecordList);}}/****************** 第五步:跑批结束上报信息 ******************/// 最多等待10分钟countDownLatch.await(10, TimeUnit.MINUTES);// 跑批结束时间String endTime = DateUtil.now();// 需要报告都信息String msg = String.format("竞猜跑批结束\n" +"跑批开始时间:%s\n" +"跑批结束时间:%s\n" +"完成数据:%s\n" +"异常用户数:%s\n"startTime, endTime, finishCount.get(), winCount.get(), failCount.get(), errorCount.get(), countQuizUsers, pageSize, pageNum);// 发送报告,邮件,短信等就可以收到信息Profiler.businessReport(msg);}
}

http://chatgpt.dhexx.cn/article/esRENclk.shtml

相关文章

跑批利器--批处理应用程序

目前笔者正在进行直销银行互联网核心的设计和研发,在银行相关系统中有一块内容比较关键,那就是跑批.因此接触到了SpringBatch的相关内容,作为学习和记录,有必要将SpringBatch的相关技术点和实际项目中遇到的问题记录下来,同时也跟各位来进行分享和学习. 首先先了解一下什么是批…

跑批 流程、代码梳理

权限系统 全部——ETL服务——计划维护 日程表&#xff1a;到时间触发跑批任务 复制任务编号 ETL设计——作业设计——自定义java——插入语句 找到类所在位置 org.isscloud.portal.agent.scf.batch.FinaResultBatch base下的agent下的scf.batch包下的FinaResultBatch类 S…

跑批为什么这么难

文章目录 问题分析SPL用于跑批应用效果SPL资料 业务系统产生的明细数据通常要经过加工处理&#xff0c;按照一定逻辑计算成需要的结果&#xff0c;用以支持企业的经营活动。这类数据加工任务一般会有很多个&#xff0c;需要批量完成计算&#xff0c;在银行和保险行业常常被称为…

Java开源专业计算引擎:跑批真的这么难吗?

业务系统产生的明细数据通常要经过加工处理&#xff0c;按照一定逻辑计算成需要的结果&#xff0c;用以支持企业的经营活动。这类数据加工任务一般会有很多个&#xff0c;需要批量完成计算&#xff0c;在银行和保险行业常常被称为跑批&#xff0c;其它像石油、电力等行业也经常…

银行跑批业务 的 初步理解(批量批量.....流水账)

一、初步理解 白天的柜台交易, 实时的 对帐户进行操作。 晚上 批量 , 比如 一些报表的生成 , 定期储蓄到期的自动转存 , 行内行外业务清分清算 &#xff0c; 有时还可能赶上利息计算....... 当然 不是所有的数据都是实时操作 , 因此跑批就是为此诞生。 二、逐渐深入 批量…

跑批设计-如何才能让跑批更加高速

跑批的应用场景 在开发过程中跑批经常使用的地方&#xff1a; 消息类&#xff1a;到期失效以及到期批量通知客户计算类&#xff1a;在财务中的罚息、计提、计息文件类&#xff1a;对账信息、还款信息同步以及报表生成 跑批数据特点 数据量非常大实时性并不是特别高&#xf…

跑批bat、shell

“跑批”也叫“批量处理”。批处理&#xff0c;也称为批处理脚本。顾名思义&#xff0c;批处理就是对某对象进行批量的处理&#xff0c;通常被认为是一种简化的脚本语言&#xff0c;它应用于DOS和Windows系统中。批处理文件的扩展名为bat 。 批处理定义&#xff1a;顾名思义&a…

gPRC基础教程

1.什么是RPC? RPC 远过程调用.在理解远程调用之前,首先我们来了解一下本地调用,只有更好的理解了本地调用,才能更好的理解RPC. 1.1 本地调用ex:本地的函数调用在函数调用的时候,一般会经过几个步骤 返回地址入栈参数入栈提升堆栈空间函数参数的复制执行函数调用清空堆栈 1.…

GPRM/GNRMC定位信息的读取与解析

GPRM/GNRMC定位信息的读取与解析 参考网址&#xff1a;http://www.cnblogs.com/88223100/p/GPRM_GNRMC_Transform.html 帧头 UTC时间 状态 纬度 北纬/南纬 经度 东经/西经 速度 $GPRMC hhmmss.sss A/V ddmm.mmmm N/S dddmm.mmmm E/W 节 方位角 UTC日期 磁偏角…

GPRMC转经纬度 地理位置

前言 一、GPRMC是什么&#xff1f; 二、GPRMC如何转经纬度 三、python加地图实现定位 1.Python代码 2.地图定位 四、通过python直接输出位置的尝试 1.需要用到的库geocoder 2.需要准备的库folium 总结 前言 在实际汽车路测的时候会出现GPS偏移&#xff0c;这个时候就需要将GPR…

USB转串口 模拟 PPS+GPRMC 进行授时

把 PC的系统时间 通过USB转串口发出来 来模拟 PPSGPRMC 授时, 这大冷天的, 用来在室内测试MCU或者SOC的授时功能, 传感器的授时与线束检测, 测试干扰等等, 还是比较合适的. 如下图, 左边为 USB转9针RS232串口, 右边为 USB转TTL串口 原理: PPS 有 3V3 / 5V / 12V 等规格, 这里…

GPC规范-SCP02

SPC02 流程 SPC02 指令 命令&#xff1a; 响应&#xff1a; 举例回复&#xff1a; 密钥分散数据&#xff1a; 0000FFFFFFFFFFFFFFFF Key Info&#xff1a; 20 02&#xff08;scp02&#xff09; Card挑战数&#xff1a; 001AC6619BE83082 Card加密值&#xff1a; 7…

STM32模拟GPS输出PPS、GPRMC与VLP16时钟同步

这里写目录标题 1.VLP16与GPS相关的管脚&#xff1a;2.利用GPS信息完成时间同步 TimeSynchronization3.查找同步关系4.修改ROS代码结论 1.VLP16与GPS相关的管脚&#xff1a; GPS-RECEIVE 接收GPS的GPRMC语句&#xff0c;注意是RS232电平(high 3-15V&#xff0c;low 1.2V以下)&…

c++处理GPRMC

#include <iostream> #include<string> #include<vector>// A::member 表示A成员中的member // namespace my_vary202234610229 namespace是c的关键字&#xff0c;将变量定义在自己创建的my_vary命名空间 // 访问命名空间中的变量需要作用域分解符 // 命名空…

gPRC基本介绍

1.说明 gRPC英文全名为Google Remote Procedure Call&#xff0c; 即Google远程过程调用&#xff0c; 是Google发布的一个高性能、通用的开源RPC框架&#xff0c; 2.gRPC定义 gRPC是一个现代的开源高性能RPC框架&#xff0c; 可以在任何环境中运行。 它可以高效地连接数据中心内…

自动驾驶时间同步分析概述--PPS/GPRMC/PTP/全域架构时间同步方案

时间同步的重要性在生活中已经充分体现。试想你因一个姑娘在酒吧和别人大打出手&#xff0c;并约定下周六早上九点在后海小树林里进行群体活动。为此你微信召集了在南非、印度、泰国干建筑的好兄弟。可在你如期赴约的时候&#xff0c;发现队友只有河畔的孤影。当你在病床上睁开…

U-BLOX GPS 模块及GPRMC指令解析

受朋友所托&#xff0c;调试一款GPS模块&#xff0c;该模块是UBLOX的NEO-6M GPS模组。想到用这款GPS的人较多&#xff0c;自己日后也有可能在用到这个模块&#xff0c;就写下这份笔记。 一、介绍 基本信息如下&#xff1a; 1、 模块采用U-BLOX NEO-6M模组&#xff0c;体积小巧&…

ROS节点解析GPS数据:GPRMC/GPFDP/HEADINGA

数据解析&#xff0c;肯定是要知道数据格式的&#xff1a; 数据格式参考&#xff1a;&#xff08;前人已经总结的比较齐全了&#xff09; https://blog.csdn.net/u010384390/article/details/78432016?ops_request_misc%257B%2522request%255Fid%2522%253A%2522163031225416…

python各种模块,迭代器,生成器

从逻辑上组织python代码&#xff08;变量&#xff0c;函数&#xff0c;类&#xff0c;逻辑&#xff1a;实现一个功能&#xff09; 本质就是.py结尾的python文件(文件名&#xff1a;test.py,对应的模块名就是test) 包&#xff1a;用来从逻辑上组织模块的,本质就是一个目录&#…

VFS - 虚拟文件系统的加载和导出

VFS - 代码生成器预览功能实现VFS - 虚拟文件系统基本操作方法的封装VFS - 虚拟文件系统的加载和导出 这是 VFS 的最后一篇&#xff0c;前面两篇中的基本方法已经实现了一个简单的虚拟文件系统&#xff0c;可以创建目录和文件&#xff0c;可以读写文件的内容。在这最后一篇中&a…