基于RxJava2.0+Retrofit2.0的文件下载实现(带进度,非覆写ResponseBody和拦截器)

本文主要是介绍基于RxJava2.0+Retrofit2.0的文件下载实现(带进度,非覆写ResponseBody和拦截器),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

基于RxJava2.0+Retrofit2.0的文件下载实现(带进度,非覆写ResponseBody和拦截器)

前言

本文主要讲解RxJava2.0+Retrofit2.0实现下载文件并带进度效果,如果按照传统方法是很容易实现的。但是,发现网上搜索的例子都是通过OkHttpClient的拦截器去拦截Response来实现进度显示(侵入性有点强),个人发现bug不少,问题都是在UI更新方面出了问题,只要记住UI刷新在主线程更新都容易解决,下面介绍两种非修改拦截器实现文件下载的方法:

效果图 

图中下载速度较快,是基于在公司专线环境测试

依赖添加

 /*Retrofit是一款类型安全的网络框架,基于HTTP协议,服务于Android和java语言,集成了okhttp依赖*/compile 'com.squareup.retrofit2:retrofit:2.3.0'compile 'com.squareup.retrofit2:converter-gson:2.3.0'compile 'com.squareup.retrofit2:adapter-rxjava2:2.3.0'/*RxAndroid一款Android客户端组件间异步通信的框架,1和2差别很大*/compile 'io.reactivex.rxjava2:rxandroid:2.0.1'compile 'io.reactivex.rxjava2:rxjava:2.1.8'

具体实现 

 方法1:使用Handler更新下载进度

