项目代码基于:MySql数据,开发框架为:SpringBoot、Mybatis 开发语言为:Java8前言 公司业务中遇到一个需求,需要同时修改最多约5万条数据,而且还不支持批量或异步修改操作。于是只能写个for循环操作,但操作耗时太长,只能一步一步寻找其他解决方案。 具体操作如下:一、循环操作的代码 先写一个最简单的for循环代码,看看耗时情况怎么样。一条一条依次对50000条数据进行更新操作耗时:2m27s,1m54sTestvoidupdateStudent(){ListStudentallStudentsstudentMapper。getAll();allStudents。forEach(s{更新教师信息Stringteachers。getTeacher();StringnewTeacherTNOnewRandom()。nextInt(100);s。setTeacher(newTeacher);studentMapper。update(s);});} 循环修改整体耗时约1分54秒,且代码中没有手动事务控制应该是自动事务提交,所以每次操作事务都会提交所以操作比较慢,我们先对代码中添加手动事务控制,看查询效率怎样。 最新面试题整理:https:www。javastack。cnmst二、使用手动事务的操作代码 修改后的代码如下:AutowiredprivateDataSourceTransactionManagerdataSourceTransactionManager;AutowiredprivateTransactionDefinitiontransactionDefinition;由于希望更新操作一次性完成,需要手动控制添加事务耗时:24s从测试结果可以看出,添加事务后插入数据的效率有明显的提升TestvoidupdateStudentWithTrans(){ListStudentallStudentsstudentMapper。getAll();TransactionStatustransactionStatusdataSourceTransactionManager。getTransaction(transactionDefinition);try{allStudents。forEach(s{更新教师信息Stringteachers。getTeacher();StringnewTeacherTNOnewRandom()。nextInt(100);s。setTeacher(newTeacher);studentMapper。update(s);});dataSourceTransactionManager。commit(transactionStatus);}catch(Throwablee){dataSourceTransactionManager。rollback(transactionStatus);throwe;}} 添加手动事务操控制后,整体耗时约24秒,这相对于自动事务提交的代码,快了约5倍,对于大量循环数据库提交操作,添加手动事务可以有效提高操作效率。三、尝试多线程进行数据修改 添加数据库手动事务后操作效率有明细提高,但还是比较长,接下来尝试多线程提交看是不是能够再快一些。 先添加一个Service将批量修改操作整合一下,具体代码如下:StudentServiceImpl。javaServicepublicclassStudentServiceImplimplementsStudentService{AutowiredprivateStudentMapperstudentMapper;AutowiredprivateDataSourceTransactionManagerdataSourceTransactionManager;AutowiredprivateTransactionDefinitiontransactionDefinition;OverridepublicvoidupdateStudents(ListStudentstudents,CountDownLatchthreadLatch){TransactionStatustransactionStatusdataSourceTransactionManager。getTransaction(transactionDefinition);System。out。println(子线程:Thread。currentThread()。getName());try{students。forEach(s{更新教师信息Stringteachers。getTeacher();StringnewTeacherTNOnewRandom()。nextInt(100);s。setTeacher(newTeacher);studentMapper。update(s);});dataSourceTransactionManager。commit(transactionStatus);threadLatch。countDown();}catch(Throwablee){e。printStackTrace();dataSourceTransactionManager。rollback(transactionStatus);}}} 批量测试代码,我们采用了多线程进行提交,修改后测试代码如下:AutowiredprivateDataSourceTransactionManagerdataSourceTransactionManager;AutowiredprivateTransactionDefinitiontransactionDefinition;AutowiredprivateStudentServicestudentService;对用户而言,27s任是一个较长的时间,我们尝试用多线程的方式来经行修改操作看能否加快处理速度预计创建10个线程,每个线程进行5000条数据修改操作耗时统计1线程数:1耗时:25s2线程数:2耗时:14s3线程数:5耗时:15s4线程数:10耗时:15s5线程数:100耗时:15s6线程数:200耗时:15s7线程数:500耗时:17s8线程数:1000耗时:19s8线程数:2000耗时:23s8线程数:5000耗时:29sTestvoidupdateStudentWithThreads(){查询总数据ListStudentallStudentsstudentMapper。getAll();线程数量finalIntegerthreadCount100;每个线程处理的数据量finalIntegerdataPartionLength(allStudents。size()threadCount1)threadCount;创建多线程处理任务ExecutorServicestudentThreadPoolExecutors。newFixedThreadPool(threadCount);CountDownLatchthreadLatchsnewCountDownLatch(threadCount);for(inti0;ithreadCount;i){每个线程处理的数据ListStudentthreadDatasallStudents。stream()。skip(idataPartionLength)。limit(dataPartionLength)。collect(Collectors。toList());studentThreadPool。execute((){studentService。updateStudents(threadDatas,threadLatchs);});}try{倒计时锁设置超时时间30sthreadLatchs。await(30,TimeUnit。SECONDS);}catch(Throwablee){e。printStackTrace();}System。out。println(主线程完成);} 多线程提交修改时,我们尝试了不同线程数对提交速度的影响,具体可以看下面表格, 多线程修改50000条数据时不同线程数耗时对比(秒) 根据表格,我们线程数增大提交速度并非一直增大,在当前情况下约在25个线程数时,提交速度最快(实际线程数还是需要根据服务器配置实际测试)。 另外,MySQL系列面试题和答案全部整理好了,微信搜索Java技术栈,在后台发送:面试,可以在线阅读。四、基于两个CountDownLatch控制多线程事务提交 由于多线程提交时,每个线程事务时单独的,无法保证一致性,我们尝试给多线程添加事务控制,来保证每个线程都是在插入数据完成后在提交事务, 这里我们使用两个CountDownLatch来控制主线程与子线程事务提交,并设置了超时时间为30秒。我们对代码进行了一点修改:OverridepublicvoidupdateStudentsThread(ListStudentstudents,CountDownLatchthreadLatch,CountDownLatchmainLatch,StudentTaskErrortaskStatus){TransactionStatustransactionStatusdataSourceTransactionManager。getTransaction(transactionDefinition);System。out。println(子线程:Thread。currentThread()。getName());try{students。forEach(s{更新教师信息Stringteachers。getTeacher();StringnewTeacherTNOnewRandom()。nextInt(100);s。setTeacher(newTeacher);studentMapper。update(s);});}catch(Throwablee){taskStatus。setIsError();}finally{threadLatch。countDown();切换到主线程执行}try{mainLatch。await();等待主线程执行}catch(Throwablee){taskStatus。setIsError();}判断是否有错误,如有错误就回滚事务if(taskStatus。getIsError()){dataSourceTransactionManager。rollback(transactionStatus);}else{dataSourceTransactionManager。commit(transactionStatus);}}由于每个线程都是单独的事务,需要添加对线程事务的统一控制我们这边使用两个CountDownLatch对子线程的事务进行控制TestvoidupdateStudentWithThreadsAndTrans(){查询总数据ListStudentallStudentsstudentMapper。getAll();线程数量finalIntegerthreadCount4;每个线程处理的数据量finalIntegerdataPartionLength(allStudents。size()threadCount1)threadCount;创建多线程处理任务ExecutorServicestudentThreadPoolExecutors。newFixedThreadPool(threadCount);CountDownLatchthreadLatchsnewCountDownLatch(threadCount);用于计算子线程提交数量CountDownLatchmainLatchnewCountDownLatch(1);用于判断主线程是否提交StudentTaskErrortaskStatusnewStudentTaskError();用于判断子线程任务是否有错误for(inti0;ithreadCount;i){每个线程处理的数据ListStudentthreadDatasallStudents。stream()。skip(idataPartionLength)。limit(dataPartionLength)。collect(Collectors。toList());studentThreadPool。execute((){studentService。updateStudentsThread(threadDatas,threadLatchs,mainLatch,taskStatus);});}try{倒计时锁设置超时时间30sbooleanawaitthreadLatchs。await(30,TimeUnit。SECONDS);if(!await){等待超时,事务回滚taskStatus。setIsError();}}catch(Throwablee){e。printStackTrace();taskStatus。setIsError();}mainLatch。countDown();切换到子线程执行studentThreadPool。shutdown();关闭线程池System。out。println(主线程完成);} 本想再次测试一下不同线程数对执行效率的影响时,发现当线程数超过10个时,执行时就报错。具体错误内容如下:Exceptioninthreadpool1thread2org。springframework。transaction。CannotCreateTransactionException:CouldnotopenJDBCConnectionfortransaction;nestedexceptionisjava。sql。SQLTransientConnectionException:HikariPool1Connectionisnotavailable,requesttimedoutafter30055ms。atorg。springframework。jdbc。datasource。DataSourceTransactionManager。doBegin(DataSourceTransactionManager。java:309)atorg。springframework。transaction。support。AbstractPlatformTransactionManager。startTransaction(AbstractPlatformTransactionManager。java:400)atorg。springframework。transaction。support。AbstractPlatformTransactionManager。getTransaction(AbstractPlatformTransactionManager。java:373)atcom。example。springbootmybatis。service。Impl。StudentServiceImpl。updateStudentsThread(StudentServiceImpl。java:58)atcom。example。springbootmybatis。StudentTest。lambdaupdateStudentWithThreadsAndTrans3(StudentTest。java:164)atjava。util。concurrent。ThreadPoolExecutor。runWorker(ThreadPoolExecutor。java:1149)atjava。util。concurrent。ThreadPoolExecutorWorker。run(ThreadPoolExecutor。java:624)atjava。lang。Thread。run(Thread。java:748)Causedby:java。sql。SQLTransientConnectionException:HikariPool1Connectionisnotavailable,requesttimedoutafter30055ms。atcom。zaxxer。hikari。pool。HikariPool。createTimeoutException(HikariPool。java:696)atcom。zaxxer。hikari。pool。HikariPool。getConnection(HikariPool。java:197)atcom。zaxxer。hikari。pool。HikariPool。getConnection(HikariPool。java:162)atcom。zaxxer。hikari。HikariDataSource。getConnection(HikariDataSource。java:128)atorg。springframework。jdbc。datasource。DataSourceTransactionManager。doBegin(DataSourceTransactionManager。java:265)。。。7more 错误的大致意思时,不能为数据库事务打开jdbcConnection,连接在30s的时候超时了。由于前面启动的十个线程需要等待主线程完成后才能提交,所以一直占用连接未释放,造成后面的进程创建连接超时。 看错误日志中错误的来源是HikariPool,我们来重新配置一下这个连接池的参数,将最大连接数修改为100,具体配置如下:连接池中允许的最小连接数。缺省值:10spring。datasource。hikari。minimumidle10连接池中允许的最大连接数。缺省值:10spring。datasource。hikari。maximumpoolsize100自动提交spring。datasource。hikari。autocommittrue一个连接idle状态的最大时长(毫秒),超时则被释放(retired),缺省:10分钟spring。datasource。hikari。idletimeout30000一个连接的生命时长(毫秒),超时而且没被使用则被释放(retired),缺省:30分钟,建议设置比数据库超时时长少30秒spring。datasource。hikari。maxlifetime1800000等待连接池分配连接的最大时长(毫秒),超过这个时长还没可用的连接则发生SQLException,缺省:30秒 再次执行测试发现没有报错,修改线程数为20又执行了一下,同样执行成功了。另外,关注公众号Java技术栈,在后台回复:面试,可以获取我整理的Java系列面试题和答案,非常齐全。五、基于TransactionStatus集合来控制多线程事务提交 在同事推荐下我们使用事务集合来进行多线程事务控制,主要代码如下ServicepublicclassStudentsTransactionThread{AutowiredprivateStudentMapperstudentMapper;AutowiredprivateStudentServicestudentService;AutowiredprivatePlatformTransactionManagertransactionManager;ListTransactionStatustransactionStatusesCollections。synchronizedList(newArrayListTransactionStatus());Transactional(propagationPropagation。REQUIRED,rollbackFor{Exception。class})publicvoidupdateStudentWithThreadsAndTrans()throwsInterruptedException{查询总数据ListStudentallStudentsstudentMapper。getAll();线程数量finalIntegerthreadCount2;每个线程处理的数据量finalIntegerdataPartionLength(allStudents。size()threadCount1)threadCount;创建多线程处理任务ExecutorServicestudentThreadPoolExecutors。newFixedThreadPool(threadCount);CountDownLatchthreadLatchsnewCountDownLatch(threadCount);AtomicBooleanisErrornewAtomicBoolean(false);try{for(inti0;ithreadCount;i){每个线程处理的数据ListStudentthreadDatasallStudents。stream()。skip(idataPartionLength)。limit(dataPartionLength)。collect(Collectors。toList());studentThreadPool。execute((){try{try{studentService。updateStudentsTransaction(transactionManager,transactionStatuses,threadDatas);}catch(Throwablee){e。printStackTrace();isError。set(true);}finally{threadLatchs。countDown();}}catch(Exceptione){e。printStackTrace();isError。set(true);}});}倒计时锁设置超时时间30sbooleanawaitthreadLatchs。await(30,TimeUnit。SECONDS);判断是否超时if(!await){isError。set(true);}}catch(Throwablee){e。printStackTrace();isError。set(true);}if(!transactionStatuses。isEmpty()){if(isError。get()){transactionStatuses。forEach(stransactionManager。rollback(s));}else{transactionStatuses。forEach(stransactionManager。commit(s));}}System。out。println(主线程完成);}}OverrideTransactional(propagationPropagation。REQUIRED,rollbackFor{Exception。class})publicvoidupdateStudentsTransaction(PlatformTransactionManagertransactionManager,ListTransactionStatustransactionStatuses,ListStudentstudents){使用这种方式将事务状态都放在同一个事务里面DefaultTransactionDefinitiondefnewDefaultTransactionDefinition();def。setPropagationBehavior(TransactionDefinition。PROPAGATIONREQUIRESNEW);事物隔离级别,开启新事务,这样会比较安全些。TransactionStatusstatustransactionManager。getTransaction(def);获得事务状态transactionStatuses。add(status);students。forEach(s{更新教师信息Stringteachers。getTeacher();StringnewTeacherTNOnewRandom()。nextInt(100);s。setTeacher(newTeacher);studentMapper。update(s);});System。out。println(子线程:Thread。currentThread()。getName());} 由于这个中方式去前面方式相同,需要等待线程执行完成后才会提交事务,所有任会占用Jdbc连接池,如果线程数量超过连接池最大数量会产生连接超时。所以在使用过程中任要控制线程数量,六、使用union连接多个select实现批量update 有些情况写不支持,批量update,但支持insert多条数据,这个时候可尝试将需要更新的数据拼接成多条select语句,然后使用union连接起来,再使用update关联这个数据进行update,具体代码演示如下:updatestudent,((select1asid,teacherAasteacher)union(select2asid,teacherAasteacher)union(select3asid,teacherAasteacher)union(select4asid,teacherAasteacher)。。。。moredata。。。)asnewteachersetstudent。teachernewteacher。teacherwherestudent。idnewteacher。id 这种方式在Mysql数据库没有配置allowMultiQueriestrue也可以实现批量更新。总结对于大批量数据库操作,使用手动事务提交可以很多程度上提高操作效率多线程对数据库进行操作时,并非线程数越多操作时间越快,按上述示例大约在25个线程时操作时间最快。对于多线程阻塞事务提交时,线程数量不能过多。如果能有办法实现批量更新那是最好 版权声明:本文为博主圣心的原创文章。原文链接:https:blog。csdn。netqq273766764articledetails119972911