文件归集系统

2023-11-10 09:10
文章标签 系统 归集

本文主要是介绍文件归集系统,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

文件归集

监控客户端指定目录,当指定目录有文件创建时自动将其上传至服务器

使用到的技术:

​ 1.监控目录中文件的创建使用的是 commons-io 的FileAlterationMonitor

​ 2.文件传输使用的是Netty

系统架构图

文件归集_系统原理图

文件监控器

总体说明

在这里插入图片描述

  1. 文件监控器注册FileCreateListener,用以响应文件创建事件
  2. FileCreateListener 使用 FileCreateHandler接口来具体处理创建的文件(由FileCreateSimlpleHandler实现FileCreateHandler 具体实现创建的文件的处理逻辑–调用Netty客户端上传至服务器)。
  3. 文件监控器使用FileMonitorContext上下文来初始化配置信息
  4. FileMonitorContext通过FilePoolClientPorperties配置信息(file.pool.client)来初始化
  5. FilePoolClientPorperties 中的 FileMonitorProperties来配置(file.pool.client.fileMonitor)文件监控器相关的信息

代码片段

FileMonitor
@Component
@Slf4j
public class FileMonitor {@Resource // 注入上下文信息private FileMonitorContext context;@Resource // 注入容器中的监听器,即FileCreateListenerprivate List<FileAlterationListener> fileAlterationListenerList; private FileAlterationMonitor monitor; // 文件监控器@PostConstructpublic void init(){// 使用上下文来初始化文件监控器this.monitor = new FileAlterationMonitor(context.getMonitorInterval());FileAlterationObserver fileObserver = new FileAlterationObserver(context.getMonitorDir());fileAlterationListenerList.forEach(fileObserver::addListener);this.monitor.addObserver(fileObserver);}public void start() throws Exception{//启动文件监听器if (context.isMonitorAsDaemonThread()) {// 以守护进程方式启动文件监听器---非守护进程关闭后,守护进程自动关闭monitor.setThreadFactory(r -> {Thread th = new Thread(r);th.setDaemon(true);return th;});}monitor.start();}public void stop() throws Exception {// 停止文件监听器monitor.stop();}
}
FileCreateListener
public class FileCreateListener extends FileAlterationListenerAdaptor {@Autowired // 注入FileCreateHandler接口的实现类,即FileCreateSimlpleHandlerprivate FileCreateHandler fileCreateHandler;@Overridepublic void onFileCreate(File file) {// 新创建的文件交由FileCreateHandler来处理fileCreateHandler.handler(file);}
}
FileCreateSimlpleHandler
public class FileCreateSimlpleHandler implements FileCreateHandler {@Autowired // 注入Netty客户端的文件上传服务private UploadService uploadService;private ScheduledExecutorService scheduledExecutorService;@Autowiredprivate FileMonitorContext context;@PostConstructpublic void init() {scheduledExecutorService = Executors.newScheduledThreadPool(2);}@Overridepublic void handler(File file) {// 文件创建后,延迟3秒再上传scheduledExecutorService.schedule((Runnable) () -> {uploadFile(file);}, 3000, TimeUnit.MILLISECONDS);}public void uploadFile(File file) {try {String localFileName = file.getAbsolutePath();String fileName = localFileName.substring(context.getScheduleBaseDir().length());// 调用上传服务将文件上传至服务器,若上传失败则在3秒后再次重试uploadService.execute(localFileName, fileName,(status) -> {if (status == NettyTransferStatusConstants.OK) {FileUtil.del(file);} else {scheduledExecutorService.schedule((Runnable) () -> {uploadFile(file);}, 3000, TimeUnit.MILLISECONDS);}});} catch (Exception e) {log.error(e.getMessage(), e);}}
}

Netty客户端

总体说明

文件归集_Netty客户端

  1. 客户端对外提供文件上传服务(UploadService)
  2. UploadService实现FileTransferService接口,此接口借助于FileTransferable接口(FileTransferClient实现了此接口)提供上传服务
  3. FileTransferClient 调用 NettyClient提供文件传输服务
  4. NettyClient 即为真正的Netty客户端(Bootstrap),NettyClient 中注册了(编码、业务处理、解码)三个ChannelHandler
  5. 业务处理(NettyTransferHanler)注册了若干个NettyTransferProcessor来处理各种类型的文件信息(如文件上传)

代码片段

