高吞吐SFTP连接池设计方案

2024-03-10 13:04

本文主要是介绍高吞吐SFTP连接池设计方案,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

背景

在现代的数据驱动环境中,安全文件传输协议(SFTP)扮演着至关重要的角色,它提供了一种安全、可靠的文件传输方式。我们目前项目是一个大型数据集成平台,跟上下游有很多文件对接是通过SFTP协议,当需要处理大量文件或进行大规模数据迁移时,我们常常会遇到因上下游服务器的限制导致连接访问被中断或者文件传输中断,大大影响系统的文件传输效率。常见的报错例子 com.jcraft.jsch.JSchException: Session.connect: java.net.SocketException: Connection resetcom.jcraft.jsch.JSchException: connection is closed by foreign hostcom.jcraft.jsch.JSchException: session is downcom.jcraft.jsch.JSchException: channel is not opened 等。这些连接问题也一直困扰着我们,那我们有没有办法在上下游有限的资源去最大提高文件并行处理能力呢?目前网上的连接池方案只是简单解决了SSH Session复用,但是并没有充分利用同一个SSH Session对象中的SFTP Channel以减少Session的开销,所以当文件量大的时候,还是会因为服务器的SSH Session数量限制从而影响文件并行传输效率。最节省资源又能提高文件并行处理方式的做法应该是同时复用SSH Session以及其SFTP Channel,这样最大并行可以处理的文件数就是 SSH Session数*SFTP Channel数,而不是直接每个文件处理线程开启一个SSH Session,只使用SSH Session的一个Channel

问题原因

所有上下游的SFTP服务器都是连接限制,一般来说默认情况下,常见的Linux服务器默认MaxSessions10MaxStartups10:30:60

这里简单对MaxSessionsMaxStartups两个配置参数解释下,这个对我们理解SFTP很重要。

  • MaxStartups 10:30:60:这里指服务器开始时可以进行的最大未经身份验证的连接数是10,当超过第一个数字的连接数时(10),SSHD 将开始随机地拒绝新的连接,在达到第三个数字时(60),SSHD 将拒绝所有新的连接。注意,这里是拒绝未经身份认证的连接数,已经登陆的不会限制,也就是客户端如果同时创建一堆连接等待认证的话,等待的数量超过这个配置就会被拒绝
  • MaxSessions 10: 这里是指每个SSH连接可以创建多少个通信通道,例如JSCH中的ChannelSftp文件处理通道。

如果上下游不对其SFTP服务器做任何设置,那么其服务器最大允许同时并行10个SSH连接(超过10个连接服务器SSHD服务就会随机中断连接,服务开始不稳定),每个SSH连接最多可以打开10SFTP Channel

这里要说下MaxSessionsMaxStartups参数对Java SFTP客户端的影响

以下以目前常用的SFTP框架JSCH为例子。 MaxStartups 和 MaxSessions 是 SSH 守护进程(SSHD)的配置选项,而 JSch 中的 Session 和 ChannelSftp 是 Java 客户端连接到 SSH 服务的对象。它们之间的关系如下:

  • MaxStartups:这是 SSHD 配置的一部分,用于控制并发未经身份验证的连接的数量。这对于防止暴力破解攻击非常有用,因为它限制了同时尝试身份验证的连接数。这个设置对于 JSch 客户端来说,主要影响的是当你尝试并发创建多个 Session 对象(即多个 SSH 连接)时,服务器端可能会开始拒绝额外的连接。
  • MaxSessions:这也是 SSHD 配置的一部分,用于限制每个网络连接(即每个 SSH Session)可以打开的最大会话(channel)数量。这个设置对于 JSch 客户端来说,主要影响的是在一个 Session 对象上你可以创建多少个 ChannelSftp 对象(或其他类型的 Channel 对象)。如果你尝试超过 MaxSessions 的限制创建更多的 Channel,服务器端可能会拒绝新的 Channel 创建请求。

总的来说,MaxStartupsMaxSessions 是服务器端的限制,它们影响了客户端(例如使用 JSch 的应用程序)可以并行创建多少个 SessionChannel。如果我们要设计一个需要处理大量并发 SFTP 传输的系统,需要考虑这些限制,并在必要时调整服务器的 SSHD 配置。

以文件下载为例,这里用现实中的模型简单说下目前我们数据集成平台下载文件的场景
我们数据集成平台需要从不同的上游的下载文件,A、B、C公司相当于我们的上游系统,某东快递相当于我们的数据集成系统。如下图所示,以其中一个快递公司的视角,某东快递需要对接多个商家收件,如果想提高收件速度,最理想就是安排不同的员工并行到不同的商家去取件,这样才可以同时最快收件;而以其中一个商家的视角,A公司要应对不同快递公司的并行取件请求,必然需要对人员有些限制,不然快递员太多可能会影响公司的正常运作,这也就是为什么SFTP服务器需要限制外部连接。

这里用现实生活中的场景来类比我们数据集成平台和上游系统进行文件下载交互的模型,方便大家理解SFTP服务器的限制,以及面对上游服务器的限制怎么最大化我们的传输效率。P.S:以下内容是以我们数据集成平台的视角去陈述。

1对1模型(对接一个上游下载文件)

因为我们目的是提升我们数据集成平台的并行文件传输效率,所以这里是以我们数据集成平台为参照物,假设只跟一个上游交互的场景。(这里为了简单化说明,假设上游服务器每个用户允许最大并发连接是1,打开最大会话数量是2)。

