多线程(callable+futureTask)去组装数据,并批量入库

2024-06-19 02:32

本文主要是介绍多线程(callable+futureTask)去组装数据,并批量入库,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

      商城项目,收货地址会用到4级地址(省,市,县,镇),我们只用到了特定城市的。 但是我想通过京东的接口把全部的数据拿出来。于是就有 ------多线程(callable+futureTask)去组装数据。

---------------------------

    先贴下controller的代码:

package com.truelore.xunjia.wssc.test.controller;


import com.alibaba.fastjson.JSON;
import com.truelore.common.util.WsscHttpClientUtils;
import com.truelore.xunjia.wssc.dao.ProvinceDao;
import com.truelore.xunjia.wssc.entity.WsscArea;
import com.truelore.xunjia.wssc.entity.WsscCity;
import com.truelore.xunjia.wssc.entity.WsscProvince;
import com.truelore.xunjia.wssc.service.AreaService;
import com.truelore.xunjia.wssc.service.CityService;
import com.truelore.xunjia.wssc.service.ProvinceService;
import com.truelore.xunjia.wssc.service.TownService;
import com.truelore.xunjia.wssc.vo.JdaddressVo;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;

import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;

/**
 * 拿到京东全部的4级地址
 *  fujf
 *  201853114:49:49
 */
@RequestMapping("/wssc/test")
@Controller("testGetallJdaddress")
public class TestGetallJdaddress extends SuperToken {@Autowired
    private ProvinceService provinceService;

    @Autowired
    private ProvinceDao provinceDao;

    @Autowired
    private CityService cityService;

    @Autowired
    private AreaService areaService;

    @Autowired
    private TownService townService;

    @RequestMapping("/saveAddress")public void saveAddress(String level, HttpServletResponse resp){//level ="2";
        Long start_time = System.currentTimeMillis();   //开始时间

        List<String> parentids = new ArrayList<String>();
         if("1".equals(level)){//获取省的时候,没有parentid.  直接保存好了
             List<JdaddressVo> jdaddressVos = getaddressList(level, null);
            //save(jdaddressVos,level);
             batsave(jdaddressVos,level);

        }else if("2".equals(level)){List<WsscProvince> ProvinceList = provinceService.getLocalProvince(99);
             for (WsscProvince wsscProvince : ProvinceList) {parentids.add(wsscProvince.getLocalProvinceId());
             }}else if("3".equals(level)){List<WsscCity> cityList = cityService.queryCityByCondition(null,null,null,99);
             for (WsscCity wsscCity : cityList) {parentids.add(wsscCity.getLocalCityId());
             }}else if("4".equals(level)){List<WsscArea> areaList = areaService.queryAreaByCondition(null,null,null,99);
             for (WsscArea wsscArea : areaList) {parentids.add(wsscArea.getLocalAreaId());
             }}if(!("1".equals(level))){         //不是省级的地址获取,我们就用下面的多线程方式
      List<FutureTask<List<JdaddressVo>>> futureTasks = new ArrayList<FutureTask<List<JdaddressVo>>>();
            ExecutorService executorService = Executors.newFixedThreadPool(50);
            MycallableForaddress callable = null;


            System.out.println("****************");
            for (String parentid : parentids) {callable = new MycallableForaddress(level,parentid);
                FutureTask<List<JdaddressVo>> futureTask = new FutureTask<List<JdaddressVo>>(callable);
                futureTasks.add(futureTask);
                executorService.submit(futureTask);
              while(futureTasks.size()==500){for (FutureTask<List<JdaddressVo>> task : futureTasks) {try {List<JdaddressVo> addrlist = task.get();
                          if(null!=addrlist){//save(addrlist,level);
                              batsave(addrlist,level);   //换成批量保存
                          }} catch (Exception e) {e.printStackTrace();
                      }}futureTasks.clear();
              }}//循环结束,最后不满futureTasks.size()的也要保存起来
            while (futureTasks.size() > 0) {for (FutureTask<List<JdaddressVo>> task : futureTasks) {try {List<JdaddressVo> addrlist = task.get();
                        batsave(addrlist,level);
                    } catch (Exception e) {e.printStackTrace();
                    }}futureTasks.clear();
            }executorService.shutdown();
        }try {Long end_time = System.currentTimeMillis();
            resp.setHeader("Content-type", "text/html;charset=UTF-8");
            resp.getWriter().write("ok");
            resp.getWriter().write("共用时:"+(end_time - start_time)+"毫秒");
        } catch (IOException e) {e.printStackTrace();
        }}//批量保存
    private void batsave(List<JdaddressVo> addrlist, String level) {if("1".equals(level)){for (JdaddressVo jdaddressVo : addrlist) {WsscProvince p = new WsscProvince();
                p.setGuid(UUID.randomUUID().toString());
                p.setLocalProvinceId(jdaddressVo.getAddressId());
                p.setProvinceName(jdaddressVo.getAddressName());
                p.setTarget(99);
                p.setTargetProvinceId(jdaddressVo.getAddressId());
                provinceService.save(p);
            }}else if("2".equals(level)){cityService.batsave(addrlist);
        }else if("3".equals(level)){areaService.batsave(addrlist);
        }else{townService.batsave(addrlist);
      }}//内部线程类    根据上级id返回下级的地址list<JdaddressVo>
    class MycallableForaddress implements Callable{private String level;

