ElasticSearch常用的增删查改操作

2024-08-28 02:38

本文主要是介绍ElasticSearch常用的增删查改操作,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

ElasticSearch常用的增删查改操作

使用Java对ElasticSearch增删查改操作,分为两个步骤:
1.拼接sql语句
2.执行增删查改操作
以下提供了一些常用的轮子。

sql拼接

1.最普通的sql拼接

	/*** Get query DSL* @param queryString* @return Query DSL*/public String getQueryDSL(String queryString) {String dsl = "{" + " \"query\": { " + " \"query_string\": { " + " \"query\":\"" + queryString + "\" " + "}" + " } "+ " } ";return dsl;}

2.查询并根据字段排序

	/*** Get query DSL with sort* @param filter* @param sortByASCOrDesc : key:sort field, value: asc or desc* @return query DSL with sort*/public String getQueryDSL(String queryString, Map<String, String> sortByASCOrDesc) {String dsl = "{" + " \"query\": { " + " \"query_string\": { " + " \"query\":\"" + queryString + "\" " + "}" + " } ";StringBuilder builder = new StringBuilder(dsl);builder.append(", \"sort\": [");int index = 0;for (Map.Entry<String, String> pars : sortByASCOrDesc.entrySet()) {if (index == 0) {builder.append(String.format("{\"%s\":\"%s\"}", pars.getKey(), pars.getValue()));} else {builder.append(String.format(",{\"%s\":\"%s\"}", pars.getKey(), pars.getValue()));}index++;}builder.append("]");builder.append(" } ");return builder.toString();}

3.查询区间并排序

	/*** Get query DSL with range* <p>queryString</p>* <p>rangeMap: Key = range field, value = key = gte or gt or lte or lt, value = value </p>* <p> sortByASCOrDesc: key:sort field, value: asc or desc. input null if no need sort </p>* @return query DSL with range*/public String getRangeQueryDSL(String queryString, Map<String, Map<String, String>> rangeMap, Map<String, String> sortByASCOrDesc) {StringBuilder strBuilder = new StringBuilder();strBuilder.append("{").append("\"query\": {").append("\"bool\": {").append("\"must\": [").append("{").append("\"query_string\": {").append(" \"query\":\"" + queryString + "\" ").append("}").append("},");int jj = 0;for (Map.Entry<String, Map<String, String>> pair : rangeMap.entrySet()) {String field = pair.getKey();Map<String, String> range = pair.getValue();if (jj == 0) {strBuilder.append("{").append("\"range\": {").append(" \"" + field + "\": { ");int ii = 0;for (Map.Entry<String, String> map : range.entrySet()) {if (ii == 0 && range.size() > 1) {strBuilder.append(" \"" + map.getKey() + "\": \"" + map.getValue() + "\" , ");} else {strBuilder.append(" \"" + map.getKey() + "\": \"" + map.getValue() + "\"  ");}ii++;}strBuilder.append("}").append("}").append("}");} else {strBuilder.append(",{").append("\"range\": {").append(" \"" + field + "\": { ");int ii = 0;for (Map.Entry<String, String> map : range.entrySet()) {if (ii == 0 && range.size() > 1) {strBuilder.append(" \"" + map.getKey() + "\": \"" + map.getValue() + "\" , ");} else {strBuilder.append(" \"" + map.getKey() + "\": \"" + map.getValue() + "\"  ");}ii++;}strBuilder.append("}").append("}").append("}");}jj++;}strBuilder.append("]").append("}").append("}");if(sortByASCOrDesc !=null){strBuilder.append(", \"sort\": [");jj = 0;for (Map.Entry<String, String> pars : sortByASCOrDesc.entrySet()) {if (jj == 0) {strBuilder.append(String.format("{\"%s\":\"%s\"}", pars.getKey(), pars.getValue()));} else {strBuilder.append(String.format(",{\"%s\":\"%s\"}", pars.getKey(), pars.getValue()));}jj++;}strBuilder.append("]");}strBuilder.append("}");return strBuilder.toString();}

4.查所有

	/*** @return*/public String getQueryAllDSL() {String dsl = "{ " + " \"query\": { " + " \"match_all\": {} " + " } " + " } ";return dsl;}

