ElasticSearch常用的增删查改操作

2024-08-28 02:38

本文主要是介绍ElasticSearch常用的增删查改操作,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

ElasticSearch常用的增删查改操作

使用Java对ElasticSearch增删查改操作,分为两个步骤:
1.拼接sql语句
2.执行增删查改操作
以下提供了一些常用的轮子。

sql拼接

1.最普通的sql拼接

	/*** Get query DSL* @param queryString* @return Query DSL*/public String getQueryDSL(String queryString) {String dsl = "{" + " \"query\": { " + " \"query_string\": { " + " \"query\":\"" + queryString + "\" " + "}" + " } "+ " } ";return dsl;}

2.查询并根据字段排序

	/*** Get query DSL with sort* @param filter* @param sortByASCOrDesc : key:sort field, value: asc or desc* @return query DSL with sort*/public String getQueryDSL(String queryString, Map<String, String> sortByASCOrDesc) {String dsl = "{" + " \"query\": { " + " \"query_string\": { " + " \"query\":\"" + queryString + "\" " + "}" + " } ";StringBuilder builder = new StringBuilder(dsl);builder.append(", \"sort\": [");int index = 0;for (Map.Entry<String, String> pars : sortByASCOrDesc.entrySet()) {if (index == 0) {builder.append(String.format("{\"%s\":\"%s\"}", pars.getKey(), pars.getValue()));} else {builder.append(String.format(",{\"%s\":\"%s\"}", pars.getKey(), pars.getValue()));}index++;}builder.append("]");builder.append(" } ");return builder.toString();}

3.查询区间并排序

	/*** Get query DSL with range* <p>queryString</p>* <p>rangeMap: Key = range field, value = key = gte or gt or lte or lt, value = value </p>* <p> sortByASCOrDesc: key:sort field, value: asc or desc. input null if no need sort </p>* @return query DSL with range*/public String getRangeQueryDSL(String queryString, Map<String, Map<String, String>> rangeMap, Map<String, String> sortByASCOrDesc) {StringBuilder strBuilder = new StringBuilder();strBuilder.append("{").append("\"query\": {").append("\"bool\": {").append("\"must\": [").append("{").append("\"query_string\": {").append(" \"query\":\"" + queryString + "\" ").append("}").append("},");int jj = 0;for (Map.Entry<String, Map<String, String>> pair : rangeMap.entrySet()) {String field = pair.getKey();Map<String, String> range = pair.getValue();if (jj == 0) {strBuilder.append("{").append("\"range\": {").append(" \"" + field + "\": { ");int ii = 0;for (Map.Entry<String, String> map : range.entrySet()) {if (ii == 0 && range.size() > 1) {strBuilder.append(" \"" + map.getKey() + "\": \"" + map.getValue() + "\" , ");} else {strBuilder.append(" \"" + map.getKey() + "\": \"" + map.getValue() + "\"  ");}ii++;}strBuilder.append("}").append("}").append("}");} else {strBuilder.append(",{").append("\"range\": {").append(" \"" + field + "\": { ");int ii = 0;for (Map.Entry<String, String> map : range.entrySet()) {if (ii == 0 && range.size() > 1) {strBuilder.append(" \"" + map.getKey() + "\": \"" + map.getValue() + "\" , ");} else {strBuilder.append(" \"" + map.getKey() + "\": \"" + map.getValue() + "\"  ");}ii++;}strBuilder.append("}").append("}").append("}");}jj++;}strBuilder.append("]").append("}").append("}");if(sortByASCOrDesc !=null){strBuilder.append(", \"sort\": [");jj = 0;for (Map.Entry<String, String> pars : sortByASCOrDesc.entrySet()) {if (jj == 0) {strBuilder.append(String.format("{\"%s\":\"%s\"}", pars.getKey(), pars.getValue()));} else {strBuilder.append(String.format(",{\"%s\":\"%s\"}", pars.getKey(), pars.getValue()));}jj++;}strBuilder.append("]");}strBuilder.append("}");return strBuilder.toString();}

4.查所有

	/*** @return*/public String getQueryAllDSL() {String dsl = "{ " + " \"query\": { " + " \"match_all\": {} " + " } " + " } ";return dsl;}

