Rxjava2的介绍与使用(一:详解)

2024-08-26 12:20
文章标签 java 使用 详解 介绍 rx

本文主要是介绍Rxjava2的介绍与使用(一:详解),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

一、介绍

RXJava2是基于观察者模式和链式编程思想的异步编程库,它可以用来优雅地处理异步操作,比如网络请求、数据库查询、文件I/O等操作,减少了回调嵌套,提高了代码的可读性和可维护性。

二、特点

  1. 异步处理:RXJava2的异步处理可以避免主线程被阻塞,提高应用的响应速度。
  2. 链式编程:RXJava2使用链式编程的方式,使得代码更加简洁、易读、易维护。
  3. 统一处理异步任务:RXJava2提供了统一的异步处理方式,可以简化异步任务的管理和维护。
  4. 支持多种操作符:RXJava2内置了大量操作符,可以方便地实现复杂的异步操作。
  5. 跨线程处理:RXJava2可以方便地切换线程,使得异步操作更加灵活。

三、RXJava2创建符介绍

RXJava2是一个基于观察者模式的异步编程库,它提供了一些操作符来帮助开发者简化异步编程。以下是一些常用的RXJava2创建符:

  1. just( ) — 将一个或多个对象转换成发射这个或这些对象的一个Observable
  2. from( ) — 将一个Iterable, 一个Future, 或者一个数组转换成一个Observable
  3. repeat( ) — 创建一个重复发射指定数据或数据序列的Observable
  4. repeatWhen( ) — 创建一个重复发射指定数据或数据序列的Observable,它依赖于另一个Observable发射的数据
  5. create( ) — 使用一个函数从头创建一个Observable
  6. defer( ) — 只有当订阅者订阅才创建Observable;为每个订阅创建一个新的Observable
  7. range( ) — 创建一个发射指定范围的整数序列的Observable
  8. interval( ) — 创建一个按照给定的时间间隔发射整数序列的Observable
  9. timer( ) — 创建一个在给定的延时之后发射单个数据的Observable
  10. empty( ) — 创建一个什么都不做直接通知完成的Observable
  11. error( ) — 创建一个什么都不做直接通知错误的Observable
  12. never( ) — 创建一个不发射任何数据的Observable

通过上面的介绍,我们大概知道Rxjava到底是干什么了。

就是观察者到订阅者,观察者可以指定线程的模式,订阅者也可以指定线程的模式。观察者将观察执行操作时,会将状态同步给订阅者。订阅者正常有四个回调,会根据不同的操作符,做对应的操作。

如果你是kotlin的玩家,可以参考flow的数据流用法,这个和Rxjava类似:Kotlin 流flow、ShareFlow、StateFlow、Channel的解释与使用-CSDN博客

四、高频API使用

接下来,我会将在项目中使用比较频繁和频率比较高的一些api进行介绍