5.根据字段分组

    private String getAggsQueryDsl(String queryString, String strAggs) {return "{" + "\"query\": {" + "\"bool\": {" + "\"must\": [" + "{" + "\"query_string\": {" + "\"default_field\": \"_all\"," + "\"query\": \"" + queryString + "\"" + "}" + "}" + "]" + "}" + "}," +"\"size\": 0," +"\"aggs\": {" + strAggs +"}"+ "}";}

注意:
1).使用aggs聚合函数可以根据字段去重分组,如果有多字段需要去重分组,可以嵌套使用aggs;
2).实际项目中,建议尽量少的使用aggs,会比较吃性能,建议将数据查询后,在后台代码中进行去重分组

查询(Query)

1.执行查询操作,返回结果用实体类封装

	/*** Get Documents by query DSL* @param index* @param type* @param dsl variable sample: { "query":{ "query_string":{ "query":"field:value" } } }* @param classOfT sample: xxxVO.class* @return List<xxxVO>*/public <T> List<T> getDocListByQueryDSL(String index, String type, String dsl, Class<T> classOfT) {String scrollId = "";List<T> list;try {list = new ArrayList<T>();SearchResponse scrollResp = this.esClient.prepareSearch(index).setTypes(type).setExtraSource(dsl).setFrom(0).setSize(2000).setScroll(new TimeValue(60000)).execute().actionGet();scrollId = scrollResp.getScrollId();Gson gson = new Gson();do {for (SearchHit searchHit : scrollResp.getHits().getHits()) {list.add(gson.fromJson(searchHit.getSourceAsString(), classOfT));}scrollResp = this.esClient.prepareSearchScroll(scrollId).setScroll(new TimeValue(60000)).execute().actionGet();} while (scrollResp.getHits().getHits().length > 0);return list;} catch (Exception ex) {throw ex;} finally {this.clearESScrollId(scrollId);}}

2.执行查询操作,返回结果用map封装

	/*** Get document map by query DSL *  * @param index* @param type* @param dsl sample: { "query":{ "query_string":{ "query":"field:value" } } }* @param classOfT* @return Map<String, T> Key = document ID , value = Document VO*/public <T> Map<String, T> getDocMapByQueryDSL(String index, String type, String dsl, Class<T> classOfT) {String scrollId = "";Map<String, T> retMap;try {retMap = new HashMap<String, T>();SearchResponse scrollResp = this.esClient.prepareSearch(index).setTypes(type).setExtraSource(dsl).setFrom(0).setSize(2000).setScroll(new TimeValue(60000)).execute().actionGet();scrollId = scrollResp.getScrollId();Gson gson = new Gson();do {for (SearchHit searchHit : scrollResp.getHits().getHits()) {retMap.put(searchHit.getId(), gson.fromJson(searchHit.getSourceAsString(), classOfT));}scrollResp = this.esClient.prepareSearchScroll(scrollId).setScroll(new TimeValue(60000)).execute().actionGet();} while (scrollResp.getHits().getHits().length > 0);return retMap;} catch (Exception ex) {throw ex;} finally {this.clearESScrollId(scrollId);}}

3.根据ID查询

	/*** Get one document by document ID* * @param index* @param type* @param docID sample: AAA_BBB_CCC* @param classOfT * @return xxxVO*/public <T> T getDocByDocID(String index, String type, String docID, Class<T> classOfT) {GetResponse response = this.esClient.prepareGet(index, type, docID).get();Gson gson = new Gson();return gson.fromJson(response.getSourceAsString(), classOfT);}

4.根据ID的List查询

	/*** Get documents by their ID* * @param index* @param type* @param docIDs sample:List<AAA_BBB_CCC  >* @param classOfT* @return* @throws Exception*/public <T> List<T> getDocListByDocID(String index, String type, List<String> docIDs, Class<T> classOfT)throws Exception {String scrollId = "";List<T> list;try {list = new ArrayList<T>();SearchResponse scrollResp = this.esClient.prepareSearch(index).setTypes(type).setQuery(QueryBuilders.idsQuery().ids(docIDs)).setScroll(new TimeValue(60000)).setFrom(0).setSize(1000).execute().actionGet();Gson gson = new Gson();do {for (SearchHit searchHit : scrollResp.getHits().getHits()) {list.add(gson.fromJson(searchHit.getSourceAsString(), classOfT));}scrollId = scrollResp.getScrollId();scrollResp = this.esClient.prepareSearchScroll(scrollId).setScroll(new TimeValue(60000)).execute().actionGet();} while (scrollResp.getHits().getHits().length > 0);return list;} catch (Exception ex) {throw ex;} finally {this.clearESScrollId(scrollId);}}