用例说明: 某东快递类比我们的数据集成平台,我们平台要去上游收件,而A公司类比我们要对接的其中一个上游。A公司的门禁类比上游服务器的限制规则。某东快递的快递员类比数据集成平台需要创建的SSH连接,每个快递员可以同时搬运几个货物类比每个SSH连接打开的SFTP Channel数量。


场景:
某东快递需要上门到A公司取件,A公司有三个件在仓库,A公司有以下门禁要求,不满足就会被拒绝访问:
1.同一个时间允许同一家物流公司的3个快递员进入
2.每个工人不能取超过一个货物

某东快递如果想取件时效性高,在快递员充足的情况下,最理想是派3个快递员同时取件,每人拿一个就可以一次拿完。但是由于A公司有门禁要求,3名快递员同时取取件,会有一位快递员会被保安拒绝,造成人员浪费。因此只需要同时派2名快递员,一个人拿2个,另一个人拿一个就可以。这样子就能一次拿完,也不会人员浪费,时效性也高。

但是真实情况,在对接上游时,对方一般都不会告诉我们有什么限制,通常都是被对方拦截才知道,而且快递员也是有成本的。所以我们作为某东快递,如果一直派快递员去A公司取件,除了可能派出去的快递员会被拦截,还会造成人员浪费。因此我们需要一种灵活配置的方式,可以基于对接的上游,灵活分配符合需求的快递员,这样可以在符合上游限制范围内,最大化我们的取件效率。

我们需要有一种可配置的池化技术,除了可以重复利用快递员,还需要在对方限制的范围基于以下原则合理分配:

  1. 并行派出的快递员数<=对方限制同一时间允许相同公司进入的快递员数
  2. 每位快递员并行取件的数量<=对方限制每个快递员单次同时取件的数量

用上面的例子转化成计算机术语,就是需要我们数据集成平台满足以下条件:

  1. 为A上游同一时间不能建立超过2SSH Session
  2. 每个SSH Session最多只能建立一个Sftp Channel

1对多模型(对接多个上游下载文件)

1对多模型是1对1模型的延伸,因为我们目的是提升我们数据集成平台的并行文件传输效率,所以这里是以我们数据集成平台为参照物,假设只跟多个上游交互的场景。(这里为了简单化说明,假设上游服务器每个用户允许最大并发连接是1,打开最大会话数量是2)。

用例说明: 跟以上1对1模型类似,某东快递类比我们的数据集成平台,我们平台要去多个上游收件,而A、B公司类比我们要对接的两个上游。A、B公司的门禁类比上游服务器的限制规则。某东快递的快递员类比数据集成平台需要创建的SSH连接,每个快递员可以同时搬运几个货物类比每个SSH连接打开的SFTP Channel数量。


场景:
某东快递需要上门到A、B公司取件,A、B公司各有三个件在仓库,A、B公司均有以下门禁要求,不满足就会被拒绝访问:
1.同一个时间允许同一家物流公司的3个快递员进入
2.每个工人不能取超过一个货物

跟1对1模型类似,某东快递如果想取件时效性高,在快递员充足的情况下,最理想是同时为两家公司各派3名快递员到A、B公司取件,每人拿一个就可以一次拿完。但是由于A、B公司有门禁要求,3名快递员同时取取件,会有一位快递员会被保安拒绝,造成人员浪费。因此只需要同时各派2名快递员,一个人拿2个,另一个人拿一个就可以。这样子就能一次拿完,也不会人员浪费,时效性也高。

因为这里是一对多模型,跟上面会有一点不一样的地方,我们除了需要上面提到的可配置的池化技术,还需要针对不同上下游做资源隔离,基本原则如下:

  1. 并行派出的快递员数<=对方限制同一时间允许相同公司进入的快递员数
  2. 每位快递员并行取件的数量<=对方限制每个快递员单次同时取件的数量
  3. 同一时间为每个公司分配的快递员是独立的,不会资源竞争

用上面的例子转化成计算机术语,就是需要我们数据集成平台:

  1. 为A、B上游同一时间不能超过2SSH Session
  2. 每个SSH Session最多只能建立一个Sftp Channel
  3. A、B的SSH Session连接池是隔离的,不会互相竞争

解决方案

基于前文提到的思路,我们已经有了可配置连接池的雏形,抽象出来的连接池模型需要符合如下条件:

  1. 为每台对接的SFTP服务器同一时间不能分配超过XSSH Session
  2. 为每台对接的SFTP服务器分配的每个SSH Session最多只能建立YSFTP Channel
  3. 为每台对接的SFTP服务器分配的SSH Session连接池是隔离的,不会互相竞争

以上X代表的我们需要配置的最大SSH Session连接数,Y代表我们需要配置的每个SSH Session最大能创建的Channel数,这两个值都需要根据对接的SFTP服务器的限制和自身服务器情况去调整。

为了避免连接池浪费SSH Session资源,我们还需要做进一步优化,就是要限制每个SSH Session需要创建满YSFTP Channel才创建一个新的SSH Session,而不是每次请求都先创建一个新的SSH Session,这样会导致每个SSH Session都没被充分利用,而且也会增加系统的负担。