        private String parentId;

       public MycallableForaddress(String level, String parentId) {this.level = level;
           this.parentId = parentId;
       }@Override
        public Object call() throws Exception {List<JdaddressVo> jdaddressVos = getaddressList(level, parentId);
            return jdaddressVos;
        }}//返回map<地区名,编号>
      private List<JdaddressVo> getaddressList(String level,String parentId) {String url = null;
          Map maps = new HashMap<String, String>();
          maps.put("token", token);

          if ("1" .equals(level)) {                      //获取省
              url="https://bizapi.jd.com/api/area/getProvince";
          } else if ("2" .equals(level)) {               //获取市
              url="https://bizapi.jd.com/api/area/getCity";
              maps.put("id", parentId);
          } else if ("3" .equals(level)) {                //获取县
              url="https://bizapi.jd.com/api/area/getCounty";
              maps.put("id", parentId);
          } else if ("4".equals(level)) {                 //获取乡
              url="https://bizapi.jd.com/api/area/getTown";
              maps.put("id", parentId);
          } else {return null;
          }String rev = WsscHttpClientUtils.post(url, maps, null);

          if (null != rev) {Map resultmaps = (Map) JSON.parse(rev);
              System.out.println(resultmaps.get("success"));
              boolean isSuccess = (boolean) resultmaps.get("success");
              if (isSuccess) {Map<String, Integer> resultmap = (Map) resultmaps.get("result");
                  //遍历map,方法1
                  List<JdaddressVo> JdaddressList = new ArrayList<>();
                  for (Object key : resultmap.keySet()) {System.out.println(key + "---->" + resultmap.get(key));
                      JdaddressVo jdaddress = new JdaddressVo();
                      jdaddress.setAddressId(resultmap.get(key).toString());
                      jdaddress.setAddressName(key.toString());
                      jdaddress.setParentAddressId(parentId);
                      JdaddressList.add(jdaddress);
                  }return JdaddressList;
              }}else{return null;
          }return null;

      }//-------------------以下是单元测试,不用理会-----------------------------------------
   // @Test
    public void getProvinceList(){String url = "https://bizapi.jd.com/api/area/getProvince";
        Map maps = new HashMap<String,String>();
        maps.put("token", token);
        String rev = WsscHttpClientUtils.post(url, maps, null);
        System.out.println(rev);
    }// @Test
    public void getCityList(){String url = "https://bizapi.jd.com/api/area/getCity";
        String parentId ="6";
        Map maps = new HashMap<String,String>();
        maps.put("id",parentId);
        maps.put("token", token);

        String rev = WsscHttpClientUtils.post(url, maps, null);
        System.out.println(rev);
    }// @Test
    public void getCountyList(){String url = "https://bizapi.jd.com/api/area/getCounty";
        String parentId ="318";
        Map maps = new HashMap<String,String>();
        maps.put("id",parentId);
        maps.put("token", token);

        String rev = WsscHttpClientUtils.post(url, maps, null);
        System.out.println(rev);
    }// @Test
    public void getTownList() {String url = "https://bizapi.jd.com/api/area/getTown";
        String parentId = "319";
        Map maps = new HashMap<String, String>();
        maps.put("id", parentId);
        maps.put("token", token);

        String rev = WsscHttpClientUtils.post(url, maps, null);
        System.out.println(rev);

        if (null != rev) {Map resultmaps = (Map) JSON.parse(rev);
            System.out.println(resultmaps.get("success"));
            boolean isSuccess = (boolean) resultmaps.get("success");
            if (isSuccess) {Map<String,Integer> resultmap = (Map) resultmaps.get("result");

               //遍历map,方法1
                for (Object key : resultmap.keySet()){System.out.println(key+"---->"+resultmap.get(key));
                }//遍历map,方法2
               /* for (Map.Entry<String,Integer> entry : resultmap.entrySet()){
                    System.out.println(entry.getKey()+"---->"+entry.getValue());
                }*/

                //遍历map,方法3   迭代器
               /* Iterator keys = resultmap.keySet().iterator();
                while (keys.hasNext()){
                   String key = (String) keys.next();
                   System.out.println(key+"--->"+resultmap.get(key));
                }*/

            }}}
}
 

这个是内部线程类的定义。

***DaoImpl中的批量保存代码。

