本文主要是介绍RxJava 2.x 之图解创建、订阅、发射流程,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
- 从一个例子开始
- 创建过程
- 订阅过程
- 发射过程
- 小结
从一个例子开始
Observable.create(new ObservableOnSubscribe<Integer>() {@Overridepublic void subscribe(ObservableEmitter<Integer> emitter) throws Exception {for (int i = 0; i < 3; i++) {emitter.onNext(i);}emitter.onComplete();Log.d(TAG, "subscribe " + Thread.currentThread().getName());}}).subscribeOn(Schedulers.newThread()).map(new Function<Integer, String>() {@Overridepublic String apply(Integer value) throws Exception {Log.d(TAG, "apply " + Thread.currentThread().getName());return "apply " + value;}}).observeOn(AndroidSchedulers.mainThread()).subscribeWith(new ResourceObserver<String>() {@Overridepublic void onNext(String value) {Log.d(TAG, "onNext " + value);}@Overridepublic void onError(Throwable e) {Log.d(TAG, "onError");}@Overridepublic void onComplete() {Log.d(TAG, "onComplete " + Thread.currentThread().getName());}});
来看看输出:
10-26 16:55:17.418 32696-561/com.onzhou.study D/MainActivity: apply RxNewThreadScheduler-1
10-26 16:55:17.418 32696-561/com.onzhou.study D/MainActivity: apply RxNewThreadScheduler-1
10-26 16:55:17.418 32696-561/com.onzhou.study D/MainActivity: create RxNewThreadScheduler-1
10-26 16:55:17.427 32696-32696/com.onzhou.study D/MainActivity: onNext apply 0
10-26 16:55:17.427 32696-32696/com.onzhou.study D/MainActivity: onNext apply 1
10-26 16:55:17.427 32696-32696/com.onzhou.study D/MainActivity: onNext apply 2
10-26 16:55:17.427 32696-32696/com.onzhou.study D/MainActivity: onComplete main
可以看到创建
,发送
,转换
过程都在子线程中
,而最后的回调是在主线程中
整个过程笔者整理成一张图,一步一步来跟进分析
创建过程
- 第一步:通过
create操作符
创建了一个ObservableCreate
类型的Observable
,由于是基于匿名内部类
创建的,因此持有的是实现了ObservableOnSubscribe
接口的HomeActivity实例
- 第二步:通过
subscribeOn操作符
创建了一个ObservableSubscribeOn
类型的Observable
,且其内部的source
持有上个步骤的ObservableCreate实例
- 第三步:通过
map操作符
创建了一个ObservableMap
类型的Observable
,且其内部持有上个步骤传入的ObservableSubscribeOn实例
- 第四步:通过
observeOn操作符
创建了一个ObservableObserveOn
类型的Observable
,且其内部持有上个步骤的ObservableMap实例
- 第五步:通过
subscribeWith方法
完成订阅,由于是基于匿名内部类创建的,因此传入的实际上是实现了ResourceObserver
的HomeActivity实例
订阅过程
上述的几个步骤其实已经完成的基本的创建过程了,最后我们拿到的实际是ObservableObserveOn
的实例,下面开始订阅
流程。
- 第一步:
subscribeWith方法
,传入的observer
是实现了ResourceObserver接口
的HomeActivity实例
,通过subscribeActual
发起订阅,内部实际调用的是source.subscribe
方法,由于source
持有的是上面传入的ObservableMap实例
,因此这一步骤实际调用的是,ObservableMap实例
中的subscribe
方法,传入的参数就是ObserveOnObserver实例(构造参数主要是实现了ResourceObserver的实例即:HomeActivity)
。
- 第二步:进入
ObservableMap实例
的subscribe
方法中,通过subscribeActual
发起订阅,实际调用的是source.subscribe
方法,传入的是MapObserver实例(构造参数为之前传递的ObserveOnObserver实例)
,由于source
持有的是ObservableSubscribeOn的实例
,因此最终调用的其实是ObservableSubscribeOn实例
中的subscribe
方法
- 第三步:进入
ObservableSubscribeOn实例
的subscribe
方法中,通过subscribeActual
发起订阅,完成MapObserver实例对SubscribeOnObserver的订阅
,接着由NewThreadScheduler线程调度器
完成对应的任务(该任务的执行是在线程中执行的),SubscribeTask实现了Runnable接口
,最终会回调run
方法,执行source.subscribe
方法,这里的source
持有的就是最开始的ObservableCreate实例
@Overridepublic void subscribeActual(final Observer<? super T> s) {final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);//这里的s就是上个步骤的MapObserver实例s.onSubscribe(parent);//这里的scheduler就是我们最开始指定的Schedulers.newThread 即NewThreadScheduler线程调度器parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
- 第四步:进入
ObservableCreate实例
的subscribe
方法中,通过subscribeActual
发起订阅,这里的source
持有的是HomeActivity实例
,直接调用subscribe
方法,传入参数是构建的最顶层的发射器CreateEmitter实例
- 第五步:上述的几个过程实际已经完成了订阅的过程,最后经过层层传递,持有的最顶层的是
CreateEmitter实例
,即我们最终的被观察者
发射过程
上述的过程已经完成了订阅过程,在最后订阅完成之后,最终会通过source.subscribe
方法,其实就是调用HomeActivity实例的subscribe方法
,完成元素发射
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {for (int i = 0; i < 3; i++) {emitter.onNext(i);}emitter.onComplete();Log.d(TAG, "subscribe " + Thread.currentThread().getName());
}
我们在最顶层的被观察者
里通过ObservableEmitter实例
的onNext
方法完成元素的发射
,最终又会通过一层一层的Observer
转发到最原始的实现了ResourceObserver接口
的观察者中来
注意:
- 这里的
被观察者
里的所有发射过程实际上都是在NewThreadScheduler线程调度器
分配的线程里完成的 - 当
发射的元素
被传递到下层的ObservableObserveOn类
中的ObserveOnObserver实例
的onNext方法
,实际执行的是HandlerScheduler.HandlerWorker
的schedule
方法,最终就是通过我们持有的主线程的handler
来切换到主线程中
小结
整个创建过程
,订阅过程
,发射过程
看起来山路十八弯
,但是如果你一步一步跟进查看,会发现整个流程实际上是很清晰的,整个过程起点
和终点
很明确,
而中间产生的一系列Observable
和Observer
你都可以看作是代理类
,用来转发订阅
以及最终的元素发射
这篇关于RxJava 2.x 之图解创建、订阅、发射流程的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!