带你入门学习Rxjava--上手教程

2024-06-01 07:58
文章标签 java 学习 入门 教程 上手 rx

本文主要是介绍带你入门学习Rxjava--上手教程,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

相信各位看官对RxJava早有耳闻,那么关于什么是RxJava我就不再赘述了,不知道的可自行百度。网上的RxJava的入门门槛稍高,但入门不难,可以跟本文一起来学习

注: 本文针对rxjava 1.x.x ,用例为下,建议初学先从1.x看起

    compile 'io.reactivex:rxjava:1.1.6'
    compile 'io.reactivex:rxandroid:1.2.1'

官方的介绍

1.支持Java6+

2.android 2.3+

3.异步的

4.基于观察者设计模式(Observer、Observable)

5.Subscribe (订阅)


之前不了解设计模式--观察者模式的先来一个简单例子,懂得可以跳过

观察者模式的简单介绍:

观察者模式很好理解,类似于邮件订阅和RSS订阅,当我们浏览一些博客或wiki时,经常会看到RSS图标,就这的意思是,当你订阅了该文章,如果后续有更新,会及时通知你。

其实,简单来讲就一句话:当一个对象变化时,其它依赖该对象的对象都会收到通知,并且随着变化.对象之间是一种一对多的关系。先来看看关系图:

MySubject类就是我们的主对象,Observer1和Observer2是依赖于MySubject的对象,当MySubject变化时,Observer1和Observer2必然变化。AbstractSubject类中定义着需要监控的对象列表,可以对其进行修改:增加或删除被监控对象,且当MySubject变化时,负责通知在列表内存在的对象。我们看实现代码:

Observer两个实现类:

     
  1. public interface Observer {    
  2.    public void update();    
  3. }  


     
  1. public class Observer1 implements Observer {    
  2.    
  3.    @Override    
  4.    public void update() {    
  5.        System.out.println("observer1 has received!");    
  6.    }    
  7. }
     
  1. public class Observer2 implements Observer {    
  2.    
  3.    @Override    
  4.    public void update() {    
  5.        System.out.println("observer2 has received!");    
  6.    }    
  7.    
  8. }  

Subject接口及实现类

     
  1. public interface Subject {    
  2.        
  3.    /*增加观察者*/    
  4.    public void add(Observer observer);    
  5.        
  6.    /*删除观察者*/    
  7.    public void del(Observer observer);    
  8.        
  9.    /*通知所有的观察者*/    
  10.    public void notifyObservers();    
  11.        
  12.    /*自身的操作*/    
  13.    public void operation();    
  14. }    


     
  1. public abstract class AbstractSubject implements Subject {    
  2.    
  3.    private Vector<Observer> vector = new Vector<Observer>();    
  4.    @Override    
  5.    public void add(Observer observer) {    
  6.        vector.add(observer);    
  7.    }    
  8.    
  9.    @Override    
  10.    public void del(Observer observer) {    
  11.        vector.remove(observer);    
  12.    }    
  13.    
  14.    @Override    
  15.    public void notifyObservers() {    
  16.        Enumeration<Observer> enumo = vector.elements();    
  17.        while(enumo.hasMoreElements()){    
  18.            enumo.nextElement().update();    
  19.        }    
  20.    }    
  21. }    
     
  1. public class MySubject extends AbstractSubject {    
  2.    
  3.    @Override    
  4.    public void operation() {    
  5.        System.out.println("update self!");    
  6.        notifyObservers();    
  7.    }    
  8.    
  9. }    


     
  1. public class ObserverTest {    
  2.    
  3.    public static void main(String[] args) {    
  4.        Subject sub = new MySubject();    
  5.        sub.add(new Observer1());    
  6.        sub.add(new Observer2());    
  7.            
  8.        sub.operation();    
  9.    }    
  10.    
  11. }    

输出:
update self!
observer1 has received!
observer2 has received!

好啦 ,正式进入Rxjava