/*** 下载文件法1(使用Handler更新UI)** @param observable      下载被观察者* @param destDir         下载目录* @param fileName        文件名* @param progressHandler 进度handler*/public static void downloadFile(Observable<ResponseBody> observable, final String destDir, final String fileName, final DownloadProgressHandler progressHandler) {final DownloadInfo downloadInfo = new DownloadInfo();observable.subscribeOn(Schedulers.io()).observeOn(Schedulers.io()).subscribe(new Observer<ResponseBody>() {@Overridepublic void onSubscribe(Disposable d) {addDisposable(d);}@Overridepublic void onNext(ResponseBody responseBody) {InputStream inputStream = null;long total = 0;long responseLength;FileOutputStream fos = null;try {byte[] buf = new byte[1024 * 8];int len;responseLength = responseBody.contentLength();inputStream = responseBody.byteStream();final File file = new File(destDir, fileName);downloadInfo.setFile(file);downloadInfo.setFileSize(responseLength);File dir = new File(destDir);if (!dir.exists()) {dir.mkdirs();}fos = new FileOutputStream(file);int progress = 0;int lastProgress=-1;long startTime = System.currentTimeMillis(); // 开始下载时获取开始时间while ((len = inputStream.read(buf)) != -1) {fos.write(buf, 0, len);total += len;progress = (int) (total * 100 / responseLength);long curTime = System.currentTimeMillis();long usedTime = (curTime - startTime) / 1000;if (usedTime == 0) {usedTime = 1;}long speed = (total / usedTime); // 平均每秒下载速度// 如果进度与之前进度相等,则不更新,如果更新太频繁,则会造成界面卡顿if (progress != lastProgress) {downloadInfo.setSpeed(speed);downloadInfo.setProgress(progress);downloadInfo.setCurrentSize(total);progressHandler.sendMessage(DownloadProgressHandler.DOWNLOAD_PROGRESS, downloadInfo);}lastProgress = progress;}fos.flush();downloadInfo.setFile(file);progressHandler.sendMessage(DownloadProgressHandler.DOWNLOAD_SUCCESS, downloadInfo);} catch (final Exception e) {downloadInfo.setErrorMsg(e);progressHandler.sendMessage(DownloadProgressHandler.DOWNLOAD_FAIL, downloadInfo);} finally {try {if (fos != null) {fos.close();}if (inputStream != null) {inputStream.close();}} catch (Exception e) {e.printStackTrace();}}}@Overridepublic void onError(Throwable e) {//new Consumer<Throwable>downloadInfo.setErrorMsg(e);progressHandler.sendMessage(DownloadProgressHandler.DOWNLOAD_FAIL, downloadInfo);}@Overridepublic void onComplete() {// new Action()}});}

方法2:使用RxJava发射器更新下载进度 

 /*** 下载文件法2(使用RXJava更新UI)** @param observable* @param destDir* @param fileName* @param progressHandler*/public static void downloadFile2(Observable<ResponseBody> observable, final String destDir, final String fileName, final DownloadProgressHandler progressHandler) {final DownloadInfo downloadInfo = new DownloadInfo();observable.subscribeOn(Schedulers.io()).unsubscribeOn(Schedulers.io()).flatMap(new Function<ResponseBody, ObservableSource<DownloadInfo>>() {@Overridepublic ObservableSource<DownloadInfo> apply(final ResponseBody responseBody) throws Exception {return Observable.create(new ObservableOnSubscribe<DownloadInfo>() {@Overridepublic void subscribe(ObservableEmitter<DownloadInfo> emitter) throws Exception {InputStream inputStream = null;long total = 0;long responseLength = 0;FileOutputStream fos = null;try {byte[] buf = new byte[1024 * 8];int len = 0;responseLength = responseBody.contentLength();inputStream = responseBody.byteStream();final File file = new File(destDir, fileName);downloadInfo.setFile(file);downloadInfo.setFileSize(responseLength);File dir = new File(destDir);if (!dir.exists()) {dir.mkdirs();}fos = new FileOutputStream(file);int progress = 0;int lastProgress = -1;long startTime = System.currentTimeMillis(); // 开始下载时获取开始时间while ((len = inputStream.read(buf)) != -1) {fos.write(buf, 0, len);total += len;progress = (int) (total * 100 / responseLength);long curTime = System.currentTimeMillis();long usedTime = (curTime - startTime) / 1000;if (usedTime == 0) {usedTime = 1;}long speed = (total / usedTime); // 平均每秒下载速度// 如果进度与之前进度相等,则不更新,如果更新太频繁,则会造成界面卡顿if (progress != lastProgress) {downloadInfo.setSpeed(speed);downloadInfo.setProgress(progress);downloadInfo.setCurrentSize(total);emitter.onNext(downloadInfo);}lastProgress = progress;}fos.flush();downloadInfo.setFile(file);emitter.onComplete();} catch (Exception e) {downloadInfo.setErrorMsg(e);emitter.onError(e);} finally {try {if (fos != null) {fos.close();}if (inputStream != null) {inputStream.close();}} catch (Exception e) {e.printStackTrace();}}}});}}).observeOn(AndroidSchedulers.mainThread()).subscribe(new Observer<DownloadInfo>() {@Overridepublic void onSubscribe(Disposable d) {addDisposable(d);}@Overridepublic void onNext(DownloadInfo downloadInfo) {progressHandler.onProgress(downloadInfo.getProgress(), downloadInfo.getFileSize(), downloadInfo.getSpeed());}@Overridepublic void onError(Throwable e) {progressHandler.onError(e);}@Overridepublic void onComplete() {LogUtils.i("下载完成");progressHandler.onCompleted(downloadInfo.getFile());}});}

相关代码:

api服务接口类

DownloadApi.java

public interface DownloadApi {/*** 下载Apk1文件**/@Streaming@GET("imtt.dd.qq.com/16891/C527A902F14C1FFD8AA9C13872D5F92F.apk?mkey=5c41136cb711c35d&f=0c2f&fsname=com.tencent.moyu_1.4.0_1.apk&csr=1bbd&cip=183.17.229.168&proto=https")Observable<ResponseBody> downloadApkFile1();/*** 下载Apk2文件**/@Streaming@GET("https://cc849cacb0e96648f8dd4bb35ff8365b.dd.cdntips.com/imtt.dd.qq.com/16891/5BB89032B0755F5922C80DA8C2CAF735.apk?mkey=5c415b9fb711c35d&f=07b4&fsname=com.tencent.mobileqq_7.9.7_994.apk&csr=1bbd&cip=183.17.229.168&proto=https")Observable<ResponseBody> downloadApkFile2();/*** 下载Apk3文件**/@Streaming@GET("https://cc849cacb0e96648f8dd4bb35ff8365b.dd.cdntips.com/imtt.dd.qq.com/16891/BEC5EEF53983300D9F0AB46166EC9EA7.apk?mkey=5c41a20bda11e60f&f=184b&fsname=com.tencent.pao_1.0.61.0_161.apk&csr=1bbd&cip=218.17.192.250&proto=https")Observable<ResponseBody> downloadApkFile3();
}

下载进度Handler类

DownloadProgressHandler.java 

/*** 下载进度Handler** @author Kelly* @version 1.0.0* @filename DownloadProgressHandler.java* @time 2018/7/25 15:25* @copyright(C) 2018 song*/
public abstract class DownloadProgressHandler implements DownloadCallBack {public static final int DOWNLOAD_SUCCESS = 0;public static final int DOWNLOAD_PROGRESS = 1;public static final int DOWNLOAD_FAIL = 2;protected ResponseHandler mHandler = new ResponseHandler(this, Looper.getMainLooper());/*** 发送消息,更新进度** @param what* @param downloadInfo*/public void sendMessage(int what, DownloadInfo downloadInfo) {mHandler.obtainMessage(what, downloadInfo).sendToTarget();}/*** 处理消息* @param message*/protected void handleMessage(Message message) {DownloadInfo progressBean = (DownloadInfo) message.obj;switch (message.what) {case DOWNLOAD_SUCCESS://下载成功onCompleted(progressBean.getFile());removeMessage();break;case DOWNLOAD_PROGRESS://下载中onProgress(progressBean.getProgress(), progressBean.getFileSize(),progressBean.getSpeed());break;case DOWNLOAD_FAIL://下载失败onError(progressBean.getErrorMsg());break;default:removeMessage();break;}}private void removeMessage() {if (mHandler != null){mHandler.removeCallbacksAndMessages(null);}}protected static class ResponseHandler extends Handler {private DownloadProgressHandler mProgressHandler;public ResponseHandler(DownloadProgressHandler mProgressHandler, Looper looper) {super(looper);this.mProgressHandler = mProgressHandler;}@Overridepublic void handleMessage(Message msg) {mProgressHandler.handleMessage(msg);}}
}

 DownloadCallBack.java

/*** 下载回调*/public interface DownloadCallBack {/*** 进度,运行在主线程** @param progress 下载进度* @param total 总大小* @param speed 下载速率*/void onProgress(int progress, long total,long speed);/*** 运行在主线程** @param file*/void onCompleted(File file);/*** 运行在主线程** @param e*/void onError(Throwable e);}

文件下载信息类

DownloadInfo.java

/*** 下载文件信息** @author Kelly* @version 1.0.0* @filename DownloadInfo.java* @time 2018/7/25 14:27* @copyright(C) 2018 song*/
public class DownloadInfo {private File file;private String fileName;private long fileSize;//单位 byteprivate long currentSize;//当前下载大小private int progress;//当前下载进度private long speed;//下载速率private Throwable errorMsg;//下载异常信息public File getFile() {return file;}public void setFile(File file) {this.file = file;}public String getFileName() {return fileName;}public void setFileName(String fileName) {this.fileName = fileName;}public long getFileSize() {return fileSize;}public void setFileSize(long fileSize) {this.fileSize = fileSize;}public long getCurrentSize() {return currentSize;}public void setCurrentSize(long currentSize) {this.currentSize = currentSize;}public int getProgress() {return progress;}public void setProgress(int progress) {this.progress = progress;}public long getSpeed() {return speed;}public void setSpeed(long speed) {this.speed = speed;}public Throwable getErrorMsg() {return errorMsg;}public void setErrorMsg(Throwable errorMsg) {this.errorMsg = errorMsg;}
}

调用方法

 DownloadApi apiService = RetrofitHelper.getInstance().getApiService(DownloadApi.class);FileDownloader.downloadFile(apiService.downloadApkFile1(), DOWNLOAD_APK_PATH, "test.apk", new DownloadProgressHandler() {@Overridepublic void onProgress(int progress, long total, long speed) {LogUtils.i("progress:" + progress + ",speed:" + speed);mProgress.setText(progress + "%");mFileSize.setText(FileUtils.formatFileSize(total));mRate.setText(FileUtils.formatFileSize(speed)+"/s");}@Overridepublic void onCompleted(File file) {LogUtils.i("下载apk文件成功");FileDownloader.clear();}@Overridepublic void onError(Throwable e) {LogUtils.e("下载apk文件异常", e);FileDownloader.clear();}});

下载地址

CSDN:基于RxJava2.0+Retrofit2.0的文件下载实现(带进度,非覆写ResponseBody和拦截器)_rxjava下载文件-Android代码类资源-CSDN下载

GitHub:https://github.com/kellysong/android-blog-demo/tree/master/net-demo

关于下载进度问题:

本文测试的前提是创建OkHttpClient时,没有添加log拦截器。添加HttpLoggingInterceptor会影响下载进度观察,会出现下载出现卡顿一会(时间因文件而异),才触发下载进度回调,导致认为不是下载进度回调,原因是HttpLoggingInterceptor中的下面代码导致

BufferedSource source = responseBody.source();
source.request(Long.MAX_VALUE); // 卡主,运行在子线程
Buffer buffer = source.getBuffer();

上面代码已经在下载文件了。

解决方案:

  1. 取消HttpLoggingInterceptor
  2. HttpLoggingInterceptor设置更低级别的日志拦截,如NONE
  3. 使用修改的HttpLoggingInterceptor(对应okhttp版本 3.14.9)已经修复打开日志拦截器的情况下:文件下载和文件上传oom问题

生产上必须关HttpLoggingInterceptor,除非需要做特别的网络监控

关于Retrofit下载问题:

有人是说通过okhttp拦截器下载文件才是下载(官方例子也是这样实现),通过ResponseBody是写本地文件?事实是真是这样吗?
    
    
简单来说文件下载就是拿到文件的输入流,边读编写,服务端只是返回一个通道inputStream。先说一下,传统文件(这里解释为不用任何第三方封装的框架,这里使用自带的HttpURLConnection)下载使用示例:
    
   

private void downloadFile(String url) {long start = System.currentTimeMillis();InputStream is = null;FileOutputStream fos = null;HttpURLConnection httpConn;try {httpConn = (HttpURLConnection) new URL(url).openConnection();httpConn.setDoOutput(false);// 使用 URL 连接进行输出httpConn.setDoInput(true);// 使用 URL 连接进行输入httpConn.setRequestMethod("GET");// 设置URL请求方法httpConn.setConnectTimeout(40000);httpConn.setReadTimeout(40000);httpConn.setRequestProperty("Content-Type", "application/octet-stream");httpConn.setRequestProperty("Connection", "Keep-Alive");// 维持长连接httpConn.setRequestProperty("Charset", "UTF-8");//获取文件下载输入流is = httpConn.getInputStream();File file = new File(Environment.getExternalStorageDirectory(), "test.apk");fos = new FileOutputStream(file);int b;byte[] byArr = new byte[1024];while ((b = is.read(byArr)) != -1) {//写文件fos.write(byArr, 0, b);}long end = System.currentTimeMillis();System.out.println("下载耗时:" + (end - start) / 1000.0 + "s");} catch (Exception e) {e.printStackTrace();} finally {close(is);close(fos);}}public static void close(Closeable x) {if (x != null) {try {x.close();} catch (Exception e) {// skip}}}

看了上面的例子,是否对下载文件有了更清晰的认识。现在我们再来下看下retrofit文件下载的操作:

      @Streaming@GETCall<ResponseBody> downLoadFile(@Url String url);

上面的ResponseBody是okhttp3.ResponseBody包下下,有一个注解 @Streaming表示直接返回ResponseBody类型的数据,不读取到内存,可以理解为返回一个输入通道inputStream,也就是你可以用过这个返回来的body的bytestream循环写入文件,同时可以做下载进度的回调。

为什么是这样呢,要具体分析两个主要类:转换器类BuiltInConverters类ResponseBody类
    
BuiltInConverters

    

final class BuiltInConverters extends Converter.Factory {@Overridepublic Converter<ResponseBody, ?> responseBodyConverter(Type type, Annotation[] annotations,Retrofit retrofit) {if (type == ResponseBody.class) {return Utils.isAnnotationPresent(annotations, Streaming.class)? StreamingResponseBodyConverter.INSTANCE: BufferingResponseBodyConverter.INSTANCE;}if (type == Void.class) {return VoidResponseBodyConverter.INSTANCE;}return null;}//省略}

如果下载接口加了注解@Streaming就会用StreamingResponseBodyConverter,直接返回ResponseBody,否则就是BufferingResponseBodyConverter,而BufferingResponseBodyConverter是一次性读取到内存中的,实际容易出现OOM,这也就是为什么下载大文件时接口需要加上注解@Streaming。

ResponseBody

    

    public abstract class ResponseBody implements Closeable {/** Multiple calls to {@link #charStream()} must return the same instance. */private Reader reader;public abstract @Nullable MediaType contentType();/*** Returns the number of bytes in that will returned by {@link #bytes}, or {@link #byteStream}, or* -1 if unknown.*/public abstract long contentLength();//输入流public final InputStream byteStream() {return source().inputStream();}public abstract BufferedSource source();//省略}

拿到inputStream就可以进行文件写入,而`source().inputStream()`的InputStream(抽象类)在`RealBufferedSource`已经被重新定义。

 @Override public InputStream inputStream() {return new InputStream() {@Override public int read() throws IOException {if (closed) throw new IOException("closed");if (buffer.size == 0) {long count = source.read(buffer, Segment.SIZE);if (count == -1) return -1;}return buffer.readByte() & 0xff;}@Override public int read(byte[] data, int offset, int byteCount) throws IOException {if (closed) throw new IOException("closed");checkOffsetAndCount(data.length, offset, byteCount);if (buffer.size == 0) {long count = source.read(buffer, Segment.SIZE);if (count == -1) return -1;}return buffer.read(data, offset, byteCount);}//省略...};}

上面分析了@Streaming只是拿到一个下载通道,如果你不读取数据是不会下载的,也就不会回调拦截器进度,即OkHttp没有从输入流读取数据,哪怕下载请求响应已经返回。

下面分析下载进度问题:

当我们通过ResponseBody拿到InputStream ,调用inputStream.read(myBuffer)时,会触发 `read(byte[] data, int offset, int byteCount)`方法调用,
从方法可知okhttp使用`Buffer`(提高IO的处理效率),意思是buffer.size == 0,先通过source把数据读到Buffer,然后在从Buffer中读取myBuffer返回到`inputStream.read(myBuffer)`调用处,再把myBuffer写入文件。而如果重写了`ResponseBody`的话,会调用拦截器下面的ProgressResponseBody的source中的`read(Buffer sink, long byteCount)`方法。

 private  class ProgressResponseBody extends ResponseBody {private final ResponseBody responseBody;private BufferedSource bufferedSource;ProgressResponseBody(ResponseBody responseBody, ProgressListener progressListener) {this.responseBody = responseBody;this.progressListener = progressListener;}private Source source(Source source) {return new ForwardingSource(source) {long totalBytesRead = 0L;@Overridepublic long read(Buffer sink, long byteCount) throws IOException {long bytesRead = super.read(sink, byteCount);// read() returns the number of bytes read, or -1 if this source is exhausted.totalBytesRead += bytesRead != -1 ? bytesRead : 0;progressListener.update(totalBytesRead, responseBody.contentLength(), bytesRead == -1);return bytesRead;}};}//省略...}}

如果设置的缓冲区较小,会导致先把缓冲区的数据读完,给人感觉是上层循环多次读取,其实buffer已经存在,但是上层读取数据大小进度是跟拦截器中ProgressResponseBody的进度是等价的。如果读完了buffer,不去调用`source.read(buffer, Segment.SIZE)`,数据是没有从服务端返回。缓冲区的数据读取是非常快的。故通过ResponseBody监听文件下载进度是没有问题的。

小结

文件下载,连接过程本身需要一定的时间,然后是文件下载的IO读写,个人觉得通过ResponseBody下载文件和监听下载进度,相比通过拦截器处理更简单,可以更方便处理多任务下载进度问题。如有不对请指出,会立马纠正。

其它文章:

基于RxJava2.0+Retrofit2.0超大文件分块(分片)上传(带进度)

这篇关于基于RxJava2.0+Retrofit2.0的文件下载实现(带进度,非覆写ResponseBody和拦截器)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1014976

相关文章

在Ubuntu上部署SpringBoot应用的操作步骤

《在Ubuntu上部署SpringBoot应用的操作步骤》随着云计算和容器化技术的普及,Linux服务器已成为部署Web应用程序的主流平台之一,Java作为一种跨平台的编程语言,具有广泛的应用场景,本... 目录一、部署准备二、安装 Java 环境1. 安装 JDK2. 验证 Java 安装三、安装 mys

Springboot的ThreadPoolTaskScheduler线程池轻松搞定15分钟不操作自动取消订单

《Springboot的ThreadPoolTaskScheduler线程池轻松搞定15分钟不操作自动取消订单》:本文主要介绍Springboot的ThreadPoolTaskScheduler线... 目录ThreadPoolTaskScheduler线程池实现15分钟不操作自动取消订单概要1,创建订单后

JAVA中整型数组、字符串数组、整型数和字符串 的创建与转换的方法

《JAVA中整型数组、字符串数组、整型数和字符串的创建与转换的方法》本文介绍了Java中字符串、字符数组和整型数组的创建方法,以及它们之间的转换方法,还详细讲解了字符串中的一些常用方法,如index... 目录一、字符串、字符数组和整型数组的创建1、字符串的创建方法1.1 通过引用字符数组来创建字符串1.2

python使用watchdog实现文件资源监控

《python使用watchdog实现文件资源监控》watchdog支持跨平台文件资源监控,可以检测指定文件夹下文件及文件夹变动,下面我们来看看Python如何使用watchdog实现文件资源监控吧... python文件监控库watchdogs简介随着Python在各种应用领域中的广泛使用,其生态环境也

el-select下拉选择缓存的实现

《el-select下拉选择缓存的实现》本文主要介绍了在使用el-select实现下拉选择缓存时遇到的问题及解决方案,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的... 目录项目场景:问题描述解决方案:项目场景:从左侧列表中选取字段填入右侧下拉多选框,用户可以对右侧

SpringCloud集成AlloyDB的示例代码

《SpringCloud集成AlloyDB的示例代码》AlloyDB是GoogleCloud提供的一种高度可扩展、强性能的关系型数据库服务,它兼容PostgreSQL,并提供了更快的查询性能... 目录1.AlloyDBjavascript是什么?AlloyDB 的工作原理2.搭建测试环境3.代码工程1.

Java调用Python代码的几种方法小结

《Java调用Python代码的几种方法小结》Python语言有丰富的系统管理、数据处理、统计类软件包,因此从java应用中调用Python代码的需求很常见、实用,本文介绍几种方法从java调用Pyt... 目录引言Java core使用ProcessBuilder使用Java脚本引擎总结引言python

SpringBoot操作spark处理hdfs文件的操作方法

《SpringBoot操作spark处理hdfs文件的操作方法》本文介绍了如何使用SpringBoot操作Spark处理HDFS文件,包括导入依赖、配置Spark信息、编写Controller和Ser... 目录SpringBoot操作spark处理hdfs文件1、导入依赖2、配置spark信息3、cont

springboot整合 xxl-job及使用步骤

《springboot整合xxl-job及使用步骤》XXL-JOB是一个分布式任务调度平台,用于解决分布式系统中的任务调度和管理问题,文章详细介绍了XXL-JOB的架构,包括调度中心、执行器和Web... 目录一、xxl-job是什么二、使用步骤1. 下载并运行管理端代码2. 访问管理页面,确认是否启动成功

Java中的密码加密方式

《Java中的密码加密方式》文章介绍了Java中使用MD5算法对密码进行加密的方法,以及如何通过加盐和多重加密来提高密码的安全性,MD5是一种不可逆的哈希算法,适合用于存储密码,因为其输出的摘要长度固... 目录Java的密码加密方式密码加密一般的应用方式是总结Java的密码加密方式密码加密【这里采用的