用RxJava实现Rxbus替换EventBus事件总线

2024-06-01 07:58

本文主要是介绍用RxJava实现Rxbus替换EventBus事件总线,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

     首先,Rxjava不必多说,可以说和Retrofit是年度最火框架,在GitHub上都已经超过两万star,Eventbus也不必多说,目前大多数开发者大多数项目一定会用到EventBus或者Otto作为事件总线通信库,对于RxJava使用者来说,RxJava也可以轻松实现事件总线,因为它们都依据于观察者模式。本文介绍Rxbus如何完美替换Eventbus,减少APP体积.

不多说,上代码


/*** 作者:JackyZ fenglizizhu* 说明:事件的订阅*/
public class RxBus {private static volatile RxBus mDefaultInstance;private final Subject<Object,Object> mBus;private final Map<Class<?>,Object> mStickEventMap;public RxBus() {//将Subject(PublishSubject)转换为SerializedSubjectmBus = new SerializedSubject<>(PublishSubject.create());mStickEventMap=new ConcurrentHashMap<>();}/*单例*/public static RxBus getDefault(){if(mDefaultInstance==null){synchronized (RxBus.class){if(mDefaultInstance==null){mDefaultInstance=new RxBus();}}}return mDefaultInstance;}/*发送事件*/public void post(Object event){if(mBus == null) {mDefaultInstance=new RxBus();}mBus.onNext(event);}/*订阅事件*/public <T> Observable<T> toObservable(Class<T> eventType){return  mBus.ofType(eventType);//ofType可以根据事件类型发送指定数据}/*发送一个新的Sticky事件*/public void postSticky(Object event){synchronized (mStickEventMap){//线程锁mStickEventMap.put(event.getClass(),event);//将事件类型保存到Map集合}post(event);}/*订阅Sticky事件*/public <T> Observable<T> tObservableStick(final Class<T> eventType){synchronized (mStickEventMap){final Observable<T> observable=mBus.ofType(eventType);//获取发送事件的Observablefinal Object event=mStickEventMap.get(eventType);//根据事件类型作为key查找value对应的valueif(null!=event){return  observable.mergeWith(Observable.create(new Observable.OnSubscribe<T>(){//通过mergeWith合并两个Observable@Overridepublic void call(Subscriber<? super T> subscriber) {//订阅 eventType.cast(event)直接将eventType转换为 T发送subscriber.onNext(eventType.cast(event));}}));}else {return observable;//如果没有sticky 就返回observable}}}/*根据eventType获取事件*/public <T> T getStickEvent(Class<T> eventType){synchronized (mStickEventMap){return  eventType.cast(mStickEventMap.get(eventType));}}/*移除指定类型的eventType的Sticky事件*/public <T> T removeStickyEvent(Class<T> eventType){synchronized (mStickEventMap){return eventType.cast(mStickEventMap.remove(eventType));}}/*移除所有的Sticky事件*/public void removeAllStickyEvents(){synchronized (mStickEventMap){mStickEventMap.clear();}}

开始分析

注:
1、Subject同时充当了Observer和Observable的角色,Subject是非线程安全的,要避免该问题,需要将 Subject转换为一个 SerializedSubject ,上述RxBus类中把线程非安全的PublishSubject包装成线程安全的Subject。

2、PublishSubject只会把在订阅发生的时间点之后来自原始Observable的数据发射给观察者。

3、ofType操作符只发射指定类型的数据,其内部就是filter+cast

public final <R> Observable<R> ofType(final Class<R> klass) {return filter(new Func1<T, Boolean>() {@Overridepublic final Boolean call(T t) {return klass.isInstance(t);}}).cast(klass);
}

filter操作符可以使你提供一个指定的测试数据项,只有通过测试的数据才会被“发射”。
cast操作符可以将一个Observable转换成指定类型的Observable。

分析:


RxBus工作流程图


1、首先创建一个可同时充当Observer和Observable的Subject;

2、在需要接收事件的地方,订阅该Subject(此时Subject是作为Observable),在这之后,一旦Subject接收到事件,立即发射给该订阅者;

3、在我们需要发送事件的地方,将事件post至Subject,此时Subject作为Observer接收到事件(onNext),然后会发射给所有订阅该Subject的订阅者。

对于RxBus的使用,就和普通的RxJava订阅事件很相似了。
先看发送事件的代码:

RxBus.getDefault().post(new UserEvent (1, "yoyo"));

userEvent是要发送的事件,如果你用过EventBus, 很容易理解,UserEvent的代码:

public class UserEvent {long id;String name;public UserEvent(long id,String name) {this.id= id;this.name= name;}public long getId() {return id;}public String getName() {return name;}
}

再看接收事件的代码:

// rxSubscription是一个Subscription的全局变量,这段代码可以在onCreate/onStart等生命周期内
rxSubscription = RxBus.getDefault().toObserverable(UserEvent.class).subscribe(new Action1<UserEvent>() {@Overridepublic void call(UserEvent userEvent) {long id = userEvent.getId();String name = userEvent.getName();...}},new Action1<Throwable>() {@Overridepublic void call(Throwable throwable) {// TODO: 处理异常}        });

最后,一定要记得在生命周期结束的地方取消订阅事件,防止RxJava可能会引起的内存泄漏问题。

@Override
protected void onDestroy() {super.onDestroy();if(!rxSubscription.isUnsubscribed()) {rxSubscription.unsubscribe();}
}


如果你的项目已经开始使用RxJava,建议可以把EventBus或Otto替换成RxBus,减小项目体积。


支持Sticky事件

在Android开发中,Sticky事件只指事件消费者在事件发布之后才注册的也能接收到该事件的特殊类型。Android中就有这样的实例,也就是Sticky Broadcast,即粘性广播。正常情况下如果发送者发送了某个广播,而接收者在这个广播发送后才注册自己的Receiver,这时接收者便无法接收到刚才的广播,为此Android引入了StickyBroadcast,在广播发送结束后会保存刚刚发送的广播(Intent),这样当接收者注册完Receiver后就可以接收到刚才已经发布的广播。这就使得我们可以预先处理一些事件,让有消费者时再把这些事件投递给消费者。

Subject

我们在实现简单的RxBus时使用了PublishSubject,其实RxJava提供给开发者4种Subject:
PublishSubject,BehaviorSubject ,BehaviorSubject,AsyncSubject。

  1. PublishSubject只会给在订阅者订阅的时间点之后的数据发送给观察者。

  2. BehaviorSubject在订阅者订阅时,会发送其最近发送的数据(如果此时还没有收到任何数据,它会发送一个默认值)。

  3. ReplaySubject在订阅者订阅时,会发送所有的数据给订阅者,无论它们是何时订阅的。

  4. AsyncSubject只在原Observable事件序列完成后,发送最后一个数据,后续如果还有订阅者继续订阅该Subject, 则可以直接接收到最后一个值。


Subject

从上图来看,似乎BehaviorSubjectReplaySubject具备Sticky的特性。

BehaviorSubject方案

BehaviorSubject似乎完全符合Sticky的定义,但是你发现了它只能保存最近的那个事件。

有这样的场景:如果订阅者A订阅了Event1,订阅者B订阅了Event2,而此时BehaviorSubject事件队列里是[..., Event2, Event1],当订阅者订阅时,因为保存的是最近的事件:即Event1,所以订阅者B是接收不到Event2的。

解决办法就是:
每个Event类型都各自创建一个对应的BehaviorSubject,这样的话资源开销比较大,并且该Sticky事件总线和普通的RxBus事件总线不能共享,即:普通事件和Sticky事件是独立的,因为普通事件是基于PublishSubject, 暂时放弃该方案!

ReplaySubject方案

ReplaySubject可以保存发送过的所有数据事件。

因为保存了所有的数据事件,所以不管什么类型的Event,我们只要过滤该类型,并让其发送最近的那个Event即可满足Sticky事件了。但是获取最近的对应事件是个难点,因为最符合需求的操作符takeLast()仅在订阅事件结束时(即:onCompleted())才会发送最后的那个数据事件,而我们的RxBus正常情况下应该是尽量避免其订阅事件结束的。(我没能找到合适的操作符,如果你知道,请告知我)

所以BehaviorSubject也是比较难实现Sticky特性的。

并且,不管是BehaviorSubject还是ReplaySubject,它们还有一个棘手的问题:它们实现的EventBus和普通的RxBus(基于PublishSubject)之间的数据是相互独立的!

总结:BehaviorSubjectBehaviorSubject都不能天然适合Sticky事件......

使用Map实现Sticky

该方法思路是在原来PublishSubject实现的RxBus基础上,使用ConcurrentHashMap<事件类型,事件>保存每个事件的最近事件,不仅能实现Sticky特性,最重要的是可以和普通RxBus的事件数据共享,不独立

因为我们的RxBus是基于PublishSubject的,而RxJava又有4种Subject,而且其中的BehaviorSubjectReplaySubject看起来又符合Sticky特性,所以我们可能会钻这个牛角尖,理所当然的认为实现Sticky需要通过其他类型的Subject.... (好吧,我钻进去了...)

这个方案的思路我是根据EventBus的实现想到的,下面是大致流程:

  1. Map的初始化:

    private final Map<Class<?>, Object> mStickyEventMap;public RxBus() {mBus = new SerializedSubject<>(PublishSubject.create());mStickyEventMap = new ConcurrentHashMap<>();}

    ConcurrentHashMap是一个线程安全的HashMap, 采用stripping lock(分离锁),效率比HashTable高很多。

  2. 在我们postSticky(Event)时,存入Map中:

    public void postSticky(Object event) {synchronized (mStickyEventMap) {mStickyEventMap.put(event.getClass(), event);} post(event); 
    }
  3. 订阅时toObservableSticky(Class<T> eventType),先从Map中寻找是否包含该类型的事件,如果没有,则说明没有Sticky事件要发送,直接订阅Subject(此时作为被观察者Observable);如果有,则说明有Sticky事件需要发送,订阅merge(Subject 和 Sticky事件)

    public <T> Observable<T> toObservableSticky(final Class<T> eventType) {synchronized (mStickyEventMap) {Observable<T> observable = mBus.ofType(eventType);final Object event = mStickyEventMap.get(eventType);if (event != null) {return observable.mergeWith(Observable.create(new Observable.OnSubscribe<T>() {@Overridepublic void call(Subscriber<? super T> subscriber) {subscriber.onNext(eventType.cast(event));}}));} else {return observable;}}}

    merge操作符:可以将多个Observables合并,就好像它们是单个的Observable一样。

这样,Sticky的核心功能就完成了,使用上和普通RxBus一样,通过postSticky()发送事件,toObservableSticky()订阅事件。

除此之外,我还提供了getStickyEvent(Class<T> eventType),removeStickyEvent(Class<T> eventType),removeAllStickyEvents()方法,供查找、移除对应事件类型的事件、移除全部Sticky事件。

在使用Sticky特性时,在不需要某Sticky事件时, 通过removeStickyEvent(Class<T> eventType)移除它,最保险的做法是:在主Activity的onDestroyremoveAllStickyEvents()
因为我们的RxBus是个单例静态对象,再正常退出app时,该对象依然会存在于JVM,除非进程被杀死,这样的话导致StickyMap里的数据依然存在,为了避免该问题,需要在app退出时,清理StickyMap。

// 主Activity(一般是栈底Activity)
@Override
protected void onDestroy() {super.onDestroy();// 移除所有Sticky事件RxBus.getDefault().removeAllStickyEvents();
}



 


异常处理

在使用RxBus过程中,你会发现你订阅了某个事件后,在后续接收到该事件时,如果处理的过程中发生了异常,你会发现后续的事件再也接收不到了,除非你重新订阅!

原因在于RxJava的事件序列机制,一个订阅事件是以onCompleted()或者onError()作为结束的,即:一旦订阅者的onCompleted()onError()被调用,订阅者和被订阅者的订阅关系就解除了。

这里说下RxJava的异常传递机制onError()在Observable序列传递过程中出现任何异常时被调用,然后终止Observable事件序列的传递,以此通知所有的订阅者发生了一个不可恢复的错误,即:异常总会传递到订阅者。

这本是RxJava的一个优点,反而在事件总线的场景下,成了让人头疼的问题!

所以我们的RxBus的订阅者在处理订阅事件时,一旦发生了异常,而又没Catch,那么最终都会调用到onError(),而一旦走到onError(),就意味着这个订阅者和该Subject解除了订阅关系,因此再也收不到后续发出的事件了

我目前想到2种方案

解决方案1:自动重新订阅

即在onError(e)或onCompleted()发生时,立即重新订阅,保证订阅事件在解决时可以立即恢复。

    private void subscribeEvent(){RxBus.getDefault().toObservable(Event.class)// 使用操作符过程中,无需try,catch,直接使用.subscribe(new Subscriber<Event>() {@Overridepublic void onCompleted() {subscribeEvent();}@Overridepublic void onError(Throwable e) {e.printStackTrace();subscribeEvent();}@Overridepublic void onNext(Event event) {// 直接处理接收的事件}});}

注意:这个方案仅可以用于普通RxBus,绝不能用于Sticky事件! 
原因在于Sticky的粘性特性,引起error的事件如果重新订阅的话,该事件很可能继续导致error,从而引起死循环!

解决方案2:捕捉

不管是普通RxBus还是使用Sticky的RxBus,通用的解决方案就是捕捉异常:在任何操作符内的ActionX,FuncX方法以及onNext(T t)内使用try,catch处理。

 RxBus.getDefault().toObservableSticky(EventSticky.class)        // 建议在Sticky时,在操作符内主动try,catch        .map(new Func1<EventSticky, EventSticky>() {@Overridepublic EventSticky call(EventSticky eventSticky) {try {// 变换操作} catch (Exception e) {e.printStackTrace();}return eventSticky;}}).subscribe(new Action1<EventSticky>() {@Overridepublic void call(EventSticky eventSticky) {try {// 处理接收的事件} catch (Exception e) {e.printStackTrace();}}
});

这篇关于用RxJava实现Rxbus替换EventBus事件总线的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1020428

相关文章

springboot security快速使用示例详解

《springbootsecurity快速使用示例详解》:本文主要介绍springbootsecurity快速使用示例,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝... 目录创www.chinasem.cn建spring boot项目生成脚手架配置依赖接口示例代码项目结构启用s

java之Objects.nonNull用法代码解读

《java之Objects.nonNull用法代码解读》:本文主要介绍java之Objects.nonNull用法代码,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐... 目录Java之Objects.nonwww.chinasem.cnNull用法代码Objects.nonN

Python如何使用__slots__实现节省内存和性能优化

《Python如何使用__slots__实现节省内存和性能优化》你有想过,一个小小的__slots__能让你的Python类内存消耗直接减半吗,没错,今天咱们要聊的就是这个让人眼前一亮的技巧,感兴趣的... 目录背景:内存吃得满满的类__slots__:你的内存管理小助手举个大概的例子:看看效果如何?1.

springboot security之前后端分离配置方式

《springbootsecurity之前后端分离配置方式》:本文主要介绍springbootsecurity之前后端分离配置方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的... 目录前言自定义配置认证失败自定义处理登录相关接口匿名访问前置文章总结前言spring boot secu

Python+PyQt5实现多屏幕协同播放功能

《Python+PyQt5实现多屏幕协同播放功能》在现代会议展示、数字广告、展览展示等场景中,多屏幕协同播放已成为刚需,下面我们就来看看如何利用Python和PyQt5开发一套功能强大的跨屏播控系统吧... 目录一、项目概述:突破传统播放限制二、核心技术解析2.1 多屏管理机制2.2 播放引擎设计2.3 专

一文详解SpringBoot响应压缩功能的配置与优化

《一文详解SpringBoot响应压缩功能的配置与优化》SpringBoot的响应压缩功能基于智能协商机制,需同时满足很多条件,本文主要为大家详细介绍了SpringBoot响应压缩功能的配置与优化,需... 目录一、核心工作机制1.1 自动协商触发条件1.2 压缩处理流程二、配置方案详解2.1 基础YAML

Python实现无痛修改第三方库源码的方法详解

《Python实现无痛修改第三方库源码的方法详解》很多时候,我们下载的第三方库是不会有需求不满足的情况,但也有极少的情况,第三方库没有兼顾到需求,本文将介绍几个修改源码的操作,大家可以根据需求进行选择... 目录需求不符合模拟示例 1. 修改源文件2. 继承修改3. 猴子补丁4. 追踪局部变量需求不符合很

java中使用POI生成Excel并导出过程

《java中使用POI生成Excel并导出过程》:本文主要介绍java中使用POI生成Excel并导出过程,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录需求说明及实现方式需求完成通用代码版本1版本2结果展示type参数为atype参数为b总结注:本文章中代码均为

springboot简单集成Security配置的教程

《springboot简单集成Security配置的教程》:本文主要介绍springboot简单集成Security配置的教程,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,... 目录集成Security安全框架引入依赖编写配置类WebSecurityConfig(自定义资源权限规则

Java的IO模型、Netty原理解析

《Java的IO模型、Netty原理解析》Java的I/O是以流的方式进行数据输入输出的,Java的类库涉及很多领域的IO内容:标准的输入输出,文件的操作、网络上的数据传输流、字符串流、对象流等,这篇... 目录1.什么是IO2.同步与异步、阻塞与非阻塞3.三种IO模型BIO(blocking I/O)NI