本文主要是介绍多线程(callable+futureTask)去组装数据,并批量入库,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
商城项目,收货地址会用到4级地址(省,市,县,镇),我们只用到了特定城市的。 但是我想通过京东的接口把全部的数据拿出来。于是就有 ------多线程(callable+futureTask)去组装数据。
---------------------------
先贴下controller的代码:
package com.truelore.xunjia.wssc.test.controller; import com.alibaba.fastjson.JSON; import com.truelore.common.util.WsscHttpClientUtils; import com.truelore.xunjia.wssc.dao.ProvinceDao; import com.truelore.xunjia.wssc.entity.WsscArea; import com.truelore.xunjia.wssc.entity.WsscCity; import com.truelore.xunjia.wssc.entity.WsscProvince; import com.truelore.xunjia.wssc.service.AreaService; import com.truelore.xunjia.wssc.service.CityService; import com.truelore.xunjia.wssc.service.ProvinceService; import com.truelore.xunjia.wssc.service.TownService; import com.truelore.xunjia.wssc.vo.JdaddressVo; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Controller; import org.springframework.web.bind.annotation.RequestMapping; import javax.servlet.http.HttpServletResponse; import java.io.IOException; import java.util.*; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.FutureTask; /** * 拿到京东全部的4级地址 * fujf * 2018年5月31日 14:49:49 */ @RequestMapping("/wssc/test") @Controller("testGetallJdaddress") public class TestGetallJdaddress extends SuperToken {@Autowired private ProvinceService provinceService; @Autowired private ProvinceDao provinceDao; @Autowired private CityService cityService; @Autowired private AreaService areaService; @Autowired private TownService townService; @RequestMapping("/saveAddress")public void saveAddress(String level, HttpServletResponse resp){//level ="2"; Long start_time = System.currentTimeMillis(); //开始时间 List<String> parentids = new ArrayList<String>(); if("1".equals(level)){//获取省的时候,没有parentid. 直接保存好了 List<JdaddressVo> jdaddressVos = getaddressList(level, null); //save(jdaddressVos,level); batsave(jdaddressVos,level); }else if("2".equals(level)){List<WsscProvince> ProvinceList = provinceService.getLocalProvince(99); for (WsscProvince wsscProvince : ProvinceList) {parentids.add(wsscProvince.getLocalProvinceId()); }}else if("3".equals(level)){List<WsscCity> cityList = cityService.queryCityByCondition(null,null,null,99); for (WsscCity wsscCity : cityList) {parentids.add(wsscCity.getLocalCityId()); }}else if("4".equals(level)){List<WsscArea> areaList = areaService.queryAreaByCondition(null,null,null,99); for (WsscArea wsscArea : areaList) {parentids.add(wsscArea.getLocalAreaId()); }}if(!("1".equals(level))){ //不是省级的地址获取,我们就用下面的多线程方式 List<FutureTask<List<JdaddressVo>>> futureTasks = new ArrayList<FutureTask<List<JdaddressVo>>>(); ExecutorService executorService = Executors.newFixedThreadPool(50); MycallableForaddress callable = null; System.out.println("****************"); for (String parentid : parentids) {callable = new MycallableForaddress(level,parentid); FutureTask<List<JdaddressVo>> futureTask = new FutureTask<List<JdaddressVo>>(callable); futureTasks.add(futureTask); executorService.submit(futureTask); while(futureTasks.size()==500){for (FutureTask<List<JdaddressVo>> task : futureTasks) {try {List<JdaddressVo> addrlist = task.get(); if(null!=addrlist){//save(addrlist,level); batsave(addrlist,level); //换成批量保存 }} catch (Exception e) {e.printStackTrace(); }}futureTasks.clear(); }}//循环结束,最后不满futureTasks.size()的也要保存起来 while (futureTasks.size() > 0) {for (FutureTask<List<JdaddressVo>> task : futureTasks) {try {List<JdaddressVo> addrlist = task.get(); batsave(addrlist,level); } catch (Exception e) {e.printStackTrace(); }}futureTasks.clear(); }executorService.shutdown(); }try {Long end_time = System.currentTimeMillis(); resp.setHeader("Content-type", "text/html;charset=UTF-8"); resp.getWriter().write("ok"); resp.getWriter().write("共用时:"+(end_time - start_time)+"毫秒"); } catch (IOException e) {e.printStackTrace(); }}//批量保存 private void batsave(List<JdaddressVo> addrlist, String level) {if("1".equals(level)){for (JdaddressVo jdaddressVo : addrlist) {WsscProvince p = new WsscProvince(); p.setGuid(UUID.randomUUID().toString()); p.setLocalProvinceId(jdaddressVo.getAddressId()); p.setProvinceName(jdaddressVo.getAddressName()); p.setTarget(99); p.setTargetProvinceId(jdaddressVo.getAddressId()); provinceService.save(p); }}else if("2".equals(level)){cityService.batsave(addrlist); }else if("3".equals(level)){areaService.batsave(addrlist); }else{townService.batsave(addrlist); }}//内部线程类 根据上级id返回下级的地址list<JdaddressVo> class MycallableForaddress implements Callable{private String level; private String parentId; public MycallableForaddress(String level, String parentId) {this.level = level; this.parentId = parentId; }@Override public Object call() throws Exception {List<JdaddressVo> jdaddressVos = getaddressList(level, parentId); return jdaddressVos; }}//返回map<地区名,编号> private List<JdaddressVo> getaddressList(String level,String parentId) {String url = null; Map maps = new HashMap<String, String>(); maps.put("token", token); if ("1" .equals(level)) { //获取省 url="https://bizapi.jd.com/api/area/getProvince"; } else if ("2" .equals(level)) { //获取市 url="https://bizapi.jd.com/api/area/getCity"; maps.put("id", parentId); } else if ("3" .equals(level)) { //获取县 url="https://bizapi.jd.com/api/area/getCounty"; maps.put("id", parentId); } else if ("4".equals(level)) { //获取乡 url="https://bizapi.jd.com/api/area/getTown"; maps.put("id", parentId); } else {return null; }String rev = WsscHttpClientUtils.post(url, maps, null); if (null != rev) {Map resultmaps = (Map) JSON.parse(rev); System.out.println(resultmaps.get("success")); boolean isSuccess = (boolean) resultmaps.get("success"); if (isSuccess) {Map<String, Integer> resultmap = (Map) resultmaps.get("result"); //遍历map,方法1 List<JdaddressVo> JdaddressList = new ArrayList<>(); for (Object key : resultmap.keySet()) {System.out.println(key + "---->" + resultmap.get(key)); JdaddressVo jdaddress = new JdaddressVo(); jdaddress.setAddressId(resultmap.get(key).toString()); jdaddress.setAddressName(key.toString()); jdaddress.setParentAddressId(parentId); JdaddressList.add(jdaddress); }return JdaddressList; }}else{return null; }return null; }//-------------------以下是单元测试,不用理会----------------------------------------- // @Test public void getProvinceList(){String url = "https://bizapi.jd.com/api/area/getProvince"; Map maps = new HashMap<String,String>(); maps.put("token", token); String rev = WsscHttpClientUtils.post(url, maps, null); System.out.println(rev); }// @Test public void getCityList(){String url = "https://bizapi.jd.com/api/area/getCity"; String parentId ="6"; Map maps = new HashMap<String,String>(); maps.put("id",parentId); maps.put("token", token); String rev = WsscHttpClientUtils.post(url, maps, null); System.out.println(rev); }// @Test public void getCountyList(){String url = "https://bizapi.jd.com/api/area/getCounty"; String parentId ="318"; Map maps = new HashMap<String,String>(); maps.put("id",parentId); maps.put("token", token); String rev = WsscHttpClientUtils.post(url, maps, null); System.out.println(rev); }// @Test public void getTownList() {String url = "https://bizapi.jd.com/api/area/getTown"; String parentId = "319"; Map maps = new HashMap<String, String>(); maps.put("id", parentId); maps.put("token", token); String rev = WsscHttpClientUtils.post(url, maps, null); System.out.println(rev); if (null != rev) {Map resultmaps = (Map) JSON.parse(rev); System.out.println(resultmaps.get("success")); boolean isSuccess = (boolean) resultmaps.get("success"); if (isSuccess) {Map<String,Integer> resultmap = (Map) resultmaps.get("result"); //遍历map,方法1 for (Object key : resultmap.keySet()){System.out.println(key+"---->"+resultmap.get(key)); }//遍历map,方法2 /* for (Map.Entry<String,Integer> entry : resultmap.entrySet()){ System.out.println(entry.getKey()+"---->"+entry.getValue()); }*/ //遍历map,方法3 迭代器 /* Iterator keys = resultmap.keySet().iterator(); while (keys.hasNext()){ String key = (String) keys.next(); System.out.println(key+"--->"+resultmap.get(key)); }*/ }}} }
这个是内部线程类的定义。
***DaoImpl中的批量保存代码。
@Override public void batsave(final List<JdaddressVo> addrs) {this.getSession().doWork(new Work() {@Override public void execute(Connection connection) throws SQLException {String sql = "insert into WSSC_CITY(GUID,CITY_NAME,LOCAL_CITYID,LOCAL_PROVINCEID,TARGET_CITYID,TARGET_PROVINCEID,TARGET) values(?,?,?,?,?,?,?)"; PreparedStatement ps = connection.prepareStatement(sql); for (JdaddressVo addr : addrs) {ps.setString(1, UUID.randomUUID().toString()); ps.setString(2,addr.getAddressName()); ps.setString(3,addr.getAddressId()); ps.setString(4,addr.getParentAddressId()); ps.setString(5,addr.getAddressId()); ps.setString(6,addr.getParentAddressId()); ps.setInt(7,99); ps.addBatch(); }ps.executeBatch(); }}); }
-----------------
经过测试,这样处理,5万条数据导入需要几分钟,快了不少。
------
感受:
要培养一种“批量”,“缓存”的思想,比如上面代码中 的 futureTasks(满500再处理);保存数据时,jdbc去批处理等。
"满一定量再去做"
这篇关于多线程(callable+futureTask)去组装数据,并批量入库的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!