ElasticSearch常用的增删查改操作

2024-08-28 02:38

本文主要是介绍ElasticSearch常用的增删查改操作,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

ElasticSearch常用的增删查改操作

使用Java对ElasticSearch增删查改操作,分为两个步骤:
1.拼接sql语句
2.执行增删查改操作
以下提供了一些常用的轮子。

sql拼接

1.最普通的sql拼接

	/*** Get query DSL* @param queryString* @return Query DSL*/public String getQueryDSL(String queryString) {String dsl = "{" + " \"query\": { " + " \"query_string\": { " + " \"query\":\"" + queryString + "\" " + "}" + " } "+ " } ";return dsl;}

2.查询并根据字段排序

	/*** Get query DSL with sort* @param filter* @param sortByASCOrDesc : key:sort field, value: asc or desc* @return query DSL with sort*/public String getQueryDSL(String queryString, Map<String, String> sortByASCOrDesc) {String dsl = "{" + " \"query\": { " + " \"query_string\": { " + " \"query\":\"" + queryString + "\" " + "}" + " } ";StringBuilder builder = new StringBuilder(dsl);builder.append(", \"sort\": [");int index = 0;for (Map.Entry<String, String> pars : sortByASCOrDesc.entrySet()) {if (index == 0) {builder.append(String.format("{\"%s\":\"%s\"}", pars.getKey(), pars.getValue()));} else {builder.append(String.format(",{\"%s\":\"%s\"}", pars.getKey(), pars.getValue()));}index++;}builder.append("]");builder.append(" } ");return builder.toString();}

3.查询区间并排序

	/*** Get query DSL with range* <p>queryString</p>* <p>rangeMap: Key = range field, value = key = gte or gt or lte or lt, value = value </p>* <p> sortByASCOrDesc: key:sort field, value: asc or desc. input null if no need sort </p>* @return query DSL with range*/public String getRangeQueryDSL(String queryString, Map<String, Map<String, String>> rangeMap, Map<String, String> sortByASCOrDesc) {StringBuilder strBuilder = new StringBuilder();strBuilder.append("{").append("\"query\": {").append("\"bool\": {").append("\"must\": [").append("{").append("\"query_string\": {").append(" \"query\":\"" + queryString + "\" ").append("}").append("},");int jj = 0;for (Map.Entry<String, Map<String, String>> pair : rangeMap.entrySet()) {String field = pair.getKey();Map<String, String> range = pair.getValue();if (jj == 0) {strBuilder.append("{").append("\"range\": {").append(" \"" + field + "\": { ");int ii = 0;for (Map.Entry<String, String> map : range.entrySet()) {if (ii == 0 && range.size() > 1) {strBuilder.append(" \"" + map.getKey() + "\": \"" + map.getValue() + "\" , ");} else {strBuilder.append(" \"" + map.getKey() + "\": \"" + map.getValue() + "\"  ");}ii++;}strBuilder.append("}").append("}").append("}");} else {strBuilder.append(",{").append("\"range\": {").append(" \"" + field + "\": { ");int ii = 0;for (Map.Entry<String, String> map : range.entrySet()) {if (ii == 0 && range.size() > 1) {strBuilder.append(" \"" + map.getKey() + "\": \"" + map.getValue() + "\" , ");} else {strBuilder.append(" \"" + map.getKey() + "\": \"" + map.getValue() + "\"  ");}ii++;}strBuilder.append("}").append("}").append("}");}jj++;}strBuilder.append("]").append("}").append("}");if(sortByASCOrDesc !=null){strBuilder.append(", \"sort\": [");jj = 0;for (Map.Entry<String, String> pars : sortByASCOrDesc.entrySet()) {if (jj == 0) {strBuilder.append(String.format("{\"%s\":\"%s\"}", pars.getKey(), pars.getValue()));} else {strBuilder.append(String.format(",{\"%s\":\"%s\"}", pars.getKey(), pars.getValue()));}jj++;}strBuilder.append("]");}strBuilder.append("}");return strBuilder.toString();}

4.查所有

	/*** @return*/public String getQueryAllDSL() {String dsl = "{ " + " \"query\": { " + " \"match_all\": {} " + " } " + " } ";return dsl;}

