数据批量导入时,加入队列,分批处理,只是个笔记

2024-05-01 09:58

本文主要是介绍数据批量导入时,加入队列,分批处理,只是个笔记,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

1,用到技术点:队列,线程,单例模式,分批处理

2,添加笔记代码:

入口:

if(null != set && set.size() > 0){//异步,加入队列logger.info(String.format("加入队列,总共  %s 条数据", set.size()));TrackBusinessRunner trackBusinessRunner	= TrackBusinessRunner.getInstance();//获取单例TrackDataDTO trackDataDTO = new TrackDataDTO();//放入处理好的数据trackDataDTO.setParam(param);trackDataDTO.setSets(set);trackBusinessRunner.putQueueOnload(trackDataDTO);//放入队列if(trackBusinessRunner.getThreadTrackBusinessService()==null){trackBusinessRunner.setThreadTrackBusinessService(threadTrackBusinessService);}if(!trackBusinessRunner.isAlive()){trackBusinessRunner.start();//判断是否启动状态,如果不是就启动}}

单例模式和队列的核心类


import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue;import org.apache.commons.collections.CollectionUtils;
import org.apache.log4j.Logger;import com.sf.iec.common.util.BatchHandlerInterface;
import com.sf.iec.common.util.BatchHandlerList;
import com.sf.iec.customerbusiness.inquiryorder.dto.TrackDataDTO;
import com.sf.iec.customerbusiness.inquiryorder.service.ThreadTrackBusinessService;public class TrackBusinessRunner extends Thread {private final static Logger LOGGER = Logger.getLogger(TrackBusinessRunner.class);private ThreadTrackBusinessService threadTrackBusinessService;public ThreadTrackBusinessService getThreadTrackBusinessService() {return threadTrackBusinessService;}private LinkedBlockingQueue<TrackDataDTO> blockingQueue = new LinkedBlockingQueue<TrackDataDTO>(300);//队列长度300,非常推荐该队列(put和take好好用)private volatile boolean running = true;//开启一个线程private TrackBusinessRunner(){}private static TrackBusinessRunner trackBusinessRunner;private static Object obj = new Object();//单例模式public static TrackBusinessRunner getInstance(){if(trackBusinessRunner==null){synchronized (obj) {if(trackBusinessRunner==null){trackBusinessRunner = new TrackBusinessRunner();}}}return trackBusinessRunner;}public void putQueueOnload(TrackDataDTO trackDataDTO){int i= 0;try {blockingQueue.put(trackDataDTO);//加入队列i = 0;} catch (InterruptedException e) {LOGGER.error("加入队列信息异常");e.printStackTrace();if(i < 2){putQueueOnload(trackDataDTO);i++;}}}@Overridepublic void run() {while (running) {try {TrackDataDTO trackDataDTO = blockingQueue.take();Set<Map<String,Object>> set = trackDataDTO.getSets();final Map<String, String> param = trackDataDTO.getParam();//处理 插入  	List<Map<String,Object>> lst = new ArrayList<Map<String,Object>>();CollectionUtils.addAll(lst, set.iterator());
//分批处理,每次取200条BatchHandlerList<Map<String, Object>> handler = new BatchHandlerList<Map<String,Object>>(200,lst) {@Overridepublic void handler(List<Map<String, Object>> subList) {// TODO Auto-generated method stub						threadTrackBusinessService.saveMainTainTrajectory(subList, param);//休眠	try {Thread.sleep(12000);} catch (InterruptedException e) {LOGGER.error("batch track handler thread interrupt excption",e);}//12秒}};handler.handlerList();} catch (Exception e) {LOGGER.error("获取队列信息异常",e);e.printStackTrace();}//取数据,没有的话会等待}}		public void setThreadTrackBusinessService(ThreadTrackBusinessService threadTrackBusinessService) {this.threadTrackBusinessService = threadTrackBusinessService;}}

分批接口

import java.util.List;public interface BatchHandlerInterface<T> {public void handler(List<T> subList);
}

分批处理工具类


import java.util.List;import org.apache.log4j.Logger;/*** @author  * @description 分批调用方法接口* */
public abstract class BatchHandlerList<T> implements BatchHandlerInterface<T> {private static final Logger LOGGER = Logger.getLogger(BatchHandlerList.class);//每次处理条数private Integer perNum;private List<T> aylist;public BatchHandlerList(Integer perNum, List<T> aylist) {super();this.perNum = perNum;this.aylist = aylist;}/*** 分批调用方法* */public void handlerList(){try{if(aylist!=null && aylist.size() > 0){int size = aylist.size();int startIndex = 0;int endIndex = 1;int num = 1;if (size > perNum) {num = size / perNum;}for (int i = 1; i <= num; i++) {endIndex = (i) * perNum > size ? size : (i) * perNum;List<T> subList = aylist.subList(startIndex, endIndex);startIndex = perNum * i;if (subList!=null && subList.size() > 0) {handler(subList);}if (num == i && perNum * num < size) {//最后一批处理subList = aylist.subList(perNum * num, size);if (subList.size() > 0) {handler(subList);}}}}}catch(Throwable e){LOGGER.error("batchHandlerList handler exception",e);//错误回调方法可以重写errorHandler();}}public void errorHandler(){};
}

 

这篇关于数据批量导入时,加入队列,分批处理,只是个笔记的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/951385

相关文章

MySQL 中查询 VARCHAR 类型 JSON 数据的问题记录

《MySQL中查询VARCHAR类型JSON数据的问题记录》在数据库设计中,有时我们会将JSON数据存储在VARCHAR或TEXT类型字段中,本文将详细介绍如何在MySQL中有效查询存储为V... 目录一、问题背景二、mysql jsON 函数2.1 常用 JSON 函数三、查询示例3.1 基本查询3.2

SpringBatch数据写入实现

《SpringBatch数据写入实现》SpringBatch通过ItemWriter接口及其丰富的实现,提供了强大的数据写入能力,本文主要介绍了SpringBatch数据写入实现,具有一定的参考价值,... 目录python引言一、ItemWriter核心概念二、数据库写入实现三、文件写入实现四、多目标写入

SpringKafka错误处理(重试机制与死信队列)

《SpringKafka错误处理(重试机制与死信队列)》SpringKafka提供了全面的错误处理机制,通过灵活的重试策略和死信队列处理,下面就来介绍一下,具有一定的参考价值,感兴趣的可以了解一下... 目录引言一、Spring Kafka错误处理基础二、配置重试机制三、死信队列实现四、特定异常的处理策略五

使用Python将JSON,XML和YAML数据写入Excel文件

《使用Python将JSON,XML和YAML数据写入Excel文件》JSON、XML和YAML作为主流结构化数据格式,因其层次化表达能力和跨平台兼容性,已成为系统间数据交换的通用载体,本文将介绍如何... 目录如何使用python写入数据到Excel工作表用Python导入jsON数据到Excel工作表用

Mysql如何将数据按照年月分组的统计

《Mysql如何将数据按照年月分组的统计》:本文主要介绍Mysql如何将数据按照年月分组的统计方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录mysql将数据按照年月分组的统计要的效果方案总结Mysql将数据按照年月分组的统计要的效果方案① 使用 DA

resultMap如何处理复杂映射问题

《resultMap如何处理复杂映射问题》:本文主要介绍resultMap如何处理复杂映射问题,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录resultMap复杂映射问题Ⅰ 多对一查询:学生——老师Ⅱ 一对多查询:老师——学生总结resultMap复杂映射问题

鸿蒙中Axios数据请求的封装和配置方法

《鸿蒙中Axios数据请求的封装和配置方法》:本文主要介绍鸿蒙中Axios数据请求的封装和配置方法,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友参考下吧... 目录1.配置权限 应用级权限和系统级权限2.配置网络请求的代码3.下载在Entry中 下载AxIOS4.封装Htt

利用Python快速搭建Markdown笔记发布系统

《利用Python快速搭建Markdown笔记发布系统》这篇文章主要为大家详细介绍了使用Python生态的成熟工具,在30分钟内搭建一个支持Markdown渲染、分类标签、全文搜索的私有化知识发布系统... 目录引言:为什么要自建知识博客一、技术选型:极简主义开发栈二、系统架构设计三、核心代码实现(分步解析

Python实现AVIF图片与其他图片格式间的批量转换

《Python实现AVIF图片与其他图片格式间的批量转换》这篇文章主要为大家详细介绍了如何使用Pillow库实现AVIF与其他格式的相互转换,即将AVIF转换为常见的格式,比如JPG或PNG,需要的小... 目录环境配置1.将单个 AVIF 图片转换为 JPG 和 PNG2.批量转换目录下所有 AVIF 图

详解如何通过Python批量转换图片为PDF

《详解如何通过Python批量转换图片为PDF》:本文主要介绍如何基于Python+Tkinter开发的图片批量转PDF工具,可以支持批量添加图片,拖拽等操作,感兴趣的小伙伴可以参考一下... 目录1. 概述2. 功能亮点2.1 主要功能2.2 界面设计3. 使用指南3.1 运行环境3.2 使用步骤4. 核