文件归集系统

2023-11-10 09:10
文章标签 系统 归集

本文主要是介绍文件归集系统,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

文件归集

监控客户端指定目录,当指定目录有文件创建时自动将其上传至服务器

使用到的技术:

​ 1.监控目录中文件的创建使用的是 commons-io 的FileAlterationMonitor

​ 2.文件传输使用的是Netty

系统架构图

文件归集_系统原理图

文件监控器

总体说明

在这里插入图片描述

  1. 文件监控器注册FileCreateListener,用以响应文件创建事件
  2. FileCreateListener 使用 FileCreateHandler接口来具体处理创建的文件(由FileCreateSimlpleHandler实现FileCreateHandler 具体实现创建的文件的处理逻辑–调用Netty客户端上传至服务器)。
  3. 文件监控器使用FileMonitorContext上下文来初始化配置信息
  4. FileMonitorContext通过FilePoolClientPorperties配置信息(file.pool.client)来初始化
  5. FilePoolClientPorperties 中的 FileMonitorProperties来配置(file.pool.client.fileMonitor)文件监控器相关的信息

代码片段

FileMonitor
@Component
@Slf4j
public class FileMonitor {@Resource // 注入上下文信息private FileMonitorContext context;@Resource // 注入容器中的监听器,即FileCreateListenerprivate List<FileAlterationListener> fileAlterationListenerList; private FileAlterationMonitor monitor; // 文件监控器@PostConstructpublic void init(){// 使用上下文来初始化文件监控器this.monitor = new FileAlterationMonitor(context.getMonitorInterval());FileAlterationObserver fileObserver = new FileAlterationObserver(context.getMonitorDir());fileAlterationListenerList.forEach(fileObserver::addListener);this.monitor.addObserver(fileObserver);}public void start() throws Exception{//启动文件监听器if (context.isMonitorAsDaemonThread()) {// 以守护进程方式启动文件监听器---非守护进程关闭后,守护进程自动关闭monitor.setThreadFactory(r -> {Thread th = new Thread(r);th.setDaemon(true);return th;});}monitor.start();}public void stop() throws Exception {// 停止文件监听器monitor.stop();}
}
FileCreateListener
public class FileCreateListener extends FileAlterationListenerAdaptor {@Autowired // 注入FileCreateHandler接口的实现类,即FileCreateSimlpleHandlerprivate FileCreateHandler fileCreateHandler;@Overridepublic void onFileCreate(File file) {// 新创建的文件交由FileCreateHandler来处理fileCreateHandler.handler(file);}
}
FileCreateSimlpleHandler
public class FileCreateSimlpleHandler implements FileCreateHandler {@Autowired // 注入Netty客户端的文件上传服务private UploadService uploadService;private ScheduledExecutorService scheduledExecutorService;@Autowiredprivate FileMonitorContext context;@PostConstructpublic void init() {scheduledExecutorService = Executors.newScheduledThreadPool(2);}@Overridepublic void handler(File file) {// 文件创建后,延迟3秒再上传scheduledExecutorService.schedule((Runnable) () -> {uploadFile(file);}, 3000, TimeUnit.MILLISECONDS);}public void uploadFile(File file) {try {String localFileName = file.getAbsolutePath();String fileName = localFileName.substring(context.getScheduleBaseDir().length());// 调用上传服务将文件上传至服务器,若上传失败则在3秒后再次重试uploadService.execute(localFileName, fileName,(status) -> {if (status == NettyTransferStatusConstants.OK) {FileUtil.del(file);} else {scheduledExecutorService.schedule((Runnable) () -> {uploadFile(file);}, 3000, TimeUnit.MILLISECONDS);}});} catch (Exception e) {log.error(e.getMessage(), e);}}
}

Netty客户端

总体说明

文件归集_Netty客户端

  1. 客户端对外提供文件上传服务(UploadService)
  2. UploadService实现FileTransferService接口,此接口借助于FileTransferable接口(FileTransferClient实现了此接口)提供上传服务
  3. FileTransferClient 调用 NettyClient提供文件传输服务
  4. NettyClient 即为真正的Netty客户端(Bootstrap),NettyClient 中注册了(编码、业务处理、解码)三个ChannelHandler
  5. 业务处理(NettyTransferHanler)注册了若干个NettyTransferProcessor来处理各种类型的文件信息(如文件上传)

代码片段

