从零开发短视频电商 Low Level Client(推荐)连接OpenSearch进行CRUD

本文主要是介绍从零开发短视频电商 Low Level Client(推荐)连接OpenSearch进行CRUD,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

文章目录

      • 依赖
      • 初始化客户端
      • 发起请求
      • 请求参数
      • 请求头
      • 设置超时时间
      • 设置线程数
      • 设置用户名密码
      • 结果解析
      • 节点选择器
      • 配置嗅探器
      • 整体示例
      • 问题
      • 参考

OpenSearch开发环境安装Docker和Docker-Compose两种方式

依赖

<dependency><groupId>org.elasticsearch.client</groupId><artifactId>elasticsearch-rest-client</artifactId><version>7.13.4</version>  <!-- 建议就是这个版本 -->
</dependency>
<!-- 或者 -->
<dependency><groupId>org.opensearch.client</groupId><artifactId>opensearch-java</artifactId><version>2.8.1</version>
</dependency>

初始化客户端

// 构建客户端
RestClient restClient = RestClient.builder(new HttpHost("localhost", 9200, "http"),new HttpHost("localhost", 9201, "http")).build();

发起请求

  • performRequest: 是同步请求方法: 将阻塞调用线程,并在请求成功时返回响应,或在请求失败时引发异常
  • performRequestAsync: 是异步方法:接收一个ResponseListener对象作为参数。如果请求成功,则该参数使用响应进行调用;如果请求失败,则使用异常进行调用
// 同步请求Request request = new Request("GET","/posts/_search");Response response = restClient.performRequest(request); // 执行同步请求response.toString();// 异步请求Request request = new Request("GET", "/posts/_search");restClient.performRequestAsync(request, new ResponseListener() {@Overridepublic void onSuccess(Response response) {log.info("异步请求成功!" + response.toString());}@Overridepublic void onFailure(Exception e) {log.error("异步请求失败!");e.printStackTrace();}});

请求参数

// 第一种
request.addParameter("pretty","true");
// 第二种
request.setEntity(new NStringEntity("{\"json\":\"text\"}",ContentType.APPLICATION_JSON));
// 第三种
request.setJsonEntity("{\"json\":\"text\"}");

请求头

RequestOptions.Builder builder = RequestOptions.DEFAULT.toBuilder();builder.addHeader("Authorization", "Bearer " + "my-token");builder.setHttpAsyncResponseConsumerFactory(new HttpAsyncResponseConsumerFactory.HeapBufferedResponseConsumerFactory(30 * 1024 * 1024 * 1024));COMMON_OPTIONS = builder.build();
Request request = new Request("GET", "/");
request.setOptions(COMMON_OPTIONS);