1.create

    private fun create() {//1、创建被观察者Observableval observable = Observable.create(object : ObservableOnSubscribe<String> {override fun subscribe(e: ObservableEmitter<String>?) {e?.onNext("RxJava:e.onNext== 第一次");e?.onNext("RxJava:e.onNext== 第二次");e?.onNext("RxJava:e.onNext== 第三次");
//                e?.onComplete();}})//回调,观察val observer =object : Observer<String> {override fun onSubscribe(d: Disposable?) {bind.textInfo.append("onSubscribe\n")}override fun onNext(t: String?) {bind.textInfo.append(t+"\n")}override fun onError(e: Throwable?) {}override fun onComplete() {bind.textInfo.append("\nonComplete")}}//Scheduler调度者,被观察和消费者,一般消费者如果是更新UI,采用android的主线程observable.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(observer)/*** subscribeOn():      用于指定Observable被观察者subscribe()时所发生的线程,即指定发生事件的线程observeOn():         指定Observer观察者接收&响应事件的线程,即订阅者接收事件的线程* *//**** RxJava中内置了很多线程项供我们选择:Schedulers.io():        代表IO操作的线程,通常用于网络、读写文件等IO密集型的操作。行为模式和new Thread()差不多,只是IO的内部是一个无上限的线程池,可重用空闲的线程,更高效(不要把计算工作放在IO内,可以避免创建不必要的线程);AndroidSchedulers.mainThread():Android的主线程;用于更新UISchedulers.newThread():             总是启用新线程,并在新线程中执行操作;多用于耗时操作Schedulers.computation():           代表CPU计算密集型的操作,即不会被IO等操作限制性能的操作。**** */}

2.empty

    private fun empty(){Observable.empty<String>().subscribe(object :Observer<String>{override fun onSubscribe(d: Disposable?) {}override fun onNext(t: String?) {}override fun onError(e: Throwable?) {}override fun onComplete() {}})/*** 只执行onSubscribe 和 onComplete* ,同理error():执行订阅和error方法* 同理:never()只执行订阅,其他都不执行* */}

3.just

    private fun just(){/*** 这个是执行just里面的内容进行分发,一般数据格式都是一样的,或者object自己定义,都是按顺序分发* */Observable.just(1,2,3,4,5).subscribe(object :Observer<Int>{override fun onSubscribe(d: Disposable?) {}override fun onNext(t: Int?) {bind.textInfo.append("${t}\n")}override fun onError(e: Throwable?) {}override fun onComplete() {}})}

4.fromArray

    private fun fromArray(){/*** 分发一个数组,其实和just差不多,都是可变数组批量分发* */Observable.fromArray("A","b","C",1,2).subscribe(object :Observer<Any>{override fun onSubscribe(d: Disposable?) {}override fun onNext(t: Any?) {bind.textInfo.append("${t}\n")}override fun onError(e: Throwable?) {}override fun onComplete() {}})}

5.defer

  private fun defer(){//订阅才会调用,不订阅默认不调用,所以i的值是20var i=10val obser=  Observable.defer(object : Callable<ObservableSource<Int>>{override fun call(): ObservableSource<Int> {return Observable.just(i)}});i=20obser.subscribe(object :Observer<Int>{override fun onSubscribe(d: Disposable?) {}override fun onNext(t: Int?) {}override fun onError(e: Throwable?) {}override fun onComplete() {}})}

6.timer

延迟多久执行,单次事件,不是循环事件,onSubscribe是事件的开始,onNext是事件的到期执行

    private fun timer(){//延迟多久执行,单次事件,不是循环事件,onSubscribe是事件的开始,onNext是事件的到期执行,也可以叫着结束var formate=SimpleDateFormat("YYYY-MM-DD HH:mm:ss")Observable.timer(3,TimeUnit.SECONDS).observeOn(Schedulers.io()).subscribeOn(Schedulers.io()).subscribe(object :Observer<Long>{override fun onSubscribe(d: Disposable?) {bind.textInfo.append("onSubscribe=")bind.textInfo.append(formate.format(Date(System.currentTimeMillis())))bind.textInfo.append("\n")}override fun onNext(t: Long?) {bind.textInfo.append("onNext=")bind.textInfo.append(formate.format(Date(System.currentTimeMillis())))bind.textInfo.append("\n")}override fun onError(e: Throwable?) {}override fun onComplete() {}})}

7.interval

    var obser:Observable<*>?=nullprivate fun interval(){/**** initialDelay: 表示延迟开始的时间,类型为Long;period:         距离下一次发送事件的时间间隔,类型为Long;unit:              时间单位,有TimeUnit.SECONDS等多种类型;scheduler:    表示调度器,用于指定线程。* */obser= Observable.interval(1L,TimeUnit.SECONDS,Schedulers.io()).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread())(obser as Observable<Long>?)?.subscribe(object :Observer<Long>{override fun onSubscribe(d: Disposable?) {}override fun onNext(t: Long?) {//next数值,一般从0开始,每回调一次,递增一个数值bind.textInfo.append("${t}\n")}override fun onError(e: Throwable?) {}override fun onComplete() {}})}

8.intervalRange

private fun intervalRange(){/*** start:             标识起始值;count:            表示事件执行的次数,不能为负数;initialDelay:     表示延迟开始的时间,类型为Long;如果需要立刻执行,设为0period:           距离下一次发送事件的时间间隔,类型为Long;unit:                时间单位,有TimeUnit.SECONDS等多种类型;scheduler:     表示调度器,用于指定线程。* */Observable.intervalRange(1,20,0,1,TimeUnit.SECONDS).observeOn(AndroidSchedulers.mainThread()).subscribe(object :Observer<Long>{override fun onSubscribe(d: Disposable?) {val value=SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(Date().apply { time=System.currentTimeMillis() })bind.textInfo.append("onSubscribe=${value}\n")}override fun onNext(t: Long?) {val value=SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(Date().apply { time=System.currentTimeMillis() })bind.textInfo.append("${value}\n")}override fun onError(e: Throwable?) {}override fun onComplete() {}})}

9.range

fun range(){//从start开始,一直累加多少个次,这是一直执行上来,不受时间控制,数据遍历,和Observable.rangeLong()用法一样Observable.range(10,10).observeOn(AndroidSchedulers.mainThread()).subscribe(object :Observer<Int>{override fun onSubscribe(d: Disposable?) {}override fun onNext(t: Int?) {bind.textInfo.append("${t}\n")}override fun onError(e: Throwable?) {}override fun onComplete() {}})}

10.flatMap操作符

是对observe的结果进行拦截处理,是单个拦截,最后再进行发送出去

        //操作符flatMap,通过flatmap可以对结果进行过滤和其他的转换,然后再通过Observable.just(0, 1, 2, 3).observeOn(Schedulers.io()).subscribeOn(AndroidSchedulers.mainThread()).flatMap(object : io.reactivex.functions.Function<Int, ObservableSource<Int>> {override fun apply(t: Int?): ObservableSource<Int> {runOnUiThread {viewbind?.textInfo?.apply { append("flow=${t}\n") }}val item = t?.plus(4)return Observable.just(t)}}).subscribe(object : Consumer<Int> {override fun accept(t: Int?) {runOnUiThread {viewbind?.textInfo?.apply { append("${t}\n") }}}})

11.map操作符

直接通过function进行拦截,然后再转发出去。一般在处理数据过滤进程会用到

     Observable.just(0, 1, 2, 3).observeOn(Schedulers.io()).subscribeOn(AndroidSchedulers.mainThread()).map(object : io.reactivex.functions.Function<Int, Int> {override fun apply(t: Int?): Int {runOnUiThread {viewbind?.textInfo?.apply { append("flow=${t}\n") }}val item = t?.plus(4)return item ?: 0}}).subscribe(object : Consumer<Int> {override fun accept(t: Int?) {}})

12.concatMap

参考flatmap

       Observable.just(0, 1, 2, 3).observeOn(Schedulers.io()).subscribeOn(AndroidSchedulers.mainThread()).concatMap(object : io.reactivex.functions.Function<Int, ObservableSource<Int>> {override fun apply(t: Int?): ObservableSource<Int> {runOnUiThread {viewbind?.textInfo?.apply { append("flow=${t}\n") }}val item = t?.plus(4)return Observable.just(t)}}).subscribe(object : Consumer<Int> {override fun accept(t: Int?) {runOnUiThread {viewbind?.textInfo?.apply { append("${t}\n") }}}})

12.zip

zip是将多个observe合并在BiFunction得回调用,最后合并成一个,类似将多个结果合并成一个

        Observable.zip(Observable.just(1), Observable.just(2), object :BiFunction<Int, Int, Int> {override fun apply(t1: Int?, t2: Int?): Int {return (t1 ?: 0) + (t2 ?: 0)}}).observeOn(Schedulers.io()).subscribeOn(AndroidSchedulers.mainThread()).subscribe(object : Consumer<Int> {override fun accept(t: Int?) {runOnUiThread {viewbind?.textInfo?.apply { append("${t}\n") }}}})

13.merge

将多个观察合并在一起提交,最后按顺序发送出去,这个和zip还是有区别的,zip是将多个观察得结果合并发送一个订阅,merge是由多少个观察,就有收到多少个订阅。

        //将多个Observable合并在一起提交,结果按Observable返回Observable.merge(Observable.just(1), Observable.just(2), Observable.just(3)).observeOn(Schedulers.io()).subscribeOn(AndroidSchedulers.mainThread()).subscribe(object : Consumer<Int> {override fun accept(t: Int?) {runOnUiThread {viewbind?.textInfo?.apply { append("${t}\n") }}}})

14.buffer操作符

缓冲,是针对多个观察结果进行处理,如果设置了缓冲数,那么超过的数据会分匹配进行多次收到订阅,结果是一个list数组。

//buffer:每次缓存多少了,一个数组最多返回这么多Observable.fromArray(1,2,3,4,5,6,7,8,9,10, Observable.just(3)).observeOn(Schedulers.io()).subscribeOn(AndroidSchedulers.mainThread()).buffer(3).subscribe{it->runOnUiThread {viewbind?.textInfo?.apply { append("${  it.size}\n") }}}

15.filter操作符

条件过滤,如果满足条件将会接受,否则不会受到订阅通知

        Observable.fromArray(1, 2, 3, 4, 5, 6, 7, 8, 9, 10).observeOn(Schedulers.io()).filter { it ->if (it >3) true else false}.subscribeOn(AndroidSchedulers.mainThread()).subscribe { it ->runOnUiThread {viewbind?.textInfo?.apply { append("${it}\n") }}}

解绑

有订阅肯定就有接触,所以每个订阅都会返回Observable的对象,我们可以通过这个状态来查看是否已订阅上了,或者解绑进行解绑。

任何一个订阅成功都会进行一个回调:

 override fun onSubscribe(d: Disposable?)

拿到Disposable对象,我们就可以判断是否订阅以及取消了

   override fun onSubscribe(d: Disposable?) {d.isDisposed //判断是否订阅成功d.dispose()//取消订阅}

四、总结

通过以上的订阅,我们知道了如何去创建和订阅以及观察,以及常用的合并以及操作符等,有了这些简单的,我们就可以很好的在项目中进行很好的运用。

这篇关于Rxjava2的介绍与使用(一:详解)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1108552

相关文章

Java实现检查多个时间段是否有重合

《Java实现检查多个时间段是否有重合》这篇文章主要为大家详细介绍了如何使用Java实现检查多个时间段是否有重合,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 目录流程概述步骤详解China编程步骤1:定义时间段类步骤2:添加时间段步骤3:检查时间段是否有重合步骤4:输出结果示例代码结语作

Java中String字符串使用避坑指南

《Java中String字符串使用避坑指南》Java中的String字符串是我们日常编程中用得最多的类之一,看似简单的String使用,却隐藏着不少“坑”,如果不注意,可能会导致性能问题、意外的错误容... 目录8个避坑点如下:1. 字符串的不可变性:每次修改都创建新对象2. 使用 == 比较字符串,陷阱满

Java判断多个时间段是否重合的方法小结

《Java判断多个时间段是否重合的方法小结》这篇文章主要为大家详细介绍了Java中判断多个时间段是否重合的方法,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 目录判断多个时间段是否有间隔判断时间段集合是否与某时间段重合判断多个时间段是否有间隔实体类内容public class D

Python使用国内镜像加速pip安装的方法讲解

《Python使用国内镜像加速pip安装的方法讲解》在Python开发中,pip是一个非常重要的工具,用于安装和管理Python的第三方库,然而,在国内使用pip安装依赖时,往往会因为网络问题而导致速... 目录一、pip 工具简介1. 什么是 pip?2. 什么是 -i 参数?二、国内镜像源的选择三、如何

使用C++实现链表元素的反转

《使用C++实现链表元素的反转》反转链表是链表操作中一个经典的问题,也是面试中常见的考题,本文将从思路到实现一步步地讲解如何实现链表的反转,帮助初学者理解这一操作,我们将使用C++代码演示具体实现,同... 目录问题定义思路分析代码实现带头节点的链表代码讲解其他实现方式时间和空间复杂度分析总结问题定义给定

IDEA编译报错“java: 常量字符串过长”的原因及解决方法

《IDEA编译报错“java:常量字符串过长”的原因及解决方法》今天在开发过程中,由于尝试将一个文件的Base64字符串设置为常量,结果导致IDEA编译的时候出现了如下报错java:常量字符串过长,... 目录一、问题描述二、问题原因2.1 理论角度2.2 源码角度三、解决方案解决方案①:StringBui

Linux使用nload监控网络流量的方法

《Linux使用nload监控网络流量的方法》Linux中的nload命令是一个用于实时监控网络流量的工具,它提供了传入和传出流量的可视化表示,帮助用户一目了然地了解网络活动,本文给大家介绍了Linu... 目录简介安装示例用法基础用法指定网络接口限制显示特定流量类型指定刷新率设置流量速率的显示单位监控多个

Java覆盖第三方jar包中的某一个类的实现方法

《Java覆盖第三方jar包中的某一个类的实现方法》在我们日常的开发中,经常需要使用第三方的jar包,有时候我们会发现第三方的jar包中的某一个类有问题,或者我们需要定制化修改其中的逻辑,那么应该如何... 目录一、需求描述二、示例描述三、操作步骤四、验证结果五、实现原理一、需求描述需求描述如下:需要在

Debezium 与 Apache Kafka 的集成方式步骤详解

《Debezium与ApacheKafka的集成方式步骤详解》本文详细介绍了如何将Debezium与ApacheKafka集成,包括集成概述、步骤、注意事项等,通过KafkaConnect,D... 目录一、集成概述二、集成步骤1. 准备 Kafka 环境2. 配置 Kafka Connect3. 安装 D

Java中ArrayList和LinkedList有什么区别举例详解

《Java中ArrayList和LinkedList有什么区别举例详解》:本文主要介绍Java中ArrayList和LinkedList区别的相关资料,包括数据结构特性、核心操作性能、内存与GC影... 目录一、底层数据结构二、核心操作性能对比三、内存与 GC 影响四、扩容机制五、线程安全与并发方案六、工程