大型文件数据读取并持久化到数据库

2024-03-06 19:36

本文主要是介绍大型文件数据读取并持久化到数据库,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

产品经理今天给了一个上亿数据的文本文件给我,让我把导入到mysql数据库。
文本的内容很简单,只有一个字段,但有1亿行。
在这里插入图片描述
我拿到文件后最开始直接用navicat工具直接导入,但发现效率极慢,跑了一分多钟,才导进去10W+数据进去,算下来要跑完至少需要20多个小时,时间不允许。
看来只能自己写代码来提升效率了。
常规的做法肯定是把文件内容按行读取出来,然后每N条拆分一批,再插入到数据库中。但这个文件太大,一次性全部读取到内存中,对机器有点压力。所以只能按批来读取,一边读一边写,已经持久化的数据就及时释放掉,避免一直占用内存。哎!LinkedBlockingQueue 就很适合干这个事。

import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.collection.LineIter;
import cn.hutool.core.io.FileUtil;
import com.yc.kfpt.oversea.dao.entity.SourceCode;
import com.yc.kfpt.oversea.dao.repository.SourceCodeRepository;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;import java.io.BufferedReader;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;/*** @author 敖癸* @formatter:on* @since 2024/3/6*/
@Slf4j
@Service
@RequiredArgsConstructor
public class ImportDataService {private final SourceCodeRepository sourceCodeRepository;private final Executor asyncExecutor;@Asyncpublic void importData() {LinkedBlockingQueue<String> codeQueue = new LinkedBlockingQueue<>(500000);// 监听器线程queueListener(codeQueue).start();readFile("D:\\91-240305-j000.txt", codeQueue);}/*** 创建队列监听器** @param codeQueue* @return java.lang.Thread* @author 敖癸* @since 2024/3/6 - 16:41*/private Thread queueListener(LinkedBlockingQueue<String> codeQueue) {return new Thread(() -> {long index = 0;List<String> codes = new ArrayList<>();while (true) {try {String code = codeQueue.poll(5, TimeUnit.SECONDS);// 如果超过5秒从队列中还没获取到数据,就认为已经没有数据了if (code == null) {if (CollUtil.isNotEmpty(codes)) {log.info("入库处理: {}", index);List<SourceCode> entities = convertToEntity(codes);asyncExecutor.execute(() -> sourceCodeRepository.saveBatch("GENERAL", entities));codes.clear();}break;}index++;codes.add(code);// 5000一个批次if (codes.size() == 5000) {log.info("入库处理: {}", index);List<SourceCode> entities = convertToEntity(codes);// 持久化操作扔到线程池中异步去执行,可以多开点线程数量。asyncExecutor.execute(() -> sourceCodeRepository.saveBatch("GENERAL", entities));codes.clear();}} catch (InterruptedException e) {throw new RuntimeException(e);}}});}/*** 文件读取** @param codeQueue* @author 敖癸* @since 2024/3/6 - 16:41*/private static void readFile(String filePath, LinkedBlockingQueue<String> codeQueue) {BufferedReader reader = FileUtil.getReader(filePath, Charset.defaultCharset());int readCount = 0;  // 读取行数计数try (LineIter lineIter = new LineIter(reader)) {while (lineIter.hasNext()) {readCount++;// 如果codeQueue中的元素个数已达上限,这里会阻塞codeQueue.put(lineIter.next());if (readCount % 50000 == 0) {log.info("已读取{}行", readCount);}}} catch (Exception e) {log.error("文件读取异常", e);}log.info("读取完成,供{}行", readCount);}/*** 将行数据转换成数据库对象** @param codes* @return java.util.List<com.yc.kfpt.oversea.dao.entity.SourceCode>* @author 敖癸* @since 2024/3/6 - 16:43*/private static List<SourceCode> convertToEntity(List<String> codes) {return codes.stream().map(SourceCode::new).collect(Collectors.toList());}
}

实测,1亿数据量,大概花了20分钟导入完成。
这里需要注意的知识点:
LinkedBlockingQueue 的 put 方法,如果队列已满,会阻塞等待,直到队列中腾出空位。
LinkedBlockingQueue 的 poll 方法,可以设置超时时间,在等待超时后如果在队列中还是没有拿到数据,就返回null。
注意 take, add, offer, remove,poll,put的使用区别
注意 take, add, offer, remove,poll,put的使用区别

关于 LinkedBlockingQueue 的详解,可以参考一下这位博主的文章
深入理解Java系列 | LinkedBlockingQueue用法详解

这篇关于大型文件数据读取并持久化到数据库的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/781094

相关文章

Qt实现对Word网页的读取功能

《Qt实现对Word网页的读取功能》文章介绍了几种在Qt中实现Word文档(.docx/.doc)读写功能的方法,包括基于QAxObject的COM接口调用、DOCX模板替换及跨平台解决方案,重点讨论... 目录1. 核心实现方式2. 基于QAxObject的COM接口调用(Windows专用)2.1 环境

MySQL数据目录迁移的完整过程

《MySQL数据目录迁移的完整过程》文章详细介绍了将MySQL数据目录迁移到新硬盘的整个过程,包括新硬盘挂载、创建新的数据目录、迁移数据(推荐使用两遍rsync方案)、修改MySQL配置文件和重启验证... 目录1,新硬盘挂载(如果有的话)2,创建新的 mysql 数据目录3,迁移 MySQL 数据(推荐两

Python数据验证神器Pydantic库的使用和实践中的避坑指南

《Python数据验证神器Pydantic库的使用和实践中的避坑指南》Pydantic是一个用于数据验证和设置的库,可以显著简化API接口开发,文章通过一个实际案例,展示了Pydantic如何在生产环... 目录1️⃣ 崩溃时刻:当你的API接口又双叒崩了!2️⃣ 神兵天降:3行代码解决验证难题3️⃣ 深度

MySQL快速复制一张表的四种核心方法(包括表结构和数据)

《MySQL快速复制一张表的四种核心方法(包括表结构和数据)》本文详细介绍了四种复制MySQL表(结构+数据)的方法,并对每种方法进行了对比分析,适用于不同场景和数据量的复制需求,特别是针对超大表(1... 目录一、mysql 复制表(结构+数据)的 4 种核心方法(面试结构化回答)方法 1:CREATE

JavaWeb项目创建、部署、连接数据库保姆级教程(tomcat)

《JavaWeb项目创建、部署、连接数据库保姆级教程(tomcat)》:本文主要介绍如何在IntelliJIDEA2020.1中创建和部署一个JavaWeb项目,包括创建项目、配置Tomcat服务... 目录简介:一、创建项目二、tomcat部署1、将tomcat解压在一个自己找得到路径2、在idea中添加

详解C++ 存储二进制数据容器的几种方法

《详解C++存储二进制数据容器的几种方法》本文主要介绍了详解C++存储二进制数据容器,包括std::vector、std::array、std::string、std::bitset和std::ve... 目录1.std::vector<uint8_t>(最常用)特点:适用场景:示例:2.std::arra

MySQL MHA集群详解(数据库高可用)

《MySQLMHA集群详解(数据库高可用)》MHA(MasterHighAvailability)是开源MySQL高可用管理工具,用于自动故障检测与转移,支持异步或半同步复制的MySQL主从架构,本... 目录mysql 高可用方案:MHA 详解与实战1. MHA 简介2. MHA 的组件组成(1)MHA

MySQL中的DELETE删除数据及注意事项

《MySQL中的DELETE删除数据及注意事项》MySQL的DELETE语句是数据库操作中不可或缺的一部分,通过合理使用索引、批量删除、避免全表删除、使用TRUNCATE、使用ORDERBY和LIMI... 目录1. 基本语法单表删除2. 高级用法使用子查询删除删除多表3. 性能优化策略使用索引批量删除避免

MySQL 数据库进阶之SQL 数据操作与子查询操作大全

《MySQL数据库进阶之SQL数据操作与子查询操作大全》本文详细介绍了SQL中的子查询、数据添加(INSERT)、数据修改(UPDATE)和数据删除(DELETE、TRUNCATE、DROP)操作... 目录一、子查询:嵌套在查询中的查询1.1 子查询的基本语法1.2 子查询的实战示例二、数据添加:INSE

在C#中读取文件的六种主流方法详解

《在C#中读取文件的六种主流方法详解》在C#中读取文件有多种方法,不同方式适用于不同场景(小型文件、大型文件、文本文件或二进制文件),本文给大家介绍了6种主流方法以及其适用场景,需要的朋友可以参考下... 目录方法1:File.ReadAllText(读取整个文本文件)方法2:File.ReadAllLin