5.根据字段分组

    private String getAggsQueryDsl(String queryString, String strAggs) {return "{" + "\"query\": {" + "\"bool\": {" + "\"must\": [" + "{" + "\"query_string\": {" + "\"default_field\": \"_all\"," + "\"query\": \"" + queryString + "\"" + "}" + "}" + "]" + "}" + "}," +"\"size\": 0," +"\"aggs\": {" + strAggs +"}"+ "}";}

注意:
1).使用aggs聚合函数可以根据字段去重分组,如果有多字段需要去重分组,可以嵌套使用aggs;
2).实际项目中,建议尽量少的使用aggs,会比较吃性能,建议将数据查询后,在后台代码中进行去重分组

查询(Query)

1.执行查询操作,返回结果用实体类封装

	/*** Get Documents by query DSL* @param index* @param type* @param dsl variable sample: { "query":{ "query_string":{ "query":"field:value" } } }* @param classOfT sample: xxxVO.class* @return List<xxxVO>*/public <T> List<T> getDocListByQueryDSL(String index, String type, String dsl, Class<T> classOfT) {String scrollId = "";List<T> list;try {list = new ArrayList<T>();SearchResponse scrollResp = this.esClient.prepareSearch(index).setTypes(type).setExtraSource(dsl).setFrom(0).setSize(2000).setScroll(new TimeValue(60000)).execute().actionGet();scrollId = scrollResp.getScrollId();Gson gson = new Gson();do {for (SearchHit searchHit : scrollResp.getHits().getHits()) {list.add(gson.fromJson(searchHit.getSourceAsString(), classOfT));}scrollResp = this.esClient.prepareSearchScroll(scrollId).setScroll(new TimeValue(60000)).execute().actionGet();} while (scrollResp.getHits().getHits().length > 0);return list;} catch (Exception ex) {throw ex;} finally {this.clearESScrollId(scrollId);}}

2.执行查询操作,返回结果用map封装

	/*** Get document map by query DSL *  * @param index* @param type* @param dsl sample: { "query":{ "query_string":{ "query":"field:value" } } }* @param classOfT* @return Map<String, T> Key = document ID , value = Document VO*/public <T> Map<String, T> getDocMapByQueryDSL(String index, String type, String dsl, Class<T> classOfT) {String scrollId = "";Map<String, T> retMap;try {retMap = new HashMap<String, T>();SearchResponse scrollResp = this.esClient.prepareSearch(index).setTypes(type).setExtraSource(dsl).setFrom(0).setSize(2000).setScroll(new TimeValue(60000)).execute().actionGet();scrollId = scrollResp.getScrollId();Gson gson = new Gson();do {for (SearchHit searchHit : scrollResp.getHits().getHits()) {retMap.put(searchHit.getId(), gson.fromJson(searchHit.getSourceAsString(), classOfT));}scrollResp = this.esClient.prepareSearchScroll(scrollId).setScroll(new TimeValue(60000)).execute().actionGet();} while (scrollResp.getHits().getHits().length > 0);return retMap;} catch (Exception ex) {throw ex;} finally {this.clearESScrollId(scrollId);}}

3.根据ID查询

	/*** Get one document by document ID* * @param index* @param type* @param docID sample: AAA_BBB_CCC* @param classOfT * @return xxxVO*/public <T> T getDocByDocID(String index, String type, String docID, Class<T> classOfT) {GetResponse response = this.esClient.prepareGet(index, type, docID).get();Gson gson = new Gson();return gson.fromJson(response.getSourceAsString(), classOfT);}

4.根据ID的List查询

	/*** Get documents by their ID* * @param index* @param type* @param docIDs sample:List<AAA_BBB_CCC  >* @param classOfT* @return* @throws Exception*/public <T> List<T> getDocListByDocID(String index, String type, List<String> docIDs, Class<T> classOfT)throws Exception {String scrollId = "";List<T> list;try {list = new ArrayList<T>();SearchResponse scrollResp = this.esClient.prepareSearch(index).setTypes(type).setQuery(QueryBuilders.idsQuery().ids(docIDs)).setScroll(new TimeValue(60000)).setFrom(0).setSize(1000).execute().actionGet();Gson gson = new Gson();do {for (SearchHit searchHit : scrollResp.getHits().getHits()) {list.add(gson.fromJson(searchHit.getSourceAsString(), classOfT));}scrollId = scrollResp.getScrollId();scrollResp = this.esClient.prepareSearchScroll(scrollId).setScroll(new TimeValue(60000)).execute().actionGet();} while (scrollResp.getHits().getHits().length > 0);return list;} catch (Exception ex) {throw ex;} finally {this.clearESScrollId(scrollId);}}

