本文主要是介绍基于RxJava2.0+Retrofit2.0的文件下载实现(带进度,非覆写ResponseBody和拦截器),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
基于RxJava2.0+Retrofit2.0的文件下载实现(带进度,非覆写ResponseBody和拦截器)
前言
本文主要讲解RxJava2.0+Retrofit2.0实现下载文件并带进度效果,如果按照传统方法是很容易实现的。但是,发现网上搜索的例子都是通过OkHttpClient的拦截器去拦截Response来实现进度显示(侵入性有点强),个人发现bug不少,问题都是在UI更新方面出了问题,只要记住UI刷新在主线程更新都容易解决,下面介绍两种非修改拦截器实现文件下载的方法:
效果图
图中下载速度较快,是基于在公司专线环境测试
依赖添加
/*Retrofit是一款类型安全的网络框架,基于HTTP协议,服务于Android和java语言,集成了okhttp依赖*/compile 'com.squareup.retrofit2:retrofit:2.3.0'compile 'com.squareup.retrofit2:converter-gson:2.3.0'compile 'com.squareup.retrofit2:adapter-rxjava2:2.3.0'/*RxAndroid一款Android客户端组件间异步通信的框架,1和2差别很大*/compile 'io.reactivex.rxjava2:rxandroid:2.0.1'compile 'io.reactivex.rxjava2:rxjava:2.1.8'
具体实现
方法1:使用Handler更新下载进度
/*** 下载文件法1(使用Handler更新UI)** @param observable 下载被观察者* @param destDir 下载目录* @param fileName 文件名* @param progressHandler 进度handler*/public static void downloadFile(Observable<ResponseBody> observable, final String destDir, final String fileName, final DownloadProgressHandler progressHandler) {final DownloadInfo downloadInfo = new DownloadInfo();observable.subscribeOn(Schedulers.io()).observeOn(Schedulers.io()).subscribe(new Observer<ResponseBody>() {@Overridepublic void onSubscribe(Disposable d) {addDisposable(d);}@Overridepublic void onNext(ResponseBody responseBody) {InputStream inputStream = null;long total = 0;long responseLength;FileOutputStream fos = null;try {byte[] buf = new byte[1024 * 8];int len;responseLength = responseBody.contentLength();inputStream = responseBody.byteStream();final File file = new File(destDir, fileName);downloadInfo.setFile(file);downloadInfo.setFileSize(responseLength);File dir = new File(destDir);if (!dir.exists()) {dir.mkdirs();}fos = new FileOutputStream(file);int progress = 0;int lastProgress=-1;long startTime = System.currentTimeMillis(); // 开始下载时获取开始时间while ((len = inputStream.read(buf)) != -1) {fos.write(buf, 0, len);total += len;progress = (int) (total * 100 / responseLength);long curTime = System.currentTimeMillis();long usedTime = (curTime - startTime) / 1000;if (usedTime == 0) {usedTime = 1;}long speed = (total / usedTime); // 平均每秒下载速度// 如果进度与之前进度相等,则不更新,如果更新太频繁,则会造成界面卡顿if (progress != lastProgress) {downloadInfo.setSpeed(speed);downloadInfo.setProgress(progress);downloadInfo.setCurrentSize(total);progressHandler.sendMessage(DownloadProgressHandler.DOWNLOAD_PROGRESS, downloadInfo);}lastProgress = progress;}fos.flush();downloadInfo.setFile(file);progressHandler.sendMessage(DownloadProgressHandler.DOWNLOAD_SUCCESS, downloadInfo);} catch (final Exception e) {downloadInfo.setErrorMsg(e);progressHandler.sendMessage(DownloadProgressHandler.DOWNLOAD_FAIL, downloadInfo);} finally {try {if (fos != null) {fos.close();}if (inputStream != null) {inputStream.close();}} catch (Exception e) {e.printStackTrace();}}}@Overridepublic void onError(Throwable e) {//new Consumer<Throwable>downloadInfo.setErrorMsg(e);progressHandler.sendMessage(DownloadProgressHandler.DOWNLOAD_FAIL, downloadInfo);}@Overridepublic void onComplete() {// new Action()}});}
方法2:使用RxJava发射器更新下载进度
/*** 下载文件法2(使用RXJava更新UI)** @param observable* @param destDir* @param fileName* @param progressHandler*/public static void downloadFile2(Observable<ResponseBody> observable, final String destDir, final String fileName, final DownloadProgressHandler progressHandler) {final DownloadInfo downloadInfo = new DownloadInfo();observable.subscribeOn(Schedulers.io()).unsubscribeOn(Schedulers.io()).flatMap(new Function<ResponseBody, ObservableSource<DownloadInfo>>() {@Overridepublic ObservableSource<DownloadInfo> apply(final ResponseBody responseBody) throws Exception {return Observable.create(new ObservableOnSubscribe<DownloadInfo>() {@Overridepublic void subscribe(ObservableEmitter<DownloadInfo> emitter) throws Exception {InputStream inputStream = null;long total = 0;long responseLength = 0;FileOutputStream fos = null;try {byte[] buf = new byte[1024 * 8];int len = 0;responseLength = responseBody.contentLength();inputStream = responseBody.byteStream();final File file = new File(destDir, fileName);downloadInfo.setFile(file);downloadInfo.setFileSize(responseLength);File dir = new File(destDir);if (!dir.exists()) {dir.mkdirs();}fos = new FileOutputStream(file);int progress = 0;int lastProgress = -1;long startTime = System.currentTimeMillis(); // 开始下载时获取开始时间while ((len = inputStream.read(buf)) != -1) {fos.write(buf, 0, len);total += len;progress = (int) (total * 100 / responseLength);long curTime = System.currentTimeMillis();long usedTime = (curTime - startTime) / 1000;if (usedTime == 0) {usedTime = 1;}long speed = (total / usedTime); // 平均每秒下载速度// 如果进度与之前进度相等,则不更新,如果更新太频繁,则会造成界面卡顿if (progress != lastProgress) {downloadInfo.setSpeed(speed);downloadInfo.setProgress(progress);downloadInfo.setCurrentSize(total);emitter.onNext(downloadInfo);}lastProgress = progress;}fos.flush();downloadInfo.setFile(file);emitter.onComplete();} catch (Exception e) {downloadInfo.setErrorMsg(e);emitter.onError(e);} finally {try {if (fos != null) {fos.close();}if (inputStream != null) {inputStream.close();}} catch (Exception e) {e.printStackTrace();}}}});}}).observeOn(AndroidSchedulers.mainThread()).subscribe(new Observer<DownloadInfo>() {@Overridepublic void onSubscribe(Disposable d) {addDisposable(d);}@Overridepublic void onNext(DownloadInfo downloadInfo) {progressHandler.onProgress(downloadInfo.getProgress(), downloadInfo.getFileSize(), downloadInfo.getSpeed());}@Overridepublic void onError(Throwable e) {progressHandler.onError(e);}@Overridepublic void onComplete() {LogUtils.i("下载完成");progressHandler.onCompleted(downloadInfo.getFile());}});}
相关代码:
api服务接口类
DownloadApi.java
public interface DownloadApi {/*** 下载Apk1文件**/@Streaming@GET("imtt.dd.qq.com/16891/C527A902F14C1FFD8AA9C13872D5F92F.apk?mkey=5c41136cb711c35d&f=0c2f&fsname=com.tencent.moyu_1.4.0_1.apk&csr=1bbd&cip=183.17.229.168&proto=https")Observable<ResponseBody> downloadApkFile1();/*** 下载Apk2文件**/@Streaming@GET("https://cc849cacb0e96648f8dd4bb35ff8365b.dd.cdntips.com/imtt.dd.qq.com/16891/5BB89032B0755F5922C80DA8C2CAF735.apk?mkey=5c415b9fb711c35d&f=07b4&fsname=com.tencent.mobileqq_7.9.7_994.apk&csr=1bbd&cip=183.17.229.168&proto=https")Observable<ResponseBody> downloadApkFile2();/*** 下载Apk3文件**/@Streaming@GET("https://cc849cacb0e96648f8dd4bb35ff8365b.dd.cdntips.com/imtt.dd.qq.com/16891/BEC5EEF53983300D9F0AB46166EC9EA7.apk?mkey=5c41a20bda11e60f&f=184b&fsname=com.tencent.pao_1.0.61.0_161.apk&csr=1bbd&cip=218.17.192.250&proto=https")Observable<ResponseBody> downloadApkFile3();
}
下载进度Handler类
DownloadProgressHandler.java
/*** 下载进度Handler** @author Kelly* @version 1.0.0* @filename DownloadProgressHandler.java* @time 2018/7/25 15:25* @copyright(C) 2018 song*/
public abstract class DownloadProgressHandler implements DownloadCallBack {public static final int DOWNLOAD_SUCCESS = 0;public static final int DOWNLOAD_PROGRESS = 1;public static final int DOWNLOAD_FAIL = 2;protected ResponseHandler mHandler = new ResponseHandler(this, Looper.getMainLooper());/*** 发送消息,更新进度** @param what* @param downloadInfo*/public void sendMessage(int what, DownloadInfo downloadInfo) {mHandler.obtainMessage(what, downloadInfo).sendToTarget();}/*** 处理消息* @param message*/protected void handleMessage(Message message) {DownloadInfo progressBean = (DownloadInfo) message.obj;switch (message.what) {case DOWNLOAD_SUCCESS://下载成功onCompleted(progressBean.getFile());removeMessage();break;case DOWNLOAD_PROGRESS://下载中onProgress(progressBean.getProgress(), progressBean.getFileSize(),progressBean.getSpeed());break;case DOWNLOAD_FAIL://下载失败onError(progressBean.getErrorMsg());break;default:removeMessage();break;}}private void removeMessage() {if (mHandler != null){mHandler.removeCallbacksAndMessages(null);}}protected static class ResponseHandler extends Handler {private DownloadProgressHandler mProgressHandler;public ResponseHandler(DownloadProgressHandler mProgressHandler, Looper looper) {super(looper);this.mProgressHandler = mProgressHandler;}@Overridepublic void handleMessage(Message msg) {mProgressHandler.handleMessage(msg);}}
}
DownloadCallBack.java
/*** 下载回调*/public interface DownloadCallBack {/*** 进度,运行在主线程** @param progress 下载进度* @param total 总大小* @param speed 下载速率*/void onProgress(int progress, long total,long speed);/*** 运行在主线程** @param file*/void onCompleted(File file);/*** 运行在主线程** @param e*/void onError(Throwable e);}
文件下载信息类
DownloadInfo.java
/*** 下载文件信息** @author Kelly* @version 1.0.0* @filename DownloadInfo.java* @time 2018/7/25 14:27* @copyright(C) 2018 song*/
public class DownloadInfo {private File file;private String fileName;private long fileSize;//单位 byteprivate long currentSize;//当前下载大小private int progress;//当前下载进度private long speed;//下载速率private Throwable errorMsg;//下载异常信息public File getFile() {return file;}public void setFile(File file) {this.file = file;}public String getFileName() {return fileName;}public void setFileName(String fileName) {this.fileName = fileName;}public long getFileSize() {return fileSize;}public void setFileSize(long fileSize) {this.fileSize = fileSize;}public long getCurrentSize() {return currentSize;}public void setCurrentSize(long currentSize) {this.currentSize = currentSize;}public int getProgress() {return progress;}public void setProgress(int progress) {this.progress = progress;}public long getSpeed() {return speed;}public void setSpeed(long speed) {this.speed = speed;}public Throwable getErrorMsg() {return errorMsg;}public void setErrorMsg(Throwable errorMsg) {this.errorMsg = errorMsg;}
}
调用方法
DownloadApi apiService = RetrofitHelper.getInstance().getApiService(DownloadApi.class);FileDownloader.downloadFile(apiService.downloadApkFile1(), DOWNLOAD_APK_PATH, "test.apk", new DownloadProgressHandler() {@Overridepublic void onProgress(int progress, long total, long speed) {LogUtils.i("progress:" + progress + ",speed:" + speed);mProgress.setText(progress + "%");mFileSize.setText(FileUtils.formatFileSize(total));mRate.setText(FileUtils.formatFileSize(speed)+"/s");}@Overridepublic void onCompleted(File file) {LogUtils.i("下载apk文件成功");FileDownloader.clear();}@Overridepublic void onError(Throwable e) {LogUtils.e("下载apk文件异常", e);FileDownloader.clear();}});
下载地址
CSDN:基于RxJava2.0+Retrofit2.0的文件下载实现(带进度,非覆写ResponseBody和拦截器)_rxjava下载文件-Android代码类资源-CSDN下载
GitHub:https://github.com/kellysong/android-blog-demo/tree/master/net-demo
关于下载进度问题:
本文测试的前提是创建OkHttpClient时,没有添加log拦截器。添加HttpLoggingInterceptor会影响下载进度观察,会出现下载出现卡顿一会(时间因文件而异),才触发下载进度回调,导致认为不是下载进度回调,原因是HttpLoggingInterceptor中的下面代码导致
BufferedSource source = responseBody.source();
source.request(Long.MAX_VALUE); // 卡主,运行在子线程
Buffer buffer = source.getBuffer();
上面代码已经在下载文件了。
解决方案:
- 取消HttpLoggingInterceptor
- HttpLoggingInterceptor设置更低级别的日志拦截,如NONE
- 使用修改的HttpLoggingInterceptor(对应okhttp版本 3.14.9),已经修复打开日志拦截器的情况下:文件下载和文件上传oom问题
生产上必须关HttpLoggingInterceptor,除非需要做特别的网络监控
关于Retrofit下载问题:
有人是说通过okhttp拦截器下载文件才是下载(官方例子也是这样实现),通过ResponseBody是写本地文件?事实是真是这样吗?
简单来说文件下载就是拿到文件的输入流,边读编写,服务端只是返回一个通道inputStream。先说一下,传统文件(这里解释为不用任何第三方封装的框架,这里使用自带的HttpURLConnection)下载使用示例:
private void downloadFile(String url) {long start = System.currentTimeMillis();InputStream is = null;FileOutputStream fos = null;HttpURLConnection httpConn;try {httpConn = (HttpURLConnection) new URL(url).openConnection();httpConn.setDoOutput(false);// 使用 URL 连接进行输出httpConn.setDoInput(true);// 使用 URL 连接进行输入httpConn.setRequestMethod("GET");// 设置URL请求方法httpConn.setConnectTimeout(40000);httpConn.setReadTimeout(40000);httpConn.setRequestProperty("Content-Type", "application/octet-stream");httpConn.setRequestProperty("Connection", "Keep-Alive");// 维持长连接httpConn.setRequestProperty("Charset", "UTF-8");//获取文件下载输入流is = httpConn.getInputStream();File file = new File(Environment.getExternalStorageDirectory(), "test.apk");fos = new FileOutputStream(file);int b;byte[] byArr = new byte[1024];while ((b = is.read(byArr)) != -1) {//写文件fos.write(byArr, 0, b);}long end = System.currentTimeMillis();System.out.println("下载耗时:" + (end - start) / 1000.0 + "s");} catch (Exception e) {e.printStackTrace();} finally {close(is);close(fos);}}public static void close(Closeable x) {if (x != null) {try {x.close();} catch (Exception e) {// skip}}}
看了上面的例子,是否对下载文件有了更清晰的认识。现在我们再来下看下retrofit文件下载的操作:
@Streaming@GETCall<ResponseBody> downLoadFile(@Url String url);
上面的ResponseBody是okhttp3.ResponseBody包下下,有一个注解 @Streaming表示直接返回ResponseBody类型的数据,不读取到内存,可以理解为返回一个输入通道inputStream,也就是你可以用过这个返回来的body的bytestream循环写入文件,同时可以做下载进度的回调。
为什么是这样呢,要具体分析两个主要类:转换器类BuiltInConverters类和 ResponseBody类。
BuiltInConverters
final class BuiltInConverters extends Converter.Factory {@Overridepublic Converter<ResponseBody, ?> responseBodyConverter(Type type, Annotation[] annotations,Retrofit retrofit) {if (type == ResponseBody.class) {return Utils.isAnnotationPresent(annotations, Streaming.class)? StreamingResponseBodyConverter.INSTANCE: BufferingResponseBodyConverter.INSTANCE;}if (type == Void.class) {return VoidResponseBodyConverter.INSTANCE;}return null;}//省略}
如果下载接口加了注解@Streaming就会用StreamingResponseBodyConverter,直接返回ResponseBody,否则就是BufferingResponseBodyConverter,而BufferingResponseBodyConverter是一次性读取到内存中的,实际容易出现OOM,这也就是为什么下载大文件时接口需要加上注解@Streaming。
ResponseBody
public abstract class ResponseBody implements Closeable {/** Multiple calls to {@link #charStream()} must return the same instance. */private Reader reader;public abstract @Nullable MediaType contentType();/*** Returns the number of bytes in that will returned by {@link #bytes}, or {@link #byteStream}, or* -1 if unknown.*/public abstract long contentLength();//输入流public final InputStream byteStream() {return source().inputStream();}public abstract BufferedSource source();//省略}
拿到inputStream就可以进行文件写入,而`source().inputStream()`的InputStream(抽象类)在`RealBufferedSource`已经被重新定义。
@Override public InputStream inputStream() {return new InputStream() {@Override public int read() throws IOException {if (closed) throw new IOException("closed");if (buffer.size == 0) {long count = source.read(buffer, Segment.SIZE);if (count == -1) return -1;}return buffer.readByte() & 0xff;}@Override public int read(byte[] data, int offset, int byteCount) throws IOException {if (closed) throw new IOException("closed");checkOffsetAndCount(data.length, offset, byteCount);if (buffer.size == 0) {long count = source.read(buffer, Segment.SIZE);if (count == -1) return -1;}return buffer.read(data, offset, byteCount);}//省略...};}
上面分析了@Streaming只是拿到一个下载通道,如果你不读取数据是不会下载的,也就不会回调拦截器进度,即OkHttp没有从输入流读取数据,哪怕下载请求响应已经返回。
下面分析下载进度问题:
当我们通过ResponseBody拿到InputStream ,调用inputStream.read(myBuffer)时,会触发 `read(byte[] data, int offset, int byteCount)`方法调用,
从方法可知okhttp使用`Buffer`(提高IO的处理效率),意思是buffer.size == 0,先通过source把数据读到Buffer,然后在从Buffer中读取myBuffer返回到`inputStream.read(myBuffer)`调用处,再把myBuffer写入文件。而如果重写了`ResponseBody`的话,会调用拦截器下面的ProgressResponseBody的source中的`read(Buffer sink, long byteCount)`方法。
private class ProgressResponseBody extends ResponseBody {private final ResponseBody responseBody;private BufferedSource bufferedSource;ProgressResponseBody(ResponseBody responseBody, ProgressListener progressListener) {this.responseBody = responseBody;this.progressListener = progressListener;}private Source source(Source source) {return new ForwardingSource(source) {long totalBytesRead = 0L;@Overridepublic long read(Buffer sink, long byteCount) throws IOException {long bytesRead = super.read(sink, byteCount);// read() returns the number of bytes read, or -1 if this source is exhausted.totalBytesRead += bytesRead != -1 ? bytesRead : 0;progressListener.update(totalBytesRead, responseBody.contentLength(), bytesRead == -1);return bytesRead;}};}//省略...}}
如果设置的缓冲区较小,会导致先把缓冲区的数据读完,给人感觉是上层循环多次读取,其实buffer已经存在,但是上层读取数据大小进度是跟拦截器中ProgressResponseBody的进度是等价的。如果读完了buffer,不去调用`source.read(buffer, Segment.SIZE)`,数据是没有从服务端返回。缓冲区的数据读取是非常快的。故通过ResponseBody监听文件下载进度是没有问题的。
小结
文件下载,连接过程本身需要一定的时间,然后是文件下载的IO读写,个人觉得通过ResponseBody下载文件和监听下载进度,相比通过拦截器处理更简单,可以更方便处理多任务下载进度问题。如有不对请指出,会立马纠正。
其它文章:
基于RxJava2.0+Retrofit2.0超大文件分块(分片)上传(带进度)
这篇关于基于RxJava2.0+Retrofit2.0的文件下载实现(带进度,非覆写ResponseBody和拦截器)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!