netty编程之实现断点续传(分片发送)功能

2024-08-30 21:04

本文主要是介绍netty编程之实现断点续传(分片发送)功能,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

写在前面

在我们使用各种网盘的时候,可以随时的暂停上传,然后继续上传,这其实就是断点续传的功能,本文就看下在netty中如何实现断点续传的功能。

1:核心点介绍

1.1:RandomAccessFile

RandomAccessFile类有一个seek方法,通过该方法可以从文件的指定位置开始读取内容,基于此,我们就可以实现从断点处继续上传的效果,其实也就是实现断点续传了。

1.1:client和server交互协议的封装

定义如下的类来封装交互协议:

public class FileTransferProtocol {private Integer transferType; //0请求传输文件、1文件传输指令、2文件传输数据private Object transferObj;   //数据对象;(0)FileDescInfo、(1)FileBurstInstruct、(2)FileBurstDatapublic Integer getTransferType() {return transferType;}public void setTransferType(Integer transferType) {this.transferType = transferType;}public Object getTransferObj() {return transferObj;}public void setTransferObj(Object transferObj) {this.transferObj = transferObj;}}

其中transferType有如下的值:

1:0请求传输文件 客户端请求开始上传文件,对应的信息封装类是FileDescInfo,描述了要上传的文件的名称大小等信息
2:1文件传输指令客户端和服务端共同使用,对应的信息封装类是FileBurstInstruct,通过抽象的指令值来标记当前传输处于哪个阶段
3:2文件传输数据用来封装具体要上传的数据,位置信息等

1.3:protostuff

数据传输的序列化方式采用protostuff,因为其在对象序列化上的性能表现还是比较优秀(序列化的速度以及序列化的大小),并且使用方式也比较简单。

2:正式编码

2.1:server

server main:

package com.dahuyou.netty.transferfile.server;import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;public class NettyServer {//配置服务端NIO线程组private EventLoopGroup parentGroup = new NioEventLoopGroup(); //NioEventLoopGroup extends MultithreadEventLoopGroup Math.max(1, SystemPropertyUtil.getInt("io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));private EventLoopGroup childGroup = new NioEventLoopGroup();private Channel channel;public ChannelFuture bing(int port) {ChannelFuture channelFuture = null;try {ServerBootstrap b = new ServerBootstrap();b.group(parentGroup, childGroup).channel(NioServerSocketChannel.class)    //非阻塞模式.option(ChannelOption.SO_BACKLOG, 128).childHandler(new MyChannelInitializer());channelFuture = b.bind(port).syncUninterruptibly();this.channel = channelFuture.channel();} catch (Exception e) {e.printStackTrace();} finally {if (null != channelFuture && channelFuture.isSuccess()) {System.out.println("netty server start done. {}");} else {System.out.println("netty server start error. {}");}}return channelFuture;}public void destroy() {if (null == channel) return;channel.close();parentGroup.shutdownGracefully();childGroup.shutdownGracefully();}public Channel getChannel() {return channel;}}

MyChannelInitializer:

package com.dahuyou.netty.transferfile.server;import com.dahuyou.netty.transferfile.codec.ObjDecoder;
import com.dahuyou.netty.transferfile.codec.ObjEncoder;
import com.dahuyou.netty.transferfile.domain.FileTransferProtocol;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;public class MyChannelInitializer extends ChannelInitializer<SocketChannel> {@Overrideprotected void initChannel(SocketChannel channel) {//对象传输处理channel.pipeline().addLast(new ObjDecoder(FileTransferProtocol.class));channel.pipeline().addLast(new ObjEncoder(FileTransferProtocol.class));// 在管道中添加我们自己的接收数据实现方法channel.pipeline().addLast(new MyServerHandler());}}

这里设置了基于protostuff的编解码器,以及消息处理的handler:

package com.dahuyou.netty.transferfile.server;import com.alibaba.fastjson.JSON;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.socket.SocketChannel;
import com.dahuyou.netty.transferfile.domain.*;
import com.dahuyou.netty.transferfile.util.CacheUtil;
import com.dahuyou.netty.transferfile.util.FileUtil;
import com.dahuyou.netty.transferfile.util.MsgUtil;
import java.text.SimpleDateFormat;
import java.util.Date;public class MyServerHandler extends ChannelInboundHandlerAdapter {/*** 当客户端主动链接服务端的链接后,这个通道就是活跃的了。也就是客户端与服务端建立了通信通道并且可以传输数据*/@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {SocketChannel channel = (SocketChannel) ctx.channel();System.out.println("链接报告开始");/*System.out.println("链接报告信息:有一客户端链接到本服务端。channelId:" + channel.id());System.out.println("链接报告IP:" + channel.localAddress().getHostString());System.out.println("链接报告Port:" + channel.localAddress().getPort());System.out.println("链接报告完毕");*/}/*** 当客户端主动断开服务端的链接后,这个通道就是不活跃的。也就是说客户端与服务端的关闭了通信通道并且不可以传输数据*/@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {System.out.println("客户端断开链接" + ctx.channel().localAddress().toString());}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {//数据格式验证if (!(msg instanceof FileTransferProtocol)) return;FileTransferProtocol fileTransferProtocol = (FileTransferProtocol) msg;//0传输文件'请求'、1文件传输'指令'、2文件传输'数据'switch (fileTransferProtocol.getTransferType()) {case 0:FileDescInfo fileDescInfo = (FileDescInfo) fileTransferProtocol.getTransferObj();//断点续传信息,实际应用中需要将断点续传信息保存到数据库中FileBurstInstruct fileBurstInstructOld = CacheUtil.burstDataMap.get(fileDescInfo.getFileName());if (null != fileBurstInstructOld) {if (fileBurstInstructOld.getStatus() == Constants.FileStatus.COMPLETE) {CacheUtil.burstDataMap.remove(fileDescInfo.getFileName());}//传输完成删除断点信息System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + " 服务端,接收客户端传输文件请求[断点续传]。" + JSON.toJSONString(fileBurstInstructOld));ctx.writeAndFlush(MsgUtil.buildTransferInstruct(fileBurstInstructOld));return;}//发送信息FileTransferProtocol sendFileTransferProtocol = MsgUtil.buildTransferInstruct(Constants.FileStatus.BEGIN, fileDescInfo.getFileUrl(), 0);ctx.writeAndFlush(sendFileTransferProtocol);System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + " 服务端,接收客户端传输文件请求。" + JSON.toJSONString(fileDescInfo));break;case 2:FileBurstData fileBurstData = (FileBurstData) fileTransferProtocol.getTransferObj();FileBurstInstruct fileBurstInstruct = FileUtil.writeFile("E://", fileBurstData);//保存断点续传信息CacheUtil.burstDataMap.put(fileBurstData.getFileName(), fileBurstInstruct);ctx.writeAndFlush(MsgUtil.buildTransferInstruct(fileBurstInstruct));System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + " 服务端,接收客户端传输文件数据。" + JSON.toJSONString(fileBurstData));//传输完成删除断点信息if (fileBurstInstruct.getStatus() == Constants.FileStatus.COMPLETE) {CacheUtil.burstDataMap.remove(fileBurstData.getFileName());}break;default:break;}}/*** 抓住异常,当发生异常的时候,可以做一些相应的处理,比如打印日志、关闭链接*/@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {ctx.close();System.out.println("异常信息:\r\n" + cause.getMessage());}}

主要看方法channelRead,分为如下几种情况:

0:根据是否是续传返回不同的消息,控制client上传的不同行为
2:如果是上传文件,则保存文件,完成当前文件内容的上传,并返回续传信息给client,client继续上传

2.2:client

client main:

package com.dahuyou.netty.transferfile.client;import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;public class NettyClient {//配置服务端NIO线程组private EventLoopGroup workerGroup = new NioEventLoopGroup();private Channel channel;public ChannelFuture connect(String inetHost, int inetPort) {ChannelFuture channelFuture = null;try {Bootstrap b = new Bootstrap();b.group(workerGroup);b.channel(NioSocketChannel.class);b.option(ChannelOption.AUTO_READ, true);b.handler(new MyChannelInitializer());channelFuture = b.connect(inetHost, inetPort).syncUninterruptibly();this.channel = channelFuture.channel();} catch (Exception e) {e.printStackTrace();} finally {if (null != channelFuture && channelFuture.isSuccess()) {System.out.println("netty client start done. {}");} else {System.out.println("netty client start error. {}");}}return channelFuture;}public void destroy() {if (null == channel) return;channel.close();workerGroup.shutdownGracefully();}}

MyChannelInitializer:

package com.dahuyou.netty.transferfile.client;import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import com.dahuyou.netty.transferfile.codec.ObjDecoder;
import com.dahuyou.netty.transferfile.codec.ObjEncoder;
import com.dahuyou.netty.transferfile.domain.FileTransferProtocol;public class MyChannelInitializer extends ChannelInitializer<SocketChannel> {@Overrideprotected void initChannel(SocketChannel channel) throws Exception {//对象传输处理channel.pipeline().addLast(new ObjDecoder(FileTransferProtocol.class));channel.pipeline().addLast(new ObjEncoder(FileTransferProtocol.class));// 在管道中添加我们自己的接收数据实现方法channel.pipeline().addLast(new MyClientHandler());}}

同样设置了protostuff的编解码器,以及消息处理类:

package com.dahuyou.netty.transferfile.client;import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.socket.SocketChannel;
import com.dahuyou.netty.transferfile.domain.Constants;
import com.dahuyou.netty.transferfile.domain.FileBurstData;
import com.dahuyou.netty.transferfile.domain.FileBurstInstruct;
import com.dahuyou.netty.transferfile.domain.FileTransferProtocol;
import com.dahuyou.netty.transferfile.util.FileUtil;
import com.dahuyou.netty.transferfile.util.MsgUtil;
import java.text.SimpleDateFormat;
import java.util.Date;public class MyClientHandler extends ChannelInboundHandlerAdapter {/*** 当客户端主动链接服务端的链接后,这个通道就是活跃的了。也就是客户端与服务端建立了通信通道并且可以传输数据*/@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {SocketChannel channel = (SocketChannel) ctx.channel();System.out.println("链接报告开始");/*System.out.println("链接报告信息:本客户端链接到服务端。channelId:" + channel.id());System.out.println("链接报告IP:" + channel.localAddress().getHostString());System.out.println("链接报告Port:" + channel.localAddress().getPort());System.out.println("链接报告完毕");*/}/*** 当客户端主动断开服务端的链接后,这个通道就是不活跃的。也就是说客户端与服务端的关闭了通信通道并且不可以传输数据*/@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {System.out.println("断开链接" + ctx.channel().localAddress().toString());super.channelInactive(ctx);}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {//数据格式验证if (!(msg instanceof FileTransferProtocol)) return;FileTransferProtocol fileTransferProtocol = (FileTransferProtocol) msg;//0传输文件'请求'、1文件传输'指令'、2文件传输'数据'switch (fileTransferProtocol.getTransferType()) {case 1:FileBurstInstruct fileBurstInstruct = (FileBurstInstruct) fileTransferProtocol.getTransferObj();//Constants.FileStatus {0开始、1中间、2结尾、3完成}if (Constants.FileStatus.COMPLETE == fileBurstInstruct.getStatus()) {ctx.flush();ctx.close();System.exit(-1);return;}FileBurstData fileBurstData = FileUtil.readFile(fileBurstInstruct.getClientFileUrl(), fileBurstInstruct.getReadPosition());ctx.writeAndFlush(MsgUtil.buildTransferData(fileBurstData));System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + " 客户端传输文件信息。 FILE:" + fileBurstData.getFileName() + " SIZE(byte):" + (fileBurstData.getEndPos() - fileBurstData.getBeginPos()));break;default:break;}/**模拟传输过程中断,场景测试可以注释掉*System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + " [主动断开链接,模拟断点续传]");ctx.flush();ctx.close();System.exit(-1);*/}/*** 抓住异常,当发生异常的时候,可以做一些相应的处理,比如打印日志、关闭链接*/@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {ctx.close();System.out.println("异常信息:\r\n" + cause.getMessage());}}

主要看方法channelRead,处理文件传输,根据是首次上传还是续传,从要上传的文件中获取字节码数据写到server,其中,体现续传的代码为FileUtil.readFile:

public class FileUtil {private static final int READ_BYTE_ONCE = 1024;public static FileBurstData readFile(String fileUrl, Integer readPosition) throws IOException {File file = new File(fileUrl);RandomAccessFile randomAccessFile = new RandomAccessFile(file, "r");//r: 只读模式 rw:读写模式// 这里体现了断点续传的续哦!!!randomAccessFile.seek(readPosition);}}

randomAccessFile.seek(readPosition);这里跳一下子就体现了断点续传的续哦!!!

2.3:测试

server启动类:

package com.dahuyou.netty.transferfile.test;import com.dahuyou.netty.transferfile.server.NettyServer;public class NettyServerTest {public static void main(String[] args) {System.out.println("hi netty server");//启动服务new NettyServer().bing(7397);}}

client启动类:

package com.dahuyou.netty.transferfile.test;import com.dahuyou.netty.transferfile.client.NettyClient;
import com.dahuyou.netty.transferfile.domain.FileTransferProtocol;
import com.dahuyou.netty.transferfile.util.MsgUtil;
import io.netty.channel.ChannelFuture;
import java.io.File;public class NettyClientTest {public static void main(String[] args) {//启动客户端ChannelFuture channelFuture = new NettyClient().connect("127.0.0.1", 7397);//文件信息{文件大于1024kb方便测试断点续传}
//        File file = new File("C:\\Users\\fuzhengwei1\\Desktop\\测试传输文件.rar");File file = new File("D:\\xiaofuge_sourcecode\\interview-master\\dahuyou-study-netty\\transferfile\\src\\test\\java\\com\\dahuyou\\netty\\transferfile\\test\\测试传输文件.rar");FileTransferProtocol fileTransferProtocol = MsgUtil.buildRequestTransferFile(file.getAbsolutePath(), file.getName(), file.length());//发送信息;FILE:测试传输文件请求传输文件channelFuture.channel().writeAndFlush(fileTransferProtocol);}}

在client中首次启动发送请求上传文件的协议消息,发起文件上传的流程,我们测试的文件大小为1360字节,而首次上传文件的大小为1024字节,如下代码:

public class FileUtil {private static final int READ_BYTE_ONCE = 1024;public static FileBurstData readFile(String fileUrl, Integer readPosition) throws IOException {File file = new File(fileUrl);RandomAccessFile randomAccessFile = new RandomAccessFile(file, "r");//r: 只读模式 rw:读写模式randomAccessFile.seek(readPosition);
//        byte[] bytes = new byte[1024 * 100];byte[] bytes = new byte[READ_BYTE_ONCE];    
}

所以第一次上传后文件是打不开的如下:
在这里插入图片描述
再次上传后文件就可以正常打开了。
最后看下日志输出:
在这里插入图片描述
在这里插入图片描述

写在后面

参考文章列表

protostuff序列化方式学习 。

netty编程之使用protostuff作为数据传输载体 。

这篇关于netty编程之实现断点续传(分片发送)功能的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1121928

相关文章

Java实现检查多个时间段是否有重合

《Java实现检查多个时间段是否有重合》这篇文章主要为大家详细介绍了如何使用Java实现检查多个时间段是否有重合,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 目录流程概述步骤详解China编程步骤1:定义时间段类步骤2:添加时间段步骤3:检查时间段是否有重合步骤4:输出结果示例代码结语作

使用C++实现链表元素的反转

《使用C++实现链表元素的反转》反转链表是链表操作中一个经典的问题,也是面试中常见的考题,本文将从思路到实现一步步地讲解如何实现链表的反转,帮助初学者理解这一操作,我们将使用C++代码演示具体实现,同... 目录问题定义思路分析代码实现带头节点的链表代码讲解其他实现方式时间和空间复杂度分析总结问题定义给定

Java覆盖第三方jar包中的某一个类的实现方法

《Java覆盖第三方jar包中的某一个类的实现方法》在我们日常的开发中,经常需要使用第三方的jar包,有时候我们会发现第三方的jar包中的某一个类有问题,或者我们需要定制化修改其中的逻辑,那么应该如何... 目录一、需求描述二、示例描述三、操作步骤四、验证结果五、实现原理一、需求描述需求描述如下:需要在

如何使用Java实现请求deepseek

《如何使用Java实现请求deepseek》这篇文章主要为大家详细介绍了如何使用Java实现请求deepseek功能,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 目录1.deepseek的api创建2.Java实现请求deepseek2.1 pom文件2.2 json转化文件2.2

python使用fastapi实现多语言国际化的操作指南

《python使用fastapi实现多语言国际化的操作指南》本文介绍了使用Python和FastAPI实现多语言国际化的操作指南,包括多语言架构技术栈、翻译管理、前端本地化、语言切换机制以及常见陷阱和... 目录多语言国际化实现指南项目多语言架构技术栈目录结构翻译工作流1. 翻译数据存储2. 翻译生成脚本

如何通过Python实现一个消息队列

《如何通过Python实现一个消息队列》这篇文章主要为大家详细介绍了如何通过Python实现一个简单的消息队列,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 目录如何通过 python 实现消息队列如何把 http 请求放在队列中执行1. 使用 queue.Queue 和 reque

Python如何实现PDF隐私信息检测

《Python如何实现PDF隐私信息检测》随着越来越多的个人信息以电子形式存储和传输,确保这些信息的安全至关重要,本文将介绍如何使用Python检测PDF文件中的隐私信息,需要的可以参考下... 目录项目背景技术栈代码解析功能说明运行结php果在当今,数据隐私保护变得尤为重要。随着越来越多的个人信息以电子形

使用 sql-research-assistant进行 SQL 数据库研究的实战指南(代码实现演示)

《使用sql-research-assistant进行SQL数据库研究的实战指南(代码实现演示)》本文介绍了sql-research-assistant工具,该工具基于LangChain框架,集... 目录技术背景介绍核心原理解析代码实现演示安装和配置项目集成LangSmith 配置(可选)启动服务应用场景

使用Python快速实现链接转word文档

《使用Python快速实现链接转word文档》这篇文章主要为大家详细介绍了如何使用Python快速实现链接转word文档功能,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 演示代码展示from newspaper import Articlefrom docx import

前端原生js实现拖拽排课效果实例

《前端原生js实现拖拽排课效果实例》:本文主要介绍如何实现一个简单的课程表拖拽功能,通过HTML、CSS和JavaScript的配合,我们实现了课程项的拖拽、放置和显示功能,文中通过实例代码介绍的... 目录1. 效果展示2. 效果分析2.1 关键点2.2 实现方法3. 代码实现3.1 html部分3.2