设置超时时间

 RestClientBuilder builder = RestClient.builder(new HttpHost("localhost", 9200, "http"));builder.setRequestConfigCallback(new RestClientBuilder.RequestConfigCallback() {@Overridepublic RequestConfig.Builder customizeRequestConfig(RequestConfig.Builder builder) {return builder.setConnectTimeout(50000) // 连接超时默认1s .setSocketTimeout(10000); // 套接字超时默认30s.setConnectionRequestTimeout(10000);}});

设置线程数

Apache HTTP异常客户端默认启动一个调度程序线程,连接管理器使用多个工作线程。

        RestClientBuilder builder = RestClient.builder(new HttpHost("localhost", 9200)).setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {@Overridepublic HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpAsyncClientBuilder) {return httpAsyncClientBuilder.setDefaultIOReactorConfig(IOReactorConfig.custom().setIoThreadCount(threadNumber).build());}});

设置用户名密码

// 创建凭证提供程序,设置用户名和密码
final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials("admin", "admin"));// 使用 RestClient 构建器连接到 OpenSearch
RestClient restClient = RestClient.builder(new HttpHost("localhost", 9200, "http")).setHttpClientConfigCallback(httpClientBuilder -> {// 配置连接超时,连接建立后两个节点之间数据传输的套接字超时和连接请求超时// 连接超时:客户端和服务器建立连接的最长时间RequestConfig.Builder requestConfigBuilder = RequestConfig.custom().setConnectTimeout(5000)  // 连接超时为5秒.setSocketTimeout(10000) // 套接字超时为10秒.setConnectionRequestTimeout(10000); // 连接请求超时为10秒httpClientBuilder.setDefaultRequestConfig(requestConfigBuilder.build());// 设置凭证提供程序httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);return httpClientBuilder;}).build();

结果解析

            Response response = restClient.performRequest(new Request("GET", "/"));// 已执行请求的信息RequestLine requestLine = response.getRequestLine();// Host返回的信息HttpHost httpHost = response.getHost();// 响应状态行,从中解析状态代码int statusCode = response.getStatusLine().getStatusCode();// 响应头,可以通过getHeader(string)按名称获取Header[] headers = response.getHeaders();String responseBody = EntityUtils.toString(response.getEntity());

节点选择器

在默认情况下,客户端以轮询的方式将每个请求发送到配置的各个节点中
ES允许用户自由选择要连接的节点,通过初始化客户端来配置节点选择器,以便筛选节点。该功能在启用嗅探器时可以用来防止HTTP请求只命中专用的主节点。
配置后,对于每个请求,客户端都通过节点选择器来筛选备选节点。

        RestClientBuilder builder = RestClient.builder(new HttpHost("localhost",9200,"http"));builder.setNodeSelector(new NodeSelector(){@Overridepublic void select(Iterable<Node> nodes){boolean foundOne = false;for(Node node : nodes){String rackId = node.getAttributes().get("rack_id").get(0);if("targetId".equals(rackId)){foundOne = true;break;}}if(foundOne){Iterator<Node> nodesIt = nodes.iterator();while(nodesIt.hasNext()){Node node = nodesIt.next();String rackId = node.getAttributes().get("rack_id").get(0);if("targetId".equals(rackId) == false){nodesIt.remove();}}}}

配置嗅探器

嗅探器允许自动发现运行中ES集群中的节点,并将其设置为现有的RestClient实例
默认i情况下,嗅探器使用nodes info API检索属于集群的节点并采用jackson解析获得JSON响应

  <dependency><groupId>org.elasticsearch.client</groupId><artifactId>elasticsearch-rest-client-sniffer</artifactId><version>${elasticsearch.version}</version></dependency>

创建RestClient实例就可以采用嗅探器与其互联。嗅探器利用RestClient提供的定期机制(默认定期时间为5min),从集群中获取当前节点的列表,通过调用RestClient类中的setNodes方法来更新。

整体示例

import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.util.EntityUtils;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.ResponseException;
import org.elasticsearch.client.RestClient;import java.io.IOException;public class OpenSearchExample {public static void main(String[] args) throws IOException {// Connect to OpenSearchfinal CredentialsProvider credentialsProvider = new BasicCredentialsProvider();credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials("admin", "admin"));RestClient restClient = RestClient.builder(new HttpHost("10.12.23.1", 9200, "http")).setHttpClientConfigCallback(httpClientBuilder -> {RequestConfig.Builder requestConfigBuilder = RequestConfig.custom().setConnectTimeout(5000).setSocketTimeout(10000).setConnectionRequestTimeout(10000);httpClientBuilder.setDefaultRequestConfig(requestConfigBuilder.build());httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);return httpClientBuilder;}).build();try {// Delete IndexdeleteIndex(restClient, "my_index");// Create IndexcreateIndex(restClient, "my_index");// Index DocumentindexDocument(restClient, "{\"index\":{\"_index\":\"my_index\",\"_id\":1}}\n{ \"field\": \"value\" }\n");// Get DocumentgetDocument(restClient, "my_index");// Delete DocumentdeleteDocument(restClient, "my_index", "1");// Delete IndexdeleteIndex(restClient, "my_index");} catch (ResponseException e) {e.printStackTrace();// Handle response exceptionSystem.err.println("Error: " + e.getResponse().getStatusLine().getReasonPhrase());} finally {// Close the clientrestClient.close();}}private static void createIndex(RestClient restClient, String index) throws IOException {// Create Index requestRequest request = new Request("PUT", "/" + index);// Execute the requestrestClient.performRequest(request);}private static void indexDocument(RestClient restClient, String s ) throws IOException {// Index Document requestRequest request = new Request("POST", "/_bulk" );request.setJsonEntity(s);// Execute the requestrestClient.performRequest(request);}private static void getDocument(RestClient restClient, String index) throws IOException {// Get Document requestRequest request = new Request("GET", "/" + index +  "/_search");// Execute the requestResponse response = restClient.performRequest(request);// Handle the responseSystem.out.println("Document found: " + EntityUtils.toString(response.getEntity()));}private static void deleteDocument(RestClient restClient, String index,  String id) throws IOException {// Delete Document requestRequest request = new Request("DELETE", "/" + index  + "/_doc/" + id);// Execute the requestrestClient.performRequest(request);}private static void deleteIndex(RestClient restClient, String index) throws IOException {// Delete Index requestRequest request = new Request("DELETE", "/" + index);// Execute the requestrestClient.performRequest(request);}
}

问题

异常如下

Caused by: sun.security.validator.ValidatorException: PKIX path building failed: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested targetat java.base/sun.security.validator.PKIXValidator.doBuild(PKIXValidator.java:439)at java.base/sun.security.validator.PKIXValidator.engineValidate(PKIXValidator.java:306)at java.base/sun.security.validator.Validator.validate(Validator.java:264)at java.base/sun.security.ssl.X509TrustManagerImpl.checkTrusted(X509TrustManagerImpl.java:285)at java.base/sun.security.ssl.X509TrustManagerImpl.checkServerTrusted(X509TrustManagerImpl.java:144)at java.base/sun.security.ssl.CertificateMessage$T13CertificateConsumer.checkServerCerts(CertificateMessage.java:1335)... 19 more
Caused by: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested targetat java.base/sun.security.provider.certpath.SunCertPathBuilder.build(SunCertPathBuilder.java:146)at java.base/sun.security.provider.certpath.SunCertPathBuilder.engineBuild(SunCertPathBuilder.java:127)

解决方案

因为证书问题,我们用的是测试环境,就不要费劲的去下载私有证书再安装了,直接配置opensearch支持http即可。

opensearch.yml

plugins.security.ssl.http.enabled: false

或者直接禁用安全插件。

参考

  • https://www.cnblogs.com/openmind-ink/p/13951767.html

这篇关于从零开发短视频电商 Low Level Client(推荐)连接OpenSearch进行CRUD的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/476597

相关文章

SpringBoot 多环境开发实战(从配置、管理与控制)

《SpringBoot多环境开发实战(从配置、管理与控制)》本文详解SpringBoot多环境配置,涵盖单文件YAML、多文件模式、MavenProfile分组及激活策略,通过优先级控制灵活切换环境... 目录一、多环境开发基础(单文件 YAML 版)(一)配置原理与优势(二)实操示例二、多环境开发多文件版

使用docker搭建嵌入式Linux开发环境

《使用docker搭建嵌入式Linux开发环境》本文主要介绍了使用docker搭建嵌入式Linux开发环境,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面... 目录1、前言2、安装docker3、编写容器管理脚本4、创建容器1、前言在日常开发全志、rk等不同

Python实战之SEO优化自动化工具开发指南

《Python实战之SEO优化自动化工具开发指南》在数字化营销时代,搜索引擎优化(SEO)已成为网站获取流量的重要手段,本文将带您使用Python开发一套完整的SEO自动化工具,需要的可以了解下... 目录前言项目概述技术栈选择核心模块实现1. 关键词研究模块2. 网站技术seo检测模块3. 内容优化分析模

Mac电脑如何通过 IntelliJ IDEA 远程连接 MySQL

《Mac电脑如何通过IntelliJIDEA远程连接MySQL》本文详解Mac通过IntelliJIDEA远程连接MySQL的步骤,本文通过图文并茂的形式给大家介绍的非常详细,感兴趣的朋友跟... 目录MAC电脑通过 IntelliJ IDEA 远程连接 mysql 的详细教程一、前缀条件确认二、打开 ID

Nginx中配置使用非默认80端口进行服务的完整指南

《Nginx中配置使用非默认80端口进行服务的完整指南》在实际生产环境中,我们经常需要将Nginx配置在其他端口上运行,本文将详细介绍如何在Nginx中配置使用非默认端口进行服务,希望对大家有所帮助... 目录一、为什么需要使用非默认端口二、配置Nginx使用非默认端口的基本方法2.1 修改listen指令

基于Java开发一个极简版敏感词检测工具

《基于Java开发一个极简版敏感词检测工具》这篇文章主要为大家详细介绍了如何基于Java开发一个极简版敏感词检测工具,文中的示例代码简洁易懂,感兴趣的小伙伴可以跟随小编一起学习一下... 目录你是否还在为敏感词检测头疼一、极简版Java敏感词检测工具的3大核心优势1.1 优势1:DFA算法驱动,效率提升10

Go语言连接MySQL数据库执行基本的增删改查

《Go语言连接MySQL数据库执行基本的增删改查》在后端开发中,MySQL是最常用的关系型数据库之一,本文主要为大家详细介绍了如何使用Go连接MySQL数据库并执行基本的增删改查吧... 目录Go语言连接mysql数据库准备工作安装 MySQL 驱动代码实现运行结果注意事项Go语言执行基本的增删改查准备工作

MySQL按时间维度对亿级数据表进行平滑分表

《MySQL按时间维度对亿级数据表进行平滑分表》本文将以一个真实的4亿数据表分表案例为基础,详细介绍如何在不影响线上业务的情况下,完成按时间维度分表的完整过程,感兴趣的小伙伴可以了解一下... 目录引言一、为什么我们需要分表1.1 单表数据量过大的问题1.2 分表方案选型二、分表前的准备工作2.1 数据评估

python连接sqlite3简单用法完整例子

《python连接sqlite3简单用法完整例子》SQLite3是一个内置的Python模块,可以通过Python的标准库轻松地使用,无需进行额外安装和配置,:本文主要介绍python连接sqli... 目录1. 连接到数据库2. 创建游标对象3. 创建表4. 插入数据5. 查询数据6. 更新数据7. 删除

Python开发简易网络服务器的示例详解(新手入门)

《Python开发简易网络服务器的示例详解(新手入门)》网络服务器是互联网基础设施的核心组件,它本质上是一个持续运行的程序,负责监听特定端口,本文将使用Python开发一个简单的网络服务器,感兴趣的小... 目录网络服务器基础概念python内置服务器模块1. HTTP服务器模块2. Socket服务器模块