数据批量导入时,加入队列,分批处理,只是个笔记

2024-05-01 09:58

本文主要是介绍数据批量导入时,加入队列,分批处理,只是个笔记,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

1,用到技术点:队列,线程,单例模式,分批处理

2,添加笔记代码:

入口:

if(null != set && set.size() > 0){//异步,加入队列logger.info(String.format("加入队列,总共  %s 条数据", set.size()));TrackBusinessRunner trackBusinessRunner	= TrackBusinessRunner.getInstance();//获取单例TrackDataDTO trackDataDTO = new TrackDataDTO();//放入处理好的数据trackDataDTO.setParam(param);trackDataDTO.setSets(set);trackBusinessRunner.putQueueOnload(trackDataDTO);//放入队列if(trackBusinessRunner.getThreadTrackBusinessService()==null){trackBusinessRunner.setThreadTrackBusinessService(threadTrackBusinessService);}if(!trackBusinessRunner.isAlive()){trackBusinessRunner.start();//判断是否启动状态,如果不是就启动}}

单例模式和队列的核心类


import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue;import org.apache.commons.collections.CollectionUtils;
import org.apache.log4j.Logger;import com.sf.iec.common.util.BatchHandlerInterface;
import com.sf.iec.common.util.BatchHandlerList;
import com.sf.iec.customerbusiness.inquiryorder.dto.TrackDataDTO;
import com.sf.iec.customerbusiness.inquiryorder.service.ThreadTrackBusinessService;public class TrackBusinessRunner extends Thread {private final static Logger LOGGER = Logger.getLogger(TrackBusinessRunner.class);private ThreadTrackBusinessService threadTrackBusinessService;public ThreadTrackBusinessService getThreadTrackBusinessService() {return threadTrackBusinessService;}private LinkedBlockingQueue<TrackDataDTO> blockingQueue = new LinkedBlockingQueue<TrackDataDTO>(300);//队列长度300,非常推荐该队列(put和take好好用)private volatile boolean running = true;//开启一个线程private TrackBusinessRunner(){}private static TrackBusinessRunner trackBusinessRunner;private static Object obj = new Object();//单例模式public static TrackBusinessRunner getInstance(){if(trackBusinessRunner==null){synchronized (obj) {if(trackBusinessRunner==null){trackBusinessRunner = new TrackBusinessRunner();}}}return trackBusinessRunner;}public void putQueueOnload(TrackDataDTO trackDataDTO){int i= 0;try {blockingQueue.put(trackDataDTO);//加入队列i = 0;} catch (InterruptedException e) {LOGGER.error("加入队列信息异常");e.printStackTrace();if(i < 2){putQueueOnload(trackDataDTO);i++;}}}@Overridepublic void run() {while (running) {try {TrackDataDTO trackDataDTO = blockingQueue.take();Set<Map<String,Object>> set = trackDataDTO.getSets();final Map<String, String> param = trackDataDTO.getParam();//处理 插入  	List<Map<String,Object>> lst = new ArrayList<Map<String,Object>>();CollectionUtils.addAll(lst, set.iterator());
//分批处理,每次取200条BatchHandlerList<Map<String, Object>> handler = new BatchHandlerList<Map<String,Object>>(200,lst) {@Overridepublic void handler(List<Map<String, Object>> subList) {// TODO Auto-generated method stub						threadTrackBusinessService.saveMainTainTrajectory(subList, param);//休眠	try {Thread.sleep(12000);} catch (InterruptedException e) {LOGGER.error("batch track handler thread interrupt excption",e);}//12秒}};handler.handlerList();} catch (Exception e) {LOGGER.error("获取队列信息异常",e);e.printStackTrace();}//取数据,没有的话会等待}}		public void setThreadTrackBusinessService(ThreadTrackBusinessService threadTrackBusinessService) {this.threadTrackBusinessService = threadTrackBusinessService;}}

分批接口

import java.util.List;public interface BatchHandlerInterface<T> {public void handler(List<T> subList);
}

分批处理工具类


import java.util.List;import org.apache.log4j.Logger;/*** @author  * @description 分批调用方法接口* */
public abstract class BatchHandlerList<T> implements BatchHandlerInterface<T> {private static final Logger LOGGER = Logger.getLogger(BatchHandlerList.class);//每次处理条数private Integer perNum;private List<T> aylist;public BatchHandlerList(Integer perNum, List<T> aylist) {super();this.perNum = perNum;this.aylist = aylist;}/*** 分批调用方法* */public void handlerList(){try{if(aylist!=null && aylist.size() > 0){int size = aylist.size();int startIndex = 0;int endIndex = 1;int num = 1;if (size > perNum) {num = size / perNum;}for (int i = 1; i <= num; i++) {endIndex = (i) * perNum > size ? size : (i) * perNum;List<T> subList = aylist.subList(startIndex, endIndex);startIndex = perNum * i;if (subList!=null && subList.size() > 0) {handler(subList);}if (num == i && perNum * num < size) {//最后一批处理subList = aylist.subList(perNum * num, size);if (subList.size() > 0) {handler(subList);}}}}}catch(Throwable e){LOGGER.error("batchHandlerList handler exception",e);//错误回调方法可以重写errorHandler();}}public void errorHandler(){};
}

 

这篇关于数据批量导入时,加入队列,分批处理,只是个笔记的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/951385

相关文章

Python将大量遥感数据的值缩放指定倍数的方法(推荐)

《Python将大量遥感数据的值缩放指定倍数的方法(推荐)》本文介绍基于Python中的gdal模块,批量读取大量多波段遥感影像文件,分别对各波段数据加以数值处理,并将所得处理后数据保存为新的遥感影像... 本文介绍基于python中的gdal模块,批量读取大量多波段遥感影像文件,分别对各波段数据加以数值处

使用MongoDB进行数据存储的操作流程

《使用MongoDB进行数据存储的操作流程》在现代应用开发中,数据存储是一个至关重要的部分,随着数据量的增大和复杂性的增加,传统的关系型数据库有时难以应对高并发和大数据量的处理需求,MongoDB作为... 目录什么是MongoDB?MongoDB的优势使用MongoDB进行数据存储1. 安装MongoDB

Window Server2016加入AD域的方法步骤

《WindowServer2016加入AD域的方法步骤》:本文主要介绍WindowServer2016加入AD域的方法步骤,包括配置DNS、检测ping通、更改计算机域、输入账号密码、重启服务... 目录一、 准备条件二、配置ServerB加入ServerA的AD域(test.ly)三、查看加入AD域后的变

Python MySQL如何通过Binlog获取变更记录恢复数据

《PythonMySQL如何通过Binlog获取变更记录恢复数据》本文介绍了如何使用Python和pymysqlreplication库通过MySQL的二进制日志(Binlog)获取数据库的变更记录... 目录python mysql通过Binlog获取变更记录恢复数据1.安装pymysqlreplicat

Linux使用dd命令来复制和转换数据的操作方法

《Linux使用dd命令来复制和转换数据的操作方法》Linux中的dd命令是一个功能强大的数据复制和转换实用程序,它以较低级别运行,通常用于创建可启动的USB驱动器、克隆磁盘和生成随机数据等任务,本文... 目录简介功能和能力语法常用选项示例用法基础用法创建可启动www.chinasem.cn的 USB 驱动

Oracle数据库使用 listagg去重删除重复数据的方法汇总

《Oracle数据库使用listagg去重删除重复数据的方法汇总》文章介绍了在Oracle数据库中使用LISTAGG和XMLAGG函数进行字符串聚合并去重的方法,包括去重聚合、使用XML解析和CLO... 目录案例表第一种:使用wm_concat() + distinct去重聚合第二种:使用listagg,

Go语言使用Buffer实现高性能处理字节和字符

《Go语言使用Buffer实现高性能处理字节和字符》在Go中,bytes.Buffer是一个非常高效的类型,用于处理字节数据的读写操作,本文将详细介绍一下如何使用Buffer实现高性能处理字节和... 目录1. bytes.Buffer 的基本用法1.1. 创建和初始化 Buffer1.2. 使用 Writ

Redis延迟队列的实现示例

《Redis延迟队列的实现示例》Redis延迟队列是一种使用Redis实现的消息队列,本文主要介绍了Redis延迟队列的实现示例,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习... 目录一、什么是 Redis 延迟队列二、实现原理三、Java 代码示例四、注意事项五、使用 Redi

Python实现将实体类列表数据导出到Excel文件

《Python实现将实体类列表数据导出到Excel文件》在数据处理和报告生成中,将实体类的列表数据导出到Excel文件是一项常见任务,Python提供了多种库来实现这一目标,下面就来跟随小编一起学习一... 目录一、环境准备二、定义实体类三、创建实体类列表四、将实体类列表转换为DataFrame五、导出Da

Python视频处理库VidGear使用小结

《Python视频处理库VidGear使用小结》VidGear是一个高性能的Python视频处理库,本文主要介绍了Python视频处理库VidGear使用小结,文中通过示例代码介绍的非常详细,对大家的... 目录一、VidGear的安装二、VidGear的主要功能三、VidGear的使用示例四、VidGea