文件归集系统

2023-11-10 09:10
文章标签 系统 归集

本文主要是介绍文件归集系统,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

文件归集

监控客户端指定目录,当指定目录有文件创建时自动将其上传至服务器

使用到的技术:

​ 1.监控目录中文件的创建使用的是 commons-io 的FileAlterationMonitor

​ 2.文件传输使用的是Netty

系统架构图

文件归集_系统原理图

文件监控器

总体说明

在这里插入图片描述

  1. 文件监控器注册FileCreateListener,用以响应文件创建事件
  2. FileCreateListener 使用 FileCreateHandler接口来具体处理创建的文件(由FileCreateSimlpleHandler实现FileCreateHandler 具体实现创建的文件的处理逻辑–调用Netty客户端上传至服务器)。
  3. 文件监控器使用FileMonitorContext上下文来初始化配置信息
  4. FileMonitorContext通过FilePoolClientPorperties配置信息(file.pool.client)来初始化
  5. FilePoolClientPorperties 中的 FileMonitorProperties来配置(file.pool.client.fileMonitor)文件监控器相关的信息

代码片段

FileMonitor
@Component
@Slf4j
public class FileMonitor {@Resource // 注入上下文信息private FileMonitorContext context;@Resource // 注入容器中的监听器,即FileCreateListenerprivate List<FileAlterationListener> fileAlterationListenerList; private FileAlterationMonitor monitor; // 文件监控器@PostConstructpublic void init(){// 使用上下文来初始化文件监控器this.monitor = new FileAlterationMonitor(context.getMonitorInterval());FileAlterationObserver fileObserver = new FileAlterationObserver(context.getMonitorDir());fileAlterationListenerList.forEach(fileObserver::addListener);this.monitor.addObserver(fileObserver);}public void start() throws Exception{//启动文件监听器if (context.isMonitorAsDaemonThread()) {// 以守护进程方式启动文件监听器---非守护进程关闭后,守护进程自动关闭monitor.setThreadFactory(r -> {Thread th = new Thread(r);th.setDaemon(true);return th;});}monitor.start();}public void stop() throws Exception {// 停止文件监听器monitor.stop();}
}
FileCreateListener
public class FileCreateListener extends FileAlterationListenerAdaptor {@Autowired // 注入FileCreateHandler接口的实现类,即FileCreateSimlpleHandlerprivate FileCreateHandler fileCreateHandler;@Overridepublic void onFileCreate(File file) {// 新创建的文件交由FileCreateHandler来处理fileCreateHandler.handler(file);}
}
FileCreateSimlpleHandler
public class FileCreateSimlpleHandler implements FileCreateHandler {@Autowired // 注入Netty客户端的文件上传服务private UploadService uploadService;private ScheduledExecutorService scheduledExecutorService;@Autowiredprivate FileMonitorContext context;@PostConstructpublic void init() {scheduledExecutorService = Executors.newScheduledThreadPool(2);}@Overridepublic void handler(File file) {// 文件创建后,延迟3秒再上传scheduledExecutorService.schedule((Runnable) () -> {uploadFile(file);}, 3000, TimeUnit.MILLISECONDS);}public void uploadFile(File file) {try {String localFileName = file.getAbsolutePath();String fileName = localFileName.substring(context.getScheduleBaseDir().length());// 调用上传服务将文件上传至服务器,若上传失败则在3秒后再次重试uploadService.execute(localFileName, fileName,(status) -> {if (status == NettyTransferStatusConstants.OK) {FileUtil.del(file);} else {scheduledExecutorService.schedule((Runnable) () -> {uploadFile(file);}, 3000, TimeUnit.MILLISECONDS);}});} catch (Exception e) {log.error(e.getMessage(), e);}}
}

Netty客户端

总体说明

