ThreadPoolTaskExecutor多线程跑任务时数据未跑完

2023-10-18 08:28

本文主要是介绍ThreadPoolTaskExecutor多线程跑任务时数据未跑完,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

1.问题描述:循环查数据,然后用多线程去更新查到的数据

代码如下:

建表语句

CREATE TABLE `tb_user` (`id` int NOT NULL AUTO_INCREMENT COMMENT 'id',`name` varchar(50) DEFAULT NULL COMMENT '名称',PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4

实体类

@Data
@TableName("tb_user")
public class TbUser {@TableId(type = IdType.AUTO)private int id;@TableField("name")private String name;
}

mapper

@Mapper
@Repository
public interface TbUserMapper extends BaseMapper<TbUser> {int updateByIds(List<TbUser> list);
}

xml文件

<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN""http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.test.netty.mapper.TbUserMapper"><update id="updateByIds"><foreach collection="list" item="user" separator=";">update tb_user set name = #{user.name} where id = #{user.id}</foreach></update>
</mapper>

service

package com.test.netty.service;import com.baomidou.mybatisplus.extension.service.IService;
import com.test.netty.pojo.TbUser;public interface UserService extends IService<TbUser> {void testThread();
}

实现类

package com.test.netty.service.impl;import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.test.netty.mapper.TbUserMapper;
import com.test.netty.pojo.TbUser;
import com.test.netty.service.UserService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Service;import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;import static com.baomidou.mybatisplus.extension.toolkit.Db.updateBatchById;@Service
@Slf4j
public class UserServiceImpl extends ServiceImpl<TbUserMapper, TbUser> implements UserService {@Resourceprivate TbUserMapper userMapper;@Autowired@Qualifier("taskExecutor")private ThreadPoolTaskExecutor taskExecutor;@Overridepublic void testThread() {log.info("开始批量更新流程");long start = System.currentTimeMillis();Long count = userMapper.selectCount(null);Integer pageSize = 500;int totalPages = count.intValue()/pageSize;if (count.intValue() % pageSize ==0){totalPages ++;}for (int i=1;i<totalPages;i++){Page<TbUser> page = userMapper.selectPage(new Page<TbUser>(i, 500), null);List<TbUser> list = page.getRecords();List<TbUser> users = new ArrayList<>();list.stream().forEach(user->{user.setName("test6");users.add(user);});int finalI = i;taskExecutor.execute(new Runnable() {@Overridepublic void run() {log.info("第{}次更新", finalI);updateBatchById(users,500);//userMapper.updateByIds(users);log.info("第{}次更新完成", finalI);}});}log.info("所有数据更新完成,耗时:{}",System.currentTimeMillis()-start);}
}

线程池配置

package com.test.netty.config;import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;import java.util.Currency;
import java.util.concurrent.ThreadPoolExecutor;@Configuration
@Slf4j
public class TaskExecutorPoolConfig {private static final int workQueue = 5000;private static final int keepActiveTime = 30;private static final int CPU_COUNT = Runtime.getRuntime().availableProcessors();private static final int corePoolSize = 2 * CPU_COUNT;private static final int maxPoolSize = CPU_COUNT * 5;@Bean("taskExecutor")public ThreadPoolTaskExecutor myTaskExecutor(){ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();executor.setThreadNamePrefix("poolTest-");executor.setCorePoolSize(corePoolSize);executor.setMaxPoolSize(maxPoolSize);executor.setQueueCapacity(workQueue);executor.setKeepAliveSeconds(keepActiveTime);executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());executor.initialize();return executor;}
}

mybatisPlus分页配置

package com.test.netty.config;import com.baomidou.mybatisplus.annotation.DbType;
import com.baomidou.mybatisplus.extension.plugins.MybatisPlusInterceptor;
import com.baomidou.mybatisplus.extension.plugins.inner.PaginationInnerInterceptor;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class MybatisConfig {@Beanpublic MybatisPlusInterceptor mybatisPlusInterceptor(){MybatisPlusInterceptor interceptor = new MybatisPlusInterceptor();interceptor.addInnerInterceptor(new PaginationInnerInterceptor());return interceptor;}
}

测试类

单元测试需要添加test依赖

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId></dependency>
package com.test.netty.controller;import com.test.netty.mapper.DataMapper;
import com.test.netty.mapper.TbUserMapper;
import com.test.netty.pojo.DataDto;
import com.test.netty.pojo.TbUser;
import com.test.netty.service.DataService;
import com.test.netty.service.TestService;
import com.test.netty.service.UserService;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;import java.util.ArrayList;
import java.util.List;import static org.junit.jupiter.api.Assertions.*;
@SpringBootTest
@Slf4j
class TestControllerTest {@Autowiredprivate TbUserMapper userMapper;@Autowiredprivate UserService userService;@Testvoid test1() {}@Testpublic void insertBatch(){for (int i=1;i<=10000;i++){TbUser user = new TbUser();user.setName(String.valueOf(i));userMapper.insert(user);log.info("第{}次插入",i);}}@Testpublic void test(){userService.testThread();}}

application配置文件

server:port: 8081
spring:application:name: netty-test-01datasource:driver-class-name: com.mysql.cj.jdbc.Driverurl: jdbc:mysql://localhost:3306/test?serverTimezone=Asia/Shanghai&characterEncoding=utf-8&allowMultiQueries=trueusername: rootpassword: root

步骤:

1.先运行测试类insertBatch方法,插入数据10000条。
2.运行test方法进行更新

问题

发现执行完成后更新的数据不完整,只有五千左右
在这里插入图片描述
然后自己写sql:
将实现类的方法注解放开
在这里插入图片描述
运行后发现数据全部更新完成
在这里插入图片描述
注意:test7或test8是我每次运行都会改一下修改的值,这个随便写,不用在意。

解决方法:使用ExecutorService线程池

未找到出现这个问题的原因,但是换了一个线程池后问题解决
在实现类中创建线程池
在这里插入图片描述
在这里插入图片描述
完成代码如下:

package com.test.netty.service.impl;import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.test.netty.mapper.TbUserMapper;
import com.test.netty.pojo.TbUser;
import com.test.netty.service.UserService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Service;import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;import static com.baomidou.mybatisplus.extension.toolkit.Db.updateBatchById;@Service
@Slf4j
public class UserServiceImpl extends ServiceImpl<TbUserMapper, TbUser> implements UserService {@Resourceprivate TbUserMapper userMapper;@Autowired@Qualifier("taskExecutor")private ThreadPoolTaskExecutor taskExecutor;@Overridepublic void testThread() {ExecutorService executorService = Executors.newFixedThreadPool(30);log.info("开始批量更新流程");long start = System.currentTimeMillis();Long count = userMapper.selectCount(null);Integer pageSize = 500;int totalPages = count.intValue()/pageSize;if (count.intValue() % pageSize ==0){totalPages ++;}for (int i=1;i<totalPages;i++){Page<TbUser> page = userMapper.selectPage(new Page<TbUser>(i, 500), null);List<TbUser> list = page.getRecords();List<TbUser> users = new ArrayList<>();list.stream().forEach(user->{user.setName("test6");users.add(user);});int finalI = i;executorService.execute(new Runnable() {@Overridepublic void run() {log.info("第{}次更新", finalI);updateBatchById(users,500);userMapper.updateByIds(users);log.info("第{}次更新完成", finalI);}});}executorService.shutdown();try {executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);}catch (Exception e){log.error("error");}log.info("所有数据更新完成,耗时:{}",System.currentTimeMillis()-start);}
}

结果

无论使用哪一种方法,数据都能全部更新,ExecutorService会等待所有线程都结束后再去执行其他业务。

这篇关于ThreadPoolTaskExecutor多线程跑任务时数据未跑完的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/231393

相关文章

大模型研发全揭秘:客服工单数据标注的完整攻略

在人工智能(AI)领域,数据标注是模型训练过程中至关重要的一步。无论你是新手还是有经验的从业者,掌握数据标注的技术细节和常见问题的解决方案都能为你的AI项目增添不少价值。在电信运营商的客服系统中,工单数据是客户问题和解决方案的重要记录。通过对这些工单数据进行有效标注,不仅能够帮助提升客服自动化系统的智能化水平,还能优化客户服务流程,提高客户满意度。本文将详细介绍如何在电信运营商客服工单的背景下进行

基于MySQL Binlog的Elasticsearch数据同步实践

一、为什么要做 随着马蜂窝的逐渐发展,我们的业务数据越来越多,单纯使用 MySQL 已经不能满足我们的数据查询需求,例如对于商品、订单等数据的多维度检索。 使用 Elasticsearch 存储业务数据可以很好的解决我们业务中的搜索需求。而数据进行异构存储后,随之而来的就是数据同步的问题。 二、现有方法及问题 对于数据同步,我们目前的解决方案是建立数据中间表。把需要检索的业务数据,统一放到一张M

关于数据埋点,你需要了解这些基本知识

产品汪每天都在和数据打交道,你知道数据来自哪里吗? 移动app端内的用户行为数据大多来自埋点,了解一些埋点知识,能和数据分析师、技术侃大山,参与到前期的数据采集,更重要是让最终的埋点数据能为我所用,否则可怜巴巴等上几个月是常有的事。   埋点类型 根据埋点方式,可以区分为: 手动埋点半自动埋点全自动埋点 秉承“任何事物都有两面性”的道理:自动程度高的,能解决通用统计,便于统一化管理,但个性化定

使用SecondaryNameNode恢复NameNode的数据

1)需求: NameNode进程挂了并且存储的数据也丢失了,如何恢复NameNode 此种方式恢复的数据可能存在小部分数据的丢失。 2)故障模拟 (1)kill -9 NameNode进程 [lytfly@hadoop102 current]$ kill -9 19886 (2)删除NameNode存储的数据(/opt/module/hadoop-3.1.4/data/tmp/dfs/na

异构存储(冷热数据分离)

异构存储主要解决不同的数据,存储在不同类型的硬盘中,达到最佳性能的问题。 异构存储Shell操作 (1)查看当前有哪些存储策略可以用 [lytfly@hadoop102 hadoop-3.1.4]$ hdfs storagepolicies -listPolicies (2)为指定路径(数据存储目录)设置指定的存储策略 hdfs storagepolicies -setStoragePo

Hadoop集群数据均衡之磁盘间数据均衡

生产环境,由于硬盘空间不足,往往需要增加一块硬盘。刚加载的硬盘没有数据时,可以执行磁盘数据均衡命令。(Hadoop3.x新特性) plan后面带的节点的名字必须是已经存在的,并且是需要均衡的节点。 如果节点不存在,会报如下错误: 如果节点只有一个硬盘的话,不会创建均衡计划: (1)生成均衡计划 hdfs diskbalancer -plan hadoop102 (2)执行均衡计划 hd

【Prometheus】PromQL向量匹配实现不同标签的向量数据进行运算

✨✨ 欢迎大家来到景天科技苑✨✨ 🎈🎈 养成好习惯,先赞后看哦~🎈🎈 🏆 作者简介:景天科技苑 🏆《头衔》:大厂架构师,华为云开发者社区专家博主,阿里云开发者社区专家博主,CSDN全栈领域优质创作者,掘金优秀博主,51CTO博客专家等。 🏆《博客》:Python全栈,前后端开发,小程序开发,人工智能,js逆向,App逆向,网络系统安全,数据分析,Django,fastapi

烟火目标检测数据集 7800张 烟火检测 带标注 voc yolo

一个包含7800张带标注图像的数据集,专门用于烟火目标检测,是一个非常有价值的资源,尤其对于那些致力于公共安全、事件管理和烟花表演监控等领域的人士而言。下面是对此数据集的一个详细介绍: 数据集名称:烟火目标检测数据集 数据集规模: 图片数量:7800张类别:主要包含烟火类目标,可能还包括其他相关类别,如烟火发射装置、背景等。格式:图像文件通常为JPEG或PNG格式;标注文件可能为X

pandas数据过滤

Pandas 数据过滤方法 Pandas 提供了多种方法来过滤数据,可以根据不同的条件进行筛选。以下是一些常见的 Pandas 数据过滤方法,结合实例进行讲解,希望能帮你快速理解。 1. 基于条件筛选行 可以使用布尔索引来根据条件过滤行。 import pandas as pd# 创建示例数据data = {'Name': ['Alice', 'Bob', 'Charlie', 'Dav

SWAP作物生长模型安装教程、数据制备、敏感性分析、气候变化影响、R模型敏感性分析与贝叶斯优化、Fortran源代码分析、气候数据降尺度与变化影响分析

查看原文>>>全流程SWAP农业模型数据制备、敏感性分析及气候变化影响实践技术应用 SWAP模型是由荷兰瓦赫宁根大学开发的先进农作物模型,它综合考虑了土壤-水分-大气以及植被间的相互作用;是一种描述作物生长过程的一种机理性作物生长模型。它不但运用Richard方程,使其能够精确的模拟土壤中水分的运动,而且耦合了WOFOST作物模型使作物的生长描述更为科学。 本文让更多的科研人员和农业工作者