Java多线程批量执行sql

article/2025/9/19 3:25:01

当遇到大sql批量导入时几十万上百万数据,使用plsql执行等都是非常的慢。因此开发一套自定义线程池处理sql:

1,线程代码


import java.util.ArrayList;/*** @ClassName: com.ai.order.esb.yulang.tools.handle* @Description: TODO* @version: v1.0.0* @author: yulang* @date: 2019/5/22 10:51* <p>* Modification History:* Date         Author          Version            Description* ------------------------------------------------------------* 2019/5/22      yulang          v1.1.0             第一次创建*/
public class DateHandleThread extends Thread {@Overridepublic void run() {//System.out.println("线程:" + Thread.currentThread().getName());Integer num = BatchImportData.getLine();//获取对应的线程锁,每个线程处理各自对应的数据ArrayList<String> sqlData = BatchImportData.getSqlData(num);//获取每个线程需处理的数据/*for (String sqlDatum : sqlData) {System.out.println("第"+num+"个线程取到的数据:"+sqlDatum);}*/BatchImportData.runSql(sqlData);}
}

2、BatchImportData.java

package com.ai.order.esb.yulang.tools.handle;import org.springframework.jdbc.datasource.init.ScriptUtils;import java.io.*;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Timer;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;/*** @ClassName: com.ai.order.esb.yulang.tools.handle* @Description: 多线程批量导入数据,导入失败会记录到[sqlFilePath]err.sql* @version: v1.0.0* @author: yulang* @date: 2019/5/22 10:51* <p>* Modification History:* Date         Author          Version            Description* ------------------------------------------------------------* 2019/5/22      yulang          v1.1.0             第一次创建*/
public class BatchImportData {private static AtomicInteger line = new AtomicInteger(0);static int threadNum = 4;//配置需要执行的线程数static String sqlFilePath = "D:\\qq文件\\6.sql";//sql文件的路径static int batchHandleNum = 1;//一次批量导入的条数,批量导入可以大幅度提升性能static String url = "jdbc:oracle:thin:@127.0.0.1:1521:test";static String user = "yulang";static String password = "yulang";static ExecutorService pool = Executors.newFixedThreadPool(threadNum);static ArrayList<String> sqlList = new ArrayList<>();//异常日志记录 TODO String泛型可以改成 日志记录表的对象protected static ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<>();private static Boolean isInit = Boolean.FALSE;public static Boolean DateThreadisRes = Boolean.FALSE;/* public static void getInstance() {synchronized (isInit) {if (isInit.equals(Boolean.FALSE)) {Timer timer = new Timer(true);timer.schedule(new MsgInfo(), 3000l, 1000l);//默认3秒后,每秒定时扫描isInit = Boolean.TRUE;}}}*/public static void main(String[] args) throws Exception {Timer timer = new Timer(true);//TODO 此方法如果系统更新时间之后,会导致挂起 可使用timer.schedule(new MsgInfo(), 1000l, 1000l);//默认1秒后,每秒定时扫描long starttime = System.currentTimeMillis();BatchImportData.doJob();try {// awaitTermination返回false即超时会继续循环,返回true即线程池中的线程执行完成主线程跳出循环往下执行,每隔10秒循环一次while (!pool.awaitTermination(10, TimeUnit.SECONDS)) ;DateThreadisRes=Boolean.TRUE;//标记线程池处理结束long spendtime = System.currentTimeMillis() - starttime;System.out.println("数据处理线程池执行结束,处理成功,耗时:" + spendtime / 1000 + "秒");//如果当前队列大于0,说明还有消息没有保存if (queue.size()>0){System.out.println("当前队列剩余:"+queue.size()+"。主线程将休眠:"+queue.size()+"s,若执行完毕可手动关闭!");Thread.sleep(queue.size()*1000);//主线程等待队列,最坏的情况一秒保存一条一直出现提示即可关闭}} catch (InterruptedException e) {e.printStackTrace();}}public static int getLine() {return line.addAndGet(1);}public static ArrayList<String> getSqlData(int num) {//获取本次执行的线程数int piece = sqlList.size() / threadNum;//每个线程应该分派的任务数量//每个线程应该取的范围集合int res = num * piece;int start = piece * (num - 1);ArrayList<String> tempSql = new ArrayList();if (num == threadNum) {System.out.println("第" + num + "个线程处理数据范围为:[" + start + "," + sqlList.size() + ")");for (int i = start; i < sqlList.size(); i++) {tempSql.add(sqlList.get(i));}} else {System.out.println("第" + num + "个线程处理数据范围为:[" + start + "," + res + ")");for (int i = start; i < res; i++) {tempSql.add(sqlList.get(i));}}return tempSql;}public static Connection getConnect() {Connection con = null;try {Class.forName("oracle.jdbc.driver.OracleDriver");con = DriverManager.getConnection(url, user, password);} catch (Exception e) {e.printStackTrace();}return con;}public static void runSql(ArrayList<String> stringList) {Connection con = getConnect();try {Statement st = con.createStatement();StringBuilder sb = new StringBuilder();for (int i = 0; i < stringList.size(); i++) {st.addBatch(stringList.get(i));sb.append(stringList.get(i)).append(";").append(System.getProperty("line.separator"));if (i == stringList.size()) {//剩余不足batchHandleNum条数的批量执行dealDate(st, sb);}if (i % batchHandleNum == 0) {//满batchHandleNum批量执行dealDate(st, sb);}}} catch (Exception e) {e.printStackTrace();} finally {try {con.close();} catch (SQLException e) {e.printStackTrace();}}}private static void dealDate(Statement st, StringBuilder sb) {try {st.executeBatch();sb.setLength(0);//清空} catch (Exception e) {System.out.println("执行异常:异常原因:" + e.getMessage());//记录失败sql//logs(sb.toString());//TODO 可使用队列,mq,kafka,异步处理queue.add(sb.toString());sb.setLength(0);//清空}}public static void logs(String text) {try {BufferedWriter out = new BufferedWriter(new FileWriter(sqlFilePath + "err.sql", true));//追加写入out.write(text);out.close();} catch (IOException e) {}}private static String readSql(String filePath) throws IOException {//读取文件File file = new File(filePath);BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream(file), "GBK"));StringBuilder sql = new StringBuilder();//替换String line = null;int i = 1;//以行为单位进行遍历while ((line = br.readLine()) != null) {sql.append(line);}//关闭输入流br.close();return sql.toString();}public static void doJob() throws Exception {//单线程读取String sql = readSql(sqlFilePath);ScriptUtils.splitSqlScript(sql, ";", sqlList);System.out.println("解析到待处理sql数量:" + sqlList.size());//每个线程只取自己处理的那部分数据for (int i = 0; i < threadNum; i++) {Thread thread = new DateHandleThread();pool.execute(thread);}pool.shutdown();}
}

异步处理定时任务:MsgInfo.java

package com.ai.order.esb.yulang.tools.handle;import java.util.TimerTask;
import java.util.concurrent.ConcurrentLinkedQueue;/*** 日志消息异步处理*/
public class MsgInfo extends TimerTask {public void run() {ConcurrentLinkedQueue<String> value = BatchImportData.queue;try {while (!value.isEmpty()) {String logInfo = value.poll();//TODO 存入数据库或者写入文件BatchImportData.logs(logInfo);//System.out.println("错误sql记录完成!");}} catch (Exception e) {System.err.println("save messagelogs error:" + e);}if (BatchImportData.DateThreadisRes){System.out.println("当前队列剩余数量:"+value.size()+",异步保存完毕,可手动关闭主线程!");}}
}

执行结果展示:

当前执行,一次批量执行一条效果展示。

 

不足的地方,欢迎指正,谢谢!

 

评测效果:

批量导入249665条数据,数据大小为238M,耗时238s:

线程数:4

单次批量导入:2000

执行失败的sql记录错误日志:如图1.sqlerr.sql 

 

后期扩展:

