【ES实战】ES创建Transports客户端时间过长分析

2023-10-22 06:36

本文主要是介绍【ES实战】ES创建Transports客户端时间过长分析,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

ES创建Transports客户端时间过长分析

2023年10月19日

文章目录

  • ES创建Transports客户端时间过长分析
    • 问题描述
    • 问题重现
    • 问题分析
      • 是否可以配置链接超时时间
      • 节点建立连接超时逻辑
      • 为啥超时间会出现翻倍
    • 优化方案

在创建ES Transport客户端的时,当出现以下场景时,影响连接速度。

问题描述

  1. 使用ES Transport 客户端创建与集群的链接。
  2. 连接地址里面有不存在的IP
  3. 在增加ES节点时,采用逐个增加的方式

整个建立链接的过程会非常耗时。

问题重现

采用jar依赖如下

        <dependency><groupId>org.elasticsearch.client</groupId><artifactId>transport</artifactId><version>5.6.16</version></dependency><dependency><groupId>org.elasticsearch.client</groupId><artifactId>x-pack-transport</artifactId><version>5.6.1</version></dependency><dependency><groupId>org.elasticsearch.client</groupId><artifactId>sniffer</artifactId><version>5.4.2</version></dependency>

创建连接代码如下

        final Settings settings = Settings.builder().put("cluster.name", "common-es").put("client.transport.sniff", true).build();final TransportClient transportClient = new PreBuiltXPackTransportClient(settings);long t1 = System.currentTimeMillis();transportClient.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("192.168.80.37"), 9800));logger.info("第1个错误节点耗时:" + (System.currentTimeMillis() - t1) / 1000);long t2 = System.currentTimeMillis();transportClient.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("192.168.80.38"), 9800));logger.info("第2个错误节点耗时:" + (System.currentTimeMillis() - t2) / 1000);long t3 = System.currentTimeMillis();transportClient.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("192.168.80.39"), 9800));logger.info("第3个错误节点耗时:" + (System.currentTimeMillis() - t3) / 1000);

输出结果

[2023-10-19 15:31:31,398] [main] [INFO ] xxx.Client - 第1个错误节点耗时:21
[2023-10-19 15:32:13,414] [main] [INFO ] xxx.Client - 第2个错误节点耗时:42
[2023-10-19 15:32:55,436] [main] [INFO ] xxx.Client - 第3个错误节点耗时:42

问题分析

是否可以配置链接超时时间