UploadService
public class UploadService implements FileTransferService {@Autowired(required = false) // 注入FileTransferable的实现@Getterprivate FileTransferable fileTransferable;// 同步上传public int execute(String localFileName, String remoteFilePath) throws Exception {return execute(localFileName,remoteFilePath,null);}public int execute(String localFileName, String remoteFilePath, Consumer<Integer> callback) throws Exception {// ...if (callback != null) {// 异步上传writeFuture(request, extra, callback);return NettyTransferStatusConstants.OK;} else {return writeAndGetStatus(request, extra);}}
}
FileTransferService 接口
public interface FileTransferService {FileTransferable getFileTransferable();// 异步处理default RequestFuture writeFuture(NettyTransferable request, Map<String, Object> extra,Consumer<Integer> callback) throws Exception {return getFileTransferable().writeFuture(request,extra, callback);}// 同步处理default int writeAndGetStatus(NettyTransferable request, Map<String, Object> extra) throws Exception {return getFileTransferable().writeAndGetStatus(request,extra);}// 同步处理default Object writeAndGetResult(NettyTransferable request, Map<String, Object> extra) throws Exception {return getFileTransferable().writeAndGetResult(request,extra);}
}
FileTransferClient
public class FileTransferClient implements NettyRunable, FileTransferable {// ...private NettyClient nettyClient;// ...@Overridepublic RequestFuture writeFuture(NettyTransferable request, Map<String, Object> extra,Consumer<Integer> callback) {// nettyClient 轮流选择通道处理数据.Channel channel = nettyClient.getChannelList().get(Math.abs(lastChannelIndex.incrementAndGet()) % context.getWorkGroupSize());//...channel.writeAndFlush(request) ;return future;}//...
}
NettyClient
public class NettyClient implements NettyRunable {//....// 创建、启动客户端并连接至服务器private void connect() throws InterruptedException {workerGroup = new NioEventLoopGroup(context.getWorkGroupSize());Bootstrap bootstrap = new Bootstrap().group(workerGroup).channel(NioSocketChannel.class);//...bootstrap = addHandler(bootstrap);}protected Bootstrap addHandler(Bootstrap bootstrap) {return bootstrap.handler(new ChannelInitializer<Channel>() {@Overrideprotected void initChannel(Channel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();// 将配置好的ChannelHandlers添加到Netty客户端中.context.getChannelHandlers().forEach(pipeline::addLast);}});}
}
NettyTransferHanler
public class NettyTransferHanler extends ChannelInboundHandlerAdapter implements NettyTransferProcessable {@Autowired // 注入NettyTransferProcessor处理器private List<NettyTransferProcessor> processors = new ArrayList<>();@Overridepublic void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {Channel channel = ctx.channel();NettyTransferable request = (NettyTransferable) msg;// 获取支持当前请求的处理器.NettyTransferProcessor processor = getNettyTransferProcessor(request);try {// 使用处理器处理processor.process(request, channel);} catch (Throwable throwable) {// ...}}@Overridepublic NettyTransferProcessor getNettyTransferProcessor(NettyTransferable request) {int action = request.getAction();// 依据请求类型获取支持的处理器NettyTransferProcessor processor = processorsCache.get(action);if (processor == null) {processor = this.processors.stream().filter(p -> p.isSupport(request)).findFirst().get();if (processor != null) {processorsCache.put(action, processor);}}return processor;}
}
SendFileProcessor
public class SendFileProcessor extends AbstractNettyTransferProcessor {// 当前处理器支持的请求(本地主动上传请求 和 服务端被动下载请求)@Overridepublic boolean isSupport(NettyTransferable request) {int role = request.getRole();int action = request.getAction();return super.isSupport(request)&& ((action == NettyTansferAction.PUT_FILE && role == 1)|| (action == NettyTansferAction.GET_FILE && role == 0));}/*** 下载(服务端处理)*/@Overridepublic boolean doProcessResponse(NettyTransferable request, Channel channel) throws Exception {//..}/*** 上传(本地端处理)*/@Overridepublic boolean doProcessReqeust(NettyTransferable response, Channel channel) {// 1.申请上传不通过,或者接收的片段处理异常,可能需要重试.if(StringUtils.isBlank(response.getResponseId())){getRequestCache().notify(response.getRequestId(), response.getStatus(), null);log.debug("申请审核不通过,错误代码:{}", response.getStatus());return false;}// 2.已经传输完成if (isSendEnd(response)) {// 已经发送完成,响应当前请求。getRequestCache().notify(response.getRequestId(), NettyTransferStatusConstants.OK, null);return true;}try {// 向服务器发送文件.fillFileContentToResponse(response, fileName);writeAndFlush(channel, response, response);return true;} catch (Throwable e) {// 发送失败,响应当前请求getRequestCache().notify(response.getRequestId(), NettyTransferStatusConstants.READ_EXCEPTION, null);}return false;}private void fillFileContentToResponse(NettyTransferable response, String fileName) throws Exception {long actualSendSize = response.getHeader().getLong(FieldConstants.SIZE);long position = response.getHeader().getLong(FieldConstants.POSITION);ByteBuffer buffer = null;try (RandomAccessFile raf = new RandomAccessFile(fileName, "r");FileChannel fileChannel = raf.getChannel();) {buffer = fileChannel.map(FileChannel.MapMode.READ_ONLY, position, actualSendSize);response.getHeader().put(FieldConstants.FILE_CRC32, FileUtil.digestCrc32(buffer));response.setBody(Unpooled.wrappedBuffer(buffer)); //将文件内容写入到响应对象中.}catch(Exception e) {throw e;}}

Netyy服务端

总体说明

在这里插入图片描述

