【ES实战】ES创建Transports客户端时间过长分析

2023-10-22 06:36

本文主要是介绍【ES实战】ES创建Transports客户端时间过长分析,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

ES创建Transports客户端时间过长分析

2023年10月19日

文章目录

  • ES创建Transports客户端时间过长分析
    • 问题描述
    • 问题重现
    • 问题分析
      • 是否可以配置链接超时时间
      • 节点建立连接超时逻辑
      • 为啥超时间会出现翻倍
    • 优化方案

在创建ES Transport客户端的时,当出现以下场景时,影响连接速度。

问题描述

  1. 使用ES Transport 客户端创建与集群的链接。
  2. 连接地址里面有不存在的IP
  3. 在增加ES节点时,采用逐个增加的方式

整个建立链接的过程会非常耗时。

问题重现

采用jar依赖如下

        <dependency><groupId>org.elasticsearch.client</groupId><artifactId>transport</artifactId><version>5.6.16</version></dependency><dependency><groupId>org.elasticsearch.client</groupId><artifactId>x-pack-transport</artifactId><version>5.6.1</version></dependency><dependency><groupId>org.elasticsearch.client</groupId><artifactId>sniffer</artifactId><version>5.4.2</version></dependency>

创建连接代码如下

        final Settings settings = Settings.builder().put("cluster.name", "common-es").put("client.transport.sniff", true).build();final TransportClient transportClient = new PreBuiltXPackTransportClient(settings);long t1 = System.currentTimeMillis();transportClient.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("192.168.80.37"), 9800));logger.info("第1个错误节点耗时:" + (System.currentTimeMillis() - t1) / 1000);long t2 = System.currentTimeMillis();transportClient.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("192.168.80.38"), 9800));logger.info("第2个错误节点耗时:" + (System.currentTimeMillis() - t2) / 1000);long t3 = System.currentTimeMillis();transportClient.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("192.168.80.39"), 9800));logger.info("第3个错误节点耗时:" + (System.currentTimeMillis() - t3) / 1000);

输出结果

[2023-10-19 15:31:31,398] [main] [INFO ] xxx.Client - 第1个错误节点耗时:21
[2023-10-19 15:32:13,414] [main] [INFO ] xxx.Client - 第2个错误节点耗时:42
[2023-10-19 15:32:55,436] [main] [INFO ] xxx.Client - 第3个错误节点耗时:42

问题分析

是否可以配置链接超时时间

