本文主要是介绍ElasticSearch常用的增删查改操作,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
ElasticSearch常用的增删查改操作
使用Java对ElasticSearch增删查改操作,分为两个步骤:
1.拼接sql语句
2.执行增删查改操作
以下提供了一些常用的轮子。
sql拼接
1.最普通的sql拼接
/*** Get query DSL* @param queryString* @return Query DSL*/public String getQueryDSL(String queryString) {String dsl = "{" + " \"query\": { " + " \"query_string\": { " + " \"query\":\"" + queryString + "\" " + "}" + " } "+ " } ";return dsl;}
2.查询并根据字段排序
/*** Get query DSL with sort* @param filter* @param sortByASCOrDesc : key:sort field, value: asc or desc* @return query DSL with sort*/public String getQueryDSL(String queryString, Map<String, String> sortByASCOrDesc) {String dsl = "{" + " \"query\": { " + " \"query_string\": { " + " \"query\":\"" + queryString + "\" " + "}" + " } ";StringBuilder builder = new StringBuilder(dsl);builder.append(", \"sort\": [");int index = 0;for (Map.Entry<String, String> pars : sortByASCOrDesc.entrySet()) {if (index == 0) {builder.append(String.format("{\"%s\":\"%s\"}", pars.getKey(), pars.getValue()));} else {builder.append(String.format(",{\"%s\":\"%s\"}", pars.getKey(), pars.getValue()));}index++;}builder.append("]");builder.append(" } ");return builder.toString();}
3.查询区间并排序
/*** Get query DSL with range* <p>queryString</p>* <p>rangeMap: Key = range field, value = key = gte or gt or lte or lt, value = value </p>* <p> sortByASCOrDesc: key:sort field, value: asc or desc. input null if no need sort </p>* @return query DSL with range*/public String getRangeQueryDSL(String queryString, Map<String, Map<String, String>> rangeMap, Map<String, String> sortByASCOrDesc) {StringBuilder strBuilder = new StringBuilder();strBuilder.append("{").append("\"query\": {").append("\"bool\": {").append("\"must\": [").append("{").append("\"query_string\": {").append(" \"query\":\"" + queryString + "\" ").append("}").append("},");int jj = 0;for (Map.Entry<String, Map<String, String>> pair : rangeMap.entrySet()) {String field = pair.getKey();Map<String, String> range = pair.getValue();if (jj == 0) {strBuilder.append("{").append("\"range\": {").append(" \"" + field + "\": { ");int ii = 0;for (Map.Entry<String, String> map : range.entrySet()) {if (ii == 0 && range.size() > 1) {strBuilder.append(" \"" + map.getKey() + "\": \"" + map.getValue() + "\" , ");} else {strBuilder.append(" \"" + map.getKey() + "\": \"" + map.getValue() + "\" ");}ii++;}strBuilder.append("}").append("}").append("}");} else {strBuilder.append(",{").append("\"range\": {").append(" \"" + field + "\": { ");int ii = 0;for (Map.Entry<String, String> map : range.entrySet()) {if (ii == 0 && range.size() > 1) {strBuilder.append(" \"" + map.getKey() + "\": \"" + map.getValue() + "\" , ");} else {strBuilder.append(" \"" + map.getKey() + "\": \"" + map.getValue() + "\" ");}ii++;}strBuilder.append("}").append("}").append("}");}jj++;}strBuilder.append("]").append("}").append("}");if(sortByASCOrDesc !=null){strBuilder.append(", \"sort\": [");jj = 0;for (Map.Entry<String, String> pars : sortByASCOrDesc.entrySet()) {if (jj == 0) {strBuilder.append(String.format("{\"%s\":\"%s\"}", pars.getKey(), pars.getValue()));} else {strBuilder.append(String.format(",{\"%s\":\"%s\"}", pars.getKey(), pars.getValue()));}jj++;}strBuilder.append("]");}strBuilder.append("}");return strBuilder.toString();}
4.查所有
/*** @return*/public String getQueryAllDSL() {String dsl = "{ " + " \"query\": { " + " \"match_all\": {} " + " } " + " } ";return dsl;}
5.根据字段分组
private String getAggsQueryDsl(String queryString, String strAggs) {return "{" + "\"query\": {" + "\"bool\": {" + "\"must\": [" + "{" + "\"query_string\": {" + "\"default_field\": \"_all\"," + "\"query\": \"" + queryString + "\"" + "}" + "}" + "]" + "}" + "}," +"\"size\": 0," +"\"aggs\": {" + strAggs +"}"+ "}";}
注意:
1).使用aggs聚合函数可以根据字段去重分组,如果有多字段需要去重分组,可以嵌套使用aggs;
2).实际项目中,建议尽量少的使用aggs,会比较吃性能,建议将数据查询后,在后台代码中进行去重分组
查询(Query)
1.执行查询操作,返回结果用实体类封装
/*** Get Documents by query DSL* @param index* @param type* @param dsl variable sample: { "query":{ "query_string":{ "query":"field:value" } } }* @param classOfT sample: xxxVO.class* @return List<xxxVO>*/public <T> List<T> getDocListByQueryDSL(String index, String type, String dsl, Class<T> classOfT) {String scrollId = "";List<T> list;try {list = new ArrayList<T>();SearchResponse scrollResp = this.esClient.prepareSearch(index).setTypes(type).setExtraSource(dsl).setFrom(0).setSize(2000).setScroll(new TimeValue(60000)).execute().actionGet();scrollId = scrollResp.getScrollId();Gson gson = new Gson();do {for (SearchHit searchHit : scrollResp.getHits().getHits()) {list.add(gson.fromJson(searchHit.getSourceAsString(), classOfT));}scrollResp = this.esClient.prepareSearchScroll(scrollId).setScroll(new TimeValue(60000)).execute().actionGet();} while (scrollResp.getHits().getHits().length > 0);return list;} catch (Exception ex) {throw ex;} finally {this.clearESScrollId(scrollId);}}
2.执行查询操作,返回结果用map封装
/*** Get document map by query DSL * * @param index* @param type* @param dsl sample: { "query":{ "query_string":{ "query":"field:value" } } }* @param classOfT* @return Map<String, T> Key = document ID , value = Document VO*/public <T> Map<String, T> getDocMapByQueryDSL(String index, String type, String dsl, Class<T> classOfT) {String scrollId = "";Map<String, T> retMap;try {retMap = new HashMap<String, T>();SearchResponse scrollResp = this.esClient.prepareSearch(index).setTypes(type).setExtraSource(dsl).setFrom(0).setSize(2000).setScroll(new TimeValue(60000)).execute().actionGet();scrollId = scrollResp.getScrollId();Gson gson = new Gson();do {for (SearchHit searchHit : scrollResp.getHits().getHits()) {retMap.put(searchHit.getId(), gson.fromJson(searchHit.getSourceAsString(), classOfT));}scrollResp = this.esClient.prepareSearchScroll(scrollId).setScroll(new TimeValue(60000)).execute().actionGet();} while (scrollResp.getHits().getHits().length > 0);return retMap;} catch (Exception ex) {throw ex;} finally {this.clearESScrollId(scrollId);}}
3.根据ID查询
/*** Get one document by document ID* * @param index* @param type* @param docID sample: AAA_BBB_CCC* @param classOfT * @return xxxVO*/public <T> T getDocByDocID(String index, String type, String docID, Class<T> classOfT) {GetResponse response = this.esClient.prepareGet(index, type, docID).get();Gson gson = new Gson();return gson.fromJson(response.getSourceAsString(), classOfT);}
4.根据ID的List查询
/*** Get documents by their ID* * @param index* @param type* @param docIDs sample:List<AAA_BBB_CCC >* @param classOfT* @return* @throws Exception*/public <T> List<T> getDocListByDocID(String index, String type, List<String> docIDs, Class<T> classOfT)throws Exception {String scrollId = "";List<T> list;try {list = new ArrayList<T>();SearchResponse scrollResp = this.esClient.prepareSearch(index).setTypes(type).setQuery(QueryBuilders.idsQuery().ids(docIDs)).setScroll(new TimeValue(60000)).setFrom(0).setSize(1000).execute().actionGet();Gson gson = new Gson();do {for (SearchHit searchHit : scrollResp.getHits().getHits()) {list.add(gson.fromJson(searchHit.getSourceAsString(), classOfT));}scrollId = scrollResp.getScrollId();scrollResp = this.esClient.prepareSearchScroll(scrollId).setScroll(new TimeValue(60000)).execute().actionGet();} while (scrollResp.getHits().getHits().length > 0);return list;} catch (Exception ex) {throw ex;} finally {this.clearESScrollId(scrollId);}}
5.根据ID查询字段值,返回结果用map封装
/*** Get extra source of documents by document IDs* @param index* @param type* @param ids* @param fields* @return <p>Map<String, Object> key = id_field, value = document id_field's value.* sample:Map<AAA_BBB_CCC_field, value></p>* @throws Exception*/public Map<String, Object> getExtraMapByDocID(String index, String type, List<String> ids, String... fields)throws Exception {SearchResponse response = this.esClient.prepareSearch(index).setTypes(type).setQuery(QueryBuilders.idsQuery().ids(ids)).setFetchSource(fields, null).setScroll(new TimeValue(60000)).setFrom(0).setSize(1000).execute().actionGet();Map<String, Object> retMap = Maps.newHashMap();while (true) {for (SearchHit hit : response.getHits().getHits()) {for (String field : fields) {String key = String.format("%s_%s", hit.getId(), field);retMap.put(key, hit.getSource().get(field));}}response = this.esClient.prepareSearchScroll(response.getScrollId()).setScroll(new TimeValue(60000)).execute().actionGet();// Break condition: No hits are returnedif (response.getHits().getHits().length == 0) {break;}}return retMap;}
6.根据sql查询字段值
/*** Get Extra _source* * @param index* @param type* @param queryString* @param fields* @return List<Map<String, Object>> Map<String, Object> each document key &* value List<Map<String, Object>> all document key & value* @throws Exception*/public List<Map<String, Object>> getExtraMapByQueryDSL(String index, String type, String queryDSL, String... fields)throws Exception {List<Map<String, Object>> rtnLst = Lists.newArrayList();String scrollId = "";try {SearchResponse response = this.esClient.prepareSearch(index).setTypes(type).setExtraSource(queryDSL).setFetchSource(fields, null).setScroll(new TimeValue(60000)).setFrom(0).setSize(1000).execute().actionGet();scrollId = response.getScrollId();return this.getHitsResult(response, rtnLst, fields);} catch (Exception ex) {throw ex;} finally {this.clearESScrollId(scrollId);}}/*** @param response* @param rtnLst* @param fields* @return*/private List<Map<String, Object>> getHitsResult(SearchResponse response, List<Map<String, Object>> rtnLst,String... fields) { while (true) {for (SearchHit hit : response.getHits().getHits()) {Map<String, Object> tempMap = Maps.newHashMap();for (String field : fields) {tempMap.put(field, hit.getSource().get(field));}rtnLst.add(tempMap);}response = this.esClient.prepareSearchScroll(response.getScrollId()).setScroll(new TimeValue(60000)).execute().actionGet();if (response.getHits().getHits().length == 0) {break;}}return rtnLst;}
7.查询排序后的第一笔数据
/*** Get Top 1 document by sorted DSL* * @param index* @param type* @param dsl* should by with sort* @param classOfT* @return*/public <T> T getTop1DocByQueryDSL(String index, String type, String dsl, Class<T> classOfT) {String scrollId = "";try {SearchResponse scrollResp = this.esClient.prepareSearch(index).setTypes(type).setExtraSource(dsl).setFrom(0).setSize(1).setScroll(new TimeValue(60000)).execute().actionGet();scrollId = scrollResp.getScrollId();Gson gson = new Gson();for (SearchHit searchHit : scrollResp.getHits().getHits()) {return gson.fromJson(searchHit.getSourceAsString(), classOfT);}return null;} catch (Exception ex) {throw ex;} finally {this.clearESScrollId(scrollId);}}
8.通过sql查询数据是否存在
/*** Check document exist or not by query DSL* * @param queryDsl* @param esIndex* @param esType* @return* @throws Exception*/public boolean checkDocExistByQueryDSL(String esIndex, String esType, String queryDsl) throws Exception {try {SearchRequestBuilder searchBuilder = this.esClient.prepareSearch(esIndex).setTypes(esType).setFetchSource(new String[] { "_id" }, null).setExtraSource(queryDsl).setFrom(0).setSize(1).setScroll(new TimeValue(60000));SearchResponse scrollResp = searchBuilder.execute().actionGet();if (scrollResp.getHits().getTotalHits() > 0) {return true;}return false;} catch (Exception e) {throw e;}}
9.查询数据笔数
/*** Return total count by DSL*/protected long getTotalHitsByDsl(String index, String type, String dsl) {String scrollId = "";try {SearchResponse scrollResp = this.esClient.prepareSearch(index).setTypes(type).setExtraSource(dsl).setFrom(0).setSize(0).setScroll(new TimeValue(60000)).execute().actionGet();scrollId = scrollResp.getScrollId();return scrollResp.getHits().getTotalHits();} catch (Exception ex) {throw ex;} finally {this.clearESScrollId(scrollId);}}
新增(Upsert)
注:upsert用法:有记录就更新,没有记录就新增
1.通过ID 执行upsert操作
/*** Upsert data by VO + document ID.* * @param index* @param type* @param id* @param vo* @return*/public <T> IndexResponse upsert(String index, String type, String docID, T vo) {Gson gson = new Gson();String source = gson.toJson(vo);return this.esClient.prepareIndex(index, type, docID).setRefresh(true).setSource(source).execute().actionGet();}
2.批量upsert
/*** @param index* @param type* @param voList* @param getDocIDMethodInVo* sample : public String getDocumentID() { return* String.format("%s_%s", AAA,BBB); }* @param isAutoRefresh* @throws Exception*/public <T> void bulkProcessUpsert(String index, String type, List<T> voList, String getDocIDMethodInVo, boolean isAutoRefresh) throws Exception {Gson gson = new Gson();BulkRequestBuilder bulkRequest = this.esClient.prepareBulk();bulkRequest.setTimeout(TimeValue.timeValueMinutes(20));for (T vo : voList) {Method addMethod = vo.getClass().getMethod(getDocIDMethodInVo, new Class[] {});Object result = addMethod.invoke(vo, new Object[] {});IndexRequestBuilder indexRequestBuilder = this.esClient.prepareIndex(index, type, result.toString()).setSource(gson.toJson(vo));bulkRequest.add(indexRequestBuilder);}if (bulkRequest.numberOfActions() > 0) {BulkResponse resp = bulkRequest.setRefresh(isAutoRefresh).execute().actionGet();if (resp.hasFailures()) {throw new Exception(String.format("Bulk index items failed, message:%s", resp.buildFailureMessage()));}}}
更新(Update)
1.根据ID执行update操作
/*** update Extra fields by document ID* * @param index* @param type* @param id* @param fieldValMap:key=index field, value = value to be update* @return* @throws Exception*/public UpdateResponse update(String index, String type, String id, Map<String, Object> fieldValMap) throws Exception {return this.esClient.update(new UpdateRequest(index, type, id).refresh(true).doc(fieldValMap)).get();}
2.批量update
/*** Update datas. Map key is doc_id, Map Value is update field-value map.*/protected void bulkProcessUpdate(String index, String type, Map<String, Map<String, Object>> mapUpdates, boolean isAutoRefresh)throws Exception {BulkRequestBuilder bulkRequest = this.esClient.prepareBulk();bulkRequest.setTimeout(TimeValue.timeValueMinutes(20));for (String docId : mapUpdates.keySet()) {Map<String, Object> mapUpdateFieldValue = mapUpdates.get(docId);XContentBuilder jsonBuilder = XContentFactory.jsonBuilder().startObject();for (String key : mapUpdateFieldValue.keySet()) {jsonBuilder.field(key, mapUpdateFieldValue.get(key));}jsonBuilder.endObject();UpdateRequestBuilder updateRequestBuilder = this.esClient.prepareUpdate(index, type, docId).setDoc(jsonBuilder);bulkRequest.add(updateRequestBuilder);}if (bulkRequest.numberOfActions() > 0) {BulkResponse resp = bulkRequest.setRefresh(isAutoRefresh).execute().actionGet();if (resp.hasFailures()) {throw new Exception(String.format("Bulk update items failed, message:%s", resp.buildFailureMessage()));}}}
删除(Delete)
1.根据ID 删除
/*** Delete data by id*/public boolean deleteById(String index, String type, String id) {return this.esClient.prepareDelete(index, type, id).setRefresh(true).execute().actionGet().isFound();}
2.批量删除
/*** Bulk delete by document IDs* * @param index* @param type* @param lstId* @throws Exception*/public void bulkProcessDelete(String index, String type, List<String> lstId) throws Exception {final BulkProcessor bulkProcessor = BulkProcessor.builder(this.esClient, new BulkProcessor.Listener() {public void beforeBulk(long executionId, BulkRequest request) {}public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {}public void afterBulk(long executionId, BulkRequest request, Throwable failure) {failure.printStackTrace();logger.error("ElasticsearchHelper.bulkProcessDelete.Index is {}. error is {}. ", index,failure.getMessage());}}).setBulkActions(BULK_MAX_SIZE).setBulkSize(new ByteSizeValue(1, ByteSizeUnit.GB)).setFlushInterval(TimeValue.timeValueSeconds(5)).build();try {for (String docId : lstId) {bulkProcessor.add(new DeleteRequest(index, type, docId));}} catch (Exception ex) {logger.error("ElasticsearchHelper.bulkProcessUpdateItems.Index is {}. error is {}. ", ex);throw ex;} finally {bulkProcessor.awaitClose(1, TimeUnit.MINUTES);this.esClient.admin().indices().prepareRefresh(index).get();}}
3.根据DSL删除
/*** Delete by query* * @param queryDsl* @param index* @param type* @throws Exception*/public void deleteByQueryDsl(String queryDsl , String index, String type, boolean isAutoRefresh) throws Exception {Client client = null;try{client =this.esClient;SearchRequestBuilder searchBuilder = client.prepareSearch(index).setTypes(type).setExtraSource(queryDsl).setFrom(0).setSize(1500).setScroll(new TimeValue(60000));SearchResponse scrollResp = searchBuilder.execute().actionGet();BulkRequestBuilder bulkRequest = client.prepareBulk();bulkRequest.setTimeout(TimeValue.timeValueMinutes(20));while (true) {for (SearchHit hit : scrollResp.getHits().getHits()) {bulkRequest.add(client.prepareDelete(index, type, hit.getId()));}scrollResp = client.prepareSearchScroll(scrollResp.getScrollId()).setScroll(new TimeValue(60000)).execute().actionGet();// Break condition: No hits are returnedif (scrollResp.getHits().getHits().length == 0) {break;}}if (bulkRequest.numberOfActions() > 0) {BulkResponse response = bulkRequest.setRefresh(isAutoRefresh).execute().actionGet();if (response.hasFailures()) {throw new Exception("deleteByFilter failure:" + response.buildFailureMessage());}}}catch(Exception ex) {throw ex;}}
/*** delete documents By Query DSL* * @param index* @param type* @param dsl* @throws Exception*/public void deleteByQueryDsl(String index, String type, String dsl) throws Exception {String scrollId = "";List<String> lstId;try {lstId = new ArrayList<String>();SearchResponse scrollResp = this.esClient.prepareSearch(index).setTypes(type).setExtraSource(dsl).setFrom(0).setSize(2000).setScroll(new TimeValue(60000)).execute().actionGet();scrollId = scrollResp.getScrollId();do {for (SearchHit searchHit : scrollResp.getHits().getHits()) {lstId.add(searchHit.getId());}scrollResp = this.esClient.prepareSearchScroll(scrollId).setScroll(new TimeValue(60000)).execute().actionGet();} while (scrollResp.getHits().getHits().length > 0);this.bulkProcessDelete(index, type, lstId);} catch (Exception ex) {throw ex;} finally {this.clearESScrollId(scrollId);}}
补充
我们在执行ES操作的时候,为了避免一次操作的数据量过大,通常会设置size;因此,我们用到了scrollId;在方法finally中,需要将scrollId清除。
例如:
public void clearESScrollId(String scrollId) {if (StringUtils.isNoneEmpty(scrollId)) {ClearScrollRequest clearScrollerRequest = new ClearScrollRequest();clearScrollerRequest.addScrollId(scrollId);this.esClient.clearScroll(clearScrollerRequest).actionGet();}}
这篇关于ElasticSearch常用的增删查改操作的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!