大型文件数据读取并持久化到数据库

2024-03-06 19:36

本文主要是介绍大型文件数据读取并持久化到数据库,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

产品经理今天给了一个上亿数据的文本文件给我,让我把导入到mysql数据库。
文本的内容很简单,只有一个字段,但有1亿行。
在这里插入图片描述
我拿到文件后最开始直接用navicat工具直接导入,但发现效率极慢,跑了一分多钟,才导进去10W+数据进去,算下来要跑完至少需要20多个小时,时间不允许。
看来只能自己写代码来提升效率了。
常规的做法肯定是把文件内容按行读取出来,然后每N条拆分一批,再插入到数据库中。但这个文件太大,一次性全部读取到内存中,对机器有点压力。所以只能按批来读取,一边读一边写,已经持久化的数据就及时释放掉,避免一直占用内存。哎!LinkedBlockingQueue 就很适合干这个事。

import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.collection.LineIter;
import cn.hutool.core.io.FileUtil;
import com.yc.kfpt.oversea.dao.entity.SourceCode;
import com.yc.kfpt.oversea.dao.repository.SourceCodeRepository;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;import java.io.BufferedReader;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;/*** @author 敖癸* @formatter:on* @since 2024/3/6*/
@Slf4j
@Service
@RequiredArgsConstructor
public class ImportDataService {private final SourceCodeRepository sourceCodeRepository;private final Executor asyncExecutor;@Asyncpublic void importData() {LinkedBlockingQueue<String> codeQueue = new LinkedBlockingQueue<>(500000);// 监听器线程queueListener(codeQueue).start();readFile("D:\\91-240305-j000.txt", codeQueue);}/*** 创建队列监听器** @param codeQueue* @return java.lang.Thread* @author 敖癸* @since 2024/3/6 - 16:41*/private Thread queueListener(LinkedBlockingQueue<String> codeQueue) {return new Thread(() -> {long index = 0;List<String> codes = new ArrayList<>();while (true) {try {String code = codeQueue.poll(5, TimeUnit.SECONDS);// 如果超过5秒从队列中还没获取到数据,就认为已经没有数据了if (code == null) {if (CollUtil.isNotEmpty(codes)) {log.info("入库处理: {}", index);List<SourceCode> entities = convertToEntity(codes);asyncExecutor.execute(() -> sourceCodeRepository.saveBatch("GENERAL", entities));codes.clear();}break;}index++;codes.add(code);// 5000一个批次if (codes.size() == 5000) {log.info("入库处理: {}", index);List<SourceCode> entities = convertToEntity(codes);// 持久化操作扔到线程池中异步去执行,可以多开点线程数量。asyncExecutor.execute(() -> sourceCodeRepository.saveBatch("GENERAL", entities));codes.clear();}} catch (InterruptedException e) {throw new RuntimeException(e);}}});}/*** 文件读取** @param codeQueue* @author 敖癸* @since 2024/3/6 - 16:41*/private static void readFile(String filePath, LinkedBlockingQueue<String> codeQueue) {BufferedReader reader = FileUtil.getReader(filePath, Charset.defaultCharset());int readCount = 0;  // 读取行数计数try (LineIter lineIter = new LineIter(reader)) {while (lineIter.hasNext()) {readCount++;// 如果codeQueue中的元素个数已达上限,这里会阻塞codeQueue.put(lineIter.next());if (readCount % 50000 == 0) {log.info("已读取{}行", readCount);}}} catch (Exception e) {log.error("文件读取异常", e);}log.info("读取完成,供{}行", readCount);}/*** 将行数据转换成数据库对象** @param codes* @return java.util.List<com.yc.kfpt.oversea.dao.entity.SourceCode>* @author 敖癸* @since 2024/3/6 - 16:43*/private static List<SourceCode> convertToEntity(List<String> codes) {return codes.stream().map(SourceCode::new).collect(Collectors.toList());}
}

实测,1亿数据量,大概花了20分钟导入完成。
这里需要注意的知识点:
LinkedBlockingQueue 的 put 方法,如果队列已满,会阻塞等待,直到队列中腾出空位。
LinkedBlockingQueue 的 poll 方法,可以设置超时时间,在等待超时后如果在队列中还是没有拿到数据,就返回null。
注意 take, add, offer, remove,poll,put的使用区别
注意 take, add, offer, remove,poll,put的使用区别

关于 LinkedBlockingQueue 的详解,可以参考一下这位博主的文章
深入理解Java系列 | LinkedBlockingQueue用法详解

这篇关于大型文件数据读取并持久化到数据库的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/781094

相关文章

Spring Security基于数据库验证流程详解

Spring Security 校验流程图 相关解释说明(认真看哦) AbstractAuthenticationProcessingFilter 抽象类 /*** 调用 #requiresAuthentication(HttpServletRequest, HttpServletResponse) 决定是否需要进行验证操作。* 如果需要验证,则会调用 #attemptAuthentica

大模型研发全揭秘:客服工单数据标注的完整攻略

在人工智能(AI)领域,数据标注是模型训练过程中至关重要的一步。无论你是新手还是有经验的从业者,掌握数据标注的技术细节和常见问题的解决方案都能为你的AI项目增添不少价值。在电信运营商的客服系统中,工单数据是客户问题和解决方案的重要记录。通过对这些工单数据进行有效标注,不仅能够帮助提升客服自动化系统的智能化水平,还能优化客户服务流程,提高客户满意度。本文将详细介绍如何在电信运营商客服工单的背景下进行

基于MySQL Binlog的Elasticsearch数据同步实践

一、为什么要做 随着马蜂窝的逐渐发展,我们的业务数据越来越多,单纯使用 MySQL 已经不能满足我们的数据查询需求,例如对于商品、订单等数据的多维度检索。 使用 Elasticsearch 存储业务数据可以很好的解决我们业务中的搜索需求。而数据进行异构存储后,随之而来的就是数据同步的问题。 二、现有方法及问题 对于数据同步,我们目前的解决方案是建立数据中间表。把需要检索的业务数据,统一放到一张M

关于数据埋点,你需要了解这些基本知识

产品汪每天都在和数据打交道,你知道数据来自哪里吗? 移动app端内的用户行为数据大多来自埋点,了解一些埋点知识,能和数据分析师、技术侃大山,参与到前期的数据采集,更重要是让最终的埋点数据能为我所用,否则可怜巴巴等上几个月是常有的事。   埋点类型 根据埋点方式,可以区分为: 手动埋点半自动埋点全自动埋点 秉承“任何事物都有两面性”的道理:自动程度高的,能解决通用统计,便于统一化管理,但个性化定

使用SecondaryNameNode恢复NameNode的数据

1)需求: NameNode进程挂了并且存储的数据也丢失了,如何恢复NameNode 此种方式恢复的数据可能存在小部分数据的丢失。 2)故障模拟 (1)kill -9 NameNode进程 [lytfly@hadoop102 current]$ kill -9 19886 (2)删除NameNode存储的数据(/opt/module/hadoop-3.1.4/data/tmp/dfs/na

异构存储(冷热数据分离)

异构存储主要解决不同的数据,存储在不同类型的硬盘中,达到最佳性能的问题。 异构存储Shell操作 (1)查看当前有哪些存储策略可以用 [lytfly@hadoop102 hadoop-3.1.4]$ hdfs storagepolicies -listPolicies (2)为指定路径(数据存储目录)设置指定的存储策略 hdfs storagepolicies -setStoragePo

Hadoop集群数据均衡之磁盘间数据均衡

生产环境,由于硬盘空间不足,往往需要增加一块硬盘。刚加载的硬盘没有数据时,可以执行磁盘数据均衡命令。(Hadoop3.x新特性) plan后面带的节点的名字必须是已经存在的,并且是需要均衡的节点。 如果节点不存在,会报如下错误: 如果节点只有一个硬盘的话,不会创建均衡计划: (1)生成均衡计划 hdfs diskbalancer -plan hadoop102 (2)执行均衡计划 hd

MySQL数据库宕机,启动不起来,教你一招搞定!

作者介绍:老苏,10余年DBA工作运维经验,擅长Oracle、MySQL、PG、Mongodb数据库运维(如安装迁移,性能优化、故障应急处理等)公众号:老苏畅谈运维欢迎关注本人公众号,更多精彩与您分享。 MySQL数据库宕机,数据页损坏问题,启动不起来,该如何排查和解决,本文将为你说明具体的排查过程。 查看MySQL error日志 查看 MySQL error日志,排查哪个表(表空间

【Prometheus】PromQL向量匹配实现不同标签的向量数据进行运算

✨✨ 欢迎大家来到景天科技苑✨✨ 🎈🎈 养成好习惯,先赞后看哦~🎈🎈 🏆 作者简介:景天科技苑 🏆《头衔》:大厂架构师,华为云开发者社区专家博主,阿里云开发者社区专家博主,CSDN全栈领域优质创作者,掘金优秀博主,51CTO博客专家等。 🏆《博客》:Python全栈,前后端开发,小程序开发,人工智能,js逆向,App逆向,网络系统安全,数据分析,Django,fastapi

透彻!驯服大型语言模型(LLMs)的五种方法,及具体方法选择思路

引言 随着时间的发展,大型语言模型不再停留在演示阶段而是逐步面向生产系统的应用,随着人们期望的不断增加,目标也发生了巨大的变化。在短短的几个月的时间里,人们对大模型的认识已经从对其zero-shot能力感到惊讶,转变为考虑改进模型质量、提高模型可用性。 「大语言模型(LLMs)其实就是利用高容量的模型架构(例如Transformer)对海量的、多种多样的数据分布进行建模得到,它包含了大量的先验