优化后的连接池模型如下:

  1. 为每台对接的SFTP服务器同一时间不能分配超过XSSH Session
  2. 为每台对接的SFTP服务器分配的每个SSH Session最多只能建立YSFTP Channel
  3. 为每台对接的SFTP服务器分配的SSH Session连接池是隔离的,不会互相竞争
  4. 多个线程请求获取对某台SFTP服务器SSH Session时,如果当前SSH Session池中的第一个Session的SFTP Channel 数还没超过Y,则会返回同一个SSH Session实例,假如当前的SSH Session池的第一个Session创建的SFTP Channel数已经超过配置的Y值,则创建第二个SSH Session,如此类推,直到创建第N个SSH Session,N <=X

对于每个上游分配的连接池请求流程如下:
AI生产那个的流程图

代码实现

SFTP Session Pool

这是一个 SFTP 会话池的实现,它使用了 JSch 库来创建和管理 SFTP 会话(Session)和通道(ChannelSftp)。这个会话池的主要目的是复用已经创建的 SFTP 会话和通道,以提高文件传输的效率。

以下是这个类的主要功能和设计考虑:

  • 会话和通道的创建和管理: 这个类使用 getSession 方法来从池中获取一个会话,如果没有可用的会话,它会创建一个新的会话。然后,它使用 getChannel 方法来从一个会话中打开一个新的 SFTP 通道。

  • 并发控制: 这个类使用了 Semaphore 对象来限制每个主机的最大会话数(maxSessionsPerHostSemaphore)以及每个会话的最大通道数(channelCounts)。这样可以防止过多的并发连接导致系统资源耗尽。

  • 异常处理: 如果在创建会话或打开通道时发生异常,这个类会从池中移除无效的会话,并释放相应的 Semaphore 许可。这样可以确保池中只包含有效的会话和通道。

  • 会话和通道的复用: 如果一个会话的所有通道都已经关闭,这个类会关闭这个会话并从池中移除它。否则,它会将这个会话放回池中,以便后续的请求可以复用它。

  • Key锁: 这是用来防止多个持有相同的(host+port+username)的线程取Session的时候会出现并发,导致创建的Session数比预期多。

  • 会话释放 - 如果当前Session仍有Channel在使用(意味着其他线程在使用该Session),则会放回连接池。否则,会关闭。

这个类的设计提供了一个有效的方式来管理 SFTP 会话和通道,使得它们可以被多个请求复用,从而提高文件传输的效率。

