【ES实战】ES创建Transports客户端时间过长分析

2023-10-22 06:36

本文主要是介绍【ES实战】ES创建Transports客户端时间过长分析,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

ES创建Transports客户端时间过长分析

2023年10月19日

文章目录

  • ES创建Transports客户端时间过长分析
    • 问题描述
    • 问题重现
    • 问题分析
      • 是否可以配置链接超时时间
      • 节点建立连接超时逻辑
      • 为啥超时间会出现翻倍
    • 优化方案

在创建ES Transport客户端的时,当出现以下场景时,影响连接速度。

问题描述

  1. 使用ES Transport 客户端创建与集群的链接。
  2. 连接地址里面有不存在的IP
  3. 在增加ES节点时,采用逐个增加的方式

整个建立链接的过程会非常耗时。

问题重现

采用jar依赖如下

        <dependency><groupId>org.elasticsearch.client</groupId><artifactId>transport</artifactId><version>5.6.16</version></dependency><dependency><groupId>org.elasticsearch.client</groupId><artifactId>x-pack-transport</artifactId><version>5.6.1</version></dependency><dependency><groupId>org.elasticsearch.client</groupId><artifactId>sniffer</artifactId><version>5.4.2</version></dependency>

创建连接代码如下

        final Settings settings = Settings.builder().put("cluster.name", "common-es").put("client.transport.sniff", true).build();final TransportClient transportClient = new PreBuiltXPackTransportClient(settings);long t1 = System.currentTimeMillis();transportClient.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("192.168.80.37"), 9800));logger.info("第1个错误节点耗时:" + (System.currentTimeMillis() - t1) / 1000);long t2 = System.currentTimeMillis();transportClient.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("192.168.80.38"), 9800));logger.info("第2个错误节点耗时:" + (System.currentTimeMillis() - t2) / 1000);long t3 = System.currentTimeMillis();transportClient.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("192.168.80.39"), 9800));logger.info("第3个错误节点耗时:" + (System.currentTimeMillis() - t3) / 1000);

输出结果

[2023-10-19 15:31:31,398] [main] [INFO ] xxx.Client - 第1个错误节点耗时:21
[2023-10-19 15:32:13,414] [main] [INFO ] xxx.Client - 第2个错误节点耗时:42
[2023-10-19 15:32:55,436] [main] [INFO ] xxx.Client - 第3个错误节点耗时:42

问题分析

是否可以配置链接超时时间