UploadService
public class UploadService implements FileTransferService {@Autowired(required = false) // 注入FileTransferable的实现@Getterprivate FileTransferable fileTransferable;// 同步上传public int execute(String localFileName, String remoteFilePath) throws Exception {return execute(localFileName,remoteFilePath,null);}public int execute(String localFileName, String remoteFilePath, Consumer<Integer> callback) throws Exception {// ...if (callback != null) {// 异步上传writeFuture(request, extra, callback);return NettyTransferStatusConstants.OK;} else {return writeAndGetStatus(request, extra);}}
}
FileTransferService 接口
public interface FileTransferService {FileTransferable getFileTransferable();// 异步处理default RequestFuture writeFuture(NettyTransferable request, Map<String, Object> extra,Consumer<Integer> callback) throws Exception {return getFileTransferable().writeFuture(request,extra, callback);}// 同步处理default int writeAndGetStatus(NettyTransferable request, Map<String, Object> extra) throws Exception {return getFileTransferable().writeAndGetStatus(request,extra);}// 同步处理default Object writeAndGetResult(NettyTransferable request, Map<String, Object> extra) throws Exception {return getFileTransferable().writeAndGetResult(request,extra);}
}
FileTransferClient
public class FileTransferClient implements NettyRunable, FileTransferable {// ...private NettyClient nettyClient;// ...@Overridepublic RequestFuture writeFuture(NettyTransferable request, Map<String, Object> extra,Consumer<Integer> callback) {// nettyClient 轮流选择通道处理数据.Channel channel = nettyClient.getChannelList().get(Math.abs(lastChannelIndex.incrementAndGet()) % context.getWorkGroupSize());//...channel.writeAndFlush(request) ;return future;}//...
}
NettyClient
public class NettyClient implements NettyRunable {//....// 创建、启动客户端并连接至服务器private void connect() throws InterruptedException {workerGroup = new NioEventLoopGroup(context.getWorkGroupSize());Bootstrap bootstrap = new Bootstrap().group(workerGroup).channel(NioSocketChannel.class);//...bootstrap = addHandler(bootstrap);}protected Bootstrap addHandler(Bootstrap bootstrap) {return bootstrap.handler(new ChannelInitializer<Channel>() {@Overrideprotected void initChannel(Channel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();// 将配置好的ChannelHandlers添加到Netty客户端中.context.getChannelHandlers().forEach(pipeline::addLast);}});}
}
NettyTransferHanler
public class NettyTransferHanler extends ChannelInboundHandlerAdapter implements NettyTransferProcessable {@Autowired // 注入NettyTransferProcessor处理器private List<NettyTransferProcessor> processors = new ArrayList<>();@Overridepublic void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {Channel channel = ctx.channel();NettyTransferable request = (NettyTransferable) msg;// 获取支持当前请求的处理器.NettyTransferProcessor processor = getNettyTransferProcessor(request);try {// 使用处理器处理processor.process(request, channel);} catch (Throwable throwable) {// ...}}@Overridepublic NettyTransferProcessor getNettyTransferProcessor(NettyTransferable request) {int action = request.getAction();// 依据请求类型获取支持的处理器NettyTransferProcessor processor = processorsCache.get(action);if (processor == null) {processor = this.processors.stream().filter(p -> p.isSupport(request)).findFirst().get();if (processor != null) {processorsCache.put(action, processor);}}return processor;}
}
SendFileProcessor
public class SendFileProcessor extends AbstractNettyTransferProcessor {// 当前处理器支持的请求(本地主动上传请求 和 服务端被动下载请求)@Overridepublic boolean isSupport(NettyTransferable request) {int role = request.getRole();int action = request.getAction();return super.isSupport(request)&& ((action == NettyTansferAction.PUT_FILE && role == 1)|| (action == NettyTansferAction.GET_FILE && role == 0));}/*** 下载(服务端处理)*/@Overridepublic boolean doProcessResponse(NettyTransferable request, Channel channel) throws Exception {//..}/*** 上传(本地端处理)*/@Overridepublic boolean doProcessReqeust(NettyTransferable response, Channel channel) {// 1.申请上传不通过,或者接收的片段处理异常,可能需要重试.if(StringUtils.isBlank(response.getResponseId())){getRequestCache().notify(response.getRequestId(), response.getStatus(), null);log.debug("申请审核不通过,错误代码:{}", response.getStatus());return false;}// 2.已经传输完成if (isSendEnd(response)) {// 已经发送完成,响应当前请求。getRequestCache().notify(response.getRequestId(), NettyTransferStatusConstants.OK, null);return true;}try {// 向服务器发送文件.fillFileContentToResponse(response, fileName);writeAndFlush(channel, response, response);return true;} catch (Throwable e) {// 发送失败,响应当前请求getRequestCache().notify(response.getRequestId(), NettyTransferStatusConstants.READ_EXCEPTION, null);}return false;}private void fillFileContentToResponse(NettyTransferable response, String fileName) throws Exception {long actualSendSize = response.getHeader().getLong(FieldConstants.SIZE);long position = response.getHeader().getLong(FieldConstants.POSITION);ByteBuffer buffer = null;try (RandomAccessFile raf = new RandomAccessFile(fileName, "r");FileChannel fileChannel = raf.getChannel();) {buffer = fileChannel.map(FileChannel.MapMode.READ_ONLY, position, actualSendSize);response.getHeader().put(FieldConstants.FILE_CRC32, FileUtil.digestCrc32(buffer));response.setBody(Unpooled.wrappedBuffer(buffer)); //将文件内容写入到响应对象中.}catch(Exception e) {throw e;}}

Netyy服务端

总体说明

在这里插入图片描述