     可以解析的使用单独的工程进行解析,将解析后的数据放到MQ中,以及执行出错的sql记录也可使用mq。

 


http://chatgpt.dhexx.cn/article/fFKW01JU.shtml

相关文章

一条SQL语句是如何执行的?

大家六一儿童节好呀&#xff01; 接下来的一段时间内&#xff0c;将带领大家一同探索MySQL的奥妙&#xff0c;加油吧&#xff01;我们。 下面进入正题&#xff1a;一条SQL语句是如何进行的&#xff1f; 对于这个问题&#xff0c;我想将其分为两个问题来回答&#xff0c;分别是…

mysql 查看线程状态

show full PROCESSLIST 打开两个查询窗口&#xff0c;在A窗口执行一个查询时间较长的sql&#xff0c;在B窗口使用show full PROCESSLIST&#xff0c;可以看到A中执行的sql时间。 sleep表示没有操作&#xff0c;query表示正在查询。

线程状态总结

目录 文章目录 前言 一、线程状态图解 二、线程的几种状态 及线程过程 1.线程的几种状态 2.线程过程 总结 前言 总结了在华清远见这段时间所学的线程相关的知识点&#xff0c;文章记录了线程的几种状态及线程的过程 提示&#xff1a;以下是本篇文章正文内容&#xff0c;下面案…

线程状态

原文&#xff1a;https://mp.weixin.qq.com/s/GsxeFM7QWuR--Kbpb7At2w 人类为了利用好自己的时间&#xff0c;经常会同时做多件事情&#xff0c;比如上厕所时刷手机&#xff0c;开车时听新闻... 对于自己尚且如此&#xff0c;对计算机也不能闲着。为了最大化的提升机器利用率&…

一条SQL语句的执行过程

摘要 本文站在后端开发的角度&#xff0c;讲述一条SQL从客户端发送到MySQL服务器进行处理&#xff0c;并将结果返回给客户端的过程。这个过程中涉及的操作会在后面的文章中做详细的分析。 连接建立 我们通常使用ORM框架来生成SQL语句&#xff0c;在发送SQL语句给MySQL服务器…

多线程执行sql报错处理

pymysql多线程访问数据库报错&#xff1a;Packet sequence number wrong - got 7 expected 2 原文&#xff1a;https://www.cnblogs.com/heiao10duan/p/9373237.html参考&#xff1a;https://www.jianshu.com/p/60c8e0e440ea原因&#xff1a; 使用了多线程&#xff0c;多线程共…

MySQL - 一条 SQL 语句是如何执行的(SQL执行详解)

前言 天天和数据库打交道&#xff0c;一天能写上几十条 SQL 语句&#xff0c;但你知道我们的系统是如何和数据库交互的吗&#xff1f;MySQL 如何帮我们存储数据、又是如何帮我们管理事务&#xff1f;....是不是感觉真的除了写几个 「select * from dual」外基本脑子一片空白&a…

一、SQL语句执行过程

一、MySQL架构图 MySQL逻辑架构图&#xff0c;可以分为 Server 层和存储引擎层两部分。 Server 层包括连接器、查询缓存、分析器、优化器、执行器等&#xff0c;涵盖 MySQL 的大多数核心服务功能&#xff0c;以及所有的内置函数&#xff08;如日期、时间、数学和加密函数等&…

线程状态详解

线程的生命周期 new的时候即为创建状态。 调用start即为启动了&#xff0c;启动线程后就变为就绪状态&#xff0c; 就绪之后等待CPU的调度&#xff0c;CPU调度完之后&#xff0c;就进入了运行状态。 运行状态运行sleep方法时会进入阻塞状态&#xff0c;进入阻塞状态有非常多…

一条 SQL 语句是如何执行的

1. select 语句执行过程 一条 select 语句的执行过程如上图所示 1、建立连接 连接器会校验你输入的用户名和密码是否正确&#xff0c;如果错误会返回提示&#xff0c;如果正确&#xff0c;连接器会查询当前用户对于的权限。连接器的作用就是校验用户权限 2、查询缓存 MySQL…

SQL线程状态分析:processlist

老哥哔哔叨 我们已经写了很多 MySQL 的文章了&#xff0c;比如索引优化、数据库锁、主从复制等等。今天在来和大家学习一个优化方法&#xff1a;show processlist——查看当前所有数据库连接的 session 状态。帮助我们查看每个 SQL 线程的运行状态&#xff0c;是运行正常呀&…

线程的状态

1、线程有5种状态&#xff1a;新建&#xff08;new Thread&#xff09;、就绪&#xff08;runnable&#xff09;&#xff0c;运行&#xff08;running&#xff09;、阻塞&#xff08;blocked&#xff09;、结束&#xff08;dead&#xff09; 主要方法&#xff1a; setPriorit…

基于TCP和HTTP协议的RPC简单实现

一、RPC基本概念 1、基本概念 &#xff08;1&#xff09;RPC&#xff08;Remote Procedure Call Protocol&#xff09;——远程过程调用协议&#xff0c;它是一种通过网络从远程计算机程序上请求服务&#xff0c;而不需要了解底层网络技术的协议&#xff1b; &#xff08;2&…

RPC协议及常用框架

https://www.jianshu.com/p/8ba4b7b834aa RPC协议 RPC:远程过程调用&#xff0c;原则上来说系统间跨进程的调用都属于RPC范畴 RMI/HTTP/dubbo/Spring Cloud/thrift RPC框架如何实现分布式环境下的远程调用 在一个典型的RPC的使用场景中&#xff0c;包含了服务发现&#xf…

【RPC】RPC基础(二)RPC协议

文章目录 RPC核心原理1. RPC基础1.2 RPC协议为什么设计RPC协议如何设计RPC协议可扩展协议的设计思考 RPC核心原理 1. RPC基础 1.2 RPC协议 RPC协议和HTTP协议一样都属于应用层协议 协议的作用&#xff1a; ​ 协议就像是文章中的标点符号、段落格式等规定&#xff0c;有了…

Nodejs 之 RPC 协议简介

背景 随着 Nodejs 的兴起&#xff0c;越来越多的 Web 服务中间层被搭建起来。如 Node 服务端渲染&#xff0c;BFF(Backend For Frontend))层&#xff0c;而 RPC 是远端过程调用&#xff0c;经常用于 BFF 层。最近&#xff0c;我打算写一个中间层&#xff0c;用 Nodejs 调用 Go…

RPC协议底层原理与实现

RPC协议基本组成 在一个典型RPC的使用场景中&#xff0c;包含了服务发现、负载、容错、 网络传输 、 序列化 等组件&#xff0c;其中RPC协议就指明了程序如何进行网络传输和序列化 。也就是说一个RPC协议的实现就等于一个非透明的RPC调用&#xff0c;如何做到的的呢&#xff1f…

RPC详解

RPC是什么 RPC&#xff08;Remote Procedure Call&#xff09;远程过程调用协议&#xff0c;一种通过网络从远程计算机上请求服务&#xff0c;而不需要了解底层网络技术的协议。RPC它假定某些协议的存在&#xff0c;例如TPC/UDP等&#xff0c;为通信程序之间携带信息数据。在O…

深入理解RPC—协议

协议 一提到协议&#xff0c;你最先想到的可能是 TCP 协议、UDP 协议等等&#xff0c;并且这些网络传输协议的实现有点晦涩难懂。虽然在 RPC 中我们也会用到这些协议&#xff0c;但这些协议更多的是对我们上层应用是透明的&#xff0c;我们 RPC 在使用过程中并不太需要关注他们…

浅谈RPC协议

RPC协议 RPC简介为啥需要RPCRPC的调用过程gRPCProtoBuffergRPC实战 RPC简介 RPC&#xff08;Remote Procedure Call Protocol&#xff09;远程过程调用协议&#xff0c;目标就是让远程服务调用更加简单、透明。RPC 框架负责屏蔽底层的传输方式&#xff08;TCP 或者 UDP&#x…