5.根据ID查询字段值,返回结果用map封装

	/*** Get extra source of documents by document IDs* @param index* @param type* @param ids* @param fields* @return <p>Map<String, Object> key = id_field, value = document id_field's value.*            sample:Map<AAA_BBB_CCC_field, value></p>* @throws Exception*/public Map<String, Object> getExtraMapByDocID(String index, String type, List<String> ids, String... fields)throws Exception {SearchResponse response = this.esClient.prepareSearch(index).setTypes(type).setQuery(QueryBuilders.idsQuery().ids(ids)).setFetchSource(fields, null).setScroll(new TimeValue(60000)).setFrom(0).setSize(1000).execute().actionGet();Map<String, Object> retMap = Maps.newHashMap();while (true) {for (SearchHit hit : response.getHits().getHits()) {for (String field : fields) {String key = String.format("%s_%s", hit.getId(), field);retMap.put(key, hit.getSource().get(field));}}response = this.esClient.prepareSearchScroll(response.getScrollId()).setScroll(new TimeValue(60000)).execute().actionGet();// Break condition: No hits are returnedif (response.getHits().getHits().length == 0) {break;}}return retMap;}

6.根据sql查询字段值

	/*** Get Extra _source* * @param index* @param type* @param queryString* @param fields* @return List<Map<String, Object>> Map<String, Object> each document key &*         value List<Map<String, Object>> all document key & value* @throws Exception*/public List<Map<String, Object>> getExtraMapByQueryDSL(String index, String type, String queryDSL, String... fields)throws Exception {List<Map<String, Object>> rtnLst = Lists.newArrayList();String scrollId = "";try {SearchResponse response = this.esClient.prepareSearch(index).setTypes(type).setExtraSource(queryDSL).setFetchSource(fields, null).setScroll(new TimeValue(60000)).setFrom(0).setSize(1000).execute().actionGet();scrollId = response.getScrollId();return this.getHitsResult(response, rtnLst, fields);} catch (Exception ex) {throw ex;} finally {this.clearESScrollId(scrollId);}}/*** @param response* @param rtnLst* @param fields* @return*/private List<Map<String, Object>> getHitsResult(SearchResponse response, List<Map<String, Object>> rtnLst,String... fields) { while (true) {for (SearchHit hit : response.getHits().getHits()) {Map<String, Object> tempMap = Maps.newHashMap();for (String field : fields) {tempMap.put(field, hit.getSource().get(field));}rtnLst.add(tempMap);}response = this.esClient.prepareSearchScroll(response.getScrollId()).setScroll(new TimeValue(60000)).execute().actionGet();if (response.getHits().getHits().length == 0) {break;}}return rtnLst;}

7.查询排序后的第一笔数据

	/*** Get Top 1 document by sorted DSL* * @param index* @param type* @param dsl*            should by with sort* @param classOfT* @return*/public <T> T getTop1DocByQueryDSL(String index, String type, String dsl, Class<T> classOfT) {String scrollId = "";try {SearchResponse scrollResp = this.esClient.prepareSearch(index).setTypes(type).setExtraSource(dsl).setFrom(0).setSize(1).setScroll(new TimeValue(60000)).execute().actionGet();scrollId = scrollResp.getScrollId();Gson gson = new Gson();for (SearchHit searchHit : scrollResp.getHits().getHits()) {return gson.fromJson(searchHit.getSourceAsString(), classOfT);}return null;} catch (Exception ex) {throw ex;} finally {this.clearESScrollId(scrollId);}}

8.通过sql查询数据是否存在

	/*** Check document exist or not by query DSL* * @param queryDsl* @param esIndex* @param esType* @return* @throws Exception*/public boolean checkDocExistByQueryDSL(String esIndex, String esType, String queryDsl) throws Exception {try {SearchRequestBuilder searchBuilder = this.esClient.prepareSearch(esIndex).setTypes(esType).setFetchSource(new String[] { "_id" }, null).setExtraSource(queryDsl).setFrom(0).setSize(1).setScroll(new TimeValue(60000));SearchResponse scrollResp = searchBuilder.execute().actionGet();if (scrollResp.getHits().getTotalHits() > 0) {return true;}return false;} catch (Exception e) {throw e;}}

9.查询数据笔数

    /*** Return total count by DSL*/protected long getTotalHitsByDsl(String index, String type, String dsl) {String scrollId = "";try {SearchResponse scrollResp = this.esClient.prepareSearch(index).setTypes(type).setExtraSource(dsl).setFrom(0).setSize(0).setScroll(new TimeValue(60000)).execute().actionGet();scrollId = scrollResp.getScrollId();return scrollResp.getHits().getTotalHits();} catch (Exception ex) {throw ex;} finally {this.clearESScrollId(scrollId);}}

新增(Upsert)

注:upsert用法:有记录就更新,没有记录就新增

1.通过ID 执行upsert操作

    /*** Upsert data by VO + document ID.* * @param index* @param type* @param id* @param vo* @return*/public <T> IndexResponse upsert(String index, String type, String docID, T vo) {Gson gson = new Gson();String source = gson.toJson(vo);return this.esClient.prepareIndex(index, type, docID).setRefresh(true).setSource(source).execute().actionGet();}

2.批量upsert

    /*** @param index* @param type* @param voList* @param getDocIDMethodInVo*                sample : public String getDocumentID() { return*               String.format("%s_%s", AAA,BBB); }* @param isAutoRefresh* @throws Exception*/public <T> void bulkProcessUpsert(String index, String type, List<T> voList, String getDocIDMethodInVo, boolean isAutoRefresh) throws Exception {Gson gson = new Gson();BulkRequestBuilder bulkRequest = this.esClient.prepareBulk();bulkRequest.setTimeout(TimeValue.timeValueMinutes(20));for (T vo : voList) {Method addMethod = vo.getClass().getMethod(getDocIDMethodInVo, new Class[] {});Object result = addMethod.invoke(vo, new Object[] {});IndexRequestBuilder indexRequestBuilder = this.esClient.prepareIndex(index, type, result.toString()).setSource(gson.toJson(vo));bulkRequest.add(indexRequestBuilder);}if (bulkRequest.numberOfActions() > 0) {BulkResponse resp = bulkRequest.setRefresh(isAutoRefresh).execute().actionGet();if (resp.hasFailures()) {throw new Exception(String.format("Bulk index items failed, message:%s", resp.buildFailureMessage()));}}}

更新(Update)

1.根据ID执行update操作

	/*** update Extra fields by document ID* * @param index* @param type* @param id* @param fieldValMap:key=index field, value = value to be update* @return* @throws Exception*/public UpdateResponse update(String index, String type, String id, Map<String, Object> fieldValMap) throws Exception {return this.esClient.update(new UpdateRequest(index, type, id).refresh(true).doc(fieldValMap)).get();}

2.批量update

    /*** Update datas. Map key is doc_id, Map Value is update field-value map.*/protected void bulkProcessUpdate(String index, String type, Map<String, Map<String, Object>> mapUpdates, boolean isAutoRefresh)throws Exception {BulkRequestBuilder bulkRequest = this.esClient.prepareBulk();bulkRequest.setTimeout(TimeValue.timeValueMinutes(20));for (String docId : mapUpdates.keySet()) {Map<String, Object> mapUpdateFieldValue = mapUpdates.get(docId);XContentBuilder jsonBuilder = XContentFactory.jsonBuilder().startObject();for (String key : mapUpdateFieldValue.keySet()) {jsonBuilder.field(key, mapUpdateFieldValue.get(key));}jsonBuilder.endObject();UpdateRequestBuilder updateRequestBuilder = this.esClient.prepareUpdate(index, type, docId).setDoc(jsonBuilder);bulkRequest.add(updateRequestBuilder);}if (bulkRequest.numberOfActions() > 0) {BulkResponse resp = bulkRequest.setRefresh(isAutoRefresh).execute().actionGet();if (resp.hasFailures()) {throw new Exception(String.format("Bulk update items failed, message:%s", resp.buildFailureMessage()));}}}

删除(Delete)

1.根据ID 删除

	/*** Delete data by id*/public boolean deleteById(String index, String type, String id) {return this.esClient.prepareDelete(index, type, id).setRefresh(true).execute().actionGet().isFound();}

2.批量删除

	/*** Bulk delete by document IDs* * @param index* @param type* @param lstId* @throws Exception*/public void bulkProcessDelete(String index, String type, List<String> lstId) throws Exception {final BulkProcessor bulkProcessor = BulkProcessor.builder(this.esClient, new BulkProcessor.Listener() {public void beforeBulk(long executionId, BulkRequest request) {}public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {}public void afterBulk(long executionId, BulkRequest request, Throwable failure) {failure.printStackTrace();logger.error("ElasticsearchHelper.bulkProcessDelete.Index is {}. error is {}. ", index,failure.getMessage());}}).setBulkActions(BULK_MAX_SIZE).setBulkSize(new ByteSizeValue(1, ByteSizeUnit.GB)).setFlushInterval(TimeValue.timeValueSeconds(5)).build();try {for (String docId : lstId) {bulkProcessor.add(new DeleteRequest(index, type, docId));}} catch (Exception ex) {logger.error("ElasticsearchHelper.bulkProcessUpdateItems.Index is {}. error is {}. ", ex);throw ex;} finally {bulkProcessor.awaitClose(1, TimeUnit.MINUTES);this.esClient.admin().indices().prepareRefresh(index).get();}}

3.根据DSL删除

	/*** Delete by query* * @param queryDsl* @param index* @param type* @throws Exception*/public void deleteByQueryDsl(String queryDsl , String index, String type, boolean isAutoRefresh) throws Exception {Client client = null;try{client =this.esClient;SearchRequestBuilder searchBuilder = client.prepareSearch(index).setTypes(type).setExtraSource(queryDsl).setFrom(0).setSize(1500).setScroll(new TimeValue(60000));SearchResponse scrollResp = searchBuilder.execute().actionGet();BulkRequestBuilder bulkRequest = client.prepareBulk();bulkRequest.setTimeout(TimeValue.timeValueMinutes(20));while (true) {for (SearchHit hit : scrollResp.getHits().getHits()) {bulkRequest.add(client.prepareDelete(index, type, hit.getId()));}scrollResp = client.prepareSearchScroll(scrollResp.getScrollId()).setScroll(new TimeValue(60000)).execute().actionGet();// Break condition: No hits are returnedif (scrollResp.getHits().getHits().length == 0) {break;}}if (bulkRequest.numberOfActions() > 0) {BulkResponse response = bulkRequest.setRefresh(isAutoRefresh).execute().actionGet();if (response.hasFailures()) {throw new Exception("deleteByFilter failure:" + response.buildFailureMessage());}}}catch(Exception ex) {throw ex;}}
	/*** delete documents By Query DSL* * @param index* @param type* @param dsl* @throws Exception*/public void deleteByQueryDsl(String index, String type, String dsl) throws Exception {String scrollId = "";List<String> lstId;try {lstId = new ArrayList<String>();SearchResponse scrollResp = this.esClient.prepareSearch(index).setTypes(type).setExtraSource(dsl).setFrom(0).setSize(2000).setScroll(new TimeValue(60000)).execute().actionGet();scrollId = scrollResp.getScrollId();do {for (SearchHit searchHit : scrollResp.getHits().getHits()) {lstId.add(searchHit.getId());}scrollResp = this.esClient.prepareSearchScroll(scrollId).setScroll(new TimeValue(60000)).execute().actionGet();} while (scrollResp.getHits().getHits().length > 0);this.bulkProcessDelete(index, type, lstId);} catch (Exception ex) {throw ex;} finally {this.clearESScrollId(scrollId);}}

补充

我们在执行ES操作的时候,为了避免一次操作的数据量过大,通常会设置size;因此,我们用到了scrollId;在方法finally中,需要将scrollId清除。
例如:

public void clearESScrollId(String scrollId) {if (StringUtils.isNoneEmpty(scrollId)) {ClearScrollRequest clearScrollerRequest = new ClearScrollRequest();clearScrollerRequest.addScrollId(scrollId);this.esClient.clearScroll(clearScrollerRequest).actionGet();}}

这篇关于ElasticSearch常用的增删查改操作的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1113490

相关文章

使用MongoDB进行数据存储的操作流程

《使用MongoDB进行数据存储的操作流程》在现代应用开发中,数据存储是一个至关重要的部分,随着数据量的增大和复杂性的增加,传统的关系型数据库有时难以应对高并发和大数据量的处理需求,MongoDB作为... 目录什么是MongoDB?MongoDB的优势使用MongoDB进行数据存储1. 安装MongoDB

Linux使用fdisk进行磁盘的相关操作

《Linux使用fdisk进行磁盘的相关操作》fdisk命令是Linux中用于管理磁盘分区的强大文本实用程序,这篇文章主要为大家详细介绍了如何使用fdisk进行磁盘的相关操作,需要的可以了解下... 目录简介基本语法示例用法列出所有分区查看指定磁盘的区分管理指定的磁盘进入交互式模式创建一个新的分区删除一个存

Golang操作DuckDB实战案例分享

《Golang操作DuckDB实战案例分享》DuckDB是一个嵌入式SQL数据库引擎,它与众所周知的SQLite非常相似,但它是为olap风格的工作负载设计的,DuckDB支持各种数据类型和SQL特性... 目录DuckDB的主要优点环境准备初始化表和数据查询单行或多行错误处理和事务完整代码最后总结Duck

Java 字符数组转字符串的常用方法

《Java字符数组转字符串的常用方法》文章总结了在Java中将字符数组转换为字符串的几种常用方法,包括使用String构造函数、String.valueOf()方法、StringBuilder以及A... 目录1. 使用String构造函数1.1 基本转换方法1.2 注意事项2. 使用String.valu

C# 读写ini文件操作实现

《C#读写ini文件操作实现》本文主要介绍了C#读写ini文件操作实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧... 目录一、INI文件结构二、读取INI文件中的数据在C#应用程序中,常将INI文件作为配置文件,用于存储应用程序的

Python使用qrcode库实现生成二维码的操作指南

《Python使用qrcode库实现生成二维码的操作指南》二维码是一种广泛使用的二维条码,因其高效的数据存储能力和易于扫描的特点,广泛应用于支付、身份验证、营销推广等领域,Pythonqrcode库是... 目录一、安装 python qrcode 库二、基本使用方法1. 生成简单二维码2. 生成带 Log

Java操作ElasticSearch的实例详解

《Java操作ElasticSearch的实例详解》Elasticsearch是一个分布式的搜索和分析引擎,广泛用于全文搜索、日志分析等场景,本文将介绍如何在Java应用中使用Elastics... 目录简介环境准备1. 安装 Elasticsearch2. 添加依赖连接 Elasticsearch1. 创

java Stream操作转换方法

《javaStream操作转换方法》文章总结了Java8中流(Stream)API的多种常用方法,包括创建流、过滤、遍历、分组、排序、去重、查找、匹配、转换、归约、打印日志、最大最小值、统计、连接、... 目录流创建1、list 转 map2、filter()过滤3、foreach遍历4、groupingB

Java操作PDF文件实现签订电子合同详细教程

《Java操作PDF文件实现签订电子合同详细教程》:本文主要介绍如何在PDF中加入电子签章与电子签名的过程,包括编写Word文件、生成PDF、为PDF格式做表单、为表单赋值、生成文档以及上传到OB... 目录前言:先看效果:1.编写word文件1.2然后生成PDF格式进行保存1.3我这里是将文件保存到本地后

VUE动态绑定class类的三种常用方式及适用场景详解

《VUE动态绑定class类的三种常用方式及适用场景详解》文章介绍了在实际开发中动态绑定class的三种常见情况及其解决方案,包括根据不同的返回值渲染不同的class样式、给模块添加基础样式以及根据设... 目录前言1.动态选择class样式(对象添加:情景一)2.动态添加一个class样式(字符串添加:情