从零开发短视频电商 Low Level Client(推荐)连接OpenSearch进行CRUD

本文主要是介绍从零开发短视频电商 Low Level Client(推荐)连接OpenSearch进行CRUD,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

文章目录

      • 依赖
      • 初始化客户端
      • 发起请求
      • 请求参数
      • 请求头
      • 设置超时时间
      • 设置线程数
      • 设置用户名密码
      • 结果解析
      • 节点选择器
      • 配置嗅探器
      • 整体示例
      • 问题
      • 参考

OpenSearch开发环境安装Docker和Docker-Compose两种方式

依赖

<dependency><groupId>org.elasticsearch.client</groupId><artifactId>elasticsearch-rest-client</artifactId><version>7.13.4</version>  <!-- 建议就是这个版本 -->
</dependency>
<!-- 或者 -->
<dependency><groupId>org.opensearch.client</groupId><artifactId>opensearch-java</artifactId><version>2.8.1</version>
</dependency>

初始化客户端

// 构建客户端
RestClient restClient = RestClient.builder(new HttpHost("localhost", 9200, "http"),new HttpHost("localhost", 9201, "http")).build();

发起请求

  • performRequest: 是同步请求方法: 将阻塞调用线程,并在请求成功时返回响应,或在请求失败时引发异常
  • performRequestAsync: 是异步方法:接收一个ResponseListener对象作为参数。如果请求成功,则该参数使用响应进行调用;如果请求失败,则使用异常进行调用
// 同步请求Request request = new Request("GET","/posts/_search");Response response = restClient.performRequest(request); // 执行同步请求response.toString();// 异步请求Request request = new Request("GET", "/posts/_search");restClient.performRequestAsync(request, new ResponseListener() {@Overridepublic void onSuccess(Response response) {log.info("异步请求成功!" + response.toString());}@Overridepublic void onFailure(Exception e) {log.error("异步请求失败!");e.printStackTrace();}});

请求参数

// 第一种
request.addParameter("pretty","true");
// 第二种
request.setEntity(new NStringEntity("{\"json\":\"text\"}",ContentType.APPLICATION_JSON));
// 第三种
request.setJsonEntity("{\"json\":\"text\"}");

请求头

RequestOptions.Builder builder = RequestOptions.DEFAULT.toBuilder();builder.addHeader("Authorization", "Bearer " + "my-token");builder.setHttpAsyncResponseConsumerFactory(new HttpAsyncResponseConsumerFactory.HeapBufferedResponseConsumerFactory(30 * 1024 * 1024 * 1024));COMMON_OPTIONS = builder.build();
Request request = new Request("GET", "/");
request.setOptions(COMMON_OPTIONS);