文件归集_Netty客户端

  1. 客户端对外提供文件上传服务(UploadService)
  2. UploadService实现FileTransferService接口,此接口借助于FileTransferable接口(FileTransferClient实现了此接口)提供上传服务
  3. FileTransferClient 调用 NettyClient提供文件传输服务
  4. NettyClient 即为真正的Netty客户端(Bootstrap),NettyClient 中注册了(编码、业务处理、解码)三个ChannelHandler
  5. 业务处理(NettyTransferHanler)注册了若干个NettyTransferProcessor来处理各种类型的文件信息(如文件上传)

代码片段

UploadService
public class UploadService implements FileTransferService {@Autowired(required = false) // 注入FileTransferable的实现@Getterprivate FileTransferable fileTransferable;// 同步上传public int execute(String localFileName, String remoteFilePath) throws Exception {return execute(localFileName,remoteFilePath,null);}public int execute(String localFileName, String remoteFilePath, Consumer<Integer> callback) throws Exception {// ...if (callback != null) {// 异步上传writeFuture(request, extra, callback);return NettyTransferStatusConstants.OK;} else {return writeAndGetStatus(request, extra);}}
}
FileTransferService 接口
public interface FileTransferService {FileTransferable getFileTransferable();// 异步处理default RequestFuture writeFuture(NettyTransferable request, Map<String, Object> extra,Consumer<Integer> callback) throws Exception {return getFileTransferable().writeFuture(request,extra, callback);}// 同步处理default int writeAndGetStatus(NettyTransferable request, Map<String, Object> extra) throws Exception {return getFileTransferable().writeAndGetStatus(request,extra);}// 同步处理default Object writeAndGetResult(NettyTransferable request, Map<String, Object> extra) throws Exception {return getFileTransferable().writeAndGetResult(request,extra);}
}
FileTransferClient
public class FileTransferClient implements NettyRunable, FileTransferable {// ...private NettyClient nettyClient;// ...@Overridepublic RequestFuture writeFuture(NettyTransferable request, Map<String, Object> extra,Consumer<Integer> callback) {// nettyClient 轮流选择通道处理数据.Channel channel = nettyClient.getChannelList().get(Math.abs(lastChannelIndex.incrementAndGet()) % context.getWorkGroupSize());//...channel.writeAndFlush(request) ;return future;}//...
}
NettyClient
public class NettyClient implements NettyRunable {//....// 创建、启动客户端并连接至服务器private void connect() throws InterruptedException {workerGroup = new NioEventLoopGroup(context.getWorkGroupSize());Bootstrap bootstrap = new Bootstrap().group(workerGroup).channel(NioSocketChannel.class);//...bootstrap = addHandler(bootstrap);}protected Bootstrap addHandler(Bootstrap bootstrap) {return bootstrap.handler(new ChannelInitializer<Channel>() {@Overrideprotected void initChannel(Channel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();// 将配置好的ChannelHandlers添加到Netty客户端中.context.getChannelHandlers().forEach(pipeline::addLast);}});}
}
NettyTransferHanler
public class NettyTransferHanler extends ChannelInboundHandlerAdapter implements NettyTransferProcessable {@Autowired // 注入NettyTransferProcessor处理器private List<NettyTransferProcessor> processors = new ArrayList<>();@Overridepublic void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {Channel channel = ctx.channel();NettyTransferable request = (NettyTransferable) msg;// 获取支持当前请求的处理器.NettyTransferProcessor processor = getNettyTransferProcessor(request);try {// 使用处理器处理processor.process(request, channel);} catch (Throwable throwable) {// ...}}@Overridepublic NettyTransferProcessor getNettyTransferProcessor(NettyTransferable request) {int action = request.getAction();// 依据请求类型获取支持的处理器NettyTransferProcessor processor = processorsCache.get(action);if (processor == null) {processor = this.processors.stream().filter(p -> p.isSupport(request)).findFirst().get();if (processor != null) {processorsCache.put(action, processor);}}return processor;}
}
SendFileProcessor
public class SendFileProcessor extends AbstractNettyTransferProcessor {// 当前处理器支持的请求(本地主动上传请求 和 服务端被动下载请求)@Overridepublic boolean isSupport(NettyTransferable request) {int role = request.getRole();int action = request.getAction();return super.isSupport(request)&& ((action == NettyTansferAction.PUT_FILE && role == 1)|| (action == NettyTansferAction.GET_FILE && role == 0));}/*** 下载(服务端处理)*/@Overridepublic boolean doProcessResponse(NettyTransferable request, Channel channel) throws Exception {//..}/*** 上传(本地端处理)*/@Overridepublic boolean doProcessReqeust(NettyTransferable response, Channel channel) {// 1.申请上传不通过,或者接收的片段处理异常,可能需要重试.if(StringUtils.isBlank(response.getResponseId())){getRequestCache().notify(response.getRequestId(), response.getStatus(), null);log.debug("申请审核不通过,错误代码:{}", response.getStatus());return false;}// 2.已经传输完成if (isSendEnd(response)) {// 已经发送完成,响应当前请求。getRequestCache().notify(response.getRequestId(), NettyTransferStatusConstants.OK, null);return true;}try {// 向服务器发送文件.fillFileContentToResponse(response, fileName);writeAndFlush(channel, response, response);return true;} catch (Throwable e) {// 发送失败,响应当前请求getRequestCache().notify(response.getRequestId(), NettyTransferStatusConstants.READ_EXCEPTION, null);}return false;}private void fillFileContentToResponse(NettyTransferable response, String fileName) throws Exception {long actualSendSize = response.getHeader().getLong(FieldConstants.SIZE);long position = response.getHeader().getLong(FieldConstants.POSITION);ByteBuffer buffer = null;try (RandomAccessFile raf = new RandomAccessFile(fileName, "r");FileChannel fileChannel = raf.getChannel();) {buffer = fileChannel.map(FileChannel.MapMode.READ_ONLY, position, actualSendSize);response.getHeader().put(FieldConstants.FILE_CRC32, FileUtil.digestCrc32(buffer));response.setBody(Unpooled.wrappedBuffer(buffer)); //将文件内容写入到响应对象中.}catch(Exception e) {throw e;}}

Netyy服务端

总体说明

在这里插入图片描述