@Override
public void batsave(final List<JdaddressVo> addrs) {this.getSession().doWork(new Work() {@Override
                         public void execute(Connection connection) throws SQLException {String sql = "insert into WSSC_CITY(GUID,CITY_NAME,LOCAL_CITYID,LOCAL_PROVINCEID,TARGET_CITYID,TARGET_PROVINCEID,TARGET) values(?,?,?,?,?,?,?)";
                            PreparedStatement ps = connection.prepareStatement(sql);
                            for (JdaddressVo addr : addrs) {ps.setString(1, UUID.randomUUID().toString());
                               ps.setString(2,addr.getAddressName());
                               ps.setString(3,addr.getAddressId());
                               ps.setString(4,addr.getParentAddressId());
                               ps.setString(5,addr.getAddressId());
                               ps.setString(6,addr.getParentAddressId());
                               ps.setInt(7,99);
                               ps.addBatch();
                            }ps.executeBatch();
                         }});

}


-----------------

经过测试,这样处理,5万条数据导入需要几分钟,快了不少。

------

感受:

要培养一种“批量”,“缓存”的思想,比如上面代码中 的 futureTasks(满500再处理);保存数据时,jdbc去批处理等。

"满一定量再去做"


这篇关于多线程(callable+futureTask)去组装数据,并批量入库的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1073756

相关文章

python处理带有时区的日期和时间数据

《python处理带有时区的日期和时间数据》这篇文章主要为大家详细介绍了如何在Python中使用pytz库处理时区信息,包括获取当前UTC时间,转换为特定时区等,有需要的小伙伴可以参考一下... 目录时区基本信息python datetime使用timezonepandas处理时区数据知识延展时区基本信息

Qt实现网络数据解析的方法总结

《Qt实现网络数据解析的方法总结》在Qt中解析网络数据通常涉及接收原始字节流,并将其转换为有意义的应用层数据,这篇文章为大家介绍了详细步骤和示例,感兴趣的小伙伴可以了解下... 目录1. 网络数据接收2. 缓冲区管理(处理粘包/拆包)3. 常见数据格式解析3.1 jsON解析3.2 XML解析3.3 自定义

SpringMVC 通过ajax 前后端数据交互的实现方法

《SpringMVC通过ajax前后端数据交互的实现方法》:本文主要介绍SpringMVC通过ajax前后端数据交互的实现方法,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价... 在前端的开发过程中,经常在html页面通过AJAX进行前后端数据的交互,SpringMVC的controll

SpringBoot整合mybatisPlus实现批量插入并获取ID详解

《SpringBoot整合mybatisPlus实现批量插入并获取ID详解》这篇文章主要为大家详细介绍了SpringBoot如何整合mybatisPlus实现批量插入并获取ID,文中的示例代码讲解详细... 目录【1】saveBATch(一万条数据总耗时:2478ms)【2】集合方式foreach(一万条数

Pandas统计每行数据中的空值的方法示例

《Pandas统计每行数据中的空值的方法示例》处理缺失数据(NaN值)是一个非常常见的问题,本文主要介绍了Pandas统计每行数据中的空值的方法示例,具有一定的参考价值,感兴趣的可以了解一下... 目录什么是空值?为什么要统计空值?准备工作创建示例数据统计每行空值数量进一步分析www.chinasem.cn处

如何使用 Python 读取 Excel 数据

《如何使用Python读取Excel数据》:本文主要介绍使用Python读取Excel数据的详细教程,通过pandas和openpyxl,你可以轻松读取Excel文件,并进行各种数据处理操... 目录使用 python 读取 Excel 数据的详细教程1. 安装必要的依赖2. 读取 Excel 文件3. 读

Spring 请求之传递 JSON 数据的操作方法

《Spring请求之传递JSON数据的操作方法》JSON就是一种数据格式,有自己的格式和语法,使用文本表示一个对象或数组的信息,因此JSON本质是字符串,主要负责在不同的语言中数据传递和交换,这... 目录jsON 概念JSON 语法JSON 的语法JSON 的两种结构JSON 字符串和 Java 对象互转

C++如何通过Qt反射机制实现数据类序列化

《C++如何通过Qt反射机制实现数据类序列化》在C++工程中经常需要使用数据类,并对数据类进行存储、打印、调试等操作,所以本文就来聊聊C++如何通过Qt反射机制实现数据类序列化吧... 目录设计预期设计思路代码实现使用方法在 C++ 工程中经常需要使用数据类,并对数据类进行存储、打印、调试等操作。由于数据类

SpringBoot使用GZIP压缩反回数据问题

《SpringBoot使用GZIP压缩反回数据问题》:本文主要介绍SpringBoot使用GZIP压缩反回数据问题,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录SpringBoot使用GZIP压缩反回数据1、初识gzip2、gzip是什么,可以干什么?3、Spr

SpringBoot集成Milvus实现数据增删改查功能

《SpringBoot集成Milvus实现数据增删改查功能》milvus支持的语言比较多,支持python,Java,Go,node等开发语言,本文主要介绍如何使用Java语言,采用springboo... 目录1、Milvus基本概念2、添加maven依赖3、配置yml文件4、创建MilvusClient