先创建个数据发射源,很好理解,就是发射数据用的:被观察者

   Observable<String> sender = Observable.create(new Observable.OnSubscribe<String>() {@Overridepublic void call(Subscriber<? super String> subscriber) {subscriber.onNext("Hi!");  //发送数据"Hi!"}});

再创建个数据接收源,同理,接收数据用的:观察者

   Observer<String> receiver = new Observer<String>() {@Overridepublic void onCompleted() {//数据接收完成时调用}@Overridepublic void onError(Throwable e) {//发生错误调用}@Overridepublic void onNext(String s) {//正常接收数据调用System.out.print(s);  //将接收到来自sender的问候"Hi!"}};

好了,将发射源和接收源关联起来:

  sender.subscribe(receiver);

这样就形成RxJava一个简单的用法,sender发射"Hi!",将会被receiver的onNext的接收,通过这个例子,也许你会想到“异步”、“观察者模式”,没错,这些都是RxJava所做的事情,并且让他们变得更简单和简洁,而RxJava所有的一切都将围绕这两个点展开,一个是发射数据,一个是接收数据,如果你理解了这点或者你已经知道RxJava就是这么一回事,那么恭喜你,你已经一只脚跨进RxJava的大门了,如果不是也无所谓,请继续往下看...

网上关于RxJava的博文也有很多,但绝大部分文章都有一个共同点,就是侧重于讲RxJava中各种强大的操作符,而忽略了最基本的东西——概念,下面对发射源和接收源做个归类.


基本概念

Observable:发射源,英文释义“可观察的”,在观察者模式中称为“被观察者”或“可观察对象”;

Observer:接收源,英文释义“观察者”,没错!就是观察者模式中的“观察者”,可接收Observable、Subject发射的数据;

Subject:Subject是一个比较特殊的对象,既可充当发射源,也可充当接收源,为避免初学者被混淆,本章将不对Subject做过多的解释和使用,重点放在Observable和Observer上,先把最基本方法的使用学会,后面再学其他的都不是什么问题;

Subscriber:“订阅者”,也是接收源,那它跟Observer有什么区别呢?Subscriber实现了Observer接口,比Observer多了一个最重要的方法unsubscribe( ),用来取消订阅,当你不再想接收数据了,可以调用unsubscribe( )方法停止接收,Observer 在 subscribe() 过程中,最终也会被转换成 Subscriber 对象,一般情况下,建议使用Subscriber作为接收源;

Subscription :Observable调用subscribe( )方法返回的对象,同样有unsubscribe( )方法,可以用来取消订阅事件;

Action0:RxJava中的一个接口,它只有一个无参call()方法,且无返回值,同样还有Action1,Action2...Action9等,Action1封装了含有 1 个参的call()方法,即call(T t),Action2封装了含有 2 个参数的call方法,即call(T1 t1,T2 t2),以此类推;

Func0:与Action0非常相似,也有call()方法,但是它是有返回值的,同样也有Func0、Func1...Func9;


基本用法

  • Observable的创建
    1.使用create( ),最基本的创建方式:

    normalObservable = Observable.create(new Observable.OnSubscribe<String>() {@Overridepublic void call(Subscriber<? super String> subscriber) {subscriber.onNext("create1"); //发射一个"create1"的Stringsubscriber.onNext("create2"); //发射一个"create2"的Stringsubscriber.onCompleted();//发射完成,这种方法需要手动调用onCompleted,才会回调Observer的onCompleted方法}});

    2.使用just( ),将为你创建一个Observable并自动为你调用onNext( )发射数据:

    justObservable = Observable.just("just1","just2");//依次发送"just1""just2"

    3.使用from( ),遍历集合,发送每个item:

    List<String> list = new ArrayList<>();
    list.add("from1");
    list.add("from2");
    list.add("from3");
    fromObservable = Observable.from(list);  //遍历list 每次发送一个
    /** 注意,just()方法也可以传list,但是发送的是整个list对象,而from()发送的是list的一个item** /

    4.使用defer( ),有观察者订阅时才创建Observable,并且为每个观察者创建一个新的Observable:

    deferObservable = Observable.defer(new Func0<Observable<String>>() {@Override//注意此处的call方法没有Subscriber参数public Observable<String> call() {return Observable.just("deferObservable");}});

    5.使用interval( ),创建一个按固定时间间隔发射整数序列的Observable,可用作定时器:

    intervalObservable = Observable.interval(1, TimeUnit.SECONDS);//每隔一秒发送一次

    6.使用range( ),创建一个发射特定整数序列的Observable,第一个参数为起始值,第二个为发送的个数,如果为0则不发送,负数则抛异常:

    rangeObservable = Observable.range(10, 5);//将发送整数10,11,12,13,14

    7.使用timer( ),创建一个Observable,它在一个给定的延迟后发射一个特殊的值,等同于Android中Handler的postDelay( )方法:

    timeObservable = Observable.timer(3, TimeUnit.SECONDS);  //3秒后发射一个值

    8.使用repeat( ),创建一个重复发射特定数据的Observable:

    repeatObservable = Observable.just("repeatObservable").repeat(3);//重复发射3次
  • Observer的创建

    mObserver = new Observer<String>() {@Overridepublic void onCompleted() {LogUtil.log("onCompleted");}@Overridepublic void onError(Throwable e) {}@Overridepublic void onNext(String s) {LogUtil.log(s);}};

    ok,有了Observable和Obsever,我们就可以随便玩了,任取一个已创建的Observable和Observer关联上,即形成一个RxJava的例子,如:

    justObservable.subscribe(mObserver);

    mObserver的onNext方法将会依次收到来自justObservable的数据"just1""just2",另外,如果你不在意数据是否接收完或者是否出现错误,即不需要Observer的onCompleted()onError()方法,可使用Action1subscribe()支持将Action1作为参数传入,RxJava将会调用它的call方法来接收数据,代码如下:

    justObservable.subscribe(new Action1<String>() {@Overridepublic void call(String s) {LogUtil.log(s);}});

    以上就是RxJava最简单的用法。

    RxJava也以代码的简洁深受广大用户喜爱,简洁不能理解为代码量少,而是随着逻辑的复杂,需求的更改,代码可依然能保持极强的阅读性


RxJava的简单操作符

Scheduler

在讲常用操作符前,先看看Scheduler这个东西,名之为调度器,正因为有这个东西,让RxJava可以从主线程和子线程之间轻松切换,各个Scheduler的具体使用效果看以下表解释:

调度器类型 用途
Schedulers.computation( ) 用于计算任务,如事件循环或和回调处理,不要用于IO操作(IO操作请使用Schedulers.io());默认线程数等于处理器的数量
Schedulers.from(executor) 使用指定的Executor作为调度器
Schedulers.immediate( ) 在当前线程立即开始执行任务
Schedulers.io( ) 用于IO密集型任务,如异步阻塞IO操作,这个调度器的线程池会根据需要增长;对于普通的计算任务,请使用Schedulers.computation();Schedulers.io( )默认是一个CachedThreadScheduler,很像一个有线程缓存的新线程调度器
Schedulers.newThread( ) 为每个任务创建一个新线程
Schedulers.trampoline( ) 当其它排队的任务完成后,在当前线程排队开始执行
AndroidSchedulers.mainThread() 此调度器为RxAndroid特有,顾名思义,运行在Android UI线程上

具体如何使用呢,比如从数据库读取数据更新到UI上,假设数据量很大,直接从主线程读取数据,会造成UI卡顿,以前我们常用AnsyTask或者Handler去处理避免出现这类问题,个人认为手写个AnsyTask还是挺麻烦的,但用RxJava就简单多了,例如:

Observable.create(new Observable.OnSubscribe<Data>() {@Overridepublic void call(Subscriber<? super Data> subscriber) {Data data = getData();//从数据库获取subscriber.onNext(data);subscriber.onCompleted();}}).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(new Action1<Data>() {@Overridepublic void call(Data data) {//更新ui}});

简单粗暴的解释一下,subscribeOn( )决定了发射数据在哪个调度器上执行,observeOn(AndroidSchedulers.mainThread())则指定数据接收发生在UI线程,简直不要太方便。


常用操作符

  • Map:最常用且最实用的操作符之一,将对象转换成另一个对象发射出去,应用范围非常广,如数据的转换,数据的预处理等。
    例一:数据类型转换,改变最终的接收的数据类型。假设传入本地图片路径,根据路径获取图片的Bitmap。

    Observable.just(filePath).map(new Func1<String, Bitmap>() {@Overridepublic Bitmap call(String path) {return getBitmapByPath(path);}}).subscribe(new Action1<Bitmap>() {@Overridepublic void call(Bitmap bitmap) {//获取到bitmap,显示
    }});

    例二:对数据进行预处理,最后得到理想型数据。实际开发过程中,从后台接口获取到的数据也许不符合我们想要的,这时候可以在获取过程中对得到的数据进行预处理(结合Retrofit)。

    Observable.just("12345678").map(new Func1<String, String>() {@Overridepublic String call(String s) {return s.substring(0,4);//只要前四位}})
    .subscribe(new Action1<String>() {@Overridepublic void call(String s) {Log.i("mytag",s);}});

    先说明一下,为了方便理解,所以写的例子都比较简单,不要以为明明可以简单用if-else解决的事,没必要用这种方式去写,当你真正将这些操作符使用到数据处理中去的时候,你就会发现有多方便。

  • FlatMap:和Map很像但又有所区别,Map只是转换发射的数据类型,而FlatMap可以将原始Observable转换成另一个Observable。还是举例说明吧。假设要打印全国所有学校的名称,可以直接用Map:
    为了更清晰一点,先贴一下School类:

    public class School {private String name;private List<Student> studentList;public List<Student> getStudentList() {return studentList;}public void setStudentList(List<Student> studentList) {this.studentList = studentList;}public String getName() {return name;}public void setName(String name) {this.name = name;}public static class Student{private String name;public String getName() {return name;}public void setName(String name) {this.name = name;}}
    }

    接着用Map打印学校名称:

    List<School> schoolList = new ArrayList<>();
    Observable.from(schoolList).map(new Func1<School, String>() {@Overridepublic String call(School school) {return school.getName();}}).subscribe(new Action1<String>() {@Overridepublic void call(String schoolName) {Log.i("mytag",schoolName);}});

    再进一步,打印学校所有学生的姓名,先考虑用Map实现,将所有School对象直接转成Student:

    Observable.from(schoolList).map(new Func1<School, School.Student>() {@Overridepublic School.Student call(School school) {return school.getStudentList();
      }}).subscribe(new Action1<School.Student>() {@Overridepublic void call(School.Student student) {Log.i("mytag",student.getName());}});

    看似可行,但事实上,这是一段错误的代码,细心的人就会发现错误的地方

    @Override
    public School.Student call(School school) {return school.getStudentList();  //错误,Student 是一个对象,返回的却是一个list
    }

    所以用Map是无法实现直接打印学校的所有学生名字的,因为Map是一对一的关系,无法将单一的School对象转变成多个Student。前面说到,FlatMap可以改变原始Observable变成另外一个Observable,如果我们能利用from()操作符把school.getStudentList()变成另外一个Observable问题不就迎刃而解了吗,这时候就该FlatMap上场了,来看看它是怎么实现的:

    Observable.from(schoolList).flatMap(new Func1<School, Observable<School.Student>>() {@Overridepublic Observable<School.Student> call(School school) {return Observable.from(school.getStudentList()); //关键,将学生列表以另外一个Observable发射出去}}).subscribe(new Action1<School.Student>() {@Overridepublic void call(School.Student student) {Log.i("mytag",student.getName());}});

    Map和FlatMap在我看来就像孪生兄弟一样,非常实用,实际开发中也我也经常使用,个人觉得要想上手RxJava,掌握这两个操作符必不可少。

  • Buffer:缓存,可以设置缓存大小,缓存满后,以list的方式将数据发送出去;例:

    Observable.just(1,2,3).buffer(2).subscribe(new Action1<List<Integer>>() {@Overridepublic void call(List<Integer> list) {Log.i("mytag","size:"+list.size());}});

    运行打印结果如下:

    11-02 20:49:58.370 23392-23392/? I/mytag: size:2
    11-02 20:49:58.370 23392-23392/? I/mytag: size:1

    在开发当中,个人经常将Buffer和Map一起使用,常发生在从后台取完数据,对一个List中的数据进行预处理后,再用Buffer缓存后一起发送,保证最后数据接收还是一个List,如下:

    List<School> schoolList = new ArrayList<>();
    Observable.from(schoolList).map(new Func1<School, School>() {@Overridepublic School call(School school) {school.setName("NB大学");  //将所有学校改名return school;}}).buffer(schoolList.size())  //缓存起来,最后一起发送
    .subscribe(new Action1<List<School>>() {@Overridepublic void call(List<School> schools) {   
    }});
  • Take:发射前n项数据,还是用上面的例子,假设不要改所有学校的名称了,就改前四个学校的名称:
    Observable.from(schoolList).take(4).map(new Func1<School, School>() {@Overridepublic School call(School school) {school.setName("NB大学");
          return school;
      }}).buffer(4).subscribe(new Action1<List<School>>() {@Overridepublic void call(List<School> schools) {}});
  • Distinct:去掉重复的项,比较好理解:
    Observable.just(1, 2, 1, 1, 2, 3).distinct().subscribe(new Action1<Integer>() {@Overridepublic void call(Integer item) {System.out.println("Next: " + item);}});
    输出
    Next: 1
    Next: 2
    Next: 3
  • Filter:过滤,通过谓词判断的项才会被发射,例如,发射小于4的数据:
    Observable.just(1, 2, 3, 4, 5).filter(new Func1<Integer, Boolean>() {@Overridepublic Boolean call(Integer item) {return( item < 4 );}}).subscribe(new Action1<Integer>() {@Overridepublic void call(Integer item) {System.out.println("Next: " + item);}});
    输出:
    Next: 1
    Next: 2
    Next: 3


关于Subject

关于Subject,官方文档的解释是这样的:Subject可以看成是一个桥梁或者代理,在某些ReactiveX实现中(如RxJava),它同时充当了Observer和Observable的角色。因为它是一个Observer,它可以订阅一个或多个Observable;又因为它是一个Observable,它可以转发它收到(Observe)的数据,也可以发射新的数据。从官方解释中,我提取出三个要点:

  1. 它可以充当Observable;
  2. 它可以充当Observer;
  3. 它是Observable和Observer之间的桥梁;

接下来对这三个要点解释一下,但在解释之前,要先介绍一下Subject的种类, Subject是一个抽象类,不能通过new来实例化Subject,所以Subject有四个实现类,分别为AsyncSubjectBehaviorSubjectPublishSubjectReplaySubject,每个实现类都有特定的“技能”,下面结合代码来介绍一下它们各自的“技能”。注意,所有的实现类都由create()方法实例化,无需new,所有的实现类调用onCompleted()onError(),它的Observer将不再接收数据;


Subject的分类解析

  • AsyncSubject
    Observer会接收AsyncSubject的`onComplete()之前的最后一个数据,如果因异常而终止,AsyncSubject将不会释放任何数据,但是会向Observer传递一个异常通知。示例代码如下:

          AsyncSubject<String> asyncSubject = AsyncSubject.create();asyncSubject.onNext("asyncSubject1");asyncSubject.onNext("asyncSubject2");asyncSubject.onNext("asyncSubject3");  asyncSubject.onCompleted();asyncSubject.subscribe(new Observer<String>() {@Overridepublic void onCompleted() {LogUtil.log("asyncSubject onCompleted");  //输出 asyncSubject onCompleted}@Overridepublic void onError(Throwable e) {LogUtil.log("asyncSubject onError");  //不输出(异常才会输出)}@Overridepublic void onNext(String s) {LogUtil.log("asyncSubject:"+s);  //输出asyncSubject:asyncSubject3}});

    以上代码,Observer只会接收asyncSubject的onCompleted()被调用前的最后一个数据,即“asyncSubject3”,如果不调用onCompleted(),Subscriber将不接收任何数据。

  • BehaviorSubject
    Observer会接收到BehaviorSubject被订阅之前的最后一个数据,再接收其他发射过来的数据,如果BehaviorSubject被订阅之前没有发送任何数据,则会发送一个默认数据。(注意跟AsyncSubject的区别,AsyncSubject要手动调用onCompleted(),且它的Observer会接收到onCompleted()前发送的最后一个数据,之后不会再接收数据,而BehaviorSubject不需手动调用onCompleted(),它的Observer接收的是BehaviorSubject被订阅前发送的最后一个数据,两个的分界点不一样,且之后还会继续接收数据。)示例代码如下:

      BehaviorSubject<String> behaviorSubject = BehaviorSubject.create("default");behaviorSubject.onNext("behaviorSubject1");behaviorSubject.onNext("behaviorSubject2");behaviorSubject.subscribe(new Observer<String>() {@Overridepublic void onCompleted() {LogUtil.log("behaviorSubject:complete");}@Overridepublic void onError(Throwable e) {LogUtil.log("behaviorSubject:error");}@Overridepublic void onNext(String s) {LogUtil.log("behaviorSubject:"+s);}});behaviorSubject.onNext("behaviorSubject3");behaviorSubject.onNext("behaviorSubject4");

    以上代码,Observer会接收到behaviorSubject2、behaviorSubject3、behaviorSubject4,如果在behaviorSubject.subscribe()之前不发送behaviorSubject1、behaviorSubject2,则Observer会先接收到default,再接收behaviorSubject3、behaviorSubject4。

  • PublishSubject
    PublishSubject比较容易理解,相对比其他Subject常用,它的Observer只会接收到PublishSubject被订阅之后发送的数据。示例代码如下:

      PublishSubject<String> publishSubject = PublishSubject.create();publishSubject.onNext("publishSubject1");publishSubject.onNext("publishSubject2");publishSubject.subscribe(new Observer<String>() {@Overridepublic void onCompleted() {}@Overridepublic void onError(Throwable e) {}@Overridepublic void onNext(String s) {LogUtil.log("publishSubject observer1:"+s);}});publishSubject.onNext("publishSubject3");publishSubject.onNext("publishSubject4");

    以上代码,Observer只会接收到"behaviorSubject3"、"behaviorSubject4"。

  • ReplaySubject
    ReplaySubject会发射所有数据给观察者,无论它们是何时订阅的。也有其它版本的ReplaySubject,在重放缓存增长到一定大小的时候或过了一段时间后会丢弃旧的数据。示例代码如下:
    ReplaySubject<String>replaySubject = ReplaySubject.create(); //创建默认初始缓存容量大小为16的ReplaySubject,当数据条目超过16会重新分配内存空间,使用这种方式,不论ReplaySubject何时被订阅,Observer都能接收到数据
    //replaySubject = ReplaySubject.create(100);//创建指定初始缓存容量大小为100的ReplaySubject
    //replaySubject = ReplaySubject.createWithSize(2);//只缓存订阅前最后发送的2条数据 
    //replaySubject=ReplaySubject.createWithTime(1,TimeUnit.SECONDS,Schedulers.computation());  //replaySubject被订阅前的前1秒内发送的数据才能被接收     
    replaySubject.onNext("replaySubject:pre1");
    replaySubject.onNext("replaySubject:pre2");
    replaySubject.onNext("replaySubject:pre3");
    replaySubject.subscribe(new Action1<String>() {@Overridepublic void call(String s) {LogUtil.log("replaySubject:" + s);}});
    replaySubject.onNext("replaySubject:after1");
    replaySubject.onNext("replaySubject:after2");
    如果你把 Subject 当作一个 Subscriber 使用,不要从多个线程中调用它的onNext方法(包括其它的on系列方法),这可能导致同时(非顺序)调用,这会违反Observable协议,给Subject的结果增加了不确定性。要避免此类问题,官方提出了“串行化”,你可以将 Subject 转换为一个 SerializedSubject ,类似于这样:
    SerializedSubject<String, Integer> ser = new SerializedSubject(publishSubject);

    为什么说Subject既可充当Observable,又可充当Observer,是它们两个之间的桥梁呢。首先,从理论上讲,Subject继承了Observable,又实现了Observer接口,所以说它既是Observable又是Observer,完全合理。从实际应用上讲,Subject也能实现Observable和Observer相同的功能

    • 创建Observable并发射数据:

        Observable.create(new Observable.OnSubscribe<String>() {@Overridepublic void call(Subscriber<? super String> subscriber) {subscriber.onNext("I'm Observable");subscriber.onCompleted();}});

      用Subject实现为:

      PublishSubject<String> publishSubject = PublishSubject.create();
      publishSubject.onNext("as Observable");
      publishSubject.onCompleted();
    • 创建Observer订阅Observable并接收数据:

      mObservable.subscribe(new Observer<String>() {@Overridepublic void onCompleted() {}@Overridepublic void onError(Throwable e) {}@Overridepublic void onNext(String s) {//接收数据}
      });

      用Subject实现为:

         publishSubject.subscribe(new Observer<String>() {@Overridepublic void onCompleted() {}@Overridepublic void onError(Throwable e) {}@Overridepublic void onNext(String s) {}});

      也许有人会问,不是说Subject也可以作为Observer,不能把Subject当作Observer传入subscribe()中吗?回答是:当然可以!就象这样:

      PublishSubject<String> publishSubject = PublishSubject.create();
      Observable.create(new Observable.OnSubscribe<String>() {@Overridepublic void call(Subscriber<? super String> subscriber) {subscriber.onNext("as Observer"); subscriber.onCompleted();}
      }).subscribe(publishSubject);

      有没有发现问题?publishSubject没有重写onNext()方法啊,在哪接收的数据?这就是前面说的“桥梁”的问题了,尽管把Subject作为Observer传入subscribe(),但接收数据还是要通过Observer来接收,借用Subject来连接Observable和Observer,整体代码如下:

      PublishSubject<String> publishSubject = PublishSubject.create();Observable.create(new Observable.OnSubscribe<String>() {@Overridepublic void call(Subscriber<? super String> subscriber) {subscriber.onNext("as Bridge");subscriber.onCompleted();}}).subscribe(publishSubject);publishSubject.subscribe(new Observer<String>() {@Overridepublic void onCompleted() {}@Overridepublic void onError(Throwable e) {}@Overridepublic void onNext(String s) {LogUtil.log("subject:"+s); //接收到 as Bridge}});

    


    参考:

     深入浅出RxJava就这一篇就够了

    RxJava + Retrofit让Android网络请求简单效率

    RxJava 从入门到放弃再到不离不弃

    Java8之Lambda表达式(Android用法)


    这篇关于带你入门学习Rxjava--上手教程的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



    http://www.chinasem.cn/article/1020430

    相关文章

    51单片机学习记录———定时器

    文章目录 前言一、定时器介绍二、STC89C52定时器资源三、定时器框图四、定时器模式五、定时器相关寄存器六、定时器练习 前言 一个学习嵌入式的小白~ 有问题评论区或私信指出~ 提示:以下是本篇文章正文内容,下面案例可供参考 一、定时器介绍 定时器介绍:51单片机的定时器属于单片机的内部资源,其电路的连接和运转均在单片机内部完成。 定时器作用: 1.用于计数系统,可

    问题:第一次世界大战的起止时间是 #其他#学习方法#微信

    问题:第一次世界大战的起止时间是 A.1913 ~1918 年 B.1913 ~1918 年 C.1914 ~1918 年 D.1914 ~1919 年 参考答案如图所示

    Java五子棋之坐标校正

    上篇针对了Java项目中的解构思维,在这篇内容中我们不妨从整体项目中拆解拿出一个非常重要的五子棋逻辑实现:坐标校正,我们如何使漫无目的鼠标点击变得有序化和可控化呢? 目录 一、从鼠标监听到获取坐标 1.MouseListener和MouseAdapter 2.mousePressed方法 二、坐标校正的具体实现方法 1.关于fillOval方法 2.坐标获取 3.坐标转换 4.坐

    Spring Cloud:构建分布式系统的利器

    引言 在当今的云计算和微服务架构时代,构建高效、可靠的分布式系统成为软件开发的重要任务。Spring Cloud 提供了一套完整的解决方案,帮助开发者快速构建分布式系统中的一些常见模式(例如配置管理、服务发现、断路器等)。本文将探讨 Spring Cloud 的定义、核心组件、应用场景以及未来的发展趋势。 什么是 Spring Cloud Spring Cloud 是一个基于 Spring

    [word] word设置上标快捷键 #学习方法#其他#媒体

    word设置上标快捷键 办公中,少不了使用word,这个是大家必备的软件,今天给大家分享word设置上标快捷键,希望在办公中能帮到您! 1、添加上标 在录入一些公式,或者是化学产品时,需要添加上标内容,按下快捷键Ctrl+shift++就能将需要的内容设置为上标符号。 word设置上标快捷键的方法就是以上内容了,需要的小伙伴都可以试一试呢!

    AssetBundle学习笔记

    AssetBundle是unity自定义的资源格式,通过调用引擎的资源打包接口对资源进行打包成.assetbundle格式的资源包。本文介绍了AssetBundle的生成,使用,加载,卸载以及Unity资源更新的一个基本步骤。 目录 1.定义: 2.AssetBundle的生成: 1)设置AssetBundle包的属性——通过编辑器界面 补充:分组策略 2)调用引擎接口API

    Javascript高级程序设计(第四版)--学习记录之变量、内存

    原始值与引用值 原始值:简单的数据即基础数据类型,按值访问。 引用值:由多个值构成的对象即复杂数据类型,按引用访问。 动态属性 对于引用值而言,可以随时添加、修改和删除其属性和方法。 let person = new Object();person.name = 'Jason';person.age = 42;console.log(person.name,person.age);//'J

    java8的新特性之一(Java Lambda表达式)

    1:Java8的新特性 Lambda 表达式: 允许以更简洁的方式表示匿名函数(或称为闭包)。可以将Lambda表达式作为参数传递给方法或赋值给函数式接口类型的变量。 Stream API: 提供了一种处理集合数据的流式处理方式,支持函数式编程风格。 允许以声明性方式处理数据集合(如List、Set等)。提供了一系列操作,如map、filter、reduce等,以支持复杂的查询和转

    大学湖北中医药大学法医学试题及答案,分享几个实用搜题和学习工具 #微信#学习方法#职场发展

    今天分享拥有拍照搜题、文字搜题、语音搜题、多重搜题等搜题模式,可以快速查找问题解析,加深对题目答案的理解。 1.快练题 这是一个网站 找题的网站海量题库,在线搜题,快速刷题~为您提供百万优质题库,直接搜索题库名称,支持多种刷题模式:顺序练习、语音听题、本地搜题、顺序阅读、模拟考试、组卷考试、赶快下载吧! 2.彩虹搜题 这是个老公众号了 支持手写输入,截图搜题,详细步骤,解题必备

    C++必修:模版的入门到实践

    ✨✨ 欢迎大家来到贝蒂大讲堂✨✨ 🎈🎈养成好习惯,先赞后看哦~🎈🎈 所属专栏:C++学习 贝蒂的主页:Betty’s blog 1. 泛型编程 首先让我们来思考一个问题,如何实现一个交换函数? void swap(int& x, int& y){int tmp = x;x = y;y = tmp;} 相信大家很快就能写出上面这段代码,但是如果要求这个交换函数支持字符型