5.根据字段分组

    private String getAggsQueryDsl(String queryString, String strAggs) {return "{" + "\"query\": {" + "\"bool\": {" + "\"must\": [" + "{" + "\"query_string\": {" + "\"default_field\": \"_all\"," + "\"query\": \"" + queryString + "\"" + "}" + "}" + "]" + "}" + "}," +"\"size\": 0," +"\"aggs\": {" + strAggs +"}"+ "}";}

注意:
1).使用aggs聚合函数可以根据字段去重分组,如果有多字段需要去重分组,可以嵌套使用aggs;
2).实际项目中,建议尽量少的使用aggs,会比较吃性能,建议将数据查询后,在后台代码中进行去重分组

查询(Query)

1.执行查询操作,返回结果用实体类封装

	/*** Get Documents by query DSL* @param index* @param type* @param dsl variable sample: { "query":{ "query_string":{ "query":"field:value" } } }* @param classOfT sample: xxxVO.class* @return List<xxxVO>*/public <T> List<T> getDocListByQueryDSL(String index, String type, String dsl, Class<T> classOfT) {String scrollId = "";List<T> list;try {list = new ArrayList<T>();SearchResponse scrollResp = this.esClient.prepareSearch(index).setTypes(type).setExtraSource(dsl).setFrom(0).setSize(2000).setScroll(new TimeValue(60000)).execute().actionGet();scrollId = scrollResp.getScrollId();Gson gson = new Gson();do {for (SearchHit searchHit : scrollResp.getHits().getHits()) {list.add(gson.fromJson(searchHit.getSourceAsString(), classOfT));}scrollResp = this.esClient.prepareSearchScroll(scrollId).setScroll(new TimeValue(60000)).execute().actionGet();} while (scrollResp.getHits().getHits().length > 0);return list;} catch (Exception ex) {throw ex;} finally {this.clearESScrollId(scrollId);}}

2.执行查询操作,返回结果用map封装

	/*** Get document map by query DSL *  * @param index* @param type* @param dsl sample: { "query":{ "query_string":{ "query":"field:value" } } }* @param classOfT* @return Map<String, T> Key = document ID , value = Document VO*/public <T> Map<String, T> getDocMapByQueryDSL(String index, String type, String dsl, Class<T> classOfT) {String scrollId = "";Map<String, T> retMap;try {retMap = new HashMap<String, T>();SearchResponse scrollResp = this.esClient.prepareSearch(index).setTypes(type).setExtraSource(dsl).setFrom(0).setSize(2000).setScroll(new TimeValue(60000)).execute().actionGet();scrollId = scrollResp.getScrollId();Gson gson = new Gson();do {for (SearchHit searchHit : scrollResp.getHits().getHits()) {retMap.put(searchHit.getId(), gson.fromJson(searchHit.getSourceAsString(), classOfT));}scrollResp = this.esClient.prepareSearchScroll(scrollId).setScroll(new TimeValue(60000)).execute().actionGet();} while (scrollResp.getHits().getHits().length > 0);return retMap;} catch (Exception ex) {throw ex;} finally {this.clearESScrollId(scrollId);}}

3.根据ID查询

	/*** Get one document by document ID* * @param index* @param type* @param docID sample: AAA_BBB_CCC* @param classOfT * @return xxxVO*/public <T> T getDocByDocID(String index, String type, String docID, Class<T> classOfT) {GetResponse response = this.esClient.prepareGet(index, type, docID).get();Gson gson = new Gson();return gson.fromJson(response.getSourceAsString(), classOfT);}

4.根据ID的List查询

	/*** Get documents by their ID* * @param index* @param type* @param docIDs sample:List<AAA_BBB_CCC  >* @param classOfT* @return* @throws Exception*/public <T> List<T> getDocListByDocID(String index, String type, List<String> docIDs, Class<T> classOfT)throws Exception {String scrollId = "";List<T> list;try {list = new ArrayList<T>();SearchResponse scrollResp = this.esClient.prepareSearch(index).setTypes(type).setQuery(QueryBuilders.idsQuery().ids(docIDs)).setScroll(new TimeValue(60000)).setFrom(0).setSize(1000).execute().actionGet();Gson gson = new Gson();do {for (SearchHit searchHit : scrollResp.getHits().getHits()) {list.add(gson.fromJson(searchHit.getSourceAsString(), classOfT));}scrollId = scrollResp.getScrollId();scrollResp = this.esClient.prepareSearchScroll(scrollId).setScroll(new TimeValue(60000)).execute().actionGet();} while (scrollResp.getHits().getHits().length > 0);return list;} catch (Exception ex) {throw ex;} finally {this.clearESScrollId(scrollId);}}