  1. FileTransferServer为文件传输服务器,其由NettyServer实现文件接收功能
  2. NettyServer为实现的Netty服务端(ServerBootstrap),其添加了(编码、业务处理、解码)三个ChannelHandler
  3. 编码、解码由NettySerializer提供将传输对象与字节间的转换。
  4. 业务处理(NettyTransferHanler)注册了若干个NettyTransferProcessor来处理各种类型的文件信息(如接收文件)
代码片段
@Component
public class NettySerializer implements Serializer {@Autowiredprivate ByteBufDecoder decoder;@Autowiredprivate ByteBufEncoder encoder;// 对象-->字节@Overridepublic <T extends Serializable> void encode(T serializable, ByteBuf buf) throws Exception {NettyTransferable data = (NettyTransferable) serializable;buf.writeInt(data.getStatus());buf.writeInt(data.getAction());encoder.writeString(buf, data.getRequestId());encoder.writeString(buf, data.getResponseId());buf.writeInt(data.getRole());encoder.writeObject(buf, data.getHeader());if (data.getBody() != null) {buf.writeBytes(data.getBody());}}// 字节-->对象@SuppressWarnings("unchecked")@Overridepublic <T extends Serializable> T decode(ByteBuf buf) throws Exception {NettyTransferable response = new DefaultNettyTransferFileData();response.setStatus(buf.readInt());response.setAction(buf.readInt());response.setRequestId(decoder.readString(buf));response.setResponseId(decoder.readString(buf));response.setRole(buf.readInt());try {response.setHeader(decoder.readObject(buf));} catch (IOException e) {response.setHeader(new MapNettyTransferObject());}if (buf.isReadable()) {response.setBody(buf.slice().retain());}return (T) response;}}

文件传输时序图

上传

UploadService(client) FileTransferClient(client) VerificationApplyProcessor(Server) SendFileProcessor(client) ReceiveProcessor(Server) 1.申请上传文件 2.发送申请信息至NettyServer 3.审核并返回到客户端 4.发送文件(N轮) 5.接收文件并返回(N轮) 6.文件上传结束,返回状态(异步) 7.返回上传状态(异步) UploadService(client) FileTransferClient(client) VerificationApplyProcessor(Server) SendFileProcessor(client) ReceiveProcessor(Server)

1.申请上传文件:

请求类型(PUT_FILE_APPLY)、上传文件名称、文件总大小、起始位置(0)、传输大小(0)、角色(请求方)、请求ID

3.审核并返回到客户端:

请求审核通过时,增加返回响应ID,修改请求类型为PUT_FILE、角色为响应方

4.发送文件

开始发送文件,增加发送文件的字节内容,修改实际发送的起始位置、实际传输大小、角色为请求方

5.接收文件并返回

将传输过来的文件保存到本地,并修改起始位置为下一个起始位置、角色为响应方返回至客户端(第4步)

6.上传文件结束,返回状态

第4步:文件传输完成,且服务器成功接收,则返回结果至调用方(最终至UploadService)

使用说明

项目地址:https://gitee.com/llfsnial/proj-rep.git 中的 file-pool、commons-proj中的 file-transfer、commons-lang

下载file-pool源码构建

服务端

在配置file-pool-server中的application.yml中配置服务器端口、根目录(接收到的文件存在在此目录中)等

启动ServerApplication

客户端

在配置file-pool-client中的application.yml中配置服务器端口、监控根目录(监控此目录中的文件新增情况)等

启动ClientApplication

这篇关于文件归集系统的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/381723

相关文章

C#实现系统信息监控与获取功能

《C#实现系统信息监控与获取功能》在C#开发的众多应用场景中,获取系统信息以及监控用户操作有着广泛的用途,比如在系统性能优化工具中,需要实时读取CPU、GPU资源信息,本文将详细介绍如何使用C#来实现... 目录前言一、C# 监控键盘1. 原理与实现思路2. 代码实现二、读取 CPU、GPU 资源信息1.

在C#中获取端口号与系统信息的高效实践

《在C#中获取端口号与系统信息的高效实践》在现代软件开发中,尤其是系统管理、运维、监控和性能优化等场景中,了解计算机硬件和网络的状态至关重要,C#作为一种广泛应用的编程语言,提供了丰富的API来帮助开... 目录引言1. 获取端口号信息1.1 获取活动的 TCP 和 UDP 连接说明:应用场景:2. 获取硬

JAVA系统中Spring Boot应用程序的配置文件application.yml使用详解

《JAVA系统中SpringBoot应用程序的配置文件application.yml使用详解》:本文主要介绍JAVA系统中SpringBoot应用程序的配置文件application.yml的... 目录文件路径文件内容解释1. Server 配置2. Spring 配置3. Logging 配置4. Ma

2.1/5.1和7.1声道系统有什么区别? 音频声道的专业知识科普

《2.1/5.1和7.1声道系统有什么区别?音频声道的专业知识科普》当设置环绕声系统时,会遇到2.1、5.1、7.1、7.1.2、9.1等数字,当一遍又一遍地看到它们时,可能想知道它们是什... 想要把智能电视自带的音响升级成专业级的家庭影院系统吗?那么你将面临一个重要的选择——使用 2.1、5.1 还是

高效管理你的Linux系统: Debian操作系统常用命令指南

《高效管理你的Linux系统:Debian操作系统常用命令指南》在Debian操作系统中,了解和掌握常用命令对于提高工作效率和系统管理至关重要,本文将详细介绍Debian的常用命令,帮助读者更好地使... Debian是一个流行的linux发行版,它以其稳定性、强大的软件包管理和丰富的社区资源而闻名。在使用

Ubuntu系统怎么安装Warp? 新一代AI 终端神器安装使用方法

《Ubuntu系统怎么安装Warp?新一代AI终端神器安装使用方法》Warp是一款使用Rust开发的现代化AI终端工具,该怎么再Ubuntu系统中安装使用呢?下面我们就来看看详细教程... Warp Terminal 是一款使用 Rust 开发的现代化「AI 终端」工具。最初它只支持 MACOS,但在 20

windows系统下shutdown重启关机命令超详细教程

《windows系统下shutdown重启关机命令超详细教程》shutdown命令是一个强大的工具,允许你通过命令行快速完成关机、重启或注销操作,本文将为你详细解析shutdown命令的使用方法,并提... 目录一、shutdown 命令简介二、shutdown 命令的基本用法三、远程关机与重启四、实际应用

Debian如何查看系统版本? 7种轻松查看Debian版本信息的实用方法

《Debian如何查看系统版本?7种轻松查看Debian版本信息的实用方法》Debian是一个广泛使用的Linux发行版,用户有时需要查看其版本信息以进行系统管理、故障排除或兼容性检查,在Debia... 作为最受欢迎的 linux 发行版之一,Debian 的版本信息在日常使用和系统维护中起着至关重要的作

什么是cron? Linux系统下Cron定时任务使用指南

《什么是cron?Linux系统下Cron定时任务使用指南》在日常的Linux系统管理和维护中,定时执行任务是非常常见的需求,你可能需要每天执行备份任务、清理系统日志或运行特定的脚本,而不想每天... 在管理 linux 服务器的过程中,总有一些任务需要我们定期或重复执行。就比如备份任务,通常会选在服务器资

TP-LINK/水星和hasivo交换机怎么选? 三款网管交换机系统功能对比

《TP-LINK/水星和hasivo交换机怎么选?三款网管交换机系统功能对比》今天选了三款都是”8+1″的2.5G网管交换机,分别是TP-LINK水星和hasivo交换机,该怎么选呢?这些交换机功... TP-LINK、水星和hasivo这三台交换机都是”8+1″的2.5G网管交换机,我手里的China编程has