本文主要是介绍文件归集系统,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
文件归集
监控客户端指定目录,当指定目录有文件创建时自动将其上传至服务器
使用到的技术:
1.监控目录中文件的创建使用的是 commons-io 的FileAlterationMonitor
2.文件传输使用的是Netty
系统架构图
文件监控器
总体说明
- 文件监控器注册FileCreateListener,用以响应文件创建事件
- FileCreateListener 使用 FileCreateHandler接口来具体处理创建的文件(由FileCreateSimlpleHandler实现FileCreateHandler 具体实现创建的文件的处理逻辑–调用Netty客户端上传至服务器)。
- 文件监控器使用FileMonitorContext上下文来初始化配置信息
- FileMonitorContext通过FilePoolClientPorperties配置信息(file.pool.client)来初始化
- FilePoolClientPorperties 中的 FileMonitorProperties来配置(file.pool.client.fileMonitor)文件监控器相关的信息
代码片段
FileMonitor
@Component
@Slf4j
public class FileMonitor {@Resource // 注入上下文信息private FileMonitorContext context;@Resource // 注入容器中的监听器,即FileCreateListenerprivate List<FileAlterationListener> fileAlterationListenerList; private FileAlterationMonitor monitor; // 文件监控器@PostConstructpublic void init(){// 使用上下文来初始化文件监控器this.monitor = new FileAlterationMonitor(context.getMonitorInterval());FileAlterationObserver fileObserver = new FileAlterationObserver(context.getMonitorDir());fileAlterationListenerList.forEach(fileObserver::addListener);this.monitor.addObserver(fileObserver);}public void start() throws Exception{//启动文件监听器if (context.isMonitorAsDaemonThread()) {// 以守护进程方式启动文件监听器---非守护进程关闭后,守护进程自动关闭monitor.setThreadFactory(r -> {Thread th = new Thread(r);th.setDaemon(true);return th;});}monitor.start();}public void stop() throws Exception {// 停止文件监听器monitor.stop();}
}
FileCreateListener
public class FileCreateListener extends FileAlterationListenerAdaptor {@Autowired // 注入FileCreateHandler接口的实现类,即FileCreateSimlpleHandlerprivate FileCreateHandler fileCreateHandler;@Overridepublic void onFileCreate(File file) {// 新创建的文件交由FileCreateHandler来处理fileCreateHandler.handler(file);}
}
FileCreateSimlpleHandler
public class FileCreateSimlpleHandler implements FileCreateHandler {@Autowired // 注入Netty客户端的文件上传服务private UploadService uploadService;private ScheduledExecutorService scheduledExecutorService;@Autowiredprivate FileMonitorContext context;@PostConstructpublic void init() {scheduledExecutorService = Executors.newScheduledThreadPool(2);}@Overridepublic void handler(File file) {// 文件创建后,延迟3秒再上传scheduledExecutorService.schedule((Runnable) () -> {uploadFile(file);}, 3000, TimeUnit.MILLISECONDS);}public void uploadFile(File file) {try {String localFileName = file.getAbsolutePath();String fileName = localFileName.substring(context.getScheduleBaseDir().length());// 调用上传服务将文件上传至服务器,若上传失败则在3秒后再次重试uploadService.execute(localFileName, fileName,(status) -> {if (status == NettyTransferStatusConstants.OK) {FileUtil.del(file);} else {scheduledExecutorService.schedule((Runnable) () -> {uploadFile(file);}, 3000, TimeUnit.MILLISECONDS);}});} catch (Exception e) {log.error(e.getMessage(), e);}}
}
Netty客户端
总体说明
- 客户端对外提供文件上传服务(UploadService)
- UploadService实现FileTransferService接口,此接口借助于FileTransferable接口(FileTransferClient实现了此接口)提供上传服务
- FileTransferClient 调用 NettyClient提供文件传输服务
- NettyClient 即为真正的Netty客户端(Bootstrap),NettyClient 中注册了(编码、业务处理、解码)三个ChannelHandler
- 业务处理(NettyTransferHanler)注册了若干个NettyTransferProcessor来处理各种类型的文件信息(如文件上传)
代码片段
UploadService
public class UploadService implements FileTransferService {@Autowired(required = false) // 注入FileTransferable的实现@Getterprivate FileTransferable fileTransferable;// 同步上传public int execute(String localFileName, String remoteFilePath) throws Exception {return execute(localFileName,remoteFilePath,null);}public int execute(String localFileName, String remoteFilePath, Consumer<Integer> callback) throws Exception {// ...if (callback != null) {// 异步上传writeFuture(request, extra, callback);return NettyTransferStatusConstants.OK;} else {return writeAndGetStatus(request, extra);}}
}
FileTransferService 接口
public interface FileTransferService {FileTransferable getFileTransferable();// 异步处理default RequestFuture writeFuture(NettyTransferable request, Map<String, Object> extra,Consumer<Integer> callback) throws Exception {return getFileTransferable().writeFuture(request,extra, callback);}// 同步处理default int writeAndGetStatus(NettyTransferable request, Map<String, Object> extra) throws Exception {return getFileTransferable().writeAndGetStatus(request,extra);}// 同步处理default Object writeAndGetResult(NettyTransferable request, Map<String, Object> extra) throws Exception {return getFileTransferable().writeAndGetResult(request,extra);}
}
FileTransferClient
public class FileTransferClient implements NettyRunable, FileTransferable {// ...private NettyClient nettyClient;// ...@Overridepublic RequestFuture writeFuture(NettyTransferable request, Map<String, Object> extra,Consumer<Integer> callback) {// nettyClient 轮流选择通道处理数据.Channel channel = nettyClient.getChannelList().get(Math.abs(lastChannelIndex.incrementAndGet()) % context.getWorkGroupSize());//...channel.writeAndFlush(request) ;return future;}//...
}
NettyClient
public class NettyClient implements NettyRunable {//....// 创建、启动客户端并连接至服务器private void connect() throws InterruptedException {workerGroup = new NioEventLoopGroup(context.getWorkGroupSize());Bootstrap bootstrap = new Bootstrap().group(workerGroup).channel(NioSocketChannel.class);//...bootstrap = addHandler(bootstrap);}protected Bootstrap addHandler(Bootstrap bootstrap) {return bootstrap.handler(new ChannelInitializer<Channel>() {@Overrideprotected void initChannel(Channel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();// 将配置好的ChannelHandlers添加到Netty客户端中.context.getChannelHandlers().forEach(pipeline::addLast);}});}
}
NettyTransferHanler
public class NettyTransferHanler extends ChannelInboundHandlerAdapter implements NettyTransferProcessable {@Autowired // 注入NettyTransferProcessor处理器private List<NettyTransferProcessor> processors = new ArrayList<>();@Overridepublic void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {Channel channel = ctx.channel();NettyTransferable request = (NettyTransferable) msg;// 获取支持当前请求的处理器.NettyTransferProcessor processor = getNettyTransferProcessor(request);try {// 使用处理器处理processor.process(request, channel);} catch (Throwable throwable) {// ...}}@Overridepublic NettyTransferProcessor getNettyTransferProcessor(NettyTransferable request) {int action = request.getAction();// 依据请求类型获取支持的处理器NettyTransferProcessor processor = processorsCache.get(action);if (processor == null) {processor = this.processors.stream().filter(p -> p.isSupport(request)).findFirst().get();if (processor != null) {processorsCache.put(action, processor);}}return processor;}
}
SendFileProcessor
public class SendFileProcessor extends AbstractNettyTransferProcessor {// 当前处理器支持的请求(本地主动上传请求 和 服务端被动下载请求)@Overridepublic boolean isSupport(NettyTransferable request) {int role = request.getRole();int action = request.getAction();return super.isSupport(request)&& ((action == NettyTansferAction.PUT_FILE && role == 1)|| (action == NettyTansferAction.GET_FILE && role == 0));}/*** 下载(服务端处理)*/@Overridepublic boolean doProcessResponse(NettyTransferable request, Channel channel) throws Exception {//..}/*** 上传(本地端处理)*/@Overridepublic boolean doProcessReqeust(NettyTransferable response, Channel channel) {// 1.申请上传不通过,或者接收的片段处理异常,可能需要重试.if(StringUtils.isBlank(response.getResponseId())){getRequestCache().notify(response.getRequestId(), response.getStatus(), null);log.debug("申请审核不通过,错误代码:{}", response.getStatus());return false;}// 2.已经传输完成if (isSendEnd(response)) {// 已经发送完成,响应当前请求。getRequestCache().notify(response.getRequestId(), NettyTransferStatusConstants.OK, null);return true;}try {// 向服务器发送文件.fillFileContentToResponse(response, fileName);writeAndFlush(channel, response, response);return true;} catch (Throwable e) {// 发送失败,响应当前请求getRequestCache().notify(response.getRequestId(), NettyTransferStatusConstants.READ_EXCEPTION, null);}return false;}private void fillFileContentToResponse(NettyTransferable response, String fileName) throws Exception {long actualSendSize = response.getHeader().getLong(FieldConstants.SIZE);long position = response.getHeader().getLong(FieldConstants.POSITION);ByteBuffer buffer = null;try (RandomAccessFile raf = new RandomAccessFile(fileName, "r");FileChannel fileChannel = raf.getChannel();) {buffer = fileChannel.map(FileChannel.MapMode.READ_ONLY, position, actualSendSize);response.getHeader().put(FieldConstants.FILE_CRC32, FileUtil.digestCrc32(buffer));response.setBody(Unpooled.wrappedBuffer(buffer)); //将文件内容写入到响应对象中.}catch(Exception e) {throw e;}}
Netyy服务端
总体说明
- FileTransferServer为文件传输服务器,其由NettyServer实现文件接收功能
- NettyServer为实现的Netty服务端(ServerBootstrap),其添加了(编码、业务处理、解码)三个ChannelHandler
- 编码、解码由NettySerializer提供将传输对象与字节间的转换。
- 业务处理(NettyTransferHanler)注册了若干个NettyTransferProcessor来处理各种类型的文件信息(如接收文件)
代码片段
@Component
public class NettySerializer implements Serializer {@Autowiredprivate ByteBufDecoder decoder;@Autowiredprivate ByteBufEncoder encoder;// 对象-->字节@Overridepublic <T extends Serializable> void encode(T serializable, ByteBuf buf) throws Exception {NettyTransferable data = (NettyTransferable) serializable;buf.writeInt(data.getStatus());buf.writeInt(data.getAction());encoder.writeString(buf, data.getRequestId());encoder.writeString(buf, data.getResponseId());buf.writeInt(data.getRole());encoder.writeObject(buf, data.getHeader());if (data.getBody() != null) {buf.writeBytes(data.getBody());}}// 字节-->对象@SuppressWarnings("unchecked")@Overridepublic <T extends Serializable> T decode(ByteBuf buf) throws Exception {NettyTransferable response = new DefaultNettyTransferFileData();response.setStatus(buf.readInt());response.setAction(buf.readInt());response.setRequestId(decoder.readString(buf));response.setResponseId(decoder.readString(buf));response.setRole(buf.readInt());try {response.setHeader(decoder.readObject(buf));} catch (IOException e) {response.setHeader(new MapNettyTransferObject());}if (buf.isReadable()) {response.setBody(buf.slice().retain());}return (T) response;}}
文件传输时序图
上传
1.申请上传文件:
请求类型(PUT_FILE_APPLY)、上传文件名称、文件总大小、起始位置(0)、传输大小(0)、角色(请求方)、请求ID
3.审核并返回到客户端:
请求审核通过时,增加返回响应ID,修改请求类型为PUT_FILE、角色为响应方
4.发送文件
开始发送文件,增加发送文件的字节内容,修改实际发送的起始位置、实际传输大小、角色为请求方
5.接收文件并返回
将传输过来的文件保存到本地,并修改起始位置为下一个起始位置、角色为响应方返回至客户端(第4步)
6.上传文件结束,返回状态
第4步:文件传输完成,且服务器成功接收,则返回结果至调用方(最终至UploadService)
使用说明
项目地址:https://gitee.com/llfsnial/proj-rep.git 中的 file-pool、commons-proj中的 file-transfer、commons-lang
下载file-pool源码构建
服务端
在配置file-pool-server中的application.yml中配置服务器端口、根目录(接收到的文件存在在此目录中)等
启动ServerApplication
客户端
在配置file-pool-client中的application.yml中配置服务器端口、监控根目录(监控此目录中的文件新增情况)等
启动ClientApplication
这篇关于文件归集系统的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!