netty编程之实现断点续传(分片发送)功能

2024-08-30 21:04

本文主要是介绍netty编程之实现断点续传(分片发送)功能,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

写在前面

在我们使用各种网盘的时候,可以随时的暂停上传,然后继续上传,这其实就是断点续传的功能,本文就看下在netty中如何实现断点续传的功能。

1:核心点介绍

1.1:RandomAccessFile

RandomAccessFile类有一个seek方法,通过该方法可以从文件的指定位置开始读取内容,基于此,我们就可以实现从断点处继续上传的效果,其实也就是实现断点续传了。

1.1:client和server交互协议的封装

定义如下的类来封装交互协议:

public class FileTransferProtocol {private Integer transferType; //0请求传输文件、1文件传输指令、2文件传输数据private Object transferObj;   //数据对象;(0)FileDescInfo、(1)FileBurstInstruct、(2)FileBurstDatapublic Integer getTransferType() {return transferType;}public void setTransferType(Integer transferType) {this.transferType = transferType;}public Object getTransferObj() {return transferObj;}public void setTransferObj(Object transferObj) {this.transferObj = transferObj;}}

其中transferType有如下的值:

1:0请求传输文件 客户端请求开始上传文件,对应的信息封装类是FileDescInfo,描述了要上传的文件的名称大小等信息
2:1文件传输指令客户端和服务端共同使用,对应的信息封装类是FileBurstInstruct,通过抽象的指令值来标记当前传输处于哪个阶段
3:2文件传输数据用来封装具体要上传的数据,位置信息等

1.3:protostuff

数据传输的序列化方式采用protostuff,因为其在对象序列化上的性能表现还是比较优秀(序列化的速度以及序列化的大小),并且使用方式也比较简单。

2:正式编码

2.1:server

server main:

package com.dahuyou.netty.transferfile.server;import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;public class NettyServer {//配置服务端NIO线程组private EventLoopGroup parentGroup = new NioEventLoopGroup(); //NioEventLoopGroup extends MultithreadEventLoopGroup Math.max(1, SystemPropertyUtil.getInt("io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));private EventLoopGroup childGroup = new NioEventLoopGroup();private Channel channel;public ChannelFuture bing(int port) {ChannelFuture channelFuture = null;try {ServerBootstrap b = new ServerBootstrap();b.group(parentGroup, childGroup).channel(NioServerSocketChannel.class)    //非阻塞模式.option(ChannelOption.SO_BACKLOG, 128).childHandler(new MyChannelInitializer());channelFuture = b.bind(port).syncUninterruptibly();this.channel = channelFuture.channel();} catch (Exception e) {e.printStackTrace();} finally {if (null != channelFuture && channelFuture.isSuccess()) {System.out.println("netty server start done. {}");} else {System.out.println("netty server start error. {}");}}return channelFuture;}public void destroy() {if (null == channel) return;channel.close();parentGroup.shutdownGracefully();childGroup.shutdownGracefully();}public Channel getChannel() {return channel;}}

MyChannelInitializer:

package com.dahuyou.netty.transferfile.server;import com.dahuyou.netty.transferfile.codec.ObjDecoder;
import com.dahuyou.netty.transferfile.codec.ObjEncoder;
import com.dahuyou.netty.transferfile.domain.FileTransferProtocol;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;public class MyChannelInitializer extends ChannelInitializer<SocketChannel> {@Overrideprotected void initChannel(SocketChannel channel) {//对象传输处理channel.pipeline().addLast(new ObjDecoder(FileTransferProtocol.class));channel.pipeline().addLast(new ObjEncoder(FileTransferProtocol.class));// 在管道中添加我们自己的接收数据实现方法channel.pipeline().addLast(new MyServerHandler());}}

这里设置了基于protostuff的编解码器,以及消息处理的handler:

package com.dahuyou.netty.transferfile.server;import com.alibaba.fastjson.JSON;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.socket.SocketChannel;
import com.dahuyou.netty.transferfile.domain.*;
import com.dahuyou.netty.transferfile.util.CacheUtil;
import com.dahuyou.netty.transferfile.util.FileUtil;
import com.dahuyou.netty.transferfile.util.MsgUtil;
import java.text.SimpleDateFormat;
import java.util.Date;public class MyServerHandler extends ChannelInboundHandlerAdapter {/*** 当客户端主动链接服务端的链接后,这个通道就是活跃的了。也就是客户端与服务端建立了通信通道并且可以传输数据*/@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {SocketChannel channel = (SocketChannel) ctx.channel();System.out.println("链接报告开始");/*System.out.println("链接报告信息:有一客户端链接到本服务端。channelId:" + channel.id());System.out.println("链接报告IP:" + channel.localAddress().getHostString());System.out.println("链接报告Port:" + channel.localAddress().getPort());System.out.println("链接报告完毕");*/}/*** 当客户端主动断开服务端的链接后,这个通道就是不活跃的。也就是说客户端与服务端的关闭了通信通道并且不可以传输数据*/@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {System.out.println("客户端断开链接" + ctx.channel().localAddress().toString());}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {//数据格式验证if (!(msg instanceof FileTransferProtocol)) return;FileTransferProtocol fileTransferProtocol = (FileTransferProtocol) msg;//0传输文件'请求'、1文件传输'指令'、2文件传输'数据'switch (fileTransferProtocol.getTransferType()) {case 0:FileDescInfo fileDescInfo = (FileDescInfo) fileTransferProtocol.getTransferObj();//断点续传信息,实际应用中需要将断点续传信息保存到数据库中FileBurstInstruct fileBurstInstructOld = CacheUtil.burstDataMap.get(fileDescInfo.getFileName());if (null != fileBurstInstructOld) {if (fileBurstInstructOld.getStatus() == Constants.FileStatus.COMPLETE) {CacheUtil.burstDataMap.remove(fileDescInfo.getFileName());}//传输完成删除断点信息System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + " 服务端,接收客户端传输文件请求[断点续传]。" + JSON.toJSONString(fileBurstInstructOld));ctx.writeAndFlush(MsgUtil.buildTransferInstruct(fileBurstInstructOld));return;}//发送信息FileTransferProtocol sendFileTransferProtocol = MsgUtil.buildTransferInstruct(Constants.FileStatus.BEGIN, fileDescInfo.getFileUrl(), 0);ctx.writeAndFlush(sendFileTransferProtocol);System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + " 服务端,接收客户端传输文件请求。" + JSON.toJSONString(fileDescInfo));break;case 2:FileBurstData fileBurstData = (FileBurstData) fileTransferProtocol.getTransferObj();FileBurstInstruct fileBurstInstruct = FileUtil.writeFile("E://", fileBurstData);//保存断点续传信息CacheUtil.burstDataMap.put(fileBurstData.getFileName(), fileBurstInstruct);ctx.writeAndFlush(MsgUtil.buildTransferInstruct(fileBurstInstruct));System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + " 服务端,接收客户端传输文件数据。" + JSON.toJSONString(fileBurstData));//传输完成删除断点信息if (fileBurstInstruct.getStatus() == Constants.FileStatus.COMPLETE) {CacheUtil.burstDataMap.remove(fileBurstData.getFileName());}break;default:break;}}/*** 抓住异常,当发生异常的时候,可以做一些相应的处理,比如打印日志、关闭链接*/@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {ctx.close();System.out.println("异常信息:\r\n" + cause.getMessage());}}

主要看方法channelRead,分为如下几种情况:

0:根据是否是续传返回不同的消息,控制client上传的不同行为
2:如果是上传文件,则保存文件,完成当前文件内容的上传,并返回续传信息给client,client继续上传

2.2:client

client main:

package com.dahuyou.netty.transferfile.client;import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;public class NettyClient {//配置服务端NIO线程组private EventLoopGroup workerGroup = new NioEventLoopGroup();private Channel channel;public ChannelFuture connect(String inetHost, int inetPort) {ChannelFuture channelFuture = null;try {Bootstrap b = new Bootstrap();b.group(workerGroup);b.channel(NioSocketChannel.class);b.option(ChannelOption.AUTO_READ, true);b.handler(new MyChannelInitializer());channelFuture = b.connect(inetHost, inetPort).syncUninterruptibly();this.channel = channelFuture.channel();} catch (Exception e) {e.printStackTrace();} finally {if (null != channelFuture && channelFuture.isSuccess()) {System.out.println("netty client start done. {}");} else {System.out.println("netty client start error. {}");}}return channelFuture;}public void destroy() {if (null == channel) return;channel.close();workerGroup.shutdownGracefully();}}

MyChannelInitializer:

package com.dahuyou.netty.transferfile.client;import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import com.dahuyou.netty.transferfile.codec.ObjDecoder;
import com.dahuyou.netty.transferfile.codec.ObjEncoder;
import com.dahuyou.netty.transferfile.domain.FileTransferProtocol;public class MyChannelInitializer extends ChannelInitializer<SocketChannel> {@Overrideprotected void initChannel(SocketChannel channel) throws Exception {//对象传输处理channel.pipeline().addLast(new ObjDecoder(FileTransferProtocol.class));channel.pipeline().addLast(new ObjEncoder(FileTransferProtocol.class));// 在管道中添加我们自己的接收数据实现方法channel.pipeline().addLast(new MyClientHandler());}}

同样设置了protostuff的编解码器,以及消息处理类:

package com.dahuyou.netty.transferfile.client;import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.socket.SocketChannel;
import com.dahuyou.netty.transferfile.domain.Constants;
import com.dahuyou.netty.transferfile.domain.FileBurstData;
import com.dahuyou.netty.transferfile.domain.FileBurstInstruct;
import com.dahuyou.netty.transferfile.domain.FileTransferProtocol;
import com.dahuyou.netty.transferfile.util.FileUtil;
import com.dahuyou.netty.transferfile.util.MsgUtil;
import java.text.SimpleDateFormat;
import java.util.Date;public class MyClientHandler extends ChannelInboundHandlerAdapter {/*** 当客户端主动链接服务端的链接后,这个通道就是活跃的了。也就是客户端与服务端建立了通信通道并且可以传输数据*/@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {SocketChannel channel = (SocketChannel) ctx.channel();System.out.println("链接报告开始");/*System.out.println("链接报告信息:本客户端链接到服务端。channelId:" + channel.id());System.out.println("链接报告IP:" + channel.localAddress().getHostString());System.out.println("链接报告Port:" + channel.localAddress().getPort());System.out.println("链接报告完毕");*/}/*** 当客户端主动断开服务端的链接后,这个通道就是不活跃的。也就是说客户端与服务端的关闭了通信通道并且不可以传输数据*/@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {System.out.println("断开链接" + ctx.channel().localAddress().toString());super.channelInactive(ctx);}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {//数据格式验证if (!(msg instanceof FileTransferProtocol)) return;FileTransferProtocol fileTransferProtocol = (FileTransferProtocol) msg;//0传输文件'请求'、1文件传输'指令'、2文件传输'数据'switch (fileTransferProtocol.getTransferType()) {case 1:FileBurstInstruct fileBurstInstruct = (FileBurstInstruct) fileTransferProtocol.getTransferObj();//Constants.FileStatus {0开始、1中间、2结尾、3完成}if (Constants.FileStatus.COMPLETE == fileBurstInstruct.getStatus()) {ctx.flush();ctx.close();System.exit(-1);return;}FileBurstData fileBurstData = FileUtil.readFile(fileBurstInstruct.getClientFileUrl(), fileBurstInstruct.getReadPosition());ctx.writeAndFlush(MsgUtil.buildTransferData(fileBurstData));System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + " 客户端传输文件信息。 FILE:" + fileBurstData.getFileName() + " SIZE(byte):" + (fileBurstData.getEndPos() - fileBurstData.getBeginPos()));break;default:break;}/**模拟传输过程中断,场景测试可以注释掉*System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + " [主动断开链接,模拟断点续传]");ctx.flush();ctx.close();System.exit(-1);*/}/*** 抓住异常,当发生异常的时候,可以做一些相应的处理,比如打印日志、关闭链接*/@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {ctx.close();System.out.println("异常信息:\r\n" + cause.getMessage());}}

主要看方法channelRead,处理文件传输,根据是首次上传还是续传,从要上传的文件中获取字节码数据写到server,其中,体现续传的代码为FileUtil.readFile:

public class FileUtil {private static final int READ_BYTE_ONCE = 1024;public static FileBurstData readFile(String fileUrl, Integer readPosition) throws IOException {File file = new File(fileUrl);RandomAccessFile randomAccessFile = new RandomAccessFile(file, "r");//r: 只读模式 rw:读写模式// 这里体现了断点续传的续哦!!!randomAccessFile.seek(readPosition);}}

randomAccessFile.seek(readPosition);这里跳一下子就体现了断点续传的续哦!!!

2.3:测试

server启动类:

package com.dahuyou.netty.transferfile.test;import com.dahuyou.netty.transferfile.server.NettyServer;public class NettyServerTest {public static void main(String[] args) {System.out.println("hi netty server");//启动服务new NettyServer().bing(7397);}}

client启动类:

package com.dahuyou.netty.transferfile.test;import com.dahuyou.netty.transferfile.client.NettyClient;
import com.dahuyou.netty.transferfile.domain.FileTransferProtocol;
import com.dahuyou.netty.transferfile.util.MsgUtil;
import io.netty.channel.ChannelFuture;
import java.io.File;public class NettyClientTest {public static void main(String[] args) {//启动客户端ChannelFuture channelFuture = new NettyClient().connect("127.0.0.1", 7397);//文件信息{文件大于1024kb方便测试断点续传}
//        File file = new File("C:\\Users\\fuzhengwei1\\Desktop\\测试传输文件.rar");File file = new File("D:\\xiaofuge_sourcecode\\interview-master\\dahuyou-study-netty\\transferfile\\src\\test\\java\\com\\dahuyou\\netty\\transferfile\\test\\测试传输文件.rar");FileTransferProtocol fileTransferProtocol = MsgUtil.buildRequestTransferFile(file.getAbsolutePath(), file.getName(), file.length());//发送信息;FILE:测试传输文件请求传输文件channelFuture.channel().writeAndFlush(fileTransferProtocol);}}

在client中首次启动发送请求上传文件的协议消息,发起文件上传的流程,我们测试的文件大小为1360字节,而首次上传文件的大小为1024字节,如下代码:

public class FileUtil {private static final int READ_BYTE_ONCE = 1024;public static FileBurstData readFile(String fileUrl, Integer readPosition) throws IOException {File file = new File(fileUrl);RandomAccessFile randomAccessFile = new RandomAccessFile(file, "r");//r: 只读模式 rw:读写模式randomAccessFile.seek(readPosition);
//        byte[] bytes = new byte[1024 * 100];byte[] bytes = new byte[READ_BYTE_ONCE];    
}

所以第一次上传后文件是打不开的如下:
在这里插入图片描述
再次上传后文件就可以正常打开了。
最后看下日志输出:
在这里插入图片描述
在这里插入图片描述

写在后面

参考文章列表

protostuff序列化方式学习 。

netty编程之使用protostuff作为数据传输载体 。

这篇关于netty编程之实现断点续传(分片发送)功能的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1121928

相关文章

Springboot处理跨域的实现方式(附Demo)

《Springboot处理跨域的实现方式(附Demo)》:本文主要介绍Springboot处理跨域的实现方式(附Demo),具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不... 目录Springboot处理跨域的方式1. 基本知识2. @CrossOrigin3. 全局跨域设置4.

Spring Boot 3.4.3 基于 Spring WebFlux 实现 SSE 功能(代码示例)

《SpringBoot3.4.3基于SpringWebFlux实现SSE功能(代码示例)》SpringBoot3.4.3结合SpringWebFlux实现SSE功能,为实时数据推送提供... 目录1. SSE 简介1.1 什么是 SSE?1.2 SSE 的优点1.3 适用场景2. Spring WebFlu

基于SpringBoot实现文件秒传功能

《基于SpringBoot实现文件秒传功能》在开发Web应用时,文件上传是一个常见需求,然而,当用户需要上传大文件或相同文件多次时,会造成带宽浪费和服务器存储冗余,此时可以使用文件秒传技术通过识别重复... 目录前言文件秒传原理代码实现1. 创建项目基础结构2. 创建上传存储代码3. 创建Result类4.

SpringBoot日志配置SLF4J和Logback的方法实现

《SpringBoot日志配置SLF4J和Logback的方法实现》日志记录是不可或缺的一部分,本文主要介绍了SpringBoot日志配置SLF4J和Logback的方法实现,文中通过示例代码介绍的非... 目录一、前言二、案例一:初识日志三、案例二:使用Lombok输出日志四、案例三:配置Logback一

Python如何使用__slots__实现节省内存和性能优化

《Python如何使用__slots__实现节省内存和性能优化》你有想过,一个小小的__slots__能让你的Python类内存消耗直接减半吗,没错,今天咱们要聊的就是这个让人眼前一亮的技巧,感兴趣的... 目录背景:内存吃得满满的类__slots__:你的内存管理小助手举个大概的例子:看看效果如何?1.

Python+PyQt5实现多屏幕协同播放功能

《Python+PyQt5实现多屏幕协同播放功能》在现代会议展示、数字广告、展览展示等场景中,多屏幕协同播放已成为刚需,下面我们就来看看如何利用Python和PyQt5开发一套功能强大的跨屏播控系统吧... 目录一、项目概述:突破传统播放限制二、核心技术解析2.1 多屏管理机制2.2 播放引擎设计2.3 专

一文详解SpringBoot响应压缩功能的配置与优化

《一文详解SpringBoot响应压缩功能的配置与优化》SpringBoot的响应压缩功能基于智能协商机制,需同时满足很多条件,本文主要为大家详细介绍了SpringBoot响应压缩功能的配置与优化,需... 目录一、核心工作机制1.1 自动协商触发条件1.2 压缩处理流程二、配置方案详解2.1 基础YAML

Python实现无痛修改第三方库源码的方法详解

《Python实现无痛修改第三方库源码的方法详解》很多时候,我们下载的第三方库是不会有需求不满足的情况,但也有极少的情况,第三方库没有兼顾到需求,本文将介绍几个修改源码的操作,大家可以根据需求进行选择... 目录需求不符合模拟示例 1. 修改源文件2. 继承修改3. 猴子补丁4. 追踪局部变量需求不符合很

Java的IO模型、Netty原理解析

《Java的IO模型、Netty原理解析》Java的I/O是以流的方式进行数据输入输出的,Java的类库涉及很多领域的IO内容:标准的输入输出,文件的操作、网络上的数据传输流、字符串流、对象流等,这篇... 目录1.什么是IO2.同步与异步、阻塞与非阻塞3.三种IO模型BIO(blocking I/O)NI

idea中创建新类时自动添加注释的实现

《idea中创建新类时自动添加注释的实现》在每次使用idea创建一个新类时,过了一段时间发现看不懂这个类是用来干嘛的,为了解决这个问题,我们可以设置在创建一个新类时自动添加注释,帮助我们理解这个类的用... 目录前言:详细操作:步骤一:点击上方的 文件(File),点击&nbmyHIgsp;设置(Setti