package pool.common.utils;import com.jcraft.jsch.ChannelSftp;
import com.jcraft.jsch.JSch;
import com.jcraft.jsch.JSchException;
import com.jcraft.jsch.Session;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.StringUtils;
import pool.exceptions.NoAvailableSessionException;import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;@Slf4j
public class SftpSessionPool {private ConcurrentHashMap<String, ConcurrentLinkedQueue<Session>> sessions;private ConcurrentHashMap<Session, Semaphore> channelCounts;private int maxChannelsPerSession;private Semaphore maxSessionsPerHostSemaphore;ReentrantLockPool lock = new ReentrantLockPool();public SftpSessionPool(int maxSessionsPerHost, int maxChannelsPerSession) {this.sessions = new ConcurrentHashMap<>();this.channelCounts = new ConcurrentHashMap<>();this.maxChannelsPerSession = maxChannelsPerSession;this.maxSessionsPerHostSemaphore = new Semaphore(maxSessionsPerHost, true);}/*** get session with keyLock** @param host* @param port* @param username* @param password* @param timeout* @param unit* @param lockKey* @return* @throws JSchException* @throws InterruptedException*/public Session getSession(String host, int port, String username, String password, long timeout, TimeUnit unit, String lockKey) throws JSchException, InterruptedException {String sessionKey = host + ":" + port + ":" + username;if (StringUtils.hasLength(lockKey)) {// waiting for the available lockwhile (!lock.lock(lockKey)) {Thread.sleep(1000); // wait a bit before retryinglog.debug("Thread {} failed to get lock ", Thread.currentThread().getId());}log.debug("Thread {} get lock successfully", Thread.currentThread().getId());}try {return getSessionFromPool(host, port, username, password, timeout, unit, sessionKey);} finally {if (StringUtils.hasLength(lockKey)) {lock.unlock(lockKey);log.debug("Thread {} release lock successfully", Thread.currentThread().getId());}}}private Session getSessionFromPool(String host, int port, String username, String password, long timeout, TimeUnit unit, String sessionKey) throws JSchException, InterruptedException {ConcurrentLinkedQueue<Session> hostSessions = sessions.computeIfAbsent(sessionKey, k -> new ConcurrentLinkedQueue<>());long endTime = System.nanoTime() + unit.toNanos(timeout);while (System.nanoTime() < endTime) {for (Session session : hostSessions) {if (!session.isConnected()) {// This session is no longer valid, remove it from the poollog.warn("This session is no longer valid, remove it from the pool ,sessionId: {}, session detail: {}", session, sessionKey);hostSessions.remove(session);channelCounts.remove(session);maxSessionsPerHostSemaphore.release();continue;}Semaphore channelSemaphore = channelCounts.get(session);if (channelSemaphore != null && channelSemaphore.tryAcquire()) {return session;}}if (maxSessionsPerHostSemaphore.tryAcquire()) {try {Session session = createNewSession(host, port, username, password);hostSessions.add(session);channelCounts.put(session, new Semaphore(maxChannelsPerSession - 1)); // one channel is already in usereturn session;} catch (JSchException e) {maxSessionsPerHostSemaphore.release();throw e;}}Thread.sleep(100); // wait a bit before retrying}throw new NoAvailableSessionException("Timeout while waiting for a session");}private Session createNewSession(String host, int port, String username, String password) throws JSchException {JSch jsch = new JSch();Session session = jsch.getSession(username, host, port);session.setPassword(password);session.setConfig("StrictHostKeyChecking", "no"); // Enable StrictHostKeyCheckingsession.setTimeout(5000); // Set connection timeout to 5 secondssession.connect();return session;}public ChannelSftp getChannel(Session session) throws JSchException {try {return (ChannelSftp) session.openChannel("sftp");} catch (JSchException e) {if (!session.isConnected()) {// The session is no longer valid, remove it from the poolString key = session.getHost() + ":" + session.getUserName();ConcurrentLinkedQueue<Session> sessions = this.sessions.get(key);sessions.remove(session);channelCounts.remove(session);maxSessionsPerHostSemaphore.release();}throw e;}}/*** Return the session if current session is till used by another thread ,otherwise close it.* @param session*/public synchronized void returnOrCloseSession(Session session) {Semaphore channelSemaphore = channelCounts.get(session);if (channelSemaphore != null) {channelSemaphore.release();if (channelSemaphore.availablePermits() < maxChannelsPerSession) {// still channels left, put back to the poolreturn;}}// close the sessionsession.disconnect();channelCounts.remove(session);sessions.values().forEach(queue -> queue.remove(session));maxSessionsPerHostSemaphore.release();}}

SftpConnectionPoolFactory

为了对接不同的上游,要实现连接池隔离,这里创建一个工厂类给调用者,通过不同的Key(host+port+username)可以复用独立的连接池,这里是因为考虑到有些SFTP Server是多个用户复用的,不同的用户文件传输需求应该是可以并行处理,因此这里连接池要资源隔离。

package pool.common.utils;import java.util.concurrent.ConcurrentHashMap;public class SftpConnectionPoolFactory {private static volatile SftpConnectionPoolFactory instance;private ConcurrentHashMap<String, SftpSessionPool> sessionPools;private SftpConnectionPoolFactory() {this.sessionPools = new ConcurrentHashMap<>();}public static SftpConnectionPoolFactory getInstance() {if (instance == null) {synchronized (SftpConnectionPoolFactory.class) {if (instance == null) {instance = new SftpConnectionPoolFactory();}}}return instance;}public SftpSessionPool getSessionPool(String host, int port, String username, int maxSessionsPerHost, int maxChannelsPerSession) {String key = host + ":" + port + ":" + username;return sessionPools.computeIfAbsent(key, k -> new SftpSessionPool(maxSessionsPerHost, maxChannelsPerSession));}
}

SftpFileProcessThread

这里创建一个文件处理线程来模拟文件处理,这里获取Session对象时使用host + ":" + username作为锁是防止获取Session时并发导致创建的连接数超过预期。

package pool.demo;import com.jcraft.jsch.ChannelSftp;
import com.jcraft.jsch.Session;
import lombok.extern.slf4j.Slf4j;
import pool.common.utils.SftpSessionPool;
import pool.exceptions.NoAvailableSessionException;import java.util.Vector;
import java.util.concurrent.TimeUnit;@Slf4j
public class SftpFileProcessThread extends Thread {private int maxRetries = 3;private SftpSessionPool pool;private String host;private int port;private String username;private String password;private String remoteSftpPath;public SftpFileProcessThread(SftpSessionPool pool, String host, int port, String username, String password, String remoteSftpPath, int maxRetries) {this.pool = pool;this.host = host;this.port = port;this.username = username;this.password = password;this.remoteSftpPath = remoteSftpPath;this.maxRetries = maxRetries;}public void run() {Session session = null;ChannelSftp channelSftp = null;try {int retries = 0;while (true) {try {// session = pool.getSession(host, port, username, password, 10, TimeUnit.SECONDS, "");session = pool.getSession(host, port, username, password, 10, TimeUnit.SECONDS, host + ":" + username); //get session with lockbreak; // if getSession() is successful, break the loop} catch (NoAvailableSessionException e) {if (++retries > this.maxRetries) {throw e; // if exceeded max retries, rethrow the exception}log.info("Thread {}:Failed to get session in this round. Start to retry now. Current retry count is {}. Request session detail: {}", this.getId(), retries, this.host + ":" + this.username);// if not exceeded max retries, sleep for a while and then continue the loop to retryThread.sleep(5000);}}log.info("Thread {} got session {}. Session detail: {}", this.getId(), session, session.getHost() + ":" + session.getUserName());channelSftp = pool.getChannel(session);channelSftp.connect();// ... use the session and channelVector<ChannelSftp.LsEntry> list = channelSftp.ls(this.remoteSftpPath);for (ChannelSftp.LsEntry entry : list) {// System.out.println(entry.getFilename());//Simulate file processThread.sleep(5000);}} catch (Exception e) {log.error("Thread {} failed to process", this.getId(), e);} finally {if (channelSftp != null) {channelSftp.disconnect();}if (session != null) {pool.returnOrCloseSession(session);log.info("Thread {} closed session {}. Session detail: {}", this.getId(), session, this.host + ":" + this.username);}}}
}

SftpService

这里是程序入口,从数据库配置获取连接池配置信息,然后默认开启了10条线程去模拟请求连接池并处理文件任务,后面我们进行测试时,需要改这个线程数来演示。

package pool.services;import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import pool.common.utils.SftpConnectionPoolFactory;
import pool.common.utils.SftpSessionPool;
import pool.dataobject.SftpConfig;
import pool.demo.SftpFileProcessThread;
import pool.repositories.SftpConfigRepository;import java.util.List;@Slf4j
@Service
public class SftpService {private final SftpConfigRepository sftpConfigRepository;// final static Map<String, SftpSessionPool> map = new ConcurrentHashMap();public SftpService(SftpConfigRepository sftpConfigRepository) {this.sftpConfigRepository = sftpConfigRepository;}public void initializeConnectionPools() {// Load all SFTP configs from the databaseList<SftpConfig> configs = sftpConfigRepository.findAll();//Indicate how many files need to be processed in the same timeint max_concurrent_opening_files = 10;String testPath = "/";// Initialize a connection pool for each configfor (SftpConfig config : configs) {log.info("config->{}", config);//map.put(config.getHost() + config.getUsername(), new SftpSessionPool(config.getMaxSessions(), config.getMaxChannels()));SftpSessionPool sessionPool = SftpConnectionPoolFactory.getInstance().getSessionPool(config.getHost(), 22, config.getUsername(), config.getMaxSessions(), config.getMaxChannels());//Simulate each sftp profile is being used by multiple threads for file processfor (int i = 0; i < max_concurrent_opening_files; i++) {SftpFileProcessThread thread = new SftpFileProcessThread(sessionPool, config.getHost(), config.getPort(), config.getUsername(), config.getPassword(), testPath, 0);thread.start();}}}
}

SFTP_CONFIG表

这里我创建了一张配置表,用来配置连接池的上限参数,这里JSCH的 MaxSession对应的是服务器MaxStartups参数,JSCH的MaxChannel对应的是服务器的MaxSessions参数

CREATE TABLE SFTP_CONFIG (ID INT AUTO_INCREMENT PRIMARY KEY,HOST VARCHAR(255),PORT INT,USERNAME VARCHAR(255),PASSWORD VARCHAR(255),MAXSESSIONS INT,MAXCHANNELS INT
);

测试连接池

1.不超过服务器限制

1.1 这里我把SFTP服务器的MaxStartups设置成2MaxSession设置成5。这里把SFTP 的限制设低点是方便测试。

parallels@ubuntu-linux-22-04-desktop:~$ grep -i "MaxStartups" /etc/ssh/sshd_config
MaxStartups 2
parallels@ubuntu-linux-22-04-desktop:~$ grep -i "MaxSessions" /etc/ssh/sshd_config
MaxSessions 5
parallels@ubuntu-linux-22-04-desktop:~$ 

1.2 把连接池的数据库配置改成MaxSession=3,MaxChannel =5 与上面服务器配置一致。

INSERT INTO SFTP_CONFIG (HOST, PORT, USERNAME, PASSWORD, MAXSESSIONS, MAXCHANNELS) VALUES
('192.168.50.58', 22,'parallels', 'test8808', 2, 5);

1.3 接下来我们来启动程序,会同时并行开启10个线程持有相同的连接信息去访问同一台SFTP 服务器,我们看看程序是否会符合我们预期,10个线程应该只会打开2SFTP Session,并且连接服务器不会报错。

2024-03-09 01:22:03.536  INFO 58516 --- [       Thread-3] pool.demo.SftpFileProcessThread          : Thread 35 got session com.jcraft.jsch.Session@5cae46b5. Session detail: 192.168.50.58:parallels
2024-03-09 01:22:04.072  INFO 58516 --- [       Thread-4] pool.demo.SftpFileProcessThread          : Thread 36 got session com.jcraft.jsch.Session@5cae46b5. Session detail: 192.168.50.58:parallels
2024-03-09 01:22:04.072  INFO 58516 --- [       Thread-7] pool.demo.SftpFileProcessThread          : Thread 39 got session com.jcraft.jsch.Session@5cae46b5. Session detail: 192.168.50.58:parallels
2024-03-09 01:22:05.075  INFO 58516 --- [       Thread-8] pool.demo.SftpFileProcessThread          : Thread 40 got session com.jcraft.jsch.Session@5cae46b5. Session detail: 192.168.50.58:parallels
2024-03-09 01:22:05.075  INFO 58516 --- [       Thread-6] pool.demo.SftpFileProcessThread          : Thread 38 got session com.jcraft.jsch.Session@5cae46b5. Session detail: 192.168.50.58:parallels
2024-03-09 01:22:05.191  INFO 58516 --- [       Thread-9] pool.demo.SftpFileProcessThread          : Thread 41 got session com.jcraft.jsch.Session@17e2aaef. Session detail: 192.168.50.58:parallels
2024-03-09 01:22:06.083  INFO 58516 --- [      Thread-11] pool.demo.SftpFileProcessThread          : Thread 43 got session com.jcraft.jsch.Session@17e2aaef. Session detail: 192.168.50.58:parallels
2024-03-09 01:22:06.084  INFO 58516 --- [       Thread-5] pool.demo.SftpFileProcessThread          : Thread 37 got session com.jcraft.jsch.Session@17e2aaef. Session detail: 192.168.50.58:parallels
2024-03-09 01:22:06.086  INFO 58516 --- [      Thread-12] pool.demo.SftpFileProcessThread          : Thread 44 got session com.jcraft.jsch.Session@17e2aaef. Session detail: 192.168.50.58:parallels
2024-03-09 01:22:07.084  INFO 58516 --- [      Thread-10] pool.demo.SftpFileProcessThread          : Thread 42 got session com.jcraft.jsch.Session@17e2aaef. Session detail: 192.168.50.58:parallels
2024-03-09 01:24:03.692  INFO 58516 --- [       Thread-3] pool.demo.SftpFileProcessThread          : Thread 35 closed session com.jcraft.jsch.Session@5cae46b5. Session detail: 192.168.50.58:parallels
2024-03-09 01:24:04.157  INFO 58516 --- [       Thread-4] pool.demo.SftpFileProcessThread          : Thread 36 closed session com.jcraft.jsch.Session@5cae46b5. Session detail: 192.168.50.58:parallels
2024-03-09 01:24:04.160  INFO 58516 --- [       Thread-7] pool.demo.SftpFileProcessThread          : Thread 39 closed session com.jcraft.jsch.Session@5cae46b5. Session detail: 192.168.50.58:parallels
2024-03-09 01:24:05.169  INFO 58516 --- [       Thread-8] pool.demo.SftpFileProcessThread          : Thread 40 closed session com.jcraft.jsch.Session@5cae46b5. Session detail: 192.168.50.58:parallels
2024-03-09 01:24:05.198  INFO 58516 --- [       Thread-6] pool.demo.SftpFileProcessThread          : Thread 38 closed session com.jcraft.jsch.Session@5cae46b5. Session detail: 192.168.50.58:parallels
2024-03-09 01:24:05.298  INFO 58516 --- [       Thread-9] pool.demo.SftpFileProcessThread          : Thread 41 closed session com.jcraft.jsch.Session@17e2aaef. Session detail: 192.168.50.58:parallels
2024-03-09 01:24:06.176  INFO 58516 --- [      Thread-11] pool.demo.SftpFileProcessThread          : Thread 43 closed session com.jcraft.jsch.Session@17e2aaef. Session detail: 192.168.50.58:parallels
2024-03-09 01:24:06.182  INFO 58516 --- [       Thread-5] pool.demo.SftpFileProcessThread          : Thread 37 closed session com.jcraft.jsch.Session@17e2aaef. Session detail: 192.168.50.58:parallels
2024-03-09 01:24:06.182  INFO 58516 --- [      Thread-12] pool.demo.SftpFileProcessThread          : Thread 44 closed session com.jcraft.jsch.Session@17e2aaef. Session detail: 192.168.50.58:parallels
2024-03-09 01:24:07.185  INFO 58516 --- [      Thread-10] pool.demo.SftpFileProcessThread          : Thread 42 closed session com.jcraft.jsch.Session@17e2aaef. Session detail: 192.168.50.58:parallels

从以上日志可以看出,10条线程虽然都并行持有相同的SFTP连接信息请求连接池,但是只会获取到2SFTP Session实例而不是10SFTP Session,并且当连接池里的SSH Session持有的Channel还没有达到配置的上限5个时,只会分配同一个SSH Session实例,当前连接池里的SSH Session对象持有的Channel超过5才会分配下一个SFTP Session对象,来确保SFTP Session不会浪费。因此目前连接池是符合我们预期想要的效果。

2.超过服务器MaxSessions

2.1 服务器配置跟上面一样,MaxStartups设置成2MaxSession设置成5

parallels@ubuntu-linux-22-04-desktop:~$ grep -i "MaxStartups" /etc/ssh/sshd_config
MaxStartups 2
parallels@ubuntu-linux-22-04-desktop:~$ grep -i "MaxSessions" /etc/ssh/sshd_config
MaxSessions 5
parallels@ubuntu-linux-22-04-desktop:~$ 

2.2 把连接池的数据库配置改成MaxSession=3,MaxChannel =1 ,目的是测试当连接池配置的最大Channel数大于服务器限制时,程序多个线程并行进行文件处理会不会报错。

INSERT INTO SFTP_CONFIG (HOST, PORT, USERNAME, PASSWORD, MAXSESSIONS, MAXCHANNELS) VALUES
('192.168.50.57', 22,'parallels', 'test8808', 1, 6);

2.3 接下来我们来启动程序,会同时并行开启6个线程持有相同的连接信息去访问同一台SFTP 服务器,我们看看程序是否会符合我们预期。如果程序符合预期的话,6个线程会同时使用1个SSH Session去尝试打开6个Channel,第6个线程开始打开Channel进行文件处理的时候应该会开始报错,因为已经超过了服务器所能承受的 MaxSession=5

2024-03-09 15:30:16.725  INFO 74635 --- [           main] pool.services.SftpService                : config->SftpConfig(id=1, host=192.168.50.57, port=22, username=parallels, password=test8808, maxSessions=1, maxChannels=6)
2024-03-09 15:30:17.180  INFO 74635 --- [       Thread-6] pool.demo.SftpFileProcessThread          : Thread 38 got session com.jcraft.jsch.Session@63c4e36c. Session detail: 192.168.50.57:parallels
2024-03-09 15:30:17.257  INFO 74635 --- [       Thread-3] pool.demo.SftpFileProcessThread          : Thread 35 got session com.jcraft.jsch.Session@63c4e36c. Session detail: 192.168.50.57:parallels
2024-03-09 15:30:17.257  INFO 74635 --- [       Thread-5] pool.demo.SftpFileProcessThread          : Thread 37 got session com.jcraft.jsch.Session@63c4e36c. Session detail: 192.168.50.57:parallels
2024-03-09 15:30:17.259  INFO 74635 --- [       Thread-8] pool.demo.SftpFileProcessThread          : Thread 40 got session com.jcraft.jsch.Session@63c4e36c. Session detail: 192.168.50.57:parallels
2024-03-09 15:30:17.264  INFO 74635 --- [       Thread-7] pool.demo.SftpFileProcessThread          : Thread 39 got session com.jcraft.jsch.Session@63c4e36c. Session detail: 192.168.50.57:parallels
2024-03-09 15:30:17.268  INFO 74635 --- [       Thread-4] pool.demo.SftpFileProcessThread          : Thread 36 got session com.jcraft.jsch.Session@63c4e36c. Session detail: 192.168.50.57:parallels
2024-03-09 15:30:17.278 ERROR 74635 --- [       Thread-4] pool.demo.SftpFileProcessThread          : Thread 36 failed to processcom.jcraft.jsch.JSchException: channel is not opened.at com.jcraft.jsch.Channel.sendChannelOpen(Channel.java:835) ~[jsch-0.2.16.jar:0.2.16]at com.jcraft.jsch.Channel.connect(Channel.java:161) ~[jsch-0.2.16.jar:0.2.16]at com.jcraft.jsch.Channel.connect(Channel.java:155) ~[jsch-0.2.16.jar:0.2.16]at pool.demo.SftpFileProcessThread.run(SftpFileProcessThread.java:54) ~[classes/:na]

从上面日志可以看到,当超过服务器最大Channel配置5时,服务器就会开始拒绝请求,符合我们测试预期。因此我们连接池的配置范围一定要小于等于服务器的配置,否则可能会引起文件传输中断,也就是宁愿少配也不要多配

3.超过服务器MaxStartups

3.1 服务器配置跟上面一样,MaxStartups设置成2MaxSession设置成5

parallels@ubuntu-linux-22-04-desktop:~$ grep -i "MaxStartups" /etc/ssh/sshd_config
MaxStartups 2
parallels@ubuntu-linux-22-04-desktop:~$ grep -i "MaxSessions" /etc/ssh/sshd_config
MaxSessions 5
parallels@ubuntu-linux-22-04-desktop:~$ 

3.2 把连接池的数据库配置改成MaxSession=3,MaxChannel =1 ,目的是测试当连接池配置的最大Session数大于服务器限制时,程序多个线程并发创建SSH Session是否会出错。

INSERT INTO SFTP_CONFIG (HOST, PORT, USERNAME, PASSWORD, MAXSESSIONS, MAXCHANNELS) VALUES
('192.168.50.57', 22,'parallels', 'test8808', 3, 1);

3.3 修改SftpFileProcessThread代码中获取连接方式改成无锁方式,这样可以看到并发创建Session数量超过服务器限制时服务器的报错。因为MaxStartups上面我们配置了2,需要同时多个线程模拟并发创建超过2个等待验证的Session才能看到效果,如果使用有锁模式,创建Session就会变成串行,验证通过才会到下一个Session,服务器就不会同时有多个等待的验证的Session

