用RxJava实现Rxbus替换EventBus事件总线

2024-06-01 07:58

本文主要是介绍用RxJava实现Rxbus替换EventBus事件总线,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

     首先,Rxjava不必多说,可以说和Retrofit是年度最火框架,在GitHub上都已经超过两万star,Eventbus也不必多说,目前大多数开发者大多数项目一定会用到EventBus或者Otto作为事件总线通信库,对于RxJava使用者来说,RxJava也可以轻松实现事件总线,因为它们都依据于观察者模式。本文介绍Rxbus如何完美替换Eventbus,减少APP体积.

不多说,上代码


/*** 作者:JackyZ fenglizizhu* 说明:事件的订阅*/
public class RxBus {private static volatile RxBus mDefaultInstance;private final Subject<Object,Object> mBus;private final Map<Class<?>,Object> mStickEventMap;public RxBus() {//将Subject(PublishSubject)转换为SerializedSubjectmBus = new SerializedSubject<>(PublishSubject.create());mStickEventMap=new ConcurrentHashMap<>();}/*单例*/public static RxBus getDefault(){if(mDefaultInstance==null){synchronized (RxBus.class){if(mDefaultInstance==null){mDefaultInstance=new RxBus();}}}return mDefaultInstance;}/*发送事件*/public void post(Object event){if(mBus == null) {mDefaultInstance=new RxBus();}mBus.onNext(event);}/*订阅事件*/public <T> Observable<T> toObservable(Class<T> eventType){return  mBus.ofType(eventType);//ofType可以根据事件类型发送指定数据}/*发送一个新的Sticky事件*/public void postSticky(Object event){synchronized (mStickEventMap){//线程锁mStickEventMap.put(event.getClass(),event);//将事件类型保存到Map集合}post(event);}/*订阅Sticky事件*/public <T> Observable<T> tObservableStick(final Class<T> eventType){synchronized (mStickEventMap){final Observable<T> observable=mBus.ofType(eventType);//获取发送事件的Observablefinal Object event=mStickEventMap.get(eventType);//根据事件类型作为key查找value对应的valueif(null!=event){return  observable.mergeWith(Observable.create(new Observable.OnSubscribe<T>(){//通过mergeWith合并两个Observable@Overridepublic void call(Subscriber<? super T> subscriber) {//订阅 eventType.cast(event)直接将eventType转换为 T发送subscriber.onNext(eventType.cast(event));}}));}else {return observable;//如果没有sticky 就返回observable}}}/*根据eventType获取事件*/public <T> T getStickEvent(Class<T> eventType){synchronized (mStickEventMap){return  eventType.cast(mStickEventMap.get(eventType));}}/*移除指定类型的eventType的Sticky事件*/public <T> T removeStickyEvent(Class<T> eventType){synchronized (mStickEventMap){return eventType.cast(mStickEventMap.remove(eventType));}}/*移除所有的Sticky事件*/public void removeAllStickyEvents(){synchronized (mStickEventMap){mStickEventMap.clear();}}

开始分析

注:
1、Subject同时充当了Observer和Observable的角色,Subject是非线程安全的,要避免该问题,需要将 Subject转换为一个 SerializedSubject ,上述RxBus类中把线程非安全的PublishSubject包装成线程安全的Subject。

2、PublishSubject只会把在订阅发生的时间点之后来自原始Observable的数据发射给观察者。

3、ofType操作符只发射指定类型的数据,其内部就是filter+cast

public final <R> Observable<R> ofType(final Class<R> klass) {return filter(new Func1<T, Boolean>() {@Overridepublic final Boolean call(T t) {return klass.isInstance(t);}}).cast(klass);
}

filter操作符可以使你提供一个指定的测试数据项,只有通过测试的数据才会被“发射”。
cast操作符可以将一个Observable转换成指定类型的Observable。

分析:


RxBus工作流程图


1、首先创建一个可同时充当Observer和Observable的Subject;

2、在需要接收事件的地方,订阅该Subject(此时Subject是作为Observable),在这之后,一旦Subject接收到事件,立即发射给该订阅者;

3、在我们需要发送事件的地方,将事件post至Subject,此时Subject作为Observer接收到事件(onNext),然后会发射给所有订阅该Subject的订阅者。

对于RxBus的使用,就和普通的RxJava订阅事件很相似了。
先看发送事件的代码:

RxBus.getDefault().post(new UserEvent (1, "yoyo"));

userEvent是要发送的事件,如果你用过EventBus, 很容易理解,UserEvent的代码:

public class UserEvent {long id;String name;public UserEvent(long id,String name) {this.id= id;this.name= name;}public long getId() {return id;}public String getName() {return name;}
}

再看接收事件的代码:

// rxSubscription是一个Subscription的全局变量,这段代码可以在onCreate/onStart等生命周期内
rxSubscription = RxBus.getDefault().toObserverable(UserEvent.class).subscribe(new Action1<UserEvent>() {@Overridepublic void call(UserEvent userEvent) {long id = userEvent.getId();String name = userEvent.getName();...}},new Action1<Throwable>() {@Overridepublic void call(Throwable throwable) {// TODO: 处理异常}        });

最后,一定要记得在生命周期结束的地方取消订阅事件,防止RxJava可能会引起的内存泄漏问题。

@Override
protected void onDestroy() {super.onDestroy();if(!rxSubscription.isUnsubscribed()) {rxSubscription.unsubscribe();}
}


如果你的项目已经开始使用RxJava,建议可以把EventBus或Otto替换成RxBus,减小项目体积。


支持Sticky事件

在Android开发中,Sticky事件只指事件消费者在事件发布之后才注册的也能接收到该事件的特殊类型。Android中就有这样的实例,也就是Sticky Broadcast,即粘性广播。正常情况下如果发送者发送了某个广播,而接收者在这个广播发送后才注册自己的Receiver,这时接收者便无法接收到刚才的广播,为此Android引入了StickyBroadcast,在广播发送结束后会保存刚刚发送的广播(Intent),这样当接收者注册完Receiver后就可以接收到刚才已经发布的广播。这就使得我们可以预先处理一些事件,让有消费者时再把这些事件投递给消费者。

Subject

我们在实现简单的RxBus时使用了PublishSubject,其实RxJava提供给开发者4种Subject:
PublishSubject,BehaviorSubject ,BehaviorSubject,AsyncSubject。

  1. PublishSubject只会给在订阅者订阅的时间点之后的数据发送给观察者。

  2. BehaviorSubject在订阅者订阅时,会发送其最近发送的数据(如果此时还没有收到任何数据,它会发送一个默认值)。

  3. ReplaySubject在订阅者订阅时,会发送所有的数据给订阅者,无论它们是何时订阅的。

  4. AsyncSubject只在原Observable事件序列完成后,发送最后一个数据,后续如果还有订阅者继续订阅该Subject, 则可以直接接收到最后一个值。


Subject

从上图来看,似乎BehaviorSubjectReplaySubject具备Sticky的特性。

BehaviorSubject方案

BehaviorSubject似乎完全符合Sticky的定义,但是你发现了它只能保存最近的那个事件。

有这样的场景:如果订阅者A订阅了Event1,订阅者B订阅了Event2,而此时BehaviorSubject事件队列里是[..., Event2, Event1],当订阅者订阅时,因为保存的是最近的事件:即Event1,所以订阅者B是接收不到Event2的。

解决办法就是:
每个Event类型都各自创建一个对应的BehaviorSubject,这样的话资源开销比较大,并且该Sticky事件总线和普通的RxBus事件总线不能共享,即:普通事件和Sticky事件是独立的,因为普通事件是基于PublishSubject, 暂时放弃该方案!

ReplaySubject方案

ReplaySubject可以保存发送过的所有数据事件。

因为保存了所有的数据事件,所以不管什么类型的Event,我们只要过滤该类型,并让其发送最近的那个Event即可满足Sticky事件了。但是获取最近的对应事件是个难点,因为最符合需求的操作符takeLast()仅在订阅事件结束时(即:onCompleted())才会发送最后的那个数据事件,而我们的RxBus正常情况下应该是尽量避免其订阅事件结束的。(我没能找到合适的操作符,如果你知道,请告知我)

所以BehaviorSubject也是比较难实现Sticky特性的。

并且,不管是BehaviorSubject还是ReplaySubject,它们还有一个棘手的问题:它们实现的EventBus和普通的RxBus(基于PublishSubject)之间的数据是相互独立的!

总结:BehaviorSubjectBehaviorSubject都不能天然适合Sticky事件......

使用Map实现Sticky

该方法思路是在原来PublishSubject实现的RxBus基础上,使用ConcurrentHashMap<事件类型,事件>保存每个事件的最近事件,不仅能实现Sticky特性,最重要的是可以和普通RxBus的事件数据共享,不独立

因为我们的RxBus是基于PublishSubject的,而RxJava又有4种Subject,而且其中的BehaviorSubjectReplaySubject看起来又符合Sticky特性,所以我们可能会钻这个牛角尖,理所当然的认为实现Sticky需要通过其他类型的Subject.... (好吧,我钻进去了...)

这个方案的思路我是根据EventBus的实现想到的,下面是大致流程:

  1. Map的初始化:

    private final Map<Class<?>, Object> mStickyEventMap;public RxBus() {mBus = new SerializedSubject<>(PublishSubject.create());mStickyEventMap = new ConcurrentHashMap<>();}

    ConcurrentHashMap是一个线程安全的HashMap, 采用stripping lock(分离锁),效率比HashTable高很多。

  2. 在我们postSticky(Event)时,存入Map中:

    public void postSticky(Object event) {synchronized (mStickyEventMap) {mStickyEventMap.put(event.getClass(), event);} post(event); 
    }
  3. 订阅时toObservableSticky(Class<T> eventType),先从Map中寻找是否包含该类型的事件,如果没有,则说明没有Sticky事件要发送,直接订阅Subject(此时作为被观察者Observable);如果有,则说明有Sticky事件需要发送,订阅merge(Subject 和 Sticky事件)

    public <T> Observable<T> toObservableSticky(final Class<T> eventType) {synchronized (mStickyEventMap) {Observable<T> observable = mBus.ofType(eventType);final Object event = mStickyEventMap.get(eventType);if (event != null) {return observable.mergeWith(Observable.create(new Observable.OnSubscribe<T>() {@Overridepublic void call(Subscriber<? super T> subscriber) {subscriber.onNext(eventType.cast(event));}}));} else {return observable;}}}

    merge操作符:可以将多个Observables合并,就好像它们是单个的Observable一样。

这样,Sticky的核心功能就完成了,使用上和普通RxBus一样,通过postSticky()发送事件,toObservableSticky()订阅事件。

除此之外,我还提供了getStickyEvent(Class<T> eventType),removeStickyEvent(Class<T> eventType),removeAllStickyEvents()方法,供查找、移除对应事件类型的事件、移除全部Sticky事件。

在使用Sticky特性时,在不需要某Sticky事件时, 通过removeStickyEvent(Class<T> eventType)移除它,最保险的做法是:在主Activity的onDestroyremoveAllStickyEvents()
因为我们的RxBus是个单例静态对象,再正常退出app时,该对象依然会存在于JVM,除非进程被杀死,这样的话导致StickyMap里的数据依然存在,为了避免该问题,需要在app退出时,清理StickyMap。

// 主Activity(一般是栈底Activity)
@Override
protected void onDestroy() {super.onDestroy();// 移除所有Sticky事件RxBus.getDefault().removeAllStickyEvents();
}



 


异常处理

在使用RxBus过程中,你会发现你订阅了某个事件后,在后续接收到该事件时,如果处理的过程中发生了异常,你会发现后续的事件再也接收不到了,除非你重新订阅!

原因在于RxJava的事件序列机制,一个订阅事件是以onCompleted()或者onError()作为结束的,即:一旦订阅者的onCompleted()onError()被调用,订阅者和被订阅者的订阅关系就解除了。

这里说下RxJava的异常传递机制onError()在Observable序列传递过程中出现任何异常时被调用,然后终止Observable事件序列的传递,以此通知所有的订阅者发生了一个不可恢复的错误,即:异常总会传递到订阅者。

这本是RxJava的一个优点,反而在事件总线的场景下,成了让人头疼的问题!

所以我们的RxBus的订阅者在处理订阅事件时,一旦发生了异常,而又没Catch,那么最终都会调用到onError(),而一旦走到onError(),就意味着这个订阅者和该Subject解除了订阅关系,因此再也收不到后续发出的事件了

我目前想到2种方案

解决方案1:自动重新订阅

即在onError(e)或onCompleted()发生时,立即重新订阅,保证订阅事件在解决时可以立即恢复。

    private void subscribeEvent(){RxBus.getDefault().toObservable(Event.class)// 使用操作符过程中,无需try,catch,直接使用.subscribe(new Subscriber<Event>() {@Overridepublic void onCompleted() {subscribeEvent();}@Overridepublic void onError(Throwable e) {e.printStackTrace();subscribeEvent();}@Overridepublic void onNext(Event event) {// 直接处理接收的事件}});}

注意:这个方案仅可以用于普通RxBus,绝不能用于Sticky事件! 
原因在于Sticky的粘性特性,引起error的事件如果重新订阅的话,该事件很可能继续导致error,从而引起死循环!

解决方案2:捕捉

不管是普通RxBus还是使用Sticky的RxBus,通用的解决方案就是捕捉异常:在任何操作符内的ActionX,FuncX方法以及onNext(T t)内使用try,catch处理。

 RxBus.getDefault().toObservableSticky(EventSticky.class)        // 建议在Sticky时,在操作符内主动try,catch        .map(new Func1<EventSticky, EventSticky>() {@Overridepublic EventSticky call(EventSticky eventSticky) {try {// 变换操作} catch (Exception e) {e.printStackTrace();}return eventSticky;}}).subscribe(new Action1<EventSticky>() {@Overridepublic void call(EventSticky eventSticky) {try {// 处理接收的事件} catch (Exception e) {e.printStackTrace();}}
});

这篇关于用RxJava实现Rxbus替换EventBus事件总线的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1020428

相关文章

Java的栈与队列实现代码解析

《Java的栈与队列实现代码解析》栈是常见的线性数据结构,栈的特点是以先进后出的形式,后进先出,先进后出,分为栈底和栈顶,栈应用于内存的分配,表达式求值,存储临时的数据和方法的调用等,本文给大家介绍J... 目录栈的概念(Stack)栈的实现代码队列(Queue)模拟实现队列(双链表实现)循环队列(循环数组

C++如何通过Qt反射机制实现数据类序列化

《C++如何通过Qt反射机制实现数据类序列化》在C++工程中经常需要使用数据类,并对数据类进行存储、打印、调试等操作,所以本文就来聊聊C++如何通过Qt反射机制实现数据类序列化吧... 目录设计预期设计思路代码实现使用方法在 C++ 工程中经常需要使用数据类,并对数据类进行存储、打印、调试等操作。由于数据类

Python实现图片分割的多种方法总结

《Python实现图片分割的多种方法总结》图片分割是图像处理中的一个重要任务,它的目标是将图像划分为多个区域或者对象,本文为大家整理了一些常用的分割方法,大家可以根据需求自行选择... 目录1. 基于传统图像处理的分割方法(1) 使用固定阈值分割图片(2) 自适应阈值分割(3) 使用图像边缘检测分割(4)

Android实现在线预览office文档的示例详解

《Android实现在线预览office文档的示例详解》在移动端展示在线Office文档(如Word、Excel、PPT)是一项常见需求,这篇文章为大家重点介绍了两种方案的实现方法,希望对大家有一定的... 目录一、项目概述二、相关技术知识三、实现思路3.1 方案一:WebView + Office Onl

Java中Switch Case多个条件处理方法举例

《Java中SwitchCase多个条件处理方法举例》Java中switch语句用于根据变量值执行不同代码块,适用于多个条件的处理,:本文主要介绍Java中SwitchCase多个条件处理的相... 目录前言基本语法处理多个条件示例1:合并相同代码的多个case示例2:通过字符串合并多个case进阶用法使用

Java中的Lambda表达式及其应用小结

《Java中的Lambda表达式及其应用小结》Java中的Lambda表达式是一项极具创新性的特性,它使得Java代码更加简洁和高效,尤其是在集合操作和并行处理方面,:本文主要介绍Java中的La... 目录前言1. 什么是Lambda表达式?2. Lambda表达式的基本语法例子1:最简单的Lambda表

Java中Scanner的用法示例小结

《Java中Scanner的用法示例小结》有时候我们在编写代码的时候可能会使用输入和输出,那Java也有自己的输入和输出,今天我们来探究一下,对JavaScanner用法相关知识感兴趣的朋友一起看看吧... 目录前言一 输出二 输入Scanner的使用多组输入三 综合练习:猜数字游戏猜数字前言有时候我们在

C# foreach 循环中获取索引的实现方式

《C#foreach循环中获取索引的实现方式》:本文主要介绍C#foreach循环中获取索引的实现方式,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友参考下吧... 目录一、手动维护索引变量二、LINQ Select + 元组解构三、扩展方法封装索引四、使用 for 循环替代

Spring Security+JWT如何实现前后端分离权限控制

《SpringSecurity+JWT如何实现前后端分离权限控制》本篇将手把手教你用SpringSecurity+JWT搭建一套完整的登录认证与权限控制体系,具有很好的参考价值,希望对大家... 目录Spring Security+JWT实现前后端分离权限控制实战一、为什么要用 JWT?二、JWT 基本结构

java解析jwt中的payload的用法

《java解析jwt中的payload的用法》:本文主要介绍java解析jwt中的payload的用法,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录Java解析jwt中的payload1. 使用 jjwt 库步骤 1:添加依赖步骤 2:解析 JWT2. 使用 N