  1. FileTransferServer为文件传输服务器,其由NettyServer实现文件接收功能
  2. NettyServer为实现的Netty服务端(ServerBootstrap),其添加了(编码、业务处理、解码)三个ChannelHandler
  3. 编码、解码由NettySerializer提供将传输对象与字节间的转换。
  4. 业务处理(NettyTransferHanler)注册了若干个NettyTransferProcessor来处理各种类型的文件信息(如接收文件)
代码片段
@Component
public class NettySerializer implements Serializer {@Autowiredprivate ByteBufDecoder decoder;@Autowiredprivate ByteBufEncoder encoder;// 对象-->字节@Overridepublic <T extends Serializable> void encode(T serializable, ByteBuf buf) throws Exception {NettyTransferable data = (NettyTransferable) serializable;buf.writeInt(data.getStatus());buf.writeInt(data.getAction());encoder.writeString(buf, data.getRequestId());encoder.writeString(buf, data.getResponseId());buf.writeInt(data.getRole());encoder.writeObject(buf, data.getHeader());if (data.getBody() != null) {buf.writeBytes(data.getBody());}}// 字节-->对象@SuppressWarnings("unchecked")@Overridepublic <T extends Serializable> T decode(ByteBuf buf) throws Exception {NettyTransferable response = new DefaultNettyTransferFileData();response.setStatus(buf.readInt());response.setAction(buf.readInt());response.setRequestId(decoder.readString(buf));response.setResponseId(decoder.readString(buf));response.setRole(buf.readInt());try {response.setHeader(decoder.readObject(buf));} catch (IOException e) {response.setHeader(new MapNettyTransferObject());}if (buf.isReadable()) {response.setBody(buf.slice().retain());}return (T) response;}}

文件传输时序图

上传

UploadService(client) FileTransferClient(client) VerificationApplyProcessor(Server) SendFileProcessor(client) ReceiveProcessor(Server) 1.申请上传文件 2.发送申请信息至NettyServer 3.审核并返回到客户端 4.发送文件(N轮) 5.接收文件并返回(N轮) 6.文件上传结束,返回状态(异步) 7.返回上传状态(异步) UploadService(client) FileTransferClient(client) VerificationApplyProcessor(Server) SendFileProcessor(client) ReceiveProcessor(Server)

1.申请上传文件:

请求类型(PUT_FILE_APPLY)、上传文件名称、文件总大小、起始位置(0)、传输大小(0)、角色(请求方)、请求ID

3.审核并返回到客户端:

请求审核通过时,增加返回响应ID,修改请求类型为PUT_FILE、角色为响应方

4.发送文件

开始发送文件,增加发送文件的字节内容,修改实际发送的起始位置、实际传输大小、角色为请求方

5.接收文件并返回

将传输过来的文件保存到本地,并修改起始位置为下一个起始位置、角色为响应方返回至客户端(第4步)

6.上传文件结束,返回状态

第4步:文件传输完成,且服务器成功接收,则返回结果至调用方(最终至UploadService)

使用说明

项目地址:https://gitee.com/llfsnial/proj-rep.git 中的 file-pool、commons-proj中的 file-transfer、commons-lang

下载file-pool源码构建

服务端

在配置file-pool-server中的application.yml中配置服务器端口、根目录(接收到的文件存在在此目录中)等

启动ServerApplication

客户端

在配置file-pool-client中的application.yml中配置服务器端口、监控根目录(监控此目录中的文件新增情况)等

启动ClientApplication

这篇关于文件归集系统的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/381723

相关文章

不懂推荐算法也能设计推荐系统

本文以商业化应用推荐为例,告诉我们不懂推荐算法的产品,也能从产品侧出发, 设计出一款不错的推荐系统。 相信很多新手产品,看到算法二字,多是懵圈的。 什么排序算法、最短路径等都是相对传统的算法(注:传统是指科班出身的产品都会接触过)。但对于推荐算法,多数产品对着网上搜到的资源,都会无从下手。特别当某些推荐算法 和 “AI”扯上关系后,更是加大了理解的难度。 但,不了解推荐算法,就无法做推荐系

基于人工智能的图像分类系统

目录 引言项目背景环境准备 硬件要求软件安装与配置系统设计 系统架构关键技术代码示例 数据预处理模型训练模型预测应用场景结论 1. 引言 图像分类是计算机视觉中的一个重要任务,目标是自动识别图像中的对象类别。通过卷积神经网络(CNN)等深度学习技术,我们可以构建高效的图像分类系统,广泛应用于自动驾驶、医疗影像诊断、监控分析等领域。本文将介绍如何构建一个基于人工智能的图像分类系统,包括环境

水位雨量在线监测系统概述及应用介绍

在当今社会,随着科技的飞速发展,各种智能监测系统已成为保障公共安全、促进资源管理和环境保护的重要工具。其中,水位雨量在线监测系统作为自然灾害预警、水资源管理及水利工程运行的关键技术,其重要性不言而喻。 一、水位雨量在线监测系统的基本原理 水位雨量在线监测系统主要由数据采集单元、数据传输网络、数据处理中心及用户终端四大部分构成,形成了一个完整的闭环系统。 数据采集单元:这是系统的“眼睛”,

嵌入式QT开发:构建高效智能的嵌入式系统

摘要: 本文深入探讨了嵌入式 QT 相关的各个方面。从 QT 框架的基础架构和核心概念出发,详细阐述了其在嵌入式环境中的优势与特点。文中分析了嵌入式 QT 的开发环境搭建过程,包括交叉编译工具链的配置等关键步骤。进一步探讨了嵌入式 QT 的界面设计与开发,涵盖了从基本控件的使用到复杂界面布局的构建。同时也深入研究了信号与槽机制在嵌入式系统中的应用,以及嵌入式 QT 与硬件设备的交互,包括输入输出设

JAVA智听未来一站式有声阅读平台听书系统小程序源码

智听未来,一站式有声阅读平台听书系统 🌟&nbsp;开篇:遇见未来,从“智听”开始 在这个快节奏的时代,你是否渴望在忙碌的间隙,找到一片属于自己的宁静角落?是否梦想着能随时随地,沉浸在知识的海洋,或是故事的奇幻世界里?今天,就让我带你一起探索“智听未来”——这一站式有声阅读平台听书系统,它正悄悄改变着我们的阅读方式,让未来触手可及! 📚&nbsp;第一站:海量资源,应有尽有 走进“智听

【区块链 + 人才服务】可信教育区块链治理系统 | FISCO BCOS应用案例

伴随着区块链技术的不断完善,其在教育信息化中的应用也在持续发展。利用区块链数据共识、不可篡改的特性, 将与教育相关的数据要素在区块链上进行存证确权,在确保数据可信的前提下,促进教育的公平、透明、开放,为教育教学质量提升赋能,实现教育数据的安全共享、高等教育体系的智慧治理。 可信教育区块链治理系统的顶层治理架构由教育部、高校、企业、学生等多方角色共同参与建设、维护,支撑教育资源共享、教学质量评估、

软考系统规划与管理师考试证书含金量高吗?

2024年软考系统规划与管理师考试报名时间节点: 报名时间:2024年上半年软考将于3月中旬陆续开始报名 考试时间:上半年5月25日到28日,下半年11月9日到12日 分数线:所有科目成绩均须达到45分以上(包括45分)方可通过考试 成绩查询:可在“中国计算机技术职业资格网”上查询软考成绩 出成绩时间:预计在11月左右 证书领取时间:一般在考试成绩公布后3~4个月,各地领取时间有所不同

系统架构师考试学习笔记第三篇——架构设计高级知识(20)通信系统架构设计理论与实践

本章知识考点:         第20课时主要学习通信系统架构设计的理论和工作中的实践。根据新版考试大纲,本课时知识点会涉及案例分析题(25分),而在历年考试中,案例题对该部分内容的考查并不多,虽在综合知识选择题目中经常考查,但分值也不高。本课时内容侧重于对知识点的记忆和理解,按照以往的出题规律,通信系统架构设计基础知识点多来源于教材内的基础网络设备、网络架构和教材外最新时事热点技术。本课时知识

计算机毕业设计 大学志愿填报系统 Java+SpringBoot+Vue 前后端分离 文档报告 代码讲解 安装调试

🍊作者:计算机编程-吉哥 🍊简介:专业从事JavaWeb程序开发,微信小程序开发,定制化项目、 源码、代码讲解、文档撰写、ppt制作。做自己喜欢的事,生活就是快乐的。 🍊心愿:点赞 👍 收藏 ⭐评论 📝 🍅 文末获取源码联系 👇🏻 精彩专栏推荐订阅 👇🏻 不然下次找不到哟~Java毕业设计项目~热门选题推荐《1000套》 目录 1.技术选型 2.开发工具 3.功能

基于 YOLOv5 的积水检测系统:打造高效智能的智慧城市应用

在城市发展中,积水问题日益严重,特别是在大雨过后,积水往往会影响交通甚至威胁人们的安全。通过现代计算机视觉技术,我们能够智能化地检测和识别积水区域,减少潜在危险。本文将介绍如何使用 YOLOv5 和 PyQt5 搭建一个积水检测系统,结合深度学习和直观的图形界面,为用户提供高效的解决方案。 源码地址: PyQt5+YoloV5 实现积水检测系统 预览: 项目背景