带你入门学习Rxjava--上手教程

2024-06-01 07:58
文章标签 java 学习 入门 教程 上手 rx

本文主要是介绍带你入门学习Rxjava--上手教程,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

相信各位看官对RxJava早有耳闻,那么关于什么是RxJava我就不再赘述了,不知道的可自行百度。网上的RxJava的入门门槛稍高,但入门不难,可以跟本文一起来学习

注: 本文针对rxjava 1.x.x ,用例为下,建议初学先从1.x看起

    compile 'io.reactivex:rxjava:1.1.6'
    compile 'io.reactivex:rxandroid:1.2.1'

官方的介绍

1.支持Java6+

2.android 2.3+

3.异步的

4.基于观察者设计模式(Observer、Observable)

5.Subscribe (订阅)


之前不了解设计模式--观察者模式的先来一个简单例子,懂得可以跳过

观察者模式的简单介绍:

观察者模式很好理解,类似于邮件订阅和RSS订阅,当我们浏览一些博客或wiki时,经常会看到RSS图标,就这的意思是,当你订阅了该文章,如果后续有更新,会及时通知你。

其实,简单来讲就一句话:当一个对象变化时,其它依赖该对象的对象都会收到通知,并且随着变化.对象之间是一种一对多的关系。先来看看关系图:

MySubject类就是我们的主对象,Observer1和Observer2是依赖于MySubject的对象,当MySubject变化时,Observer1和Observer2必然变化。AbstractSubject类中定义着需要监控的对象列表,可以对其进行修改:增加或删除被监控对象,且当MySubject变化时,负责通知在列表内存在的对象。我们看实现代码:

Observer两个实现类:

     
  1. public interface Observer {    
  2.    public void update();    
  3. }  


     
  1. public class Observer1 implements Observer {    
  2.    
  3.    @Override    
  4.    public void update() {    
  5.        System.out.println("observer1 has received!");    
  6.    }    
  7. }
     
  1. public class Observer2 implements Observer {    
  2.    
  3.    @Override    
  4.    public void update() {    
  5.        System.out.println("observer2 has received!");    
  6.    }    
  7.    
  8. }  

Subject接口及实现类

     
  1. public interface Subject {    
  2.        
  3.    /*增加观察者*/    
  4.    public void add(Observer observer);    
  5.        
  6.    /*删除观察者*/    
  7.    public void del(Observer observer);    
  8.        
  9.    /*通知所有的观察者*/    
  10.    public void notifyObservers();    
  11.        
  12.    /*自身的操作*/    
  13.    public void operation();    
  14. }    


     
  1. public abstract class AbstractSubject implements Subject {    
  2.    
  3.    private Vector<Observer> vector = new Vector<Observer>();    
  4.    @Override    
  5.    public void add(Observer observer) {    
  6.        vector.add(observer);    
  7.    }    
  8.    
  9.    @Override    
  10.    public void del(Observer observer) {    
  11.        vector.remove(observer);    
  12.    }    
  13.    
  14.    @Override    
  15.    public void notifyObservers() {    
  16.        Enumeration<Observer> enumo = vector.elements();    
  17.        while(enumo.hasMoreElements()){    
  18.            enumo.nextElement().update();    
  19.        }    
  20.    }    
  21. }    
     
  1. public class MySubject extends AbstractSubject {    
  2.    
  3.    @Override    
  4.    public void operation() {    
  5.        System.out.println("update self!");    
  6.        notifyObservers();    
  7.    }    
  8.    
  9. }    


     
  1. public class ObserverTest {    
  2.    
  3.    public static void main(String[] args) {    
  4.        Subject sub = new MySubject();    
  5.        sub.add(new Observer1());    
  6.        sub.add(new Observer2());    
  7.            
  8.        sub.operation();    
  9.    }    
  10.    
  11. }    

输出:
update self!
observer1 has received!
observer2 has received!

好啦 ,正式进入Rxjava

