背景
从前年开始使用 RxJava,后来换了工作以后虽然项目中一样使用了 RxJava 但是工作的侧重点更多的转向了前端上面,现在重新回到源生上再看 RxJava 发现之前的使用都是非常基础的,RxJava 提供的如此之多的操作符却基本上只用了那么几个,由于了解的比较少自然而然的使用过程中也会限制到使用 RxJava 解决问题的能力,所以参考 RxJava 的官方文档把几乎所有的操作符都实现了一遍,现在再把这些整理一下以供自己后面随时翻阅,也可以为别人提供一下参考。
需要说明的是我使用的是 RxJava 2.0.1,所有的例子都是使用 kotlin 来是实现的,所以很多地方使用了 lambda 表达式还请注意。关于 kotlin 的 lambda 表达式请参考这里
关于 RxJava 的文章网上一搜一大把,有讲原理的有讲使用的不一而足这里推荐几个个人认为比较好的:
以上
创建操作符
下面的操作符用来创建 Observable,示例代码
Create
使用一个函数从头创建一个 Observable
|
|
create 操作符默认不在任何调度器上执行
Defer
只有当订阅者订阅时才创建 observable,并且为每个观察者创建新的 Observable,即使他们看起来是一样的
Empty/Never/Throw
Empty
创建一个不发射数据但正常终止的 ObservableNever
创建一个不发射数据也不终止的 ObservableThrow
创建一个不发射数据并以一个错误终止的 Observable
From
将其他种类的对象和数据类型转换成 Observable
在 RxJava 2 中有一下几种实现
fromArray
将一个数组的每一项发射出去fromIterable
将一个 Iterable 中的每一项发射出去fromPublisher
将任意的流转换成可观察的12Observable.fromPublisher<String> { s -> s.onNext("aaaa") }.subscribe { value -> println(value) } // 输出结果 aaaafromFuture
将 Future.get() 返回的数据发射出去fromCallable
将 Callable.call() 返回的数据发射出去
Interval
创建一个在给定间隔时间发送整数数列的 Observable,注意这个默认实现除非手动停止否则不会停止。
Just
创建一个发射给定项目的 Observable,最多有10个参数,而且在 2.0 以后版本中不再支持发送 null。
Range
这个操作符的默认实现是发送一个特定范围内递增的整数序列,范围为 n ~ n+m-1
还有另外一个实现
intervalRange
发送一个具有特定时间间隔并且特定范围的序列123// 发送一个从1 到 100 首项延迟1秒 后面项间隔为2秒的序列Observable.intervalRange(1, 100, 1, 2, TimeUnit.SECONDS).subscribe { value -> println(value) }
Repeat
创建一个重复发射原始序列项的 Observable,在 2.0 上有一下三种实现
repeat
无限次重复repeatUntil
根据参数中的函数如果返回 true 则停止重复发送repeatWhen
根据指定的 observable 发送和结束来判断原始的 observable 是否重复发送或结束
Timer
创建一个 Observable 在一定的延迟以后才发送消息
变换操作符
下面的这些操作符可以对 observable 发射的数据做各种变换,示例代码
Buffer
周期性的将原始 observable 发射的数据收集在一起然后发射出去。buffer 支持通过时间或者序列长度或者指定的 Observable 来建立不同大小缓冲区。
FlatMap
将一个对原始 observable 发射的每一项数据执行变换操作,并且返回一个 observable,然后 flatMap 将每一项返回的 observables 合并(内部执行了 merge)的结果作为自己的数据序列发射。注意和下面的 map 的区别。flatMap 有以下:
flatMap
基础实现,不能保证合并以后的顺序,原因是内部执行了 merge,它还可以接受一个参数 int 设置 flatMap 原始 observable 到 observables 的最大同时订阅数,以及一个 Boolean 来确保当出现 onError 时不会终止。concatMap
和 flatMap 的区别就是保证了原始的顺序flatMapIterable
参数中的函数返回一个 list,flatMap 将这些 list 中的每一项作为一个消息单独发射出去。既是一个典型的一变多concatMapIterable
和 flatMap 一样,不同之处在于保证了消息的顺序concatMapDelayError
和 concatMap 一样,不同之处在于将变换的后的 Observable 出现的 onError 事件延迟到所有序列的最后concatMapEager
一旦有订阅,操作符就会订阅所有的原始 observable,它能够确保原始消息中的第一项首先执行,注意这个操作符在使用背压的时候才有效concatMapEagerDelayError
和 concatMapEager 一样,并保证了将 onError 延迟到最后再执行switchMap
当转换后的 observable 没有执行完时有新转换的 observable 出现,就会切换到新的 observable 上。switchMapDelayError
同上
注意: 如果任何一个通过这个 flatMap 操作产生的单独的 Observable 调用 onError 异常终止了,这个 Observable 自身会立即调用 onError 并终止,带有 DelayError 参数的除外
GroupBy
将原始 Observable 的每一项按照特定条件分组,并放在子 Observable 中发射出去。接收者可以按照设定的 Key 来对不同的子 Observable 做订阅并处理
Map
对原始的 Observable 的每一项做变换
cast
对每一项做一次强制类型转换
Scan
函数中能够获取到序列中上一个被发射出去的项以及当前项,可以根据需要进行处理,并把结果发射出去
Window
将源 Observable 的序列按照长度或者时间或者另外一个指定的 Observable 等进行分组,和 Buffer 有点像,不同之处在于 Window 最后发射出去的是一个 observable 序列,Buffer 发射的是一个 List
结语
以上就是 Observable 的创建以及变换操作符,这里也只是做了大致的说明因为有很多操作符有很多重载,比如 buffer 和 window 等。