从零开发短视频电商 Low Level Client(推荐)连接OpenSearch进行CRUD

本文主要是介绍从零开发短视频电商 Low Level Client(推荐)连接OpenSearch进行CRUD,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

文章目录

      • 依赖
      • 初始化客户端
      • 发起请求
      • 请求参数
      • 请求头
      • 设置超时时间
      • 设置线程数
      • 设置用户名密码
      • 结果解析
      • 节点选择器
      • 配置嗅探器
      • 整体示例
      • 问题
      • 参考

OpenSearch开发环境安装Docker和Docker-Compose两种方式

依赖

<dependency><groupId>org.elasticsearch.client</groupId><artifactId>elasticsearch-rest-client</artifactId><version>7.13.4</version>  <!-- 建议就是这个版本 -->
</dependency>
<!-- 或者 -->
<dependency><groupId>org.opensearch.client</groupId><artifactId>opensearch-java</artifactId><version>2.8.1</version>
</dependency>

初始化客户端

// 构建客户端
RestClient restClient = RestClient.builder(new HttpHost("localhost", 9200, "http"),new HttpHost("localhost", 9201, "http")).build();

发起请求

  • performRequest: 是同步请求方法: 将阻塞调用线程,并在请求成功时返回响应,或在请求失败时引发异常
  • performRequestAsync: 是异步方法:接收一个ResponseListener对象作为参数。如果请求成功,则该参数使用响应进行调用;如果请求失败,则使用异常进行调用
// 同步请求Request request = new Request("GET","/posts/_search");Response response = restClient.performRequest(request); // 执行同步请求response.toString();// 异步请求Request request = new Request("GET", "/posts/_search");restClient.performRequestAsync(request, new ResponseListener() {@Overridepublic void onSuccess(Response response) {log.info("异步请求成功!" + response.toString());}@Overridepublic void onFailure(Exception e) {log.error("异步请求失败!");e.printStackTrace();}});

请求参数

// 第一种
request.addParameter("pretty","true");
// 第二种
request.setEntity(new NStringEntity("{\"json\":\"text\"}",ContentType.APPLICATION_JSON));
// 第三种
request.setJsonEntity("{\"json\":\"text\"}");

请求头

RequestOptions.Builder builder = RequestOptions.DEFAULT.toBuilder();builder.addHeader("Authorization", "Bearer " + "my-token");builder.setHttpAsyncResponseConsumerFactory(new HttpAsyncResponseConsumerFactory.HeapBufferedResponseConsumerFactory(30 * 1024 * 1024 * 1024));COMMON_OPTIONS = builder.build();
Request request = new Request("GET", "/");
request.setOptions(COMMON_OPTIONS);