先创建个数据发射源,很好理解,就是发射数据用的:被观察者

   Observable<String> sender = Observable.create(new Observable.OnSubscribe<String>() {@Overridepublic void call(Subscriber<? super String> subscriber) {subscriber.onNext("Hi!");  //发送数据"Hi!"}});

再创建个数据接收源,同理,接收数据用的:观察者

   Observer<String> receiver = new Observer<String>() {@Overridepublic void onCompleted() {//数据接收完成时调用}@Overridepublic void onError(Throwable e) {//发生错误调用}@Overridepublic void onNext(String s) {//正常接收数据调用System.out.print(s);  //将接收到来自sender的问候"Hi!"}};

好了,将发射源和接收源关联起来:

  sender.subscribe(receiver);

这样就形成RxJava一个简单的用法,sender发射"Hi!",将会被receiver的onNext的接收,通过这个例子,也许你会想到“异步”、“观察者模式”,没错,这些都是RxJava所做的事情,并且让他们变得更简单和简洁,而RxJava所有的一切都将围绕这两个点展开,一个是发射数据,一个是接收数据,如果你理解了这点或者你已经知道RxJava就是这么一回事,那么恭喜你,你已经一只脚跨进RxJava的大门了,如果不是也无所谓,请继续往下看...

网上关于RxJava的博文也有很多,但绝大部分文章都有一个共同点,就是侧重于讲RxJava中各种强大的操作符,而忽略了最基本的东西——概念,下面对发射源和接收源做个归类.


基本概念

Observable:发射源,英文释义“可观察的”,在观察者模式中称为“被观察者”或“可观察对象”;

Observer:接收源,英文释义“观察者”,没错!就是观察者模式中的“观察者”,可接收Observable、Subject发射的数据;

Subject:Subject是一个比较特殊的对象,既可充当发射源,也可充当接收源,为避免初学者被混淆,本章将不对Subject做过多的解释和使用,重点放在Observable和Observer上,先把最基本方法的使用学会,后面再学其他的都不是什么问题;

Subscriber:“订阅者”,也是接收源,那它跟Observer有什么区别呢?Subscriber实现了Observer接口,比Observer多了一个最重要的方法unsubscribe( ),用来取消订阅,当你不再想接收数据了,可以调用unsubscribe( )方法停止接收,Observer 在 subscribe() 过程中,最终也会被转换成 Subscriber 对象,一般情况下,建议使用Subscriber作为接收源;

Subscription :Observable调用subscribe( )方法返回的对象,同样有unsubscribe( )方法,可以用来取消订阅事件;

Action0:RxJava中的一个接口,它只有一个无参call()方法,且无返回值,同样还有Action1,Action2...Action9等,Action1封装了含有 1 个参的call()方法,即call(T t),Action2封装了含有 2 个参数的call方法,即call(T1 t1,T2 t2),以此类推;

Func0:与Action0非常相似,也有call()方法,但是它是有返回值的,同样也有Func0、Func1...Func9;


基本用法

  • Observable的创建
    1.使用create( ),最基本的创建方式:

    normalObservable = Observable.create(new Observable.OnSubscribe<String>() {@Overridepublic void call(Subscriber<? super String> subscriber) {subscriber.onNext("create1"); //发射一个"create1"的Stringsubscriber.onNext("create2"); //发射一个"create2"的Stringsubscriber.onCompleted();//发射完成,这种方法需要手动调用onCompleted,才会回调Observer的onCompleted方法}});

    2.使用just( ),将为你创建一个Observable并自动为你调用onNext( )发射数据:

    justObservable = Observable.just("just1","just2");//依次发送"just1""just2"

    3.使用from( ),遍历集合,发送每个item:

    List<String> list = new ArrayList<>();
    list.add("from1");
    list.add("from2");
    list.add("from3");
    fromObservable = Observable.from(list);  //遍历list 每次发送一个
    /** 注意,just()方法也可以传list,但是发送的是整个list对象,而from()发送的是list的一个item** /

    4.使用defer( ),有观察者订阅时才创建Observable,并且为每个观察者创建一个新的Observable:

    deferObservable = Observable.defer(new Func0<Observable<String>>() {@Override//注意此处的call方法没有Subscriber参数public Observable<String> call() {return Observable.just("deferObservable");}});

    5.使用interval( ),创建一个按固定时间间隔发射整数序列的Observable,可用作定时器:

    intervalObservable = Observable.interval(1, TimeUnit.SECONDS);//每隔一秒发送一次

    6.使用range( ),创建一个发射特定整数序列的Observable,第一个参数为起始值,第二个为发送的个数,如果为0则不发送,负数则抛异常:

    rangeObservable = Observable.range(10, 5);//将发送整数10,11,12,13,14

    7.使用timer( ),创建一个Observable,它在一个给定的延迟后发射一个特殊的值,等同于Android中Handler的postDelay( )方法:

    timeObservable = Observable.timer(3, TimeUnit.SECONDS);  //3秒后发射一个值

    8.使用repeat( ),创建一个重复发射特定数据的Observable:

    repeatObservable = Observable.just("repeatObservable").repeat(3);//重复发射3次
  • Observer的创建

    mObserver = new Observer<String>() {@Overridepublic void onCompleted() {LogUtil.log("onCompleted");}@Overridepublic void onError(Throwable e) {}@Overridepublic void onNext(String s) {LogUtil.log(s);}};

    ok,有了Observable和Obsever,我们就可以随便玩了,任取一个已创建的Observable和Observer关联上,即形成一个RxJava的例子,如:

    justObservable.subscribe(mObserver);

    mObserver的onNext方法将会依次收到来自justObservable的数据"just1""just2",另外,如果你不在意数据是否接收完或者是否出现错误,即不需要Observer的onCompleted()onError()方法,可使用Action1subscribe()支持将Action1作为参数传入,RxJava将会调用它的call方法来接收数据,代码如下:

    justObservable.subscribe(new Action1<String>() {@Overridepublic void call(String s) {LogUtil.log(s);}});

    以上就是RxJava最简单的用法。

    RxJava也以代码的简洁深受广大用户喜爱,简洁不能理解为代码量少,而是随着逻辑的复杂,需求的更改,代码可依然能保持极强的阅读性


RxJava的简单操作符

Scheduler

在讲常用操作符前,先看看Scheduler这个东西,名之为调度器,正因为有这个东西,让RxJava可以从主线程和子线程之间轻松切换,各个Scheduler的具体使用效果看以下表解释:

调度器类型 用途
Schedulers.computation( ) 用于计算任务,如事件循环或和回调处理,不要用于IO操作(IO操作请使用Schedulers.io());默认线程数等于处理器的数量
Schedulers.from(executor) 使用指定的Executor作为调度器
Schedulers.immediate( ) 在当前线程立即开始执行任务
Schedulers.io( ) 用于IO密集型任务,如异步阻塞IO操作,这个调度器的线程池会根据需要增长;对于普通的计算任务,请使用Schedulers.computation();Schedulers.io( )默认是一个CachedThreadScheduler,很像一个有线程缓存的新线程调度器
Schedulers.newThread( ) 为每个任务创建一个新线程
Schedulers.trampoline( ) 当其它排队的任务完成后,在当前线程排队开始执行
AndroidSchedulers.mainThread() 此调度器为RxAndroid特有,顾名思义,运行在Android UI线程上

具体如何使用呢,比如从数据库读取数据更新到UI上,假设数据量很大,直接从主线程读取数据,会造成UI卡顿,以前我们常用AnsyTask或者Handler去处理避免出现这类问题,个人认为手写个AnsyTask还是挺麻烦的,但用RxJava就简单多了,例如:

Observable.create(new Observable.OnSubscribe<Data>() {@Overridepublic void call(Subscriber<? super Data> subscriber) {Data data = getData();//从数据库获取subscriber.onNext(data);subscriber.onCompleted();}}).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(new Action1<Data>() {@Overridepublic void call(Data data) {//更新ui}});

简单粗暴的解释一下,subscribeOn( )决定了发射数据在哪个调度器上执行,observeOn(AndroidSchedulers.mainThread())则指定数据接收发生在UI线程,简直不要太方便。


常用操作符

  • Map:最常用且最实用的操作符之一,将对象转换成另一个对象发射出去,应用范围非常广,如数据的转换,数据的预处理等。
    例一:数据类型转换,改变最终的接收的数据类型。假设传入本地图片路径,根据路径获取图片的Bitmap。

    Observable.just(filePath).map(new Func1<String, Bitmap>() {@Overridepublic Bitmap call(String path) {return getBitmapByPath(path);}}).subscribe(new Action1<Bitmap>() {@Overridepublic void call(Bitmap bitmap) {//获取到bitmap,显示
    }});

    例二:对数据进行预处理,最后得到理想型数据。实际开发过程中,从后台接口获取到的数据也许不符合我们想要的,这时候可以在获取过程中对得到的数据进行预处理(结合Retrofit)。

    Observable.just("12345678").map(new Func1<String, String>() {@Overridepublic String call(String s) {return s.substring(0,4);//只要前四位}})
    .subscribe(new Action1<String>() {@Overridepublic void call(String s) {Log.i("mytag",s);}});

    先说明一下,为了方便理解,所以写的例子都比较简单,不要以为明明可以简单用if-else解决的事,没必要用这种方式去写,当你真正将这些操作符使用到数据处理中去的时候,你就会发现有多方便。

  • FlatMap:和Map很像但又有所区别,Map只是转换发射的数据类型,而FlatMap可以将原始Observable转换成另一个Observable。还是举例说明吧。假设要打印全国所有学校的名称,可以直接用Map:
    为了更清晰一点,先贴一下School类:

    public class School {private String name;private List<Student> studentList;public List<Student> getStudentList() {return studentList;}public void setStudentList(List<Student> studentList) {this.studentList = studentList;}public String getName() {return name;}public void setName(String name) {this.name = name;}public static class Student{private String name;public String getName() {return name;}public void setName(String name) {this.name = name;}}
    }

    接着用Map打印学校名称:

    List<School> schoolList = new ArrayList<>();
    Observable.from(schoolList).map(new Func1<School, String>() {@Overridepublic String call(School school) {return school.getName();}}).subscribe(new Action1<String>() {@Overridepublic void call(String schoolName) {Log.i("mytag",schoolName);}});

    再进一步,打印学校所有学生的姓名,先考虑用Map实现,将所有School对象直接转成Student:

    Observable.from(schoolList).map(new Func1<School, School.Student>() {@Overridepublic School.Student call(School school) {return school.getStudentList();
      }}).subscribe(new Action1<School.Student>() {@Overridepublic void call(School.Student student) {Log.i("mytag",student.getName());}});

    看似可行,但事实上,这是一段错误的代码,细心的人就会发现错误的地方

    @Override
    public School.Student call(School school) {return school.getStudentList();  //错误,Student 是一个对象,返回的却是一个list
    }

    所以用Map是无法实现直接打印学校的所有学生名字的,因为Map是一对一的关系,无法将单一的School对象转变成多个Student。前面说到,FlatMap可以改变原始Observable变成另外一个Observable,如果我们能利用from()操作符把school.getStudentList()变成另外一个Observable问题不就迎刃而解了吗,这时候就该FlatMap上场了,来看看它是怎么实现的:

    Observable.from(schoolList).flatMap(new Func1<School, Observable<School.Student>>() {@Overridepublic Observable<School.Student> call(School school) {return Observable.from(school.getStudentList()); //关键,将学生列表以另外一个Observable发射出去}}).subscribe(new Action1<School.Student>() {@Overridepublic void call(School.Student student) {Log.i("mytag",student.getName());}});

    Map和FlatMap在我看来就像孪生兄弟一样,非常实用,实际开发中也我也经常使用,个人觉得要想上手RxJava,掌握这两个操作符必不可少。

  • Buffer:缓存,可以设置缓存大小,缓存满后,以list的方式将数据发送出去;例:

    Observable.just(1,2,3).buffer(2).subscribe(new Action1<List<Integer>>() {@Overridepublic void call(List<Integer> list) {Log.i("mytag","size:"+list.size());}});

    运行打印结果如下:

    11-02 20:49:58.370 23392-23392/? I/mytag: size:2
    11-02 20:49:58.370 23392-23392/? I/mytag: size:1

    在开发当中,个人经常将Buffer和Map一起使用,常发生在从后台取完数据,对一个List中的数据进行预处理后,再用Buffer缓存后一起发送,保证最后数据接收还是一个List,如下:

    List<School> schoolList = new ArrayList<>();
    Observable.from(schoolList).map(new Func1<School, School>() {@Overridepublic School call(School school) {school.setName("NB大学");  //将所有学校改名return school;}}).buffer(schoolList.size())  //缓存起来,最后一起发送
    .subscribe(new Action1<List<School>>() {@Overridepublic void call(List<School> schools) {   
    }});
  • Take:发射前n项数据,还是用上面的例子,假设不要改所有学校的名称了,就改前四个学校的名称:
    Observable.from(schoolList).take(4).map(new Func1<School, School>() {@Overridepublic School call(School school) {school.setName("NB大学");
          return school;
      }}).buffer(4).subscribe(new Action1<List<School>>() {@Overridepublic void call(List<School> schools) {}});
  • Distinct:去掉重复的项,比较好理解:
    Observable.just(1, 2, 1, 1, 2, 3).distinct().subscribe(new Action1<Integer>() {@Overridepublic void call(Integer item) {System.out.println("Next: " + item);}});
    输出
    Next: 1
    Next: 2
    Next: 3
  • Filter:过滤,通过谓词判断的项才会被发射,例如,发射小于4的数据:
    Observable.just(1, 2, 3, 4, 5).filter(new Func1<Integer, Boolean>() {@Overridepublic Boolean call(Integer item) {return( item < 4 );}}).subscribe(new Action1<Integer>() {@Overridepublic void call(Integer item) {System.out.println("Next: " + item);}});
    输出:
    Next: 1
    Next: 2
    Next: 3


关于Subject

关于Subject,官方文档的解释是这样的:Subject可以看成是一个桥梁或者代理,在某些ReactiveX实现中(如RxJava),它同时充当了Observer和Observable的角色。因为它是一个Observer,它可以订阅一个或多个Observable;又因为它是一个Observable,它可以转发它收到(Observe)的数据,也可以发射新的数据。从官方解释中,我提取出三个要点:

  1. 它可以充当Observable;
  2. 它可以充当Observer;
  3. 它是Observable和Observer之间的桥梁;

接下来对这三个要点解释一下,但在解释之前,要先介绍一下Subject的种类, Subject是一个抽象类,不能通过new来实例化Subject,所以Subject有四个实现类,分别为AsyncSubjectBehaviorSubjectPublishSubjectReplaySubject,每个实现类都有特定的“技能”,下面结合代码来介绍一下它们各自的“技能”。注意,所有的实现类都由create()方法实例化,无需new,所有的实现类调用onCompleted()onError(),它的Observer将不再接收数据;


Subject的分类解析

  • AsyncSubject
    Observer会接收AsyncSubject的`onComplete()之前的最后一个数据,如果因异常而终止,AsyncSubject将不会释放任何数据,但是会向Observer传递一个异常通知。示例代码如下:

          AsyncSubject<String> asyncSubject = AsyncSubject.create();asyncSubject.onNext("asyncSubject1");asyncSubject.onNext("asyncSubject2");asyncSubject.onNext("asyncSubject3");  asyncSubject.onCompleted();asyncSubject.subscribe(new Observer<String>() {@Overridepublic void onCompleted() {LogUtil.log("asyncSubject onCompleted");  //输出 asyncSubject onCompleted}@Overridepublic void onError(Throwable e) {LogUtil.log("asyncSubject onError");  //不输出(异常才会输出)}@Overridepublic void onNext(String s) {LogUtil.log("asyncSubject:"+s);  //输出asyncSubject:asyncSubject3}});

    以上代码,Observer只会接收asyncSubject的onCompleted()被调用前的最后一个数据,即“asyncSubject3”,如果不调用onCompleted(),Subscriber将不接收任何数据。

  • BehaviorSubject
    Observer会接收到BehaviorSubject被订阅之前的最后一个数据,再接收其他发射过来的数据,如果BehaviorSubject被订阅之前没有发送任何数据,则会发送一个默认数据。(注意跟AsyncSubject的区别,AsyncSubject要手动调用onCompleted(),且它的Observer会接收到onCompleted()前发送的最后一个数据,之后不会再接收数据,而BehaviorSubject不需手动调用onCompleted(),它的Observer接收的是BehaviorSubject被订阅前发送的最后一个数据,两个的分界点不一样,且之后还会继续接收数据。)示例代码如下:

      BehaviorSubject<String> behaviorSubject = BehaviorSubject.create("default");behaviorSubject.onNext("behaviorSubject1");behaviorSubject.onNext("behaviorSubject2");behaviorSubject.subscribe(new Observer<String>() {@Overridepublic void onCompleted() {LogUtil.log("behaviorSubject:complete");}@Overridepublic void onError(Throwable e) {LogUtil.log("behaviorSubject:error");}@Overridepublic void onNext(String s) {LogUtil.log("behaviorSubject:"+s);}});behaviorSubject.onNext("behaviorSubject3");behaviorSubject.onNext("behaviorSubject4");

    以上代码,Observer会接收到behaviorSubject2、behaviorSubject3、behaviorSubject4,如果在behaviorSubject.subscribe()之前不发送behaviorSubject1、behaviorSubject2,则Observer会先接收到default,再接收behaviorSubject3、behaviorSubject4。

  • PublishSubject
    PublishSubject比较容易理解,相对比其他Subject常用,它的Observer只会接收到PublishSubject被订阅之后发送的数据。示例代码如下:

      PublishSubject<String> publishSubject = PublishSubject.create();publishSubject.onNext("publishSubject1");publishSubject.onNext("publishSubject2");publishSubject.subscribe(new Observer<String>() {@Overridepublic void onCompleted() {}@Overridepublic void onError(Throwable e) {}@Overridepublic void onNext(String s) {LogUtil.log("publishSubject observer1:"+s);}});publishSubject.onNext("publishSubject3");publishSubject.onNext("publishSubject4");

    以上代码,Observer只会接收到"behaviorSubject3"、"behaviorSubject4"。

  • ReplaySubject
    ReplaySubject会发射所有数据给观察者,无论它们是何时订阅的。也有其它版本的ReplaySubject,在重放缓存增长到一定大小的时候或过了一段时间后会丢弃旧的数据。示例代码如下:
    ReplaySubject<String>replaySubject = ReplaySubject.create(); //创建默认初始缓存容量大小为16的ReplaySubject,当数据条目超过16会重新分配内存空间,使用这种方式,不论ReplaySubject何时被订阅,Observer都能接收到数据
    //replaySubject = ReplaySubject.create(100);//创建指定初始缓存容量大小为100的ReplaySubject
    //replaySubject = ReplaySubject.createWithSize(2);//只缓存订阅前最后发送的2条数据 
    //replaySubject=ReplaySubject.createWithTime(1,TimeUnit.SECONDS,Schedulers.computation());  //replaySubject被订阅前的前1秒内发送的数据才能被接收     
    replaySubject.onNext("replaySubject:pre1");
    replaySubject.onNext("replaySubject:pre2");
    replaySubject.onNext("replaySubject:pre3");
    replaySubject.subscribe(new Action1<String>() {@Overridepublic void call(String s) {LogUtil.log("replaySubject:" + s);}});
    replaySubject.onNext("replaySubject:after1");
    replaySubject.onNext("replaySubject:after2");
    如果你把 Subject 当作一个 Subscriber 使用,不要从多个线程中调用它的onNext方法(包括其它的on系列方法),这可能导致同时(非顺序)调用,这会违反Observable协议,给Subject的结果增加了不确定性。要避免此类问题,官方提出了“串行化”,你可以将 Subject 转换为一个 SerializedSubject ,类似于这样:
    SerializedSubject<String, Integer> ser = new SerializedSubject(publishSubject);

    为什么说Subject既可充当Observable,又可充当Observer,是它们两个之间的桥梁呢。首先,从理论上讲,Subject继承了Observable,又实现了Observer接口,所以说它既是Observable又是Observer,完全合理。从实际应用上讲,Subject也能实现Observable和Observer相同的功能

    • 创建Observable并发射数据:

        Observable.create(new Observable.OnSubscribe<String>() {@Overridepublic void call(Subscriber<? super String> subscriber) {subscriber.onNext("I'm Observable");subscriber.onCompleted();}});

      用Subject实现为:

      PublishSubject<String> publishSubject = PublishSubject.create();
      publishSubject.onNext("as Observable");
      publishSubject.onCompleted();
    • 创建Observer订阅Observable并接收数据:

      mObservable.subscribe(new Observer<String>() {@Overridepublic void onCompleted() {}@Overridepublic void onError(Throwable e) {}@Overridepublic void onNext(String s) {//接收数据}
      });

      用Subject实现为:

         publishSubject.subscribe(new Observer<String>() {@Overridepublic void onCompleted() {}@Overridepublic void onError(Throwable e) {}@Overridepublic void onNext(String s) {}});

      也许有人会问,不是说Subject也可以作为Observer,不能把Subject当作Observer传入subscribe()中吗?回答是:当然可以!就象这样:

      PublishSubject<String> publishSubject = PublishSubject.create();
      Observable.create(new Observable.OnSubscribe<String>() {@Overridepublic void call(Subscriber<? super String> subscriber) {subscriber.onNext("as Observer"); subscriber.onCompleted();}
      }).subscribe(publishSubject);

      有没有发现问题?publishSubject没有重写onNext()方法啊,在哪接收的数据?这就是前面说的“桥梁”的问题了,尽管把Subject作为Observer传入subscribe(),但接收数据还是要通过Observer来接收,借用Subject来连接Observable和Observer,整体代码如下:

      PublishSubject<String> publishSubject = PublishSubject.create();Observable.create(new Observable.OnSubscribe<String>() {@Overridepublic void call(Subscriber<? super String> subscriber) {subscriber.onNext("as Bridge");subscriber.onCompleted();}}).subscribe(publishSubject);publishSubject.subscribe(new Observer<String>() {@Overridepublic void onCompleted() {}@Overridepublic void onError(Throwable e) {}@Overridepublic void onNext(String s) {LogUtil.log("subject:"+s); //接收到 as Bridge}});

    


    参考:

     深入浅出RxJava就这一篇就够了

    RxJava + Retrofit让Android网络请求简单效率

    RxJava 从入门到放弃再到不离不弃

    Java8之Lambda表达式(Android用法)


    这篇关于带你入门学习Rxjava--上手教程的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



    http://www.chinasem.cn/article/1020430

    相关文章

    HarmonyOS学习(七)——UI(五)常用布局总结

    自适应布局 1.1、线性布局(LinearLayout) 通过线性容器Row和Column实现线性布局。Column容器内的子组件按照垂直方向排列,Row组件中的子组件按照水平方向排列。 属性说明space通过space参数设置主轴上子组件的间距,达到各子组件在排列上的等间距效果alignItems设置子组件在交叉轴上的对齐方式,且在各类尺寸屏幕上表现一致,其中交叉轴为垂直时,取值为Vert

    Ilya-AI分享的他在OpenAI学习到的15个提示工程技巧

    Ilya(不是本人,claude AI)在社交媒体上分享了他在OpenAI学习到的15个Prompt撰写技巧。 以下是详细的内容: 提示精确化:在编写提示时,力求表达清晰准确。清楚地阐述任务需求和概念定义至关重要。例:不用"分析文本",而用"判断这段话的情感倾向:积极、消极还是中性"。 快速迭代:善于快速连续调整提示。熟练的提示工程师能够灵活地进行多轮优化。例:从"总结文章"到"用

    JVM 的类初始化机制

    前言 当你在 Java 程序中new对象时,有没有考虑过 JVM 是如何把静态的字节码(byte code)转化为运行时对象的呢,这个问题看似简单,但清楚的同学相信也不会太多,这篇文章首先介绍 JVM 类初始化的机制,然后给出几个易出错的实例来分析,帮助大家更好理解这个知识点。 JVM 将字节码转化为运行时对象分为三个阶段,分别是:loading 、Linking、initialization

    Spring Security 基于表达式的权限控制

    前言 spring security 3.0已经可以使用spring el表达式来控制授权,允许在表达式中使用复杂的布尔逻辑来控制访问的权限。 常见的表达式 Spring Security可用表达式对象的基类是SecurityExpressionRoot。 表达式描述hasRole([role])用户拥有制定的角色时返回true (Spring security默认会带有ROLE_前缀),去

    浅析Spring Security认证过程

    类图 为了方便理解Spring Security认证流程,特意画了如下的类图,包含相关的核心认证类 概述 核心验证器 AuthenticationManager 该对象提供了认证方法的入口,接收一个Authentiaton对象作为参数; public interface AuthenticationManager {Authentication authenticate(Authenti

    Spring Security--Architecture Overview

    1 核心组件 这一节主要介绍一些在Spring Security中常见且核心的Java类,它们之间的依赖,构建起了整个框架。想要理解整个架构,最起码得对这些类眼熟。 1.1 SecurityContextHolder SecurityContextHolder用于存储安全上下文(security context)的信息。当前操作的用户是谁,该用户是否已经被认证,他拥有哪些角色权限…这些都被保

    Spring Security基于数据库验证流程详解

    Spring Security 校验流程图 相关解释说明(认真看哦) AbstractAuthenticationProcessingFilter 抽象类 /*** 调用 #requiresAuthentication(HttpServletRequest, HttpServletResponse) 决定是否需要进行验证操作。* 如果需要验证,则会调用 #attemptAuthentica

    Spring Security 从入门到进阶系列教程

    Spring Security 入门系列 《保护 Web 应用的安全》 《Spring-Security-入门(一):登录与退出》 《Spring-Security-入门(二):基于数据库验证》 《Spring-Security-入门(三):密码加密》 《Spring-Security-入门(四):自定义-Filter》 《Spring-Security-入门(五):在 Sprin

    Java架构师知识体认识

    源码分析 常用设计模式 Proxy代理模式Factory工厂模式Singleton单例模式Delegate委派模式Strategy策略模式Prototype原型模式Template模板模式 Spring5 beans 接口实例化代理Bean操作 Context Ioc容器设计原理及高级特性Aop设计原理Factorybean与Beanfactory Transaction 声明式事物

    【前端学习】AntV G6-08 深入图形与图形分组、自定义节点、节点动画(下)

    【课程链接】 AntV G6:深入图形与图形分组、自定义节点、节点动画(下)_哔哩哔哩_bilibili 本章十吾老师讲解了一个复杂的自定义节点中,应该怎样去计算和绘制图形,如何给一个图形制作不间断的动画,以及在鼠标事件之后产生动画。(有点难,需要好好理解) <!DOCTYPE html><html><head><meta charset="UTF-8"><title>06