5.根据ID查询字段值,返回结果用map封装

	/*** Get extra source of documents by document IDs* @param index* @param type* @param ids* @param fields* @return <p>Map<String, Object> key = id_field, value = document id_field's value.*            sample:Map<AAA_BBB_CCC_field, value></p>* @throws Exception*/public Map<String, Object> getExtraMapByDocID(String index, String type, List<String> ids, String... fields)throws Exception {SearchResponse response = this.esClient.prepareSearch(index).setTypes(type).setQuery(QueryBuilders.idsQuery().ids(ids)).setFetchSource(fields, null).setScroll(new TimeValue(60000)).setFrom(0).setSize(1000).execute().actionGet();Map<String, Object> retMap = Maps.newHashMap();while (true) {for (SearchHit hit : response.getHits().getHits()) {for (String field : fields) {String key = String.format("%s_%s", hit.getId(), field);retMap.put(key, hit.getSource().get(field));}}response = this.esClient.prepareSearchScroll(response.getScrollId()).setScroll(new TimeValue(60000)).execute().actionGet();// Break condition: No hits are returnedif (response.getHits().getHits().length == 0) {break;}}return retMap;}

6.根据sql查询字段值

	/*** Get Extra _source* * @param index* @param type* @param queryString* @param fields* @return List<Map<String, Object>> Map<String, Object> each document key &*         value List<Map<String, Object>> all document key & value* @throws Exception*/public List<Map<String, Object>> getExtraMapByQueryDSL(String index, String type, String queryDSL, String... fields)throws Exception {List<Map<String, Object>> rtnLst = Lists.newArrayList();String scrollId = "";try {SearchResponse response = this.esClient.prepareSearch(index).setTypes(type).setExtraSource(queryDSL).setFetchSource(fields, null).setScroll(new TimeValue(60000)).setFrom(0).setSize(1000).execute().actionGet();scrollId = response.getScrollId();return this.getHitsResult(response, rtnLst, fields);} catch (Exception ex) {throw ex;} finally {this.clearESScrollId(scrollId);}}/*** @param response* @param rtnLst* @param fields* @return*/private List<Map<String, Object>> getHitsResult(SearchResponse response, List<Map<String, Object>> rtnLst,String... fields) { while (true) {for (SearchHit hit : response.getHits().getHits()) {Map<String, Object> tempMap = Maps.newHashMap();for (String field : fields) {tempMap.put(field, hit.getSource().get(field));}rtnLst.add(tempMap);}response = this.esClient.prepareSearchScroll(response.getScrollId()).setScroll(new TimeValue(60000)).execute().actionGet();if (response.getHits().getHits().length == 0) {break;}}return rtnLst;}

7.查询排序后的第一笔数据

	/*** Get Top 1 document by sorted DSL* * @param index* @param type* @param dsl*            should by with sort* @param classOfT* @return*/public <T> T getTop1DocByQueryDSL(String index, String type, String dsl, Class<T> classOfT) {String scrollId = "";try {SearchResponse scrollResp = this.esClient.prepareSearch(index).setTypes(type).setExtraSource(dsl).setFrom(0).setSize(1).setScroll(new TimeValue(60000)).execute().actionGet();scrollId = scrollResp.getScrollId();Gson gson = new Gson();for (SearchHit searchHit : scrollResp.getHits().getHits()) {return gson.fromJson(searchHit.getSourceAsString(), classOfT);}return null;} catch (Exception ex) {throw ex;} finally {this.clearESScrollId(scrollId);}}

8.通过sql查询数据是否存在

	/*** Check document exist or not by query DSL* * @param queryDsl* @param esIndex* @param esType* @return* @throws Exception*/public boolean checkDocExistByQueryDSL(String esIndex, String esType, String queryDsl) throws Exception {try {SearchRequestBuilder searchBuilder = this.esClient.prepareSearch(esIndex).setTypes(esType).setFetchSource(new String[] { "_id" }, null).setExtraSource(queryDsl).setFrom(0).setSize(1).setScroll(new TimeValue(60000));SearchResponse scrollResp = searchBuilder.execute().actionGet();if (scrollResp.getHits().getTotalHits() > 0) {return true;}return false;} catch (Exception e) {throw e;}}

9.查询数据笔数

    /*** Return total count by DSL*/protected long getTotalHitsByDsl(String index, String type, String dsl) {String scrollId = "";try {SearchResponse scrollResp = this.esClient.prepareSearch(index).setTypes(type).setExtraSource(dsl).setFrom(0).setSize(0).setScroll(new TimeValue(60000)).execute().actionGet();scrollId = scrollResp.getScrollId();return scrollResp.getHits().getTotalHits();} catch (Exception ex) {throw ex;} finally {this.clearESScrollId(scrollId);}}

新增(Upsert)

注:upsert用法:有记录就更新,没有记录就新增

1.通过ID 执行upsert操作

    /*** Upsert data by VO + document ID.* * @param index* @param type* @param id* @param vo* @return*/public <T> IndexResponse upsert(String index, String type, String docID, T vo) {Gson gson = new Gson();String source = gson.toJson(vo);return this.esClient.prepareIndex(index, type, docID).setRefresh(true).setSource(source).execute().actionGet();}

2.批量upsert

    /*** @param index* @param type* @param voList* @param getDocIDMethodInVo*                sample : public String getDocumentID() { return*               String.format("%s_%s", AAA,BBB); }* @param isAutoRefresh* @throws Exception*/public <T> void bulkProcessUpsert(String index, String type, List<T> voList, String getDocIDMethodInVo, boolean isAutoRefresh) throws Exception {Gson gson = new Gson();BulkRequestBuilder bulkRequest = this.esClient.prepareBulk();bulkRequest.setTimeout(TimeValue.timeValueMinutes(20));for (T vo : voList) {Method addMethod = vo.getClass().getMethod(getDocIDMethodInVo, new Class[] {});Object result = addMethod.invoke(vo, new Object[] {});IndexRequestBuilder indexRequestBuilder = this.esClient.prepareIndex(index, type, result.toString()).setSource(gson.toJson(vo));bulkRequest.add(indexRequestBuilder);}if (bulkRequest.numberOfActions() > 0) {BulkResponse resp = bulkRequest.setRefresh(isAutoRefresh).execute().actionGet();if (resp.hasFailures()) {throw new Exception(String.format("Bulk index items failed, message:%s", resp.buildFailureMessage()));}}}

更新(Update)

1.根据ID执行update操作

	/*** update Extra fields by document ID* * @param index* @param type* @param id* @param fieldValMap:key=index field, value = value to be update* @return* @throws Exception*/public UpdateResponse update(String index, String type, String id, Map<String, Object> fieldValMap) throws Exception {return this.esClient.update(new UpdateRequest(index, type, id).refresh(true).doc(fieldValMap)).get();}

2.批量update

    /*** Update datas. Map key is doc_id, Map Value is update field-value map.*/protected void bulkProcessUpdate(String index, String type, Map<String, Map<String, Object>> mapUpdates, boolean isAutoRefresh)throws Exception {BulkRequestBuilder bulkRequest = this.esClient.prepareBulk();bulkRequest.setTimeout(TimeValue.timeValueMinutes(20));for (String docId : mapUpdates.keySet()) {Map<String, Object> mapUpdateFieldValue = mapUpdates.get(docId);XContentBuilder jsonBuilder = XContentFactory.jsonBuilder().startObject();for (String key : mapUpdateFieldValue.keySet()) {jsonBuilder.field(key, mapUpdateFieldValue.get(key));}jsonBuilder.endObject();UpdateRequestBuilder updateRequestBuilder = this.esClient.prepareUpdate(index, type, docId).setDoc(jsonBuilder);bulkRequest.add(updateRequestBuilder);}if (bulkRequest.numberOfActions() > 0) {BulkResponse resp = bulkRequest.setRefresh(isAutoRefresh).execute().actionGet();if (resp.hasFailures()) {throw new Exception(String.format("Bulk update items failed, message:%s", resp.buildFailureMessage()));}}}

删除(Delete)

1.根据ID 删除

	/*** Delete data by id*/public boolean deleteById(String index, String type, String id) {return this.esClient.prepareDelete(index, type, id).setRefresh(true).execute().actionGet().isFound();}

2.批量删除

	/*** Bulk delete by document IDs* * @param index* @param type* @param lstId* @throws Exception*/public void bulkProcessDelete(String index, String type, List<String> lstId) throws Exception {final BulkProcessor bulkProcessor = BulkProcessor.builder(this.esClient, new BulkProcessor.Listener() {public void beforeBulk(long executionId, BulkRequest request) {}public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {}public void afterBulk(long executionId, BulkRequest request, Throwable failure) {failure.printStackTrace();logger.error("ElasticsearchHelper.bulkProcessDelete.Index is {}. error is {}. ", index,failure.getMessage());}}).setBulkActions(BULK_MAX_SIZE).setBulkSize(new ByteSizeValue(1, ByteSizeUnit.GB)).setFlushInterval(TimeValue.timeValueSeconds(5)).build();try {for (String docId : lstId) {bulkProcessor.add(new DeleteRequest(index, type, docId));}} catch (Exception ex) {logger.error("ElasticsearchHelper.bulkProcessUpdateItems.Index is {}. error is {}. ", ex);throw ex;} finally {bulkProcessor.awaitClose(1, TimeUnit.MINUTES);this.esClient.admin().indices().prepareRefresh(index).get();}}

3.根据DSL删除

	/*** Delete by query* * @param queryDsl* @param index* @param type* @throws Exception*/public void deleteByQueryDsl(String queryDsl , String index, String type, boolean isAutoRefresh) throws Exception {Client client = null;try{client =this.esClient;SearchRequestBuilder searchBuilder = client.prepareSearch(index).setTypes(type).setExtraSource(queryDsl).setFrom(0).setSize(1500).setScroll(new TimeValue(60000));SearchResponse scrollResp = searchBuilder.execute().actionGet();BulkRequestBuilder bulkRequest = client.prepareBulk();bulkRequest.setTimeout(TimeValue.timeValueMinutes(20));while (true) {for (SearchHit hit : scrollResp.getHits().getHits()) {bulkRequest.add(client.prepareDelete(index, type, hit.getId()));}scrollResp = client.prepareSearchScroll(scrollResp.getScrollId()).setScroll(new TimeValue(60000)).execute().actionGet();// Break condition: No hits are returnedif (scrollResp.getHits().getHits().length == 0) {break;}}if (bulkRequest.numberOfActions() > 0) {BulkResponse response = bulkRequest.setRefresh(isAutoRefresh).execute().actionGet();if (response.hasFailures()) {throw new Exception("deleteByFilter failure:" + response.buildFailureMessage());}}}catch(Exception ex) {throw ex;}}
	/*** delete documents By Query DSL* * @param index* @param type* @param dsl* @throws Exception*/public void deleteByQueryDsl(String index, String type, String dsl) throws Exception {String scrollId = "";List<String> lstId;try {lstId = new ArrayList<String>();SearchResponse scrollResp = this.esClient.prepareSearch(index).setTypes(type).setExtraSource(dsl).setFrom(0).setSize(2000).setScroll(new TimeValue(60000)).execute().actionGet();scrollId = scrollResp.getScrollId();do {for (SearchHit searchHit : scrollResp.getHits().getHits()) {lstId.add(searchHit.getId());}scrollResp = this.esClient.prepareSearchScroll(scrollId).setScroll(new TimeValue(60000)).execute().actionGet();} while (scrollResp.getHits().getHits().length > 0);this.bulkProcessDelete(index, type, lstId);} catch (Exception ex) {throw ex;} finally {this.clearESScrollId(scrollId);}}

补充

我们在执行ES操作的时候,为了避免一次操作的数据量过大,通常会设置size;因此,我们用到了scrollId;在方法finally中,需要将scrollId清除。
例如:

public void clearESScrollId(String scrollId) {if (StringUtils.isNoneEmpty(scrollId)) {ClearScrollRequest clearScrollerRequest = new ClearScrollRequest();clearScrollerRequest.addScrollId(scrollId);this.esClient.clearScroll(clearScrollerRequest).actionGet();}}

这篇关于ElasticSearch常用的增删查改操作的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1113490

相关文章

Elasticsearch 在 Java 中的使用教程

《Elasticsearch在Java中的使用教程》Elasticsearch是一个分布式搜索和分析引擎,基于ApacheLucene构建,能够实现实时数据的存储、搜索、和分析,它广泛应用于全文... 目录1. Elasticsearch 简介2. 环境准备2.1 安装 Elasticsearch2.2 J

Mysql表的简单操作(基本技能)

《Mysql表的简单操作(基本技能)》在数据库中,表的操作主要包括表的创建、查看、修改、删除等,了解如何操作这些表是数据库管理和开发的基本技能,本文给大家介绍Mysql表的简单操作,感兴趣的朋友一起看... 目录3.1 创建表 3.2 查看表结构3.3 修改表3.4 实践案例:修改表在数据库中,表的操作主要

C# WinForms存储过程操作数据库的实例讲解

《C#WinForms存储过程操作数据库的实例讲解》:本文主要介绍C#WinForms存储过程操作数据库的实例,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录一、存储过程基础二、C# 调用流程1. 数据库连接配置2. 执行存储过程(增删改)3. 查询数据三、事务处

Java使用Curator进行ZooKeeper操作的详细教程

《Java使用Curator进行ZooKeeper操作的详细教程》ApacheCurator是一个基于ZooKeeper的Java客户端库,它极大地简化了使用ZooKeeper的开发工作,在分布式系统... 目录1、简述2、核心功能2.1 CuratorFramework2.2 Recipes3、示例实践3

Java利用JSONPath操作JSON数据的技术指南

《Java利用JSONPath操作JSON数据的技术指南》JSONPath是一种强大的工具,用于查询和操作JSON数据,类似于SQL的语法,它为处理复杂的JSON数据结构提供了简单且高效... 目录1、简述2、什么是 jsONPath?3、Java 示例3.1 基本查询3.2 过滤查询3.3 递归搜索3.4

Python使用DrissionPage中ChromiumPage进行自动化网页操作

《Python使用DrissionPage中ChromiumPage进行自动化网页操作》DrissionPage作为一款轻量级且功能强大的浏览器自动化库,为开发者提供了丰富的功能支持,本文将使用Dri... 目录前言一、ChromiumPage基础操作1.初始化Drission 和 ChromiumPage

Linux上设置Ollama服务配置(常用环境变量)

《Linux上设置Ollama服务配置(常用环境变量)》本文主要介绍了Linux上设置Ollama服务配置(常用环境变量),Ollama提供了多种环境变量供配置,如调试模式、模型目录等,下面就来介绍一... 目录在 linux 上设置环境变量配置 OllamPOgxSRJfa手动安装安装特定版本查看日志在

利用Go语言开发文件操作工具轻松处理所有文件

《利用Go语言开发文件操作工具轻松处理所有文件》在后端开发中,文件操作是一个非常常见但又容易出错的场景,本文小编要向大家介绍一个强大的Go语言文件操作工具库,它能帮你轻松处理各种文件操作场景... 目录为什么需要这个工具?核心功能详解1. 文件/目录存javascript在性检查2. 批量创建目录3. 文件

Java常用注解扩展对比举例详解

《Java常用注解扩展对比举例详解》:本文主要介绍Java常用注解扩展对比的相关资料,提供了丰富的代码示例,并总结了最佳实践建议,帮助开发者更好地理解和应用这些注解,需要的朋友可以参考下... 目录一、@Controller 与 @RestController 对比二、使用 @Data 与 不使用 @Dat

Mysql中深分页的五种常用方法整理

《Mysql中深分页的五种常用方法整理》在数据量非常大的情况下,深分页查询则变得很常见,这篇文章为大家整理了5个常用的方法,文中的示例代码讲解详细,大家可以根据自己的需求进行选择... 目录方案一:延迟关联 (Deferred Join)方案二:有序唯一键分页 (Cursor-based Paginatio