  1. FileTransferServer为文件传输服务器,其由NettyServer实现文件接收功能
  2. NettyServer为实现的Netty服务端(ServerBootstrap),其添加了(编码、业务处理、解码)三个ChannelHandler
  3. 编码、解码由NettySerializer提供将传输对象与字节间的转换。
  4. 业务处理(NettyTransferHanler)注册了若干个NettyTransferProcessor来处理各种类型的文件信息(如接收文件)
代码片段
@Component
public class NettySerializer implements Serializer {@Autowiredprivate ByteBufDecoder decoder;@Autowiredprivate ByteBufEncoder encoder;// 对象-->字节@Overridepublic <T extends Serializable> void encode(T serializable, ByteBuf buf) throws Exception {NettyTransferable data = (NettyTransferable) serializable;buf.writeInt(data.getStatus());buf.writeInt(data.getAction());encoder.writeString(buf, data.getRequestId());encoder.writeString(buf, data.getResponseId());buf.writeInt(data.getRole());encoder.writeObject(buf, data.getHeader());if (data.getBody() != null) {buf.writeBytes(data.getBody());}}// 字节-->对象@SuppressWarnings("unchecked")@Overridepublic <T extends Serializable> T decode(ByteBuf buf) throws Exception {NettyTransferable response = new DefaultNettyTransferFileData();response.setStatus(buf.readInt());response.setAction(buf.readInt());response.setRequestId(decoder.readString(buf));response.setResponseId(decoder.readString(buf));response.setRole(buf.readInt());try {response.setHeader(decoder.readObject(buf));} catch (IOException e) {response.setHeader(new MapNettyTransferObject());}if (buf.isReadable()) {response.setBody(buf.slice().retain());}return (T) response;}}

文件传输时序图

上传

UploadService(client) FileTransferClient(client) VerificationApplyProcessor(Server) SendFileProcessor(client) ReceiveProcessor(Server) 1.申请上传文件 2.发送申请信息至NettyServer 3.审核并返回到客户端 4.发送文件(N轮) 5.接收文件并返回(N轮) 6.文件上传结束,返回状态(异步) 7.返回上传状态(异步) UploadService(client) FileTransferClient(client) VerificationApplyProcessor(Server) SendFileProcessor(client) ReceiveProcessor(Server)

1.申请上传文件:

请求类型(PUT_FILE_APPLY)、上传文件名称、文件总大小、起始位置(0)、传输大小(0)、角色(请求方)、请求ID

3.审核并返回到客户端:

请求审核通过时,增加返回响应ID,修改请求类型为PUT_FILE、角色为响应方

4.发送文件

开始发送文件,增加发送文件的字节内容,修改实际发送的起始位置、实际传输大小、角色为请求方

5.接收文件并返回

将传输过来的文件保存到本地,并修改起始位置为下一个起始位置、角色为响应方返回至客户端(第4步)

6.上传文件结束,返回状态

第4步:文件传输完成,且服务器成功接收,则返回结果至调用方(最终至UploadService)

使用说明

项目地址:https://gitee.com/llfsnial/proj-rep.git 中的 file-pool、commons-proj中的 file-transfer、commons-lang

下载file-pool源码构建

服务端

在配置file-pool-server中的application.yml中配置服务器端口、根目录(接收到的文件存在在此目录中)等

启动ServerApplication

客户端

在配置file-pool-client中的application.yml中配置服务器端口、监控根目录(监控此目录中的文件新增情况)等

启动ClientApplication

这篇关于文件归集系统的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/381723

相关文章

利用Python快速搭建Markdown笔记发布系统

《利用Python快速搭建Markdown笔记发布系统》这篇文章主要为大家详细介绍了使用Python生态的成熟工具,在30分钟内搭建一个支持Markdown渲染、分类标签、全文搜索的私有化知识发布系统... 目录引言:为什么要自建知识博客一、技术选型:极简主义开发栈二、系统架构设计三、核心代码实现(分步解析

Python FastAPI+Celery+RabbitMQ实现分布式图片水印处理系统

《PythonFastAPI+Celery+RabbitMQ实现分布式图片水印处理系统》这篇文章主要为大家详细介绍了PythonFastAPI如何结合Celery以及RabbitMQ实现简单的分布式... 实现思路FastAPI 服务器Celery 任务队列RabbitMQ 作为消息代理定时任务处理完整

Linux系统中卸载与安装JDK的详细教程

《Linux系统中卸载与安装JDK的详细教程》本文详细介绍了如何在Linux系统中通过Xshell和Xftp工具连接与传输文件,然后进行JDK的安装与卸载,安装步骤包括连接Linux、传输JDK安装包... 目录1、卸载1.1 linux删除自带的JDK1.2 Linux上卸载自己安装的JDK2、安装2.1

Linux系统之主机网络配置方式

《Linux系统之主机网络配置方式》:本文主要介绍Linux系统之主机网络配置方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录一、查看主机的网络参数1、查看主机名2、查看IP地址3、查看网关4、查看DNS二、配置网卡1、修改网卡配置文件2、nmcli工具【通用

Linux系统之dns域名解析全过程

《Linux系统之dns域名解析全过程》:本文主要介绍Linux系统之dns域名解析全过程,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录一、dns域名解析介绍1、DNS核心概念1.1 区域 zone1.2 记录 record二、DNS服务的配置1、正向解析的配置

Linux系统中配置静态IP地址的详细步骤

《Linux系统中配置静态IP地址的详细步骤》本文详细介绍了在Linux系统中配置静态IP地址的五个步骤,包括打开终端、编辑网络配置文件、配置IP地址、保存并重启网络服务,这对于系统管理员和新手都极具... 目录步骤一:打开终端步骤二:编辑网络配置文件步骤三:配置静态IP地址步骤四:保存并关闭文件步骤五:重

Windows系统下如何查找JDK的安装路径

《Windows系统下如何查找JDK的安装路径》:本文主要介绍Windows系统下如何查找JDK的安装路径,文中介绍了三种方法,分别是通过命令行检查、使用verbose选项查找jre目录、以及查看... 目录一、确认是否安装了JDK二、查找路径三、另外一种方式如果很久之前安装了JDK,或者在别人的电脑上,想

Linux系统之authconfig命令的使用解读

《Linux系统之authconfig命令的使用解读》authconfig是一个用于配置Linux系统身份验证和账户管理设置的命令行工具,主要用于RedHat系列的Linux发行版,它提供了一系列选项... 目录linux authconfig命令的使用基本语法常用选项示例总结Linux authconfi

Nginx配置系统服务&设置环境变量方式

《Nginx配置系统服务&设置环境变量方式》本文介绍了如何将Nginx配置为系统服务并设置环境变量,以便更方便地对Nginx进行操作,通过配置系统服务,可以使用系统命令来启动、停止或重新加载Nginx... 目录1.Nginx操作问题2.配置系统服android务3.设置环境变量总结1.Nginx操作问题

CSS3 最强二维布局系统之Grid 网格布局

《CSS3最强二维布局系统之Grid网格布局》CS3的Grid网格布局是目前最强的二维布局系统,可以同时对列和行进行处理,将网页划分成一个个网格,可以任意组合不同的网格,做出各种各样的布局,本文介... 深入学习 css3 目前最强大的布局系统 Grid 网格布局Grid 网格布局的基本认识Grid 网