设置超时时间

 RestClientBuilder builder = RestClient.builder(new HttpHost("localhost", 9200, "http"));builder.setRequestConfigCallback(new RestClientBuilder.RequestConfigCallback() {@Overridepublic RequestConfig.Builder customizeRequestConfig(RequestConfig.Builder builder) {return builder.setConnectTimeout(50000) // 连接超时默认1s .setSocketTimeout(10000); // 套接字超时默认30s.setConnectionRequestTimeout(10000);}});

设置线程数

Apache HTTP异常客户端默认启动一个调度程序线程,连接管理器使用多个工作线程。

        RestClientBuilder builder = RestClient.builder(new HttpHost("localhost", 9200)).setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {@Overridepublic HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpAsyncClientBuilder) {return httpAsyncClientBuilder.setDefaultIOReactorConfig(IOReactorConfig.custom().setIoThreadCount(threadNumber).build());}});

设置用户名密码

// 创建凭证提供程序,设置用户名和密码
final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials("admin", "admin"));// 使用 RestClient 构建器连接到 OpenSearch
RestClient restClient = RestClient.builder(new HttpHost("localhost", 9200, "http")).setHttpClientConfigCallback(httpClientBuilder -> {// 配置连接超时,连接建立后两个节点之间数据传输的套接字超时和连接请求超时// 连接超时:客户端和服务器建立连接的最长时间RequestConfig.Builder requestConfigBuilder = RequestConfig.custom().setConnectTimeout(5000)  // 连接超时为5秒.setSocketTimeout(10000) // 套接字超时为10秒.setConnectionRequestTimeout(10000); // 连接请求超时为10秒httpClientBuilder.setDefaultRequestConfig(requestConfigBuilder.build());// 设置凭证提供程序httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);return httpClientBuilder;}).build();

结果解析

            Response response = restClient.performRequest(new Request("GET", "/"));// 已执行请求的信息RequestLine requestLine = response.getRequestLine();// Host返回的信息HttpHost httpHost = response.getHost();// 响应状态行,从中解析状态代码int statusCode = response.getStatusLine().getStatusCode();// 响应头,可以通过getHeader(string)按名称获取Header[] headers = response.getHeaders();String responseBody = EntityUtils.toString(response.getEntity());

节点选择器

在默认情况下,客户端以轮询的方式将每个请求发送到配置的各个节点中
ES允许用户自由选择要连接的节点,通过初始化客户端来配置节点选择器,以便筛选节点。该功能在启用嗅探器时可以用来防止HTTP请求只命中专用的主节点。
配置后,对于每个请求,客户端都通过节点选择器来筛选备选节点。

        RestClientBuilder builder = RestClient.builder(new HttpHost("localhost",9200,"http"));builder.setNodeSelector(new NodeSelector(){@Overridepublic void select(Iterable<Node> nodes){boolean foundOne = false;for(Node node : nodes){String rackId = node.getAttributes().get("rack_id").get(0);if("targetId".equals(rackId)){foundOne = true;break;}}if(foundOne){Iterator<Node> nodesIt = nodes.iterator();while(nodesIt.hasNext()){Node node = nodesIt.next();String rackId = node.getAttributes().get("rack_id").get(0);if("targetId".equals(rackId) == false){nodesIt.remove();}}}}

配置嗅探器

嗅探器允许自动发现运行中ES集群中的节点,并将其设置为现有的RestClient实例
默认i情况下,嗅探器使用nodes info API检索属于集群的节点并采用jackson解析获得JSON响应

  <dependency><groupId>org.elasticsearch.client</groupId><artifactId>elasticsearch-rest-client-sniffer</artifactId><version>${elasticsearch.version}</version></dependency>

创建RestClient实例就可以采用嗅探器与其互联。嗅探器利用RestClient提供的定期机制(默认定期时间为5min),从集群中获取当前节点的列表,通过调用RestClient类中的setNodes方法来更新。

整体示例

import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.util.EntityUtils;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.ResponseException;
import org.elasticsearch.client.RestClient;import java.io.IOException;public class OpenSearchExample {public static void main(String[] args) throws IOException {// Connect to OpenSearchfinal CredentialsProvider credentialsProvider = new BasicCredentialsProvider();credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials("admin", "admin"));RestClient restClient = RestClient.builder(new HttpHost("10.12.23.1", 9200, "http")).setHttpClientConfigCallback(httpClientBuilder -> {RequestConfig.Builder requestConfigBuilder = RequestConfig.custom().setConnectTimeout(5000).setSocketTimeout(10000).setConnectionRequestTimeout(10000);httpClientBuilder.setDefaultRequestConfig(requestConfigBuilder.build());httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);return httpClientBuilder;}).build();try {// Delete IndexdeleteIndex(restClient, "my_index");// Create IndexcreateIndex(restClient, "my_index");// Index DocumentindexDocument(restClient, "{\"index\":{\"_index\":\"my_index\",\"_id\":1}}\n{ \"field\": \"value\" }\n");// Get DocumentgetDocument(restClient, "my_index");// Delete DocumentdeleteDocument(restClient, "my_index", "1");// Delete IndexdeleteIndex(restClient, "my_index");} catch (ResponseException e) {e.printStackTrace();// Handle response exceptionSystem.err.println("Error: " + e.getResponse().getStatusLine().getReasonPhrase());} finally {// Close the clientrestClient.close();}}private static void createIndex(RestClient restClient, String index) throws IOException {// Create Index requestRequest request = new Request("PUT", "/" + index);// Execute the requestrestClient.performRequest(request);}private static void indexDocument(RestClient restClient, String s ) throws IOException {// Index Document requestRequest request = new Request("POST", "/_bulk" );request.setJsonEntity(s);// Execute the requestrestClient.performRequest(request);}private static void getDocument(RestClient restClient, String index) throws IOException {// Get Document requestRequest request = new Request("GET", "/" + index +  "/_search");// Execute the requestResponse response = restClient.performRequest(request);// Handle the responseSystem.out.println("Document found: " + EntityUtils.toString(response.getEntity()));}private static void deleteDocument(RestClient restClient, String index,  String id) throws IOException {// Delete Document requestRequest request = new Request("DELETE", "/" + index  + "/_doc/" + id);// Execute the requestrestClient.performRequest(request);}private static void deleteIndex(RestClient restClient, String index) throws IOException {// Delete Index requestRequest request = new Request("DELETE", "/" + index);// Execute the requestrestClient.performRequest(request);}
}

问题

异常如下

Caused by: sun.security.validator.ValidatorException: PKIX path building failed: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested targetat java.base/sun.security.validator.PKIXValidator.doBuild(PKIXValidator.java:439)at java.base/sun.security.validator.PKIXValidator.engineValidate(PKIXValidator.java:306)at java.base/sun.security.validator.Validator.validate(Validator.java:264)at java.base/sun.security.ssl.X509TrustManagerImpl.checkTrusted(X509TrustManagerImpl.java:285)at java.base/sun.security.ssl.X509TrustManagerImpl.checkServerTrusted(X509TrustManagerImpl.java:144)at java.base/sun.security.ssl.CertificateMessage$T13CertificateConsumer.checkServerCerts(CertificateMessage.java:1335)... 19 more
Caused by: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested targetat java.base/sun.security.provider.certpath.SunCertPathBuilder.build(SunCertPathBuilder.java:146)at java.base/sun.security.provider.certpath.SunCertPathBuilder.engineBuild(SunCertPathBuilder.java:127)

解决方案

因为证书问题,我们用的是测试环境,就不要费劲的去下载私有证书再安装了,直接配置opensearch支持http即可。

opensearch.yml

plugins.security.ssl.http.enabled: false

或者直接禁用安全插件。

参考

  • https://www.cnblogs.com/openmind-ink/p/13951767.html

这篇关于从零开发短视频电商 Low Level Client(推荐)连接OpenSearch进行CRUD的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/476597

相关文章

Qt QCustomPlot库简介(最新推荐)

《QtQCustomPlot库简介(最新推荐)》QCustomPlot是一款基于Qt的高性能C++绘图库,专为二维数据可视化设计,它具有轻量级、实时处理百万级数据和多图层支持等特点,适用于科学计算、... 目录核心特性概览核心组件解析1.绘图核心 (QCustomPlot类)2.数据容器 (QCPDataC

Java内存分配与JVM参数详解(推荐)

《Java内存分配与JVM参数详解(推荐)》本文详解JVM内存结构与参数调整,涵盖堆分代、元空间、GC选择及优化策略,帮助开发者提升性能、避免内存泄漏,本文给大家介绍Java内存分配与JVM参数详解,... 目录引言JVM内存结构JVM参数概述堆内存分配年轻代与老年代调整堆内存大小调整年轻代与老年代比例元空

深度解析Java DTO(最新推荐)

《深度解析JavaDTO(最新推荐)》DTO(DataTransferObject)是一种用于在不同层(如Controller层、Service层)之间传输数据的对象设计模式,其核心目的是封装数据,... 目录一、什么是DTO?DTO的核心特点:二、为什么需要DTO?(对比Entity)三、实际应用场景解析

Go语言中nil判断的注意事项(最新推荐)

《Go语言中nil判断的注意事项(最新推荐)》本文给大家介绍Go语言中nil判断的注意事项,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友参考下吧... 目录1.接口变量的特殊行为2.nil的合法类型3.nil值的实用行为4.自定义类型与nil5.反射判断nil6.函数返回的

MyBatisPlus如何优化千万级数据的CRUD

《MyBatisPlus如何优化千万级数据的CRUD》最近负责的一个项目,数据库表量级破千万,每次执行CRUD都像走钢丝,稍有不慎就引起数据库报警,本文就结合这个项目的实战经验,聊聊MyBatisPl... 目录背景一、MyBATis Plus 简介二、千万级数据的挑战三、优化 CRUD 的关键策略1. 查

MySQL中的表连接原理分析

《MySQL中的表连接原理分析》:本文主要介绍MySQL中的表连接原理分析,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录1、背景2、环境3、表连接原理【1】驱动表和被驱动表【2】内连接【3】外连接【4编程】嵌套循环连接【5】join buffer4、总结1、背景

Golang如何对cron进行二次封装实现指定时间执行定时任务

《Golang如何对cron进行二次封装实现指定时间执行定时任务》:本文主要介绍Golang如何对cron进行二次封装实现指定时间执行定时任务问题,具有很好的参考价值,希望对大家有所帮助,如有错误... 目录背景cron库下载代码示例【1】结构体定义【2】定时任务开启【3】使用示例【4】控制台输出总结背景

python 常见数学公式函数使用详解(最新推荐)

《python常见数学公式函数使用详解(最新推荐)》文章介绍了Python的数学计算工具,涵盖内置函数、math/cmath标准库及numpy/scipy/sympy第三方库,支持从基础算术到复杂数... 目录python 数学公式与函数大全1. 基本数学运算1.1 算术运算1.2 分数与小数2. 数学函数

SpringBoot开发中十大常见陷阱深度解析与避坑指南

《SpringBoot开发中十大常见陷阱深度解析与避坑指南》在SpringBoot的开发过程中,即使是经验丰富的开发者也难免会遇到各种棘手的问题,本文将针对SpringBoot开发中十大常见的“坑... 目录引言一、配置总出错?是不是同时用了.properties和.yml?二、换个位置配置就失效?搞清楚加

Python Pillow 库详解文档(最新推荐)

《PythonPillow库详解文档(最新推荐)》Pillow是Python中最流行的图像处理库,它是PythonImagingLibrary(PIL)的现代分支和继承者,本文给大家介绍Pytho... 目录python Pillow 库详解文档简介安装核心模块架构Image 模块 - 核心图像处理基本导入