设置超时时间

 RestClientBuilder builder = RestClient.builder(new HttpHost("localhost", 9200, "http"));builder.setRequestConfigCallback(new RestClientBuilder.RequestConfigCallback() {@Overridepublic RequestConfig.Builder customizeRequestConfig(RequestConfig.Builder builder) {return builder.setConnectTimeout(50000) // 连接超时默认1s .setSocketTimeout(10000); // 套接字超时默认30s.setConnectionRequestTimeout(10000);}});

设置线程数

Apache HTTP异常客户端默认启动一个调度程序线程,连接管理器使用多个工作线程。

        RestClientBuilder builder = RestClient.builder(new HttpHost("localhost", 9200)).setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {@Overridepublic HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpAsyncClientBuilder) {return httpAsyncClientBuilder.setDefaultIOReactorConfig(IOReactorConfig.custom().setIoThreadCount(threadNumber).build());}});

设置用户名密码

// 创建凭证提供程序,设置用户名和密码
final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials("admin", "admin"));// 使用 RestClient 构建器连接到 OpenSearch
RestClient restClient = RestClient.builder(new HttpHost("localhost", 9200, "http")).setHttpClientConfigCallback(httpClientBuilder -> {// 配置连接超时,连接建立后两个节点之间数据传输的套接字超时和连接请求超时// 连接超时:客户端和服务器建立连接的最长时间RequestConfig.Builder requestConfigBuilder = RequestConfig.custom().setConnectTimeout(5000)  // 连接超时为5秒.setSocketTimeout(10000) // 套接字超时为10秒.setConnectionRequestTimeout(10000); // 连接请求超时为10秒httpClientBuilder.setDefaultRequestConfig(requestConfigBuilder.build());// 设置凭证提供程序httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);return httpClientBuilder;}).build();

结果解析

            Response response = restClient.performRequest(new Request("GET", "/"));// 已执行请求的信息RequestLine requestLine = response.getRequestLine();// Host返回的信息HttpHost httpHost = response.getHost();// 响应状态行,从中解析状态代码int statusCode = response.getStatusLine().getStatusCode();// 响应头,可以通过getHeader(string)按名称获取Header[] headers = response.getHeaders();String responseBody = EntityUtils.toString(response.getEntity());

节点选择器

在默认情况下,客户端以轮询的方式将每个请求发送到配置的各个节点中
ES允许用户自由选择要连接的节点,通过初始化客户端来配置节点选择器,以便筛选节点。该功能在启用嗅探器时可以用来防止HTTP请求只命中专用的主节点。
配置后,对于每个请求,客户端都通过节点选择器来筛选备选节点。

        RestClientBuilder builder = RestClient.builder(new HttpHost("localhost",9200,"http"));builder.setNodeSelector(new NodeSelector(){@Overridepublic void select(Iterable<Node> nodes){boolean foundOne = false;for(Node node : nodes){String rackId = node.getAttributes().get("rack_id").get(0);if("targetId".equals(rackId)){foundOne = true;break;}}if(foundOne){Iterator<Node> nodesIt = nodes.iterator();while(nodesIt.hasNext()){Node node = nodesIt.next();String rackId = node.getAttributes().get("rack_id").get(0);if("targetId".equals(rackId) == false){nodesIt.remove();}}}}

配置嗅探器

嗅探器允许自动发现运行中ES集群中的节点,并将其设置为现有的RestClient实例
默认i情况下,嗅探器使用nodes info API检索属于集群的节点并采用jackson解析获得JSON响应

  <dependency><groupId>org.elasticsearch.client</groupId><artifactId>elasticsearch-rest-client-sniffer</artifactId><version>${elasticsearch.version}</version></dependency>

创建RestClient实例就可以采用嗅探器与其互联。嗅探器利用RestClient提供的定期机制(默认定期时间为5min),从集群中获取当前节点的列表,通过调用RestClient类中的setNodes方法来更新。

整体示例

import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.util.EntityUtils;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.ResponseException;
import org.elasticsearch.client.RestClient;import java.io.IOException;public class OpenSearchExample {public static void main(String[] args) throws IOException {// Connect to OpenSearchfinal CredentialsProvider credentialsProvider = new BasicCredentialsProvider();credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials("admin", "admin"));RestClient restClient = RestClient.builder(new HttpHost("10.12.23.1", 9200, "http")).setHttpClientConfigCallback(httpClientBuilder -> {RequestConfig.Builder requestConfigBuilder = RequestConfig.custom().setConnectTimeout(5000).setSocketTimeout(10000).setConnectionRequestTimeout(10000);httpClientBuilder.setDefaultRequestConfig(requestConfigBuilder.build());httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);return httpClientBuilder;}).build();try {// Delete IndexdeleteIndex(restClient, "my_index");// Create IndexcreateIndex(restClient, "my_index");// Index DocumentindexDocument(restClient, "{\"index\":{\"_index\":\"my_index\",\"_id\":1}}\n{ \"field\": \"value\" }\n");// Get DocumentgetDocument(restClient, "my_index");// Delete DocumentdeleteDocument(restClient, "my_index", "1");// Delete IndexdeleteIndex(restClient, "my_index");} catch (ResponseException e) {e.printStackTrace();// Handle response exceptionSystem.err.println("Error: " + e.getResponse().getStatusLine().getReasonPhrase());} finally {// Close the clientrestClient.close();}}private static void createIndex(RestClient restClient, String index) throws IOException {// Create Index requestRequest request = new Request("PUT", "/" + index);// Execute the requestrestClient.performRequest(request);}private static void indexDocument(RestClient restClient, String s ) throws IOException {// Index Document requestRequest request = new Request("POST", "/_bulk" );request.setJsonEntity(s);// Execute the requestrestClient.performRequest(request);}private static void getDocument(RestClient restClient, String index) throws IOException {// Get Document requestRequest request = new Request("GET", "/" + index +  "/_search");// Execute the requestResponse response = restClient.performRequest(request);// Handle the responseSystem.out.println("Document found: " + EntityUtils.toString(response.getEntity()));}private static void deleteDocument(RestClient restClient, String index,  String id) throws IOException {// Delete Document requestRequest request = new Request("DELETE", "/" + index  + "/_doc/" + id);// Execute the requestrestClient.performRequest(request);}private static void deleteIndex(RestClient restClient, String index) throws IOException {// Delete Index requestRequest request = new Request("DELETE", "/" + index);// Execute the requestrestClient.performRequest(request);}
}

问题

异常如下

Caused by: sun.security.validator.ValidatorException: PKIX path building failed: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested targetat java.base/sun.security.validator.PKIXValidator.doBuild(PKIXValidator.java:439)at java.base/sun.security.validator.PKIXValidator.engineValidate(PKIXValidator.java:306)at java.base/sun.security.validator.Validator.validate(Validator.java:264)at java.base/sun.security.ssl.X509TrustManagerImpl.checkTrusted(X509TrustManagerImpl.java:285)at java.base/sun.security.ssl.X509TrustManagerImpl.checkServerTrusted(X509TrustManagerImpl.java:144)at java.base/sun.security.ssl.CertificateMessage$T13CertificateConsumer.checkServerCerts(CertificateMessage.java:1335)... 19 more
Caused by: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested targetat java.base/sun.security.provider.certpath.SunCertPathBuilder.build(SunCertPathBuilder.java:146)at java.base/sun.security.provider.certpath.SunCertPathBuilder.engineBuild(SunCertPathBuilder.java:127)

解决方案

因为证书问题,我们用的是测试环境,就不要费劲的去下载私有证书再安装了,直接配置opensearch支持http即可。

opensearch.yml

plugins.security.ssl.http.enabled: false

或者直接禁用安全插件。

参考

  • https://www.cnblogs.com/openmind-ink/p/13951767.html

这篇关于从零开发短视频电商 Low Level Client(推荐)连接OpenSearch进行CRUD的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/476597

相关文章

使用Python开发一个带EPUB转换功能的Markdown编辑器

《使用Python开发一个带EPUB转换功能的Markdown编辑器》Markdown因其简单易用和强大的格式支持,成为了写作者、开发者及内容创作者的首选格式,本文将通过Python开发一个Markd... 目录应用概览代码结构与核心组件1. 初始化与布局 (__init__)2. 工具栏 (setup_t

通过Spring层面进行事务回滚的实现

《通过Spring层面进行事务回滚的实现》本文主要介绍了通过Spring层面进行事务回滚的实现,包括声明式事务和编程式事务,具有一定的参考价值,感兴趣的可以了解一下... 目录声明式事务回滚:1. 基础注解配置2. 指定回滚异常类型3. ​不回滚特殊场景编程式事务回滚:1. ​使用 TransactionT

Spring Shell 命令行实现交互式Shell应用开发

《SpringShell命令行实现交互式Shell应用开发》本文主要介绍了SpringShell命令行实现交互式Shell应用开发,能够帮助开发者快速构建功能丰富的命令行应用程序,具有一定的参考价... 目录引言一、Spring Shell概述二、创建命令类三、命令参数处理四、命令分组与帮助系统五、自定义S

Java中使用Hutool进行AES加密解密的方法举例

《Java中使用Hutool进行AES加密解密的方法举例》AES是一种对称加密,所谓对称加密就是加密与解密使用的秘钥是一个,下面:本文主要介绍Java中使用Hutool进行AES加密解密的相关资料... 目录前言一、Hutool简介与引入1.1 Hutool简介1.2 引入Hutool二、AES加密解密基础

MySQL中的交叉连接、自然连接和内连接查询详解

《MySQL中的交叉连接、自然连接和内连接查询详解》:本文主要介绍MySQL中的交叉连接、自然连接和内连接查询,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录一、引入二、交php叉连接(cross join)三、自然连接(naturalandroid join)四

mysql的基础语句和外键查询及其语句详解(推荐)

《mysql的基础语句和外键查询及其语句详解(推荐)》:本文主要介绍mysql的基础语句和外键查询及其语句详解(推荐),本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋... 目录一、mysql 基础语句1. 数据库操作 创建数据库2. 表操作 创建表3. CRUD 操作二、外键

python连接本地SQL server详细图文教程

《python连接本地SQLserver详细图文教程》在数据分析领域,经常需要从数据库中获取数据进行分析和处理,下面:本文主要介绍python连接本地SQLserver的相关资料,文中通过代码... 目录一.设置本地账号1.新建用户2.开启双重验证3,开启TCP/IP本地服务二js.python连接实例1.

基于Python和MoviePy实现照片管理和视频合成工具

《基于Python和MoviePy实现照片管理和视频合成工具》在这篇博客中,我们将详细剖析一个基于Python的图形界面应用程序,该程序使用wxPython构建用户界面,并结合MoviePy、Pill... 目录引言项目概述代码结构分析1. 导入和依赖2. 主类:PhotoManager初始化方法:__in

SpringSecurity6.0 如何通过JWTtoken进行认证授权

《SpringSecurity6.0如何通过JWTtoken进行认证授权》:本文主要介绍SpringSecurity6.0通过JWTtoken进行认证授权的过程,本文给大家介绍的非常详细,感兴趣... 目录项目依赖认证UserDetailService生成JWT token权限控制小结之前写过一个文章,从S

Python通过模块化开发优化代码的技巧分享

《Python通过模块化开发优化代码的技巧分享》模块化开发就是把代码拆成一个个“零件”,该封装封装,该拆分拆分,下面小编就来和大家简单聊聊python如何用模块化开发进行代码优化吧... 目录什么是模块化开发如何拆分代码改进版:拆分成模块让模块更强大:使用 __init__.py你一定会遇到的问题模www.