通过new PreBuiltXPackTransportClient()方法创建客户端,跟踪源码发现其会在TransportClient.buildTemplate进行建立网络模块服务,在继续debug,会发现会在TcpTransport中方法buildDefaultConnectionProfile构建链接的配置文件。发现其TCP_CONNECT_TIMEOUT默认的配置是30s,起对应的配置参数是transport.tcp.connect_timeout

    static ConnectionProfile buildDefaultConnectionProfile(Settings settings) {int connectionsPerNodeRecovery = CONNECTIONS_PER_NODE_RECOVERY.get(settings);int connectionsPerNodeBulk = CONNECTIONS_PER_NODE_BULK.get(settings);int connectionsPerNodeReg = CONNECTIONS_PER_NODE_REG.get(settings);int connectionsPerNodeState = CONNECTIONS_PER_NODE_STATE.get(settings);int connectionsPerNodePing = CONNECTIONS_PER_NODE_PING.get(settings);ConnectionProfile.Builder builder = new ConnectionProfile.Builder();// 链接的超时时间builder.setConnectTimeout(TCP_CONNECT_TIMEOUT.get(settings));builder.setHandshakeTimeout(TCP_CONNECT_TIMEOUT.get(settings));builder.addConnections(connectionsPerNodeBulk, TransportRequestOptions.Type.BULK);builder.addConnections(connectionsPerNodePing, TransportRequestOptions.Type.PING);// if we are not master eligible we don't need a dedicated channel to publish the statebuilder.addConnections(DiscoveryNode.isMasterNode(settings) ? connectionsPerNodeState : 0, TransportRequestOptions.Type.STATE);// if we are not a data-node we don't need any dedicated channels for recoverybuilder.addConnections(DiscoveryNode.isDataNode(settings) ? connectionsPerNodeRecovery : 0, TransportRequestOptions.Type.RECOVERY);builder.addConnections(connectionsPerNodeReg, TransportRequestOptions.Type.REG);return builder.build();}
public static final Setting<TimeValue> TCP_CONNECT_TIMEOUT =timeSetting("transport.tcp.connect_timeout", NetworkService.TcpSettings.TCP_CONNECT_TIMEOUT, Setting.Property.NodeScope);

节点建立连接超时逻辑

TcpTransport.openConnection(DiscoveryNode node, ConnectionProfile connectionProfile)方法建立通信管道时,在通信之前重组连接的默认配置和自定义配置。在Netty4Transport.connectToChannels()方法内具体生效,future.awaitUninterruptibly((long) (connectTimeout.millis() * 1.5));

增加节点的方式

TransportClient类提供了数组方式增加节点和单个节点增加的方式,

    public TransportClient addTransportAddress(TransportAddress transportAddress) {nodesService.addTransportAddresses(transportAddress);return this;}public TransportClient addTransportAddresses(TransportAddress... transportAddress) {nodesService.addTransportAddresses(transportAddress);return this;}

不过根据代码,发现其都是调用的TransportClientNodesService类的addTransportAddresses(TransportAddress... transportAddresses)方法

    public TransportClientNodesService addTransportAddresses(TransportAddress... transportAddresses) {// 竞争对象锁mutexsynchronized (mutex) {if (closed) {throw new IllegalStateException("transport client is closed, can't add an address");}List<TransportAddress> filtered = new ArrayList<>(transportAddresses.length);for (TransportAddress transportAddress : transportAddresses) {boolean found = false;for (DiscoveryNode otherNode : listedNodes) {// 方式连接地址值重复,会自动过滤if (otherNode.getAddress().equals(transportAddress)) {found = true;logger.debug("address [{}] already exists with [{}], ignoring...", transportAddress, otherNode);break;}}if (!found) {filtered.add(transportAddress);}}if (filtered.isEmpty()) {return this;}List<DiscoveryNode> builder = new ArrayList<>(listedNodes);for (TransportAddress transportAddress : filtered) {DiscoveryNode node = new DiscoveryNode("#transport#-" + tempNodeIdGenerator.incrementAndGet(),transportAddress, Collections.emptyMap(), Collections.emptySet(), minCompatibilityVersion);logger.debug("adding address [{}]", node);builder.add(node);}// listNodes里面存放的是配置的连接节点列表listedNodes = Collections.unmodifiableList(builder);// 调用不同的节点采集-里面也对mutex锁进行竞争nodesSampler.sample();}return this;}

NodeSampler.sample()

		public void sample() {synchronized (mutex) {if (closed) {return;}doSample();}}

NodesSampler有两个具体的继承实现类

  • SniffNodesSampler:开启嗅探属性的客户端
  • SimpleNodeSampler:简单客户端

这边对SniffNodesSamplersample()方法进行分析。

        @Overrideprotected void doSample() {Set<DiscoveryNode> nodesToPing = new HashSet<>();// 最新要进行连接的一组节点列表for (DiscoveryNode node : listedNodes) {nodesToPing.add(node);}// nodes代表已经连接上的节点列表for (DiscoveryNode node : nodes) {nodesToPing.add(node);}// 并发控制辅助类final CountDownLatch latch = new CountDownLatch(nodesToPing.size());final ConcurrentMap<DiscoveryNode, ClusterStateResponse> clusterStateResponses = ConcurrentCollections.newConcurrentMap();try {for (final DiscoveryNode nodeToPing : nodesToPing) {// 采用线程池的方式去连接节点threadPool.executor(ThreadPool.Names.MANAGEMENT).execute(new AbstractRunnable() {Transport.Connection connectionToClose = null;void onDone() {try {IOUtils.closeWhileHandlingException(connectionToClose);} finally {latch.countDown();}}@Overridepublic void onFailure(Exception e) {onDone();......}@Overrideprotected void doRun() throws Exception {Transport.Connection pingConnection = null;if (nodes.contains(nodeToPing)) {try {pingConnection = transportService.getConnection(nodeToPing);} catch (NodeNotConnectedException e) {// will use a temp connection}}if (pingConnection == null) {logger.trace("connecting to cluster node [{}]", nodeToPing);// 尝试去连接节点,超时会抛出异常connectionToClose = transportService.openConnection(nodeToPing, LISTED_NODES_PROFILE);pingConnection = connectionToClose;}// 若有一个节点连接成功会进行集群状态查询,返回值里面包含了全部可用节点transportService.sendRequest(pingConnection, ClusterStateAction.NAME,Requests.clusterStateRequest().clear().nodes(true).local(true),TransportRequestOptions.builder().withType(TransportRequestOptions.Type.STATE).withTimeout(pingTimeout).build(),new TransportResponseHandler<ClusterStateResponse>() {@Overridepublic ClusterStateResponse newInstance() {return new ClusterStateResponse();}@Overridepublic String executor() {return ThreadPool.Names.SAME;}@Overridepublic void handleResponse(ClusterStateResponse response) {clusterStateResponses.put(nodeToPing, response);onDone();}@Overridepublic void handleException(TransportException e) {logger.info((Supplier<?>) () -> new ParameterizedMessage("failed to get local cluster state for {}, disconnecting...", nodeToPing), e);try {hostFailureListener.onNodeDisconnected(nodeToPing, e);} finally {onDone();}}});}});}latch.await();} catch (InterruptedException e) {Thread.currentThread().interrupt();return;}HashSet<DiscoveryNode> newNodes = new HashSet<>();HashSet<DiscoveryNode> newFilteredNodes = new HashSet<>();for (Map.Entry<DiscoveryNode, ClusterStateResponse> entry : clusterStateResponses.entrySet()) {if (!ignoreClusterName && !clusterName.equals(entry.getValue().getClusterName())) {logger.warn("node {} not part of the cluster {}, ignoring...",entry.getValue().getState().nodes().getLocalNode(), clusterName);newFilteredNodes.add(entry.getKey());continue;}for (ObjectCursor<DiscoveryNode> cursor : entry.getValue().getState().nodes().getDataNodes().values()) {newNodes.add(cursor.value);}}// 验证新节点是否可连接nodes = validateNewNodes(newNodes);filteredNodes = Collections.unmodifiableList(new ArrayList<>(newFilteredNodes));}

通过代码发现,其实用了线程池并发连接节点,但是也使用了CountDownLatch,这就导致了,如果有一个节点超时,那整个批次都需要等待这么长的时间。典型的长尾效应

为啥超时间会出现翻倍

建立TransportClientNodesService服务时,构造函数中增加了对NodeSampler的调度。

    TransportClientNodesService(Settings settings, TransportService transportService,ThreadPool threadPool, TransportClient.HostFailureListener hostFailureListener) {.........this.nodesSamplerFuture = threadPool.schedule(nodesSamplerInterval, ThreadPool.Names.GENERIC, new ScheduledNodeSampler());}

ScheduledNodeSampler

当调度触发之后,也会去执行nodesSampler.sample();,也就对mutex锁有了竞争,当调用增加连接方法之后,就会有两次调用 nodesSampler.sample();也就会将超时时间翻倍。

class ScheduledNodeSampler implements Runnable {@Overridepublic void run() {try {nodesSampler.sample();if (!closed) {nodesSamplerFuture = threadPool.schedule(nodesSamplerInterval, ThreadPool.Names.GENERIC, this);}} catch (Exception e) {logger.warn("failed to sample", e);}}
}

优化方案

  1. Settings增加超时transport的tcp超时配置。

     final Settings settings = Settings.builder().put("cluster.name", "common-es").put("transport.tcp.connect_timeout", "5s").put("client.transport.sniff", true).build();
    

    注意此配置的参数名不同版本之间存在差异。

  2. 使用数组方式增加连接节点,减少反复调用TransportClientNodesService addTransportAddresses次数,就是在减少分批次的产生阻塞耗时

这篇关于【ES实战】ES创建Transports客户端时间过长分析的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/259722

相关文章

Go标准库常见错误分析和解决办法

《Go标准库常见错误分析和解决办法》Go语言的标准库为开发者提供了丰富且高效的工具,涵盖了从网络编程到文件操作等各个方面,然而,标准库虽好,使用不当却可能适得其反,正所谓工欲善其事,必先利其器,本文将... 目录1. 使用了错误的time.Duration2. time.After导致的内存泄漏3. jsO

Spring Boot + MyBatis Plus 高效开发实战从入门到进阶优化(推荐)

《SpringBoot+MyBatisPlus高效开发实战从入门到进阶优化(推荐)》本文将详细介绍SpringBoot+MyBatisPlus的完整开发流程,并深入剖析分页查询、批量操作、动... 目录Spring Boot + MyBATis Plus 高效开发实战:从入门到进阶优化1. MyBatis

MyBatis 动态 SQL 优化之标签的实战与技巧(常见用法)

《MyBatis动态SQL优化之标签的实战与技巧(常见用法)》本文通过详细的示例和实际应用场景,介绍了如何有效利用这些标签来优化MyBatis配置,提升开发效率,确保SQL的高效执行和安全性,感... 目录动态SQL详解一、动态SQL的核心概念1.1 什么是动态SQL?1.2 动态SQL的优点1.3 动态S

Pandas使用SQLite3实战

《Pandas使用SQLite3实战》本文主要介绍了Pandas使用SQLite3实战,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学... 目录1 环境准备2 从 SQLite3VlfrWQzgt 读取数据到 DataFrame基础用法:读

Java实现时间与字符串互相转换详解

《Java实现时间与字符串互相转换详解》这篇文章主要为大家详细介绍了Java中实现时间与字符串互相转换的相关方法,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 目录一、日期格式化为字符串(一)使用预定义格式(二)自定义格式二、字符串解析为日期(一)解析ISO格式字符串(二)解析自定义

Spring事务中@Transactional注解不生效的原因分析与解决

《Spring事务中@Transactional注解不生效的原因分析与解决》在Spring框架中,@Transactional注解是管理数据库事务的核心方式,本文将深入分析事务自调用的底层原理,解释为... 目录1. 引言2. 事务自调用问题重现2.1 示例代码2.2 问题现象3. 为什么事务自调用会失效3

idea中创建新类时自动添加注释的实现

《idea中创建新类时自动添加注释的实现》在每次使用idea创建一个新类时,过了一段时间发现看不懂这个类是用来干嘛的,为了解决这个问题,我们可以设置在创建一个新类时自动添加注释,帮助我们理解这个类的用... 目录前言:详细操作:步骤一:点击上方的 文件(File),点击&nbmyHIgsp;设置(Setti

找不到Anaconda prompt终端的原因分析及解决方案

《找不到Anacondaprompt终端的原因分析及解决方案》因为anaconda还没有初始化,在安装anaconda的过程中,有一行是否要添加anaconda到菜单目录中,由于没有勾选,导致没有菜... 目录问题原因问http://www.chinasem.cn题解决安装了 Anaconda 却找不到 An

Spring定时任务只执行一次的原因分析与解决方案

《Spring定时任务只执行一次的原因分析与解决方案》在使用Spring的@Scheduled定时任务时,你是否遇到过任务只执行一次,后续不再触发的情况?这种情况可能由多种原因导致,如未启用调度、线程... 目录1. 问题背景2. Spring定时任务的基本用法3. 为什么定时任务只执行一次?3.1 未启用

Java时间轮调度算法的代码实现

《Java时间轮调度算法的代码实现》时间轮是一种高效的定时调度算法,主要用于管理延时任务或周期性任务,它通过一个环形数组(时间轮)和指针来实现,将大量定时任务分摊到固定的时间槽中,极大地降低了时间复杂... 目录1、简述2、时间轮的原理3. 时间轮的实现步骤3.1 定义时间槽3.2 定义时间轮3.3 使用时