 session = pool.getSession(host, port, username, password, 10, TimeUnit.SECONDS, "");//session = pool.getSession(host, port, username, password, 10, TimeUnit.SECONDS, host + ":" + username); //get session with lock

3.4 接下来我们来启动程序,会同时并行开启3个线程持有相同的连接信息去访问同一台SFTP 服务器,我们看看程序是否会符合我们预期。如果程序符合预期的话,3个线程会同时创建3个SSH Session,第3个线程尝试创建SSH Session 时应该会报错,因为超过了服务器最大 并发Session数限制MaxStartups=2

2024-03-09 15:55:23.367  INFO 7577 --- [           main] pool.services.SftpService                : config->SftpConfig(id=1, host=192.168.50.57, port=22, username=parallels, password=test8808, maxSessions=3, maxChannels=1)
2024-03-09 15:55:23.455 ERROR 7577 --- [       Thread-5] pool.demo.SftpFileProcessThread          : Thread 37 failed to processcom.jcraft.jsch.JSchException: Session.connect: java.net.SocketException: Connection resetat com.jcraft.jsch.Session.connect(Session.java:570) ~[jsch-0.2.16.jar:0.2.16]at com.jcraft.jsch.Session.connect(Session.java:199) ~[jsch-0.2.16.jar:0.2.16]at pool.common.utils.SftpSessionPool.createNewSession(SftpSessionPool.java:132) ~[classes/:na]at pool.common.utils.SftpSessionPool.getSessionFromPool(SftpSessionPool.java:109) ~[classes/:na]at pool.common.utils.SftpSessionPool.getSession(SftpSessionPool.java:57) ~[classes/:na]at pool.demo.SftpFileProcessThread.run(SftpFileProcessThread.java:39) ~[classes/:na]
Caused by: java.net.SocketException: Connection resetat java.net.SocketInputStream.read(SocketInputStream.java:210) ~[na:1.8.0_231]at java.net.SocketInputStream.read(SocketInputStream.java:141) ~[na:1.8.0_231]at java.net.SocketInputStream.read(SocketInputStream.java:224) ~[na:1.8.0_231]at com.jcraft.jsch.IO.getByte(IO.java:84) ~[jsch-0.2.16.jar:0.2.16]at com.jcraft.jsch.Session.connect(Session.java:276) ~[jsch-0.2.16.jar:0.2.16]... 5 common frames omitted2024-03-09 15:55:23.900  INFO 7577 --- [       Thread-3] pool.demo.SftpFileProcessThread          : Thread 35 got session com.jcraft.jsch.Session@4777f0b1. Session detail: 192.168.50.57:parallels
2024-03-09 15:55:23.900  INFO 7577 --- [       Thread-4] pool.demo.SftpFileProcessThread          : Thread 36 got session com.jcraft.jsch.Session@326d5e63. Session detail: 192.168.50.57:parallels

从上面日志可以看到,3个线程同一时间只有2个线程能成功创建Session,另外一个线程创建Session就报错,这是因为超过了服务器最大并发Session数限制而被拒绝访问。

总结

我们如果使用这种可配置连接池进行访问,对接上游时最好时最好跟上游确认他们服务器可以承受的Session数量和Channel数量是多少,宁愿少配也不要多配。但是对于一般上游,如果使用的是Linux服务器,默认值就是上面一开始提到的MaxSessions=10,MaxStartups=10:30:60,我们客户端连接池保守配置可以设置成MaxSession=3,MaxChannel=10,这样子最大并行可以处理30个文件,MaxSession一般不建议设置太多,非常消耗系统资源。也有很多上游是使用其他SFTP服务管理工具,但是基本限制参数差不多。以上连接池还有很多需要完善的地方,大家可以根据需要自己进行优化。

Github源代码

https://github.com/EvanLeung08/sftp-session-pool

这篇关于高吞吐SFTP连接池设计方案的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/794319

相关文章

如何确定 Go 语言中 HTTP 连接池的最佳参数?

确定 Go 语言中 HTTP 连接池的最佳参数可以通过以下几种方式: 一、分析应用场景和需求 并发请求量: 确定应用程序在特定时间段内可能同时发起的 HTTP 请求数量。如果并发请求量很高,需要设置较大的连接池参数以满足需求。例如,对于一个高并发的 Web 服务,可能同时有数百个请求在处理,此时需要较大的连接池大小。可以通过压力测试工具模拟高并发场景,观察系统在不同并发请求下的性能表现,从而

MySQL数据库连接池技术

关于数据库连接池的使用,首先我们要明白我们为什么要用它,对应普通的数据库连接操作,通常会涉及到以下一些操作是比较耗时的: 网络通讯,涉及到网络延时及协议通讯身份验证,涉及安全性检查连接合法性检查,主要是检查所连接的数据库是否存在并发控制机制构造并初始化输出缓冲区连接成功后的信息保存,日志存储服务器性能数据库配置优化系统分配内存资源等等~~~状况,导致数据库连接操作比较耗时,~~~而且每次都得花费

Redis 客户端Jedis使用---连接池

Jedis 是Redis 的Java客户端,通过一段时间的使用,jedis基本实现redis的所有功能,并且jedis在客户端实现redis数据分片功能,Redis本身是没有数据分布功能。 一、下载jedis 代码 jedis 代码地址:https://github.com/xetorthio/jedis 再次感受到开源的强大。呵呵,大家有时间可以看看源码。 二、项目中如何使用Jedi

Hibernate中自带的连接池!!!

<span style="font-size:18px; font-family: Arial, Helvetica, sans-serif;"><?xml version="1.0" encoding="UTF-8"?></span> <span style="font-size:18px;"><!DOCTYPE hibernate-configuration PUBLIC"-//Hibern

C3P0连接池参数配置

<!--acquireIncrement:链接用完了自动增量3个。 --><property name="acquireIncrement">3</property><!--acquireRetryAttempts:链接失败后重新试30次。--><property name="acquireRetryAttempts">30</property><!--acquireRetryDelay;两次连接

基于MicroPython的ESP8266控制七段数码管的设计方案

以下是一个基于MicroPython的ESP8266控制七段数码管的设计方案: 一、硬件准备 1. ESP8266开发板(如NodeMCU)             2. 七段数码管(共阳或共阴型)                      3. 限流电阻(根据数码管的电流要求选择合适的阻值

OkHttp3源码分析[复用连接池]

OkHttp系列文章如下 OkHttp3源码分析[综述]OkHttp3源码分析[复用连接池]OkHttp3源码分析[缓存策略]OkHttp3源码分析[DiskLruCache]OkHttp3源码分析[任务队列] 1. 概述 HTTP中的keepalive连接在网络性能优化中,对于延迟降低与速度提升的有非常重要的作用。 通常我们进行http连接时,首先进行tcp握手,然后传输数据,最后释

tomcat连接池和dbutils使用

1.    配置tomcat下的conf下的context.xml文件,在<Context> </Context>之间添加连接池配置:             <Context>  <Resource name="jdbc/lhy"     <--对应web.xml     <res-ref-name> -->             auth="Container"

17 连接池原理

可以设计一个mysql的连接池,提高效率 提前建立一个连接池,这里面创建线程池,和mysql建立连接,维护一个任务队列。有任务到来时,将任务放入任务队列,任务结构是要执行的sql语句和需要的回调函数,可以将结果返回。连接池组件有hicar之类的 在网页中的mysql,首先请求进行注册,网站提取参数形成命令,mysql的服务器有很多个,中间可以搭一个软件层来选择要访问的数据库在哪个服务器,同时

MySQL 的基础 一 (连接池, SQL接口, 查询解析器, 查询优化器, 存储引擎接口, 执行器)

目录 1  MySQL数据库的连接池 2  网络连接必须让线程来处理 3  SQL接口:负责处理接收到的SQL语句 4  查询解析器:让MySQL能看懂SQL语句 5  查询优化器:选择最优的查询路径 6  调用存储引擎接口,真正执行SQL语句 7  执行器:根据执行计划调用存储引擎的接口 1  MySQL数据库的连接池 当多个系统访问同一个数据库时,每个系统会通过数据库连接