5.根据ID查询字段值,返回结果用map封装

	/*** Get extra source of documents by document IDs* @param index* @param type* @param ids* @param fields* @return <p>Map<String, Object> key = id_field, value = document id_field's value.*            sample:Map<AAA_BBB_CCC_field, value></p>* @throws Exception*/public Map<String, Object> getExtraMapByDocID(String index, String type, List<String> ids, String... fields)throws Exception {SearchResponse response = this.esClient.prepareSearch(index).setTypes(type).setQuery(QueryBuilders.idsQuery().ids(ids)).setFetchSource(fields, null).setScroll(new TimeValue(60000)).setFrom(0).setSize(1000).execute().actionGet();Map<String, Object> retMap = Maps.newHashMap();while (true) {for (SearchHit hit : response.getHits().getHits()) {for (String field : fields) {String key = String.format("%s_%s", hit.getId(), field);retMap.put(key, hit.getSource().get(field));}}response = this.esClient.prepareSearchScroll(response.getScrollId()).setScroll(new TimeValue(60000)).execute().actionGet();// Break condition: No hits are returnedif (response.getHits().getHits().length == 0) {break;}}return retMap;}

6.根据sql查询字段值

	/*** Get Extra _source* * @param index* @param type* @param queryString* @param fields* @return List<Map<String, Object>> Map<String, Object> each document key &*         value List<Map<String, Object>> all document key & value* @throws Exception*/public List<Map<String, Object>> getExtraMapByQueryDSL(String index, String type, String queryDSL, String... fields)throws Exception {List<Map<String, Object>> rtnLst = Lists.newArrayList();String scrollId = "";try {SearchResponse response = this.esClient.prepareSearch(index).setTypes(type).setExtraSource(queryDSL).setFetchSource(fields, null).setScroll(new TimeValue(60000)).setFrom(0).setSize(1000).execute().actionGet();scrollId = response.getScrollId();return this.getHitsResult(response, rtnLst, fields);} catch (Exception ex) {throw ex;} finally {this.clearESScrollId(scrollId);}}/*** @param response* @param rtnLst* @param fields* @return*/private List<Map<String, Object>> getHitsResult(SearchResponse response, List<Map<String, Object>> rtnLst,String... fields) { while (true) {for (SearchHit hit : response.getHits().getHits()) {Map<String, Object> tempMap = Maps.newHashMap();for (String field : fields) {tempMap.put(field, hit.getSource().get(field));}rtnLst.add(tempMap);}response = this.esClient.prepareSearchScroll(response.getScrollId()).setScroll(new TimeValue(60000)).execute().actionGet();if (response.getHits().getHits().length == 0) {break;}}return rtnLst;}

7.查询排序后的第一笔数据

	/*** Get Top 1 document by sorted DSL* * @param index* @param type* @param dsl*            should by with sort* @param classOfT* @return*/public <T> T getTop1DocByQueryDSL(String index, String type, String dsl, Class<T> classOfT) {String scrollId = "";try {SearchResponse scrollResp = this.esClient.prepareSearch(index).setTypes(type).setExtraSource(dsl).setFrom(0).setSize(1).setScroll(new TimeValue(60000)).execute().actionGet();scrollId = scrollResp.getScrollId();Gson gson = new Gson();for (SearchHit searchHit : scrollResp.getHits().getHits()) {return gson.fromJson(searchHit.getSourceAsString(), classOfT);}return null;} catch (Exception ex) {throw ex;} finally {this.clearESScrollId(scrollId);}}

8.通过sql查询数据是否存在

	/*** Check document exist or not by query DSL* * @param queryDsl* @param esIndex* @param esType* @return* @throws Exception*/public boolean checkDocExistByQueryDSL(String esIndex, String esType, String queryDsl) throws Exception {try {SearchRequestBuilder searchBuilder = this.esClient.prepareSearch(esIndex).setTypes(esType).setFetchSource(new String[] { "_id" }, null).setExtraSource(queryDsl).setFrom(0).setSize(1).setScroll(new TimeValue(60000));SearchResponse scrollResp = searchBuilder.execute().actionGet();if (scrollResp.getHits().getTotalHits() > 0) {return true;}return false;} catch (Exception e) {throw e;}}

9.查询数据笔数

    /*** Return total count by DSL*/protected long getTotalHitsByDsl(String index, String type, String dsl) {String scrollId = "";try {SearchResponse scrollResp = this.esClient.prepareSearch(index).setTypes(type).setExtraSource(dsl).setFrom(0).setSize(0).setScroll(new TimeValue(60000)).execute().actionGet();scrollId = scrollResp.getScrollId();return scrollResp.getHits().getTotalHits();} catch (Exception ex) {throw ex;} finally {this.clearESScrollId(scrollId);}}

新增(Upsert)

注:upsert用法:有记录就更新,没有记录就新增

1.通过ID 执行upsert操作

    /*** Upsert data by VO + document ID.* * @param index* @param type* @param id* @param vo* @return*/public <T> IndexResponse upsert(String index, String type, String docID, T vo) {Gson gson = new Gson();String source = gson.toJson(vo);return this.esClient.prepareIndex(index, type, docID).setRefresh(true).setSource(source).execute().actionGet();}

2.批量upsert

    /*** @param index* @param type* @param voList* @param getDocIDMethodInVo*                sample : public String getDocumentID() { return*               String.format("%s_%s", AAA,BBB); }* @param isAutoRefresh* @throws Exception*/public <T> void bulkProcessUpsert(String index, String type, List<T> voList, String getDocIDMethodInVo, boolean isAutoRefresh) throws Exception {Gson gson = new Gson();BulkRequestBuilder bulkRequest = this.esClient.prepareBulk();bulkRequest.setTimeout(TimeValue.timeValueMinutes(20));for (T vo : voList) {Method addMethod = vo.getClass().getMethod(getDocIDMethodInVo, new Class[] {});Object result = addMethod.invoke(vo, new Object[] {});IndexRequestBuilder indexRequestBuilder = this.esClient.prepareIndex(index, type, result.toString()).setSource(gson.toJson(vo));bulkRequest.add(indexRequestBuilder);}if (bulkRequest.numberOfActions() > 0) {BulkResponse resp = bulkRequest.setRefresh(isAutoRefresh).execute().actionGet();if (resp.hasFailures()) {throw new Exception(String.format("Bulk index items failed, message:%s", resp.buildFailureMessage()));}}}

更新(Update)

1.根据ID执行update操作

	/*** update Extra fields by document ID* * @param index* @param type* @param id* @param fieldValMap:key=index field, value = value to be update* @return* @throws Exception*/public UpdateResponse update(String index, String type, String id, Map<String, Object> fieldValMap) throws Exception {return this.esClient.update(new UpdateRequest(index, type, id).refresh(true).doc(fieldValMap)).get();}

2.批量update

    /*** Update datas. Map key is doc_id, Map Value is update field-value map.*/protected void bulkProcessUpdate(String index, String type, Map<String, Map<String, Object>> mapUpdates, boolean isAutoRefresh)throws Exception {BulkRequestBuilder bulkRequest = this.esClient.prepareBulk();bulkRequest.setTimeout(TimeValue.timeValueMinutes(20));for (String docId : mapUpdates.keySet()) {Map<String, Object> mapUpdateFieldValue = mapUpdates.get(docId);XContentBuilder jsonBuilder = XContentFactory.jsonBuilder().startObject();for (String key : mapUpdateFieldValue.keySet()) {jsonBuilder.field(key, mapUpdateFieldValue.get(key));}jsonBuilder.endObject();UpdateRequestBuilder updateRequestBuilder = this.esClient.prepareUpdate(index, type, docId).setDoc(jsonBuilder);bulkRequest.add(updateRequestBuilder);}if (bulkRequest.numberOfActions() > 0) {BulkResponse resp = bulkRequest.setRefresh(isAutoRefresh).execute().actionGet();if (resp.hasFailures()) {throw new Exception(String.format("Bulk update items failed, message:%s", resp.buildFailureMessage()));}}}

删除(Delete)

1.根据ID 删除

	/*** Delete data by id*/public boolean deleteById(String index, String type, String id) {return this.esClient.prepareDelete(index, type, id).setRefresh(true).execute().actionGet().isFound();}

2.批量删除

	/*** Bulk delete by document IDs* * @param index* @param type* @param lstId* @throws Exception*/public void bulkProcessDelete(String index, String type, List<String> lstId) throws Exception {final BulkProcessor bulkProcessor = BulkProcessor.builder(this.esClient, new BulkProcessor.Listener() {public void beforeBulk(long executionId, BulkRequest request) {}public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {}public void afterBulk(long executionId, BulkRequest request, Throwable failure) {failure.printStackTrace();logger.error("ElasticsearchHelper.bulkProcessDelete.Index is {}. error is {}. ", index,failure.getMessage());}}).setBulkActions(BULK_MAX_SIZE).setBulkSize(new ByteSizeValue(1, ByteSizeUnit.GB)).setFlushInterval(TimeValue.timeValueSeconds(5)).build();try {for (String docId : lstId) {bulkProcessor.add(new DeleteRequest(index, type, docId));}} catch (Exception ex) {logger.error("ElasticsearchHelper.bulkProcessUpdateItems.Index is {}. error is {}. ", ex);throw ex;} finally {bulkProcessor.awaitClose(1, TimeUnit.MINUTES);this.esClient.admin().indices().prepareRefresh(index).get();}}

3.根据DSL删除

	/*** Delete by query* * @param queryDsl* @param index* @param type* @throws Exception*/public void deleteByQueryDsl(String queryDsl , String index, String type, boolean isAutoRefresh) throws Exception {Client client = null;try{client =this.esClient;SearchRequestBuilder searchBuilder = client.prepareSearch(index).setTypes(type).setExtraSource(queryDsl).setFrom(0).setSize(1500).setScroll(new TimeValue(60000));SearchResponse scrollResp = searchBuilder.execute().actionGet();BulkRequestBuilder bulkRequest = client.prepareBulk();bulkRequest.setTimeout(TimeValue.timeValueMinutes(20));while (true) {for (SearchHit hit : scrollResp.getHits().getHits()) {bulkRequest.add(client.prepareDelete(index, type, hit.getId()));}scrollResp = client.prepareSearchScroll(scrollResp.getScrollId()).setScroll(new TimeValue(60000)).execute().actionGet();// Break condition: No hits are returnedif (scrollResp.getHits().getHits().length == 0) {break;}}if (bulkRequest.numberOfActions() > 0) {BulkResponse response = bulkRequest.setRefresh(isAutoRefresh).execute().actionGet();if (response.hasFailures()) {throw new Exception("deleteByFilter failure:" + response.buildFailureMessage());}}}catch(Exception ex) {throw ex;}}
	/*** delete documents By Query DSL* * @param index* @param type* @param dsl* @throws Exception*/public void deleteByQueryDsl(String index, String type, String dsl) throws Exception {String scrollId = "";List<String> lstId;try {lstId = new ArrayList<String>();SearchResponse scrollResp = this.esClient.prepareSearch(index).setTypes(type).setExtraSource(dsl).setFrom(0).setSize(2000).setScroll(new TimeValue(60000)).execute().actionGet();scrollId = scrollResp.getScrollId();do {for (SearchHit searchHit : scrollResp.getHits().getHits()) {lstId.add(searchHit.getId());}scrollResp = this.esClient.prepareSearchScroll(scrollId).setScroll(new TimeValue(60000)).execute().actionGet();} while (scrollResp.getHits().getHits().length > 0);this.bulkProcessDelete(index, type, lstId);} catch (Exception ex) {throw ex;} finally {this.clearESScrollId(scrollId);}}

补充

我们在执行ES操作的时候,为了避免一次操作的数据量过大,通常会设置size;因此,我们用到了scrollId;在方法finally中,需要将scrollId清除。
例如:

public void clearESScrollId(String scrollId) {if (StringUtils.isNoneEmpty(scrollId)) {ClearScrollRequest clearScrollerRequest = new ClearScrollRequest();clearScrollerRequest.addScrollId(scrollId);this.esClient.clearScroll(clearScrollerRequest).actionGet();}}

这篇关于ElasticSearch常用的增删查改操作的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1113490

相关文章

springboot使用Scheduling实现动态增删启停定时任务教程

《springboot使用Scheduling实现动态增删启停定时任务教程》:本文主要介绍springboot使用Scheduling实现动态增删启停定时任务教程,具有很好的参考价值,希望对大家有... 目录1、配置定时任务需要的线程池2、创建ScheduledFuture的包装类3、注册定时任务,增加、删

Python的time模块一些常用功能(各种与时间相关的函数)

《Python的time模块一些常用功能(各种与时间相关的函数)》Python的time模块提供了各种与时间相关的函数,包括获取当前时间、处理时间间隔、执行时间测量等,:本文主要介绍Python的... 目录1. 获取当前时间2. 时间格式化3. 延时执行4. 时间戳运算5. 计算代码执行时间6. 转换为指

Python ZIP文件操作技巧详解

《PythonZIP文件操作技巧详解》在数据处理和系统开发中,ZIP文件操作是开发者必须掌握的核心技能,Python标准库提供的zipfile模块以简洁的API和跨平台特性,成为处理ZIP文件的首选... 目录一、ZIP文件操作基础三板斧1.1 创建压缩包1.2 解压操作1.3 文件遍历与信息获取二、进阶技

Java中字符串转时间与时间转字符串的操作详解

《Java中字符串转时间与时间转字符串的操作详解》Java的java.time包提供了强大的日期和时间处理功能,通过DateTimeFormatter可以轻松地在日期时间对象和字符串之间进行转换,下面... 目录一、字符串转时间(一)使用预定义格式(二)自定义格式二、时间转字符串(一)使用预定义格式(二)自

Python正则表达式语法及re模块中的常用函数详解

《Python正则表达式语法及re模块中的常用函数详解》这篇文章主要给大家介绍了关于Python正则表达式语法及re模块中常用函数的相关资料,正则表达式是一种强大的字符串处理工具,可以用于匹配、切分、... 目录概念、作用和步骤语法re模块中的常用函数总结 概念、作用和步骤概念: 本身也是一个字符串,其中

usb接口驱动异常问题常用解决方案

《usb接口驱动异常问题常用解决方案》当遇到USB接口驱动异常时,可以通过多种方法来解决,其中主要就包括重装USB控制器、禁用USB选择性暂停设置、更新或安装新的主板驱动等... usb接口驱动异常怎么办,USB接口驱动异常是常见问题,通常由驱动损坏、系统更新冲突、硬件故障或电源管理设置导致。以下是常用解决

Java字符串操作技巧之语法、示例与应用场景分析

《Java字符串操作技巧之语法、示例与应用场景分析》在Java算法题和日常开发中,字符串处理是必备的核心技能,本文全面梳理Java中字符串的常用操作语法,结合代码示例、应用场景和避坑指南,可快速掌握字... 目录引言1. 基础操作1.1 创建字符串1.2 获取长度1.3 访问字符2. 字符串处理2.1 子字

SpringBoot集成Milvus实现数据增删改查功能

《SpringBoot集成Milvus实现数据增删改查功能》milvus支持的语言比较多,支持python,Java,Go,node等开发语言,本文主要介绍如何使用Java语言,采用springboo... 目录1、Milvus基本概念2、添加maven依赖3、配置yml文件4、创建MilvusClient

springboot项目中常用的工具类和api详解

《springboot项目中常用的工具类和api详解》在SpringBoot项目中,开发者通常会依赖一些工具类和API来简化开发、提高效率,以下是一些常用的工具类及其典型应用场景,涵盖Spring原生... 目录1. Spring Framework 自带工具类(1) StringUtils(2) Coll

Python 中的 with open文件操作的最佳实践

《Python中的withopen文件操作的最佳实践》在Python中,withopen()提供了一个简洁而安全的方式来处理文件操作,它不仅能确保文件在操作完成后自动关闭,还能处理文件操作中的异... 目录什么是 with open()?为什么使用 with open()?使用 with open() 进行