ThreadPoolTaskExecutor多线程跑任务时数据未跑完

2023-10-18 08:28

本文主要是介绍ThreadPoolTaskExecutor多线程跑任务时数据未跑完,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

1.问题描述:循环查数据,然后用多线程去更新查到的数据

代码如下:

建表语句

CREATE TABLE `tb_user` (`id` int NOT NULL AUTO_INCREMENT COMMENT 'id',`name` varchar(50) DEFAULT NULL COMMENT '名称',PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4

实体类

@Data
@TableName("tb_user")
public class TbUser {@TableId(type = IdType.AUTO)private int id;@TableField("name")private String name;
}

mapper

@Mapper
@Repository
public interface TbUserMapper extends BaseMapper<TbUser> {int updateByIds(List<TbUser> list);
}

xml文件

<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN""http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.test.netty.mapper.TbUserMapper"><update id="updateByIds"><foreach collection="list" item="user" separator=";">update tb_user set name = #{user.name} where id = #{user.id}</foreach></update>
</mapper>

service

package com.test.netty.service;import com.baomidou.mybatisplus.extension.service.IService;
import com.test.netty.pojo.TbUser;public interface UserService extends IService<TbUser> {void testThread();
}

实现类

package com.test.netty.service.impl;import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.test.netty.mapper.TbUserMapper;
import com.test.netty.pojo.TbUser;
import com.test.netty.service.UserService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Service;import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;import static com.baomidou.mybatisplus.extension.toolkit.Db.updateBatchById;@Service
@Slf4j
public class UserServiceImpl extends ServiceImpl<TbUserMapper, TbUser> implements UserService {@Resourceprivate TbUserMapper userMapper;@Autowired@Qualifier("taskExecutor")private ThreadPoolTaskExecutor taskExecutor;@Overridepublic void testThread() {log.info("开始批量更新流程");long start = System.currentTimeMillis();Long count = userMapper.selectCount(null);Integer pageSize = 500;int totalPages = count.intValue()/pageSize;if (count.intValue() % pageSize ==0){totalPages ++;}for (int i=1;i<totalPages;i++){Page<TbUser> page = userMapper.selectPage(new Page<TbUser>(i, 500), null);List<TbUser> list = page.getRecords();List<TbUser> users = new ArrayList<>();list.stream().forEach(user->{user.setName("test6");users.add(user);});int finalI = i;taskExecutor.execute(new Runnable() {@Overridepublic void run() {log.info("第{}次更新", finalI);updateBatchById(users,500);//userMapper.updateByIds(users);log.info("第{}次更新完成", finalI);}});}log.info("所有数据更新完成,耗时:{}",System.currentTimeMillis()-start);}
}

线程池配置

package com.test.netty.config;import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;import java.util.Currency;
import java.util.concurrent.ThreadPoolExecutor;@Configuration
@Slf4j
public class TaskExecutorPoolConfig {private static final int workQueue = 5000;private static final int keepActiveTime = 30;private static final int CPU_COUNT = Runtime.getRuntime().availableProcessors();private static final int corePoolSize = 2 * CPU_COUNT;private static final int maxPoolSize = CPU_COUNT * 5;@Bean("taskExecutor")public ThreadPoolTaskExecutor myTaskExecutor(){ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();executor.setThreadNamePrefix("poolTest-");executor.setCorePoolSize(corePoolSize);executor.setMaxPoolSize(maxPoolSize);executor.setQueueCapacity(workQueue);executor.setKeepAliveSeconds(keepActiveTime);executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());executor.initialize();return executor;}
}

mybatisPlus分页配置

package com.test.netty.config;import com.baomidou.mybatisplus.annotation.DbType;
import com.baomidou.mybatisplus.extension.plugins.MybatisPlusInterceptor;
import com.baomidou.mybatisplus.extension.plugins.inner.PaginationInnerInterceptor;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class MybatisConfig {@Beanpublic MybatisPlusInterceptor mybatisPlusInterceptor(){MybatisPlusInterceptor interceptor = new MybatisPlusInterceptor();interceptor.addInnerInterceptor(new PaginationInnerInterceptor());return interceptor;}
}

测试类

单元测试需要添加test依赖

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId></dependency>
package com.test.netty.controller;import com.test.netty.mapper.DataMapper;
import com.test.netty.mapper.TbUserMapper;
import com.test.netty.pojo.DataDto;
import com.test.netty.pojo.TbUser;
import com.test.netty.service.DataService;
import com.test.netty.service.TestService;
import com.test.netty.service.UserService;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;import java.util.ArrayList;
import java.util.List;import static org.junit.jupiter.api.Assertions.*;
@SpringBootTest
@Slf4j
class TestControllerTest {@Autowiredprivate TbUserMapper userMapper;@Autowiredprivate UserService userService;@Testvoid test1() {}@Testpublic void insertBatch(){for (int i=1;i<=10000;i++){TbUser user = new TbUser();user.setName(String.valueOf(i));userMapper.insert(user);log.info("第{}次插入",i);}}@Testpublic void test(){userService.testThread();}}

application配置文件

server:port: 8081
spring:application:name: netty-test-01datasource:driver-class-name: com.mysql.cj.jdbc.Driverurl: jdbc:mysql://localhost:3306/test?serverTimezone=Asia/Shanghai&characterEncoding=utf-8&allowMultiQueries=trueusername: rootpassword: root

步骤:

1.先运行测试类insertBatch方法,插入数据10000条。
2.运行test方法进行更新

问题

发现执行完成后更新的数据不完整,只有五千左右
在这里插入图片描述
然后自己写sql:
将实现类的方法注解放开
在这里插入图片描述
运行后发现数据全部更新完成
在这里插入图片描述
注意:test7或test8是我每次运行都会改一下修改的值,这个随便写,不用在意。

解决方法:使用ExecutorService线程池

未找到出现这个问题的原因,但是换了一个线程池后问题解决
在实现类中创建线程池
在这里插入图片描述
在这里插入图片描述
完成代码如下:

package com.test.netty.service.impl;import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.test.netty.mapper.TbUserMapper;
import com.test.netty.pojo.TbUser;
import com.test.netty.service.UserService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Service;import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;import static com.baomidou.mybatisplus.extension.toolkit.Db.updateBatchById;@Service
@Slf4j
public class UserServiceImpl extends ServiceImpl<TbUserMapper, TbUser> implements UserService {@Resourceprivate TbUserMapper userMapper;@Autowired@Qualifier("taskExecutor")private ThreadPoolTaskExecutor taskExecutor;@Overridepublic void testThread() {ExecutorService executorService = Executors.newFixedThreadPool(30);log.info("开始批量更新流程");long start = System.currentTimeMillis();Long count = userMapper.selectCount(null);Integer pageSize = 500;int totalPages = count.intValue()/pageSize;if (count.intValue() % pageSize ==0){totalPages ++;}for (int i=1;i<totalPages;i++){Page<TbUser> page = userMapper.selectPage(new Page<TbUser>(i, 500), null);List<TbUser> list = page.getRecords();List<TbUser> users = new ArrayList<>();list.stream().forEach(user->{user.setName("test6");users.add(user);});int finalI = i;executorService.execute(new Runnable() {@Overridepublic void run() {log.info("第{}次更新", finalI);updateBatchById(users,500);userMapper.updateByIds(users);log.info("第{}次更新完成", finalI);}});}executorService.shutdown();try {executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);}catch (Exception e){log.error("error");}log.info("所有数据更新完成,耗时:{}",System.currentTimeMillis()-start);}
}

结果

无论使用哪一种方法,数据都能全部更新,ExecutorService会等待所有线程都结束后再去执行其他业务。

这篇关于ThreadPoolTaskExecutor多线程跑任务时数据未跑完的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/231393

相关文章

Python使用Pandas对比两列数据取最大值的五种方法

《Python使用Pandas对比两列数据取最大值的五种方法》本文主要介绍使用Pandas对比两列数据取最大值的五种方法,包括使用max方法、apply方法结合lambda函数、函数、clip方法、w... 目录引言一、使用max方法二、使用apply方法结合lambda函数三、使用np.maximum函数

SpringBoot中使用 ThreadLocal 进行多线程上下文管理及注意事项小结

《SpringBoot中使用ThreadLocal进行多线程上下文管理及注意事项小结》本文详细介绍了ThreadLocal的原理、使用场景和示例代码,并在SpringBoot中使用ThreadLo... 目录前言技术积累1.什么是 ThreadLocal2. ThreadLocal 的原理2.1 线程隔离2

Java多线程父线程向子线程传值问题及解决

《Java多线程父线程向子线程传值问题及解决》文章总结了5种解决父子之间数据传递困扰的解决方案,包括ThreadLocal+TaskDecorator、UserUtils、CustomTaskDeco... 目录1 背景2 ThreadLocal+TaskDecorator3 RequestContextH

Redis的数据过期策略和数据淘汰策略

《Redis的数据过期策略和数据淘汰策略》本文主要介绍了Redis的数据过期策略和数据淘汰策略,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一... 目录一、数据过期策略1、惰性删除2、定期删除二、数据淘汰策略1、数据淘汰策略概念2、8种数据淘汰策略

轻松上手MYSQL之JSON函数实现高效数据查询与操作

《轻松上手MYSQL之JSON函数实现高效数据查询与操作》:本文主要介绍轻松上手MYSQL之JSON函数实现高效数据查询与操作的相关资料,MySQL提供了多个JSON函数,用于处理和查询JSON数... 目录一、jsON_EXTRACT 提取指定数据二、JSON_UNQUOTE 取消双引号三、JSON_KE

Python给Excel写入数据的四种方法小结

《Python给Excel写入数据的四种方法小结》本文主要介绍了Python给Excel写入数据的四种方法小结,包含openpyxl库、xlsxwriter库、pandas库和win32com库,具有... 目录1. 使用 openpyxl 库2. 使用 xlsxwriter 库3. 使用 pandas 库

SpringBoot定制JSON响应数据的实现

《SpringBoot定制JSON响应数据的实现》本文主要介绍了SpringBoot定制JSON响应数据的实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们... 目录前言一、如何使用@jsonView这个注解?二、应用场景三、实战案例注解方式编程方式总结 前言

使用Python在Excel中创建和取消数据分组

《使用Python在Excel中创建和取消数据分组》Excel中的分组是一种通过添加层级结构将相邻行或列组织在一起的功能,当分组完成后,用户可以通过折叠或展开数据组来简化数据视图,这篇博客将介绍如何使... 目录引言使用工具python在Excel中创建行和列分组Python在Excel中创建嵌套分组Pyt

在Rust中要用Struct和Enum组织数据的原因解析

《在Rust中要用Struct和Enum组织数据的原因解析》在Rust中,Struct和Enum是组织数据的核心工具,Struct用于将相关字段封装为单一实体,便于管理和扩展,Enum用于明确定义所有... 目录为什么在Rust中要用Struct和Enum组织数据?一、使用struct组织数据:将相关字段绑

在Mysql环境下对数据进行增删改查的操作方法

《在Mysql环境下对数据进行增删改查的操作方法》本文介绍了在MySQL环境下对数据进行增删改查的基本操作,包括插入数据、修改数据、删除数据、数据查询(基本查询、连接查询、聚合函数查询、子查询)等,并... 目录一、插入数据:二、修改数据:三、删除数据:1、delete from 表名;2、truncate