Elasticsearch 8.9启动时构建接收Rest请求的hander过程源码

本文主要是介绍Elasticsearch 8.9启动时构建接收Rest请求的hander过程源码,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

  • 一、main方式入口
  • 二、Elasticsearch初始化第三阶段
    • 1、构造node节点对象时构造restController
    • 2、在node构建对象最后执行初始化RestHanders的操作
  • 三、以注册在hander中的RestGetIndicesAction对象为例介绍
    • 1、继承了BaseRestHandler,routes方法做路由规则,父类调用子类的prepareRequest实现
    • 2、BaseRestHandler实现的是RestHandler接口

一、main方式入口

路径:org.elasticsearch.bootstrap.Elasticsearch

  /*** 启动 elasticsearch 的主入口点。*/public static void main(final String[] args) {Bootstrap bootstrap = initPhase1();assert bootstrap != null;try {initPhase2(bootstrap);initPhase3(bootstrap);} catch (NodeValidationException e) {bootstrap.exitWithNodeValidationException(e);} catch (Throwable t) {bootstrap.exitWithUnknownException(t);}}

这里初始化会有三个初始化阶段。可以直接看initPhase3

二、Elasticsearch初始化第三阶段

 /***初始化的第三阶段*阶段 3 包含初始化安全管理器后的所有内容。到目前为止,该系统一直是单线程的。此阶段可以生成线程、写入日志,并受安全管理器策略的约束。* 在第 3 阶段结束时,系统已准备好接受请求,主线程已准备好终止。这意味着:*    节点组件已构建并启动*    清理已完成(例如安全设置已关闭)*    除主线程外,至少有一个线程处于活动状态,并且在主线程终止后将保持活动状态*    已通知父 CLI 进程系统已准备就绪*/private static void initPhase3(Bootstrap bootstrap) throws IOException, NodeValidationException {//调用checkLucene()函数进行Lucene的检查checkLucene();//创建一个Node对象,并重写validateNodeBeforeAcceptingRequests方法,用于在接受请求之前进行节点验证。Node node = new Node(bootstrap.environment()) {@Overrideprotected void validateNodeBeforeAcceptingRequests(final BootstrapContext context,final BoundTransportAddress boundTransportAddress,List<BootstrapCheck> checks) throws NodeValidationException {BootstrapChecks.check(context, boundTransportAddress, checks);}};//使用bootstrap.spawner()和之前创建的node对象实例化一个Elasticsearch对象,并将其赋值给INSTANCE变量。INSTANCE = new Elasticsearch(bootstrap.spawner(), node);//关闭安全设置IOUtils.close(bootstrap.secureSettings());//启动INSTANCE对象,node会启动,并保持一个存活线程INSTANCE.start();//如果命令行参数指定了daemonize,则移除控制台输出的日志配置。if (bootstrap.args().daemonize()) {LogConfigurator.removeConsoleAppender();}//发送CLI标记,表示服务器已经准备好接受请求。bootstrap.sendCliMarker(BootstrapInfo.SERVER_READY_MARKER);//如果命令行参数指定了daemonize,则关闭流;否则,启动CLI监视线程。if (bootstrap.args().daemonize()) {bootstrap.closeStreams();} else {startCliMonitorThread(System.in);}}

其中INSTANCE.start();如下,代表node启动,并且存活线程运行

private void start() throws NodeValidationException {node.start();keepAliveThread.start();}

1、构造node节点对象时构造restController

public Node(Environment environment) {this(environment, PluginsService.getPluginsServiceCtor(environment), true);}
/*** Constructs a node* 节点的初始化*/protected Node(final Environment initialEnvironment,final Function<Settings, PluginsService> pluginServiceCtor,boolean forbidPrivateIndexSettings) {//省略代码。。。。//里面会初始化restControllerActionModule actionModule = new ActionModule(settings,clusterModule.getIndexNameExpressionResolver(),settingsModule.getIndexScopedSettings(),settingsModule.getClusterSettings(),settingsModule.getSettingsFilter(),threadPool,pluginsService.filterPlugins(ActionPlugin.class),client,circuitBreakerService,usageService,systemIndices,tracer,clusterService,reservedStateHandlers);modules.add(actionModule);//restController存入到networkModule,而NetworkModule是用于处理注册和绑定所有网络相关类的模块//末尾有 actionModule.initRestHandlers初始化handerfinal RestController restController = actionModule.getRestController();final NetworkModule networkModule = new NetworkModule(settings,pluginsService.filterPlugins(NetworkPlugin.class),threadPool,bigArrays,pageCacheRecycler,circuitBreakerService,namedWriteableRegistry,xContentRegistry,networkService,restController,actionModule::copyRequestHeadersToThreadContext,clusterService.getClusterSettings(),tracer);//省略代码。。。。//初始化Rest的HandleractionModule.initRestHandlers(() -> clusterService.state().nodesIfRecovered());}

2、在node构建对象最后执行初始化RestHanders的操作

  public void initRestHandlers(Supplier<DiscoveryNodes> nodesInCluster) {//省略代码。。。这里只选几个经常用到的registerHandler.accept(new RestGetIndicesAction());registerHandler.accept(new RestIndicesStatsAction());registerHandler.accept(new RestCreateIndexAction());    registerHandler.accept(new RestDeleteIndexAction());registerHandler.accept(new RestGetIndexTemplateAction());registerHandler.accept(new RestPutIndexTemplateAction());registerHandler.accept(new RestDeleteIndexTemplateAction());registerHandler.accept(new RestPutMappingAction());registerHandler.accept(new RestGetMappingAction());registerHandler.accept(new RestGetFieldMappingAction());registerHandler.accept(new RestIndexAction());registerHandler.accept(new RestSearchAction(restController.getSearchUsageHolder()));//省略代码}

三、以注册在hander中的RestGetIndicesAction对象为例介绍


/*** The REST handler for get index and head index APIs.* 用于获取索引和头索引 API 的 REST 处理程序。*/
@ServerlessScope(Scope.PUBLIC)
public class RestGetIndicesAction extends BaseRestHandler {//代表路由匹配规则,通过这个规则知道要调用这个实例,每一个实例路由规则都是不一样的@Overridepublic List<Route> routes() {return List.of(new Route(GET, "/{index}"), new Route(HEAD, "/{index}"));}@Overridepublic RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException {// starting with 7.0 we don't include types by default in the response to GET requestsif (request.getRestApiVersion() == RestApiVersion.V_7&& request.hasParam(INCLUDE_TYPE_NAME_PARAMETER)&& request.method().equals(GET)) {deprecationLogger.compatibleCritical("get_indices_with_types", TYPES_DEPRECATION_MESSAGE);}String[] indices = Strings.splitStringByCommaToArray(request.param("index"));final GetIndexRequest getIndexRequest = new GetIndexRequest();getIndexRequest.indices(indices);getIndexRequest.indicesOptions(IndicesOptions.fromRequest(request, getIndexRequest.indicesOptions()));getIndexRequest.local(request.paramAsBoolean("local", getIndexRequest.local()));getIndexRequest.masterNodeTimeout(request.paramAsTime("master_timeout", getIndexRequest.masterNodeTimeout()));getIndexRequest.humanReadable(request.paramAsBoolean("human", false));getIndexRequest.includeDefaults(request.paramAsBoolean("include_defaults", false));getIndexRequest.features(GetIndexRequest.Feature.fromRequest(request));final var httpChannel = request.getHttpChannel();return channel -> new RestCancellableNodeClient(client, httpChannel).admin().indices().getIndex(getIndexRequest, new RestChunkedToXContentListener<>(channel));}}

1、继承了BaseRestHandler,routes方法做路由规则,父类调用子类的prepareRequest实现

public abstract class BaseRestHandler implements RestHandler {/*** {@inheritDoc}*/@Overridepublic abstract List<Route> routes();@Overridepublic final void handleRequest(RestRequest request, RestChannel channel, NodeClient client) throws Exception {//调用prepareRequest方法,准备请求以供执行,并且会对请求参数进行处理。final RestChannelConsumer action = prepareRequest(request, client);//过滤未使用的参数,将未使用的参数收集到一个有序集合中。final SortedSet<String> unconsumedParams = request.unconsumedParams().stream().filter(p -> responseParams(request.getRestApiVersion()).contains(p) == false).collect(Collectors.toCollection(TreeSet::new));//验证未使用的参数是否有效,如果存在无效参数,则抛出IllegalArgumentException异常。if (unconsumedParams.isEmpty() == false) {final Set<String> candidateParams = new HashSet<>();candidateParams.addAll(request.consumedParams());candidateParams.addAll(responseParams(request.getRestApiVersion()));throw new IllegalArgumentException(unrecognized(request, unconsumedParams, candidateParams, "parameter"));}//验证请求是否包含请求体,并且请求体是否已被消耗,如果不满足条件,则抛出IllegalArgumentException异常。if (request.hasContent() && request.isContentConsumed() == false) {throw new IllegalArgumentException("request [" + request.method() + " " + request.path() + "] does not support having a body");}//增加使用计数usageCount.increment();//执行action,将结果传递给channel。action.accept(channel);}/***准备要执行的请求。* 实现应在返回可运行对象以进行实际执行之前使用所有请求参数。* 未使用的参数将立即终止请求的执行。* 但是,某些参数仅用于处理响应;实现可以覆盖 {@link BaseRestHandlerresponseParams()} 来指示此类参数。*/protected abstract RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException;
}    

2、BaseRestHandler实现的是RestHandler接口

/*** Handler for REST requests*/
@FunctionalInterface
public interface RestHandler {/*** 处理rest请求*/void handleRequest(RestRequest request, RestChannel channel, NodeClient client) throws Exception;/*** 此 RestHandler 负责处理的 {@link 路由} 的列表。*/default List<Route> routes() {return Collections.emptyList();}}

其中调用RestHandler接口的handerRequest的上游是

 @Overridepublic void handleRequest(RestRequest request, RestChannel channel, NodeClient client) throws Exception {//使用 OPTIONS 方法的请求应在其他地方处理,而不是通过调用 {@code RestHandlerhandleRequest} 使用 OPTIONS 方法的 HTTP 请求绕过 authn,因此此健全性检查可防止调度未经身份验证的请求if (request.method() == Method.OPTIONS) {handleException(request,channel,new ElasticsearchSecurityException("Cannot dispatch OPTIONS request, as they are not authenticated"));return;}if (enabled == false) {doHandleRequest(request, channel, client);return;}}private void doHandleRequest(RestRequest request, RestChannel channel, NodeClient client) throws Exception {threadContext.sanitizeHeaders();// operator privileges can short circuit to return a non-successful responseif (operatorPrivilegesService.checkRest(restHandler, request, channel, threadContext)) {try {restHandler.handleRequest(request, channel, client);} catch (Exception e) {logger.debug(() -> format("Request handling failed for REST request [%s]", request.uri()), e);throw e;}}}

其他注册在hander中的API和RestGetIndicesAction类似

这篇关于Elasticsearch 8.9启动时构建接收Rest请求的hander过程源码的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/260062

相关文章

浅析Spring Security认证过程

类图 为了方便理解Spring Security认证流程,特意画了如下的类图,包含相关的核心认证类 概述 核心验证器 AuthenticationManager 该对象提供了认证方法的入口,接收一个Authentiaton对象作为参数; public interface AuthenticationManager {Authentication authenticate(Authenti

基于MySQL Binlog的Elasticsearch数据同步实践

一、为什么要做 随着马蜂窝的逐渐发展,我们的业务数据越来越多,单纯使用 MySQL 已经不能满足我们的数据查询需求,例如对于商品、订单等数据的多维度检索。 使用 Elasticsearch 存储业务数据可以很好的解决我们业务中的搜索需求。而数据进行异构存储后,随之而来的就是数据同步的问题。 二、现有方法及问题 对于数据同步,我们目前的解决方案是建立数据中间表。把需要检索的业务数据,统一放到一张M

作业提交过程之HDFSMapReduce

作业提交全过程详解 (1)作业提交 第1步:Client调用job.waitForCompletion方法,向整个集群提交MapReduce作业。 第2步:Client向RM申请一个作业id。 第3步:RM给Client返回该job资源的提交路径和作业id。 第4步:Client提交jar包、切片信息和配置文件到指定的资源提交路径。 第5步:Client提交完资源后,向RM申请运行MrAp

MySQL数据库宕机,启动不起来,教你一招搞定!

作者介绍:老苏,10余年DBA工作运维经验,擅长Oracle、MySQL、PG、Mongodb数据库运维(如安装迁移,性能优化、故障应急处理等)公众号:老苏畅谈运维欢迎关注本人公众号,更多精彩与您分享。 MySQL数据库宕机,数据页损坏问题,启动不起来,该如何排查和解决,本文将为你说明具体的排查过程。 查看MySQL error日志 查看 MySQL error日志,排查哪个表(表空间

嵌入式QT开发:构建高效智能的嵌入式系统

摘要: 本文深入探讨了嵌入式 QT 相关的各个方面。从 QT 框架的基础架构和核心概念出发,详细阐述了其在嵌入式环境中的优势与特点。文中分析了嵌入式 QT 的开发环境搭建过程,包括交叉编译工具链的配置等关键步骤。进一步探讨了嵌入式 QT 的界面设计与开发,涵盖了从基本控件的使用到复杂界面布局的构建。同时也深入研究了信号与槽机制在嵌入式系统中的应用,以及嵌入式 QT 与硬件设备的交互,包括输入输出设

JAVA智听未来一站式有声阅读平台听书系统小程序源码

智听未来,一站式有声阅读平台听书系统 🌟&nbsp;开篇:遇见未来,从“智听”开始 在这个快节奏的时代,你是否渴望在忙碌的间隙,找到一片属于自己的宁静角落?是否梦想着能随时随地,沉浸在知识的海洋,或是故事的奇幻世界里?今天,就让我带你一起探索“智听未来”——这一站式有声阅读平台听书系统,它正悄悄改变着我们的阅读方式,让未来触手可及! 📚&nbsp;第一站:海量资源,应有尽有 走进“智听

springboot3打包成war包,用tomcat8启动

1、在pom中,将打包类型改为war <packaging>war</packaging> 2、pom中排除SpringBoot内置的Tomcat容器并添加Tomcat依赖,用于编译和测试,         *依赖时一定设置 scope 为 provided (相当于 tomcat 依赖只在本地运行和测试的时候有效,         打包的时候会排除这个依赖)<scope>provided

内核启动时减少log的方式

内核引导选项 内核引导选项大体上可以分为两类:一类与设备无关、另一类与设备有关。与设备有关的引导选项多如牛毛,需要你自己阅读内核中的相应驱动程序源码以获取其能够接受的引导选项。比如,如果你想知道可以向 AHA1542 SCSI 驱动程序传递哪些引导选项,那么就查看 drivers/scsi/aha1542.c 文件,一般在前面 100 行注释里就可以找到所接受的引导选项说明。大多数选项是通过"_

Retrieval-based-Voice-Conversion-WebUI模型构建指南

一、模型介绍 Retrieval-based-Voice-Conversion-WebUI(简称 RVC)模型是一个基于 VITS(Variational Inference with adversarial learning for end-to-end Text-to-Speech)的简单易用的语音转换框架。 具有以下特点 简单易用:RVC 模型通过简单易用的网页界面,使得用户无需深入了

【机器学习】高斯过程的基本概念和应用领域以及在python中的实例

引言 高斯过程(Gaussian Process,简称GP)是一种概率模型,用于描述一组随机变量的联合概率分布,其中任何一个有限维度的子集都具有高斯分布 文章目录 引言一、高斯过程1.1 基本定义1.1.1 随机过程1.1.2 高斯分布 1.2 高斯过程的特性1.2.1 联合高斯性1.2.2 均值函数1.2.3 协方差函数(或核函数) 1.3 核函数1.4 高斯过程回归(Gauss