通过new PreBuiltXPackTransportClient()方法创建客户端,跟踪源码发现其会在TransportClient.buildTemplate进行建立网络模块服务,在继续debug,会发现会在TcpTransport中方法buildDefaultConnectionProfile构建链接的配置文件。发现其TCP_CONNECT_TIMEOUT默认的配置是30s,起对应的配置参数是transport.tcp.connect_timeout

    static ConnectionProfile buildDefaultConnectionProfile(Settings settings) {int connectionsPerNodeRecovery = CONNECTIONS_PER_NODE_RECOVERY.get(settings);int connectionsPerNodeBulk = CONNECTIONS_PER_NODE_BULK.get(settings);int connectionsPerNodeReg = CONNECTIONS_PER_NODE_REG.get(settings);int connectionsPerNodeState = CONNECTIONS_PER_NODE_STATE.get(settings);int connectionsPerNodePing = CONNECTIONS_PER_NODE_PING.get(settings);ConnectionProfile.Builder builder = new ConnectionProfile.Builder();// 链接的超时时间builder.setConnectTimeout(TCP_CONNECT_TIMEOUT.get(settings));builder.setHandshakeTimeout(TCP_CONNECT_TIMEOUT.get(settings));builder.addConnections(connectionsPerNodeBulk, TransportRequestOptions.Type.BULK);builder.addConnections(connectionsPerNodePing, TransportRequestOptions.Type.PING);// if we are not master eligible we don't need a dedicated channel to publish the statebuilder.addConnections(DiscoveryNode.isMasterNode(settings) ? connectionsPerNodeState : 0, TransportRequestOptions.Type.STATE);// if we are not a data-node we don't need any dedicated channels for recoverybuilder.addConnections(DiscoveryNode.isDataNode(settings) ? connectionsPerNodeRecovery : 0, TransportRequestOptions.Type.RECOVERY);builder.addConnections(connectionsPerNodeReg, TransportRequestOptions.Type.REG);return builder.build();}
public static final Setting<TimeValue> TCP_CONNECT_TIMEOUT =timeSetting("transport.tcp.connect_timeout", NetworkService.TcpSettings.TCP_CONNECT_TIMEOUT, Setting.Property.NodeScope);

节点建立连接超时逻辑

TcpTransport.openConnection(DiscoveryNode node, ConnectionProfile connectionProfile)方法建立通信管道时,在通信之前重组连接的默认配置和自定义配置。在Netty4Transport.connectToChannels()方法内具体生效,future.awaitUninterruptibly((long) (connectTimeout.millis() * 1.5));

增加节点的方式

TransportClient类提供了数组方式增加节点和单个节点增加的方式,

    public TransportClient addTransportAddress(TransportAddress transportAddress) {nodesService.addTransportAddresses(transportAddress);return this;}public TransportClient addTransportAddresses(TransportAddress... transportAddress) {nodesService.addTransportAddresses(transportAddress);return this;}

不过根据代码,发现其都是调用的TransportClientNodesService类的addTransportAddresses(TransportAddress... transportAddresses)方法

    public TransportClientNodesService addTransportAddresses(TransportAddress... transportAddresses) {// 竞争对象锁mutexsynchronized (mutex) {if (closed) {throw new IllegalStateException("transport client is closed, can't add an address");}List<TransportAddress> filtered = new ArrayList<>(transportAddresses.length);for (TransportAddress transportAddress : transportAddresses) {boolean found = false;for (DiscoveryNode otherNode : listedNodes) {// 方式连接地址值重复,会自动过滤if (otherNode.getAddress().equals(transportAddress)) {found = true;logger.debug("address [{}] already exists with [{}], ignoring...", transportAddress, otherNode);break;}}if (!found) {filtered.add(transportAddress);}}if (filtered.isEmpty()) {return this;}List<DiscoveryNode> builder = new ArrayList<>(listedNodes);for (TransportAddress transportAddress : filtered) {DiscoveryNode node = new DiscoveryNode("#transport#-" + tempNodeIdGenerator.incrementAndGet(),transportAddress, Collections.emptyMap(), Collections.emptySet(), minCompatibilityVersion);logger.debug("adding address [{}]", node);builder.add(node);}// listNodes里面存放的是配置的连接节点列表listedNodes = Collections.unmodifiableList(builder);// 调用不同的节点采集-里面也对mutex锁进行竞争nodesSampler.sample();}return this;}

NodeSampler.sample()

		public void sample() {synchronized (mutex) {if (closed) {return;}doSample();}}

NodesSampler有两个具体的继承实现类

  • SniffNodesSampler:开启嗅探属性的客户端
  • SimpleNodeSampler:简单客户端

这边对SniffNodesSamplersample()方法进行分析。

        @Overrideprotected void doSample() {Set<DiscoveryNode> nodesToPing = new HashSet<>();// 最新要进行连接的一组节点列表for (DiscoveryNode node : listedNodes) {nodesToPing.add(node);}// nodes代表已经连接上的节点列表for (DiscoveryNode node : nodes) {nodesToPing.add(node);}// 并发控制辅助类final CountDownLatch latch = new CountDownLatch(nodesToPing.size());final ConcurrentMap<DiscoveryNode, ClusterStateResponse> clusterStateResponses = ConcurrentCollections.newConcurrentMap();try {for (final DiscoveryNode nodeToPing : nodesToPing) {// 采用线程池的方式去连接节点threadPool.executor(ThreadPool.Names.MANAGEMENT).execute(new AbstractRunnable() {Transport.Connection connectionToClose = null;void onDone() {try {IOUtils.closeWhileHandlingException(connectionToClose);} finally {latch.countDown();}}@Overridepublic void onFailure(Exception e) {onDone();......}@Overrideprotected void doRun() throws Exception {Transport.Connection pingConnection = null;if (nodes.contains(nodeToPing)) {try {pingConnection = transportService.getConnection(nodeToPing);} catch (NodeNotConnectedException e) {// will use a temp connection}}if (pingConnection == null) {logger.trace("connecting to cluster node [{}]", nodeToPing);// 尝试去连接节点,超时会抛出异常connectionToClose = transportService.openConnection(nodeToPing, LISTED_NODES_PROFILE);pingConnection = connectionToClose;}// 若有一个节点连接成功会进行集群状态查询,返回值里面包含了全部可用节点transportService.sendRequest(pingConnection, ClusterStateAction.NAME,Requests.clusterStateRequest().clear().nodes(true).local(true),TransportRequestOptions.builder().withType(TransportRequestOptions.Type.STATE).withTimeout(pingTimeout).build(),new TransportResponseHandler<ClusterStateResponse>() {@Overridepublic ClusterStateResponse newInstance() {return new ClusterStateResponse();}@Overridepublic String executor() {return ThreadPool.Names.SAME;}@Overridepublic void handleResponse(ClusterStateResponse response) {clusterStateResponses.put(nodeToPing, response);onDone();}@Overridepublic void handleException(TransportException e) {logger.info((Supplier<?>) () -> new ParameterizedMessage("failed to get local cluster state for {}, disconnecting...", nodeToPing), e);try {hostFailureListener.onNodeDisconnected(nodeToPing, e);} finally {onDone();}}});}});}latch.await();} catch (InterruptedException e) {Thread.currentThread().interrupt();return;}HashSet<DiscoveryNode> newNodes = new HashSet<>();HashSet<DiscoveryNode> newFilteredNodes = new HashSet<>();for (Map.Entry<DiscoveryNode, ClusterStateResponse> entry : clusterStateResponses.entrySet()) {if (!ignoreClusterName && !clusterName.equals(entry.getValue().getClusterName())) {logger.warn("node {} not part of the cluster {}, ignoring...",entry.getValue().getState().nodes().getLocalNode(), clusterName);newFilteredNodes.add(entry.getKey());continue;}for (ObjectCursor<DiscoveryNode> cursor : entry.getValue().getState().nodes().getDataNodes().values()) {newNodes.add(cursor.value);}}// 验证新节点是否可连接nodes = validateNewNodes(newNodes);filteredNodes = Collections.unmodifiableList(new ArrayList<>(newFilteredNodes));}

通过代码发现,其实用了线程池并发连接节点,但是也使用了CountDownLatch,这就导致了,如果有一个节点超时,那整个批次都需要等待这么长的时间。典型的长尾效应

为啥超时间会出现翻倍

建立TransportClientNodesService服务时,构造函数中增加了对NodeSampler的调度。

    TransportClientNodesService(Settings settings, TransportService transportService,ThreadPool threadPool, TransportClient.HostFailureListener hostFailureListener) {.........this.nodesSamplerFuture = threadPool.schedule(nodesSamplerInterval, ThreadPool.Names.GENERIC, new ScheduledNodeSampler());}

ScheduledNodeSampler

当调度触发之后,也会去执行nodesSampler.sample();,也就对mutex锁有了竞争,当调用增加连接方法之后,就会有两次调用 nodesSampler.sample();也就会将超时时间翻倍。

class ScheduledNodeSampler implements Runnable {@Overridepublic void run() {try {nodesSampler.sample();if (!closed) {nodesSamplerFuture = threadPool.schedule(nodesSamplerInterval, ThreadPool.Names.GENERIC, this);}} catch (Exception e) {logger.warn("failed to sample", e);}}
}

优化方案

  1. Settings增加超时transport的tcp超时配置。

     final Settings settings = Settings.builder().put("cluster.name", "common-es").put("transport.tcp.connect_timeout", "5s").put("client.transport.sniff", true).build();
    

    注意此配置的参数名不同版本之间存在差异。

  2. 使用数组方式增加连接节点,减少反复调用TransportClientNodesService addTransportAddresses次数,就是在减少分批次的产生阻塞耗时

这篇关于【ES实战】ES创建Transports客户端时间过长分析的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/259722

相关文章

Window Server创建2台服务器的故障转移群集的图文教程

《WindowServer创建2台服务器的故障转移群集的图文教程》本文主要介绍了在WindowsServer系统上创建一个包含两台成员服务器的故障转移群集,文中通过图文示例介绍的非常详细,对大家的... 目录一、 准备条件二、在ServerB安装故障转移群集三、在ServerC安装故障转移群集,操作与Ser

Golang操作DuckDB实战案例分享

《Golang操作DuckDB实战案例分享》DuckDB是一个嵌入式SQL数据库引擎,它与众所周知的SQLite非常相似,但它是为olap风格的工作负载设计的,DuckDB支持各种数据类型和SQL特性... 目录DuckDB的主要优点环境准备初始化表和数据查询单行或多行错误处理和事务完整代码最后总结Duck

Window Server2016 AD域的创建的方法步骤

《WindowServer2016AD域的创建的方法步骤》本文主要介绍了WindowServer2016AD域的创建的方法步骤,文中通过图文介绍的非常详细,对大家的学习或者工作具有一定的参考学习价... 目录一、准备条件二、在ServerA服务器中常见AD域管理器:三、创建AD域,域地址为“test.ly”

Python手搓邮件发送客户端

《Python手搓邮件发送客户端》这篇文章主要为大家详细介绍了如何使用Python手搓邮件发送客户端,支持发送邮件,附件,定时发送以及个性化邮件正文,感兴趣的可以了解下... 目录1. 简介2.主要功能2.1.邮件发送功能2.2.个性签名功能2.3.定时发送功能2. 4.附件管理2.5.配置加载功能2.6.

Redis主从/哨兵机制原理分析

《Redis主从/哨兵机制原理分析》本文介绍了Redis的主从复制和哨兵机制,主从复制实现了数据的热备份和负载均衡,而哨兵机制可以监控Redis集群,实现自动故障转移,哨兵机制通过监控、下线、选举和故... 目录一、主从复制1.1 什么是主从复制1.2 主从复制的作用1.3 主从复制原理1.3.1 全量复制

Python中的随机森林算法与实战

《Python中的随机森林算法与实战》本文详细介绍了随机森林算法,包括其原理、实现步骤、分类和回归案例,并讨论了其优点和缺点,通过面向对象编程实现了一个简单的随机森林模型,并应用于鸢尾花分类和波士顿房... 目录1、随机森林算法概述2、随机森林的原理3、实现步骤4、分类案例:使用随机森林预测鸢尾花品种4.1

Redis主从复制的原理分析

《Redis主从复制的原理分析》Redis主从复制通过将数据镜像到多个从节点,实现高可用性和扩展性,主从复制包括初次全量同步和增量同步两个阶段,为优化复制性能,可以采用AOF持久化、调整复制超时时间、... 目录Redis主从复制的原理主从复制概述配置主从复制数据同步过程复制一致性与延迟故障转移机制监控与维

Python在固定文件夹批量创建固定后缀的文件(方法详解)

《Python在固定文件夹批量创建固定后缀的文件(方法详解)》文章讲述了如何使用Python批量创建后缀为.md的文件夹,生成100个,代码中需要修改的路径、前缀和后缀名,并提供了注意事项和代码示例,... 目录1. python需求的任务2. Python代码的实现3. 代码修改的位置4. 运行结果5.

使用IntelliJ IDEA创建简单的Java Web项目完整步骤

《使用IntelliJIDEA创建简单的JavaWeb项目完整步骤》:本文主要介绍如何使用IntelliJIDEA创建一个简单的JavaWeb项目,实现登录、注册和查看用户列表功能,使用Se... 目录前置准备项目功能实现步骤1. 创建项目2. 配置 Tomcat3. 项目文件结构4. 创建数据库和表5.

Python 标准库time时间的访问和转换问题小结

《Python标准库time时间的访问和转换问题小结》time模块为Python提供了处理时间和日期的多种功能,适用于多种与时间相关的场景,包括获取当前时间、格式化时间、暂停程序执行、计算程序运行时... 目录模块介绍使用场景主要类主要函数 - time()- sleep()- localtime()- g