通过new PreBuiltXPackTransportClient()方法创建客户端,跟踪源码发现其会在TransportClient.buildTemplate进行建立网络模块服务,在继续debug,会发现会在TcpTransport中方法buildDefaultConnectionProfile构建链接的配置文件。发现其TCP_CONNECT_TIMEOUT默认的配置是30s,起对应的配置参数是transport.tcp.connect_timeout

    static ConnectionProfile buildDefaultConnectionProfile(Settings settings) {int connectionsPerNodeRecovery = CONNECTIONS_PER_NODE_RECOVERY.get(settings);int connectionsPerNodeBulk = CONNECTIONS_PER_NODE_BULK.get(settings);int connectionsPerNodeReg = CONNECTIONS_PER_NODE_REG.get(settings);int connectionsPerNodeState = CONNECTIONS_PER_NODE_STATE.get(settings);int connectionsPerNodePing = CONNECTIONS_PER_NODE_PING.get(settings);ConnectionProfile.Builder builder = new ConnectionProfile.Builder();// 链接的超时时间builder.setConnectTimeout(TCP_CONNECT_TIMEOUT.get(settings));builder.setHandshakeTimeout(TCP_CONNECT_TIMEOUT.get(settings));builder.addConnections(connectionsPerNodeBulk, TransportRequestOptions.Type.BULK);builder.addConnections(connectionsPerNodePing, TransportRequestOptions.Type.PING);// if we are not master eligible we don't need a dedicated channel to publish the statebuilder.addConnections(DiscoveryNode.isMasterNode(settings) ? connectionsPerNodeState : 0, TransportRequestOptions.Type.STATE);// if we are not a data-node we don't need any dedicated channels for recoverybuilder.addConnections(DiscoveryNode.isDataNode(settings) ? connectionsPerNodeRecovery : 0, TransportRequestOptions.Type.RECOVERY);builder.addConnections(connectionsPerNodeReg, TransportRequestOptions.Type.REG);return builder.build();}
public static final Setting<TimeValue> TCP_CONNECT_TIMEOUT =timeSetting("transport.tcp.connect_timeout", NetworkService.TcpSettings.TCP_CONNECT_TIMEOUT, Setting.Property.NodeScope);

节点建立连接超时逻辑

TcpTransport.openConnection(DiscoveryNode node, ConnectionProfile connectionProfile)方法建立通信管道时,在通信之前重组连接的默认配置和自定义配置。在Netty4Transport.connectToChannels()方法内具体生效,future.awaitUninterruptibly((long) (connectTimeout.millis() * 1.5));

增加节点的方式

TransportClient类提供了数组方式增加节点和单个节点增加的方式,

    public TransportClient addTransportAddress(TransportAddress transportAddress) {nodesService.addTransportAddresses(transportAddress);return this;}public TransportClient addTransportAddresses(TransportAddress... transportAddress) {nodesService.addTransportAddresses(transportAddress);return this;}

不过根据代码,发现其都是调用的TransportClientNodesService类的addTransportAddresses(TransportAddress... transportAddresses)方法

    public TransportClientNodesService addTransportAddresses(TransportAddress... transportAddresses) {// 竞争对象锁mutexsynchronized (mutex) {if (closed) {throw new IllegalStateException("transport client is closed, can't add an address");}List<TransportAddress> filtered = new ArrayList<>(transportAddresses.length);for (TransportAddress transportAddress : transportAddresses) {boolean found = false;for (DiscoveryNode otherNode : listedNodes) {// 方式连接地址值重复,会自动过滤if (otherNode.getAddress().equals(transportAddress)) {found = true;logger.debug("address [{}] already exists with [{}], ignoring...", transportAddress, otherNode);break;}}if (!found) {filtered.add(transportAddress);}}if (filtered.isEmpty()) {return this;}List<DiscoveryNode> builder = new ArrayList<>(listedNodes);for (TransportAddress transportAddress : filtered) {DiscoveryNode node = new DiscoveryNode("#transport#-" + tempNodeIdGenerator.incrementAndGet(),transportAddress, Collections.emptyMap(), Collections.emptySet(), minCompatibilityVersion);logger.debug("adding address [{}]", node);builder.add(node);}// listNodes里面存放的是配置的连接节点列表listedNodes = Collections.unmodifiableList(builder);// 调用不同的节点采集-里面也对mutex锁进行竞争nodesSampler.sample();}return this;}

NodeSampler.sample()

		public void sample() {synchronized (mutex) {if (closed) {return;}doSample();}}

NodesSampler有两个具体的继承实现类

  • SniffNodesSampler:开启嗅探属性的客户端
  • SimpleNodeSampler:简单客户端

这边对SniffNodesSamplersample()方法进行分析。

        @Overrideprotected void doSample() {Set<DiscoveryNode> nodesToPing = new HashSet<>();// 最新要进行连接的一组节点列表for (DiscoveryNode node : listedNodes) {nodesToPing.add(node);}// nodes代表已经连接上的节点列表for (DiscoveryNode node : nodes) {nodesToPing.add(node);}// 并发控制辅助类final CountDownLatch latch = new CountDownLatch(nodesToPing.size());final ConcurrentMap<DiscoveryNode, ClusterStateResponse> clusterStateResponses = ConcurrentCollections.newConcurrentMap();try {for (final DiscoveryNode nodeToPing : nodesToPing) {// 采用线程池的方式去连接节点threadPool.executor(ThreadPool.Names.MANAGEMENT).execute(new AbstractRunnable() {Transport.Connection connectionToClose = null;void onDone() {try {IOUtils.closeWhileHandlingException(connectionToClose);} finally {latch.countDown();}}@Overridepublic void onFailure(Exception e) {onDone();......}@Overrideprotected void doRun() throws Exception {Transport.Connection pingConnection = null;if (nodes.contains(nodeToPing)) {try {pingConnection = transportService.getConnection(nodeToPing);} catch (NodeNotConnectedException e) {// will use a temp connection}}if (pingConnection == null) {logger.trace("connecting to cluster node [{}]", nodeToPing);// 尝试去连接节点,超时会抛出异常connectionToClose = transportService.openConnection(nodeToPing, LISTED_NODES_PROFILE);pingConnection = connectionToClose;}// 若有一个节点连接成功会进行集群状态查询,返回值里面包含了全部可用节点transportService.sendRequest(pingConnection, ClusterStateAction.NAME,Requests.clusterStateRequest().clear().nodes(true).local(true),TransportRequestOptions.builder().withType(TransportRequestOptions.Type.STATE).withTimeout(pingTimeout).build(),new TransportResponseHandler<ClusterStateResponse>() {@Overridepublic ClusterStateResponse newInstance() {return new ClusterStateResponse();}@Overridepublic String executor() {return ThreadPool.Names.SAME;}@Overridepublic void handleResponse(ClusterStateResponse response) {clusterStateResponses.put(nodeToPing, response);onDone();}@Overridepublic void handleException(TransportException e) {logger.info((Supplier<?>) () -> new ParameterizedMessage("failed to get local cluster state for {}, disconnecting...", nodeToPing), e);try {hostFailureListener.onNodeDisconnected(nodeToPing, e);} finally {onDone();}}});}});}latch.await();} catch (InterruptedException e) {Thread.currentThread().interrupt();return;}HashSet<DiscoveryNode> newNodes = new HashSet<>();HashSet<DiscoveryNode> newFilteredNodes = new HashSet<>();for (Map.Entry<DiscoveryNode, ClusterStateResponse> entry : clusterStateResponses.entrySet()) {if (!ignoreClusterName && !clusterName.equals(entry.getValue().getClusterName())) {logger.warn("node {} not part of the cluster {}, ignoring...",entry.getValue().getState().nodes().getLocalNode(), clusterName);newFilteredNodes.add(entry.getKey());continue;}for (ObjectCursor<DiscoveryNode> cursor : entry.getValue().getState().nodes().getDataNodes().values()) {newNodes.add(cursor.value);}}// 验证新节点是否可连接nodes = validateNewNodes(newNodes);filteredNodes = Collections.unmodifiableList(new ArrayList<>(newFilteredNodes));}

通过代码发现,其实用了线程池并发连接节点,但是也使用了CountDownLatch,这就导致了,如果有一个节点超时,那整个批次都需要等待这么长的时间。典型的长尾效应

为啥超时间会出现翻倍

建立TransportClientNodesService服务时,构造函数中增加了对NodeSampler的调度。

    TransportClientNodesService(Settings settings, TransportService transportService,ThreadPool threadPool, TransportClient.HostFailureListener hostFailureListener) {.........this.nodesSamplerFuture = threadPool.schedule(nodesSamplerInterval, ThreadPool.Names.GENERIC, new ScheduledNodeSampler());}

ScheduledNodeSampler

当调度触发之后,也会去执行nodesSampler.sample();,也就对mutex锁有了竞争,当调用增加连接方法之后,就会有两次调用 nodesSampler.sample();也就会将超时时间翻倍。

class ScheduledNodeSampler implements Runnable {@Overridepublic void run() {try {nodesSampler.sample();if (!closed) {nodesSamplerFuture = threadPool.schedule(nodesSamplerInterval, ThreadPool.Names.GENERIC, this);}} catch (Exception e) {logger.warn("failed to sample", e);}}
}

优化方案

  1. Settings增加超时transport的tcp超时配置。

     final Settings settings = Settings.builder().put("cluster.name", "common-es").put("transport.tcp.connect_timeout", "5s").put("client.transport.sniff", true).build();
    

    注意此配置的参数名不同版本之间存在差异。

  2. 使用数组方式增加连接节点,减少反复调用TransportClientNodesService addTransportAddresses次数,就是在减少分批次的产生阻塞耗时

这篇关于【ES实战】ES创建Transports客户端时间过长分析的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/259722

相关文章

Java MQTT实战应用

《JavaMQTT实战应用》本文详解MQTT协议,涵盖其发布/订阅机制、低功耗高效特性、三种服务质量等级(QoS0/1/2),以及客户端、代理、主题的核心概念,最后提供Linux部署教程、Sprin... 目录一、MQTT协议二、MQTT优点三、三种服务质量等级四、客户端、代理、主题1. 客户端(Clien

MySQL中的表连接原理分析

《MySQL中的表连接原理分析》:本文主要介绍MySQL中的表连接原理分析,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录1、背景2、环境3、表连接原理【1】驱动表和被驱动表【2】内连接【3】外连接【4编程】嵌套循环连接【5】join buffer4、总结1、背景

在Spring Boot中集成RabbitMQ的实战记录

《在SpringBoot中集成RabbitMQ的实战记录》本文介绍SpringBoot集成RabbitMQ的步骤,涵盖配置连接、消息发送与接收,并对比两种定义Exchange与队列的方式:手动声明(... 目录前言准备工作1. 安装 RabbitMQ2. 消息发送者(Producer)配置1. 创建 Spr

Golang如何对cron进行二次封装实现指定时间执行定时任务

《Golang如何对cron进行二次封装实现指定时间执行定时任务》:本文主要介绍Golang如何对cron进行二次封装实现指定时间执行定时任务问题,具有很好的参考价值,希望对大家有所帮助,如有错误... 目录背景cron库下载代码示例【1】结构体定义【2】定时任务开启【3】使用示例【4】控制台输出总结背景

深度解析Spring Boot拦截器Interceptor与过滤器Filter的区别与实战指南

《深度解析SpringBoot拦截器Interceptor与过滤器Filter的区别与实战指南》本文深度解析SpringBoot中拦截器与过滤器的区别,涵盖执行顺序、依赖关系、异常处理等核心差异,并... 目录Spring Boot拦截器(Interceptor)与过滤器(Filter)深度解析:区别、实现

深度解析Spring AOP @Aspect 原理、实战与最佳实践教程

《深度解析SpringAOP@Aspect原理、实战与最佳实践教程》文章系统讲解了SpringAOP核心概念、实现方式及原理,涵盖横切关注点分离、代理机制(JDK/CGLIB)、切入点类型、性能... 目录1. @ASPect 核心概念1.1 AOP 编程范式1.2 @Aspect 关键特性2. 完整代码实

python如何创建等差数列

《python如何创建等差数列》:本文主要介绍python如何创建等差数列的问题,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录python创建等差数列例题运行代码回车输出结果总结python创建等差数列import numpy as np x=int(in

python中Hash使用场景分析

《python中Hash使用场景分析》Python的hash()函数用于获取对象哈希值,常用于字典和集合,不可变类型可哈希,可变类型不可,常见算法包括除法、乘法、平方取中和随机数哈希,各有优缺点,需根... 目录python中的 Hash除法哈希算法乘法哈希算法平方取中法随机数哈希算法小结在Python中,

MySQL中的索引结构和分类实战案例详解

《MySQL中的索引结构和分类实战案例详解》本文详解MySQL索引结构与分类,涵盖B树、B+树、哈希及全文索引,分析其原理与优劣势,并结合实战案例探讨创建、管理及优化技巧,助力提升查询性能,感兴趣的朋... 目录一、索引概述1.1 索引的定义与作用1.2 索引的基本原理二、索引结构详解2.1 B树索引2.2

Java Stream的distinct去重原理分析

《JavaStream的distinct去重原理分析》Javastream中的distinct方法用于去除流中的重复元素,它返回一个包含过滤后唯一元素的新流,该方法会根据元素的hashcode和eq... 目录一、distinct 的基础用法与核心特性二、distinct 的底层实现原理1. 顺序流中的去重