RxJava操作符 —— 创建和变换

背景

从前年开始使用 RxJava,后来换了工作以后虽然项目中一样使用了 RxJava 但是工作的侧重点更多的转向了前端上面,现在重新回到源生上再看 RxJava 发现之前的使用都是非常基础的,RxJava 提供的如此之多的操作符却基本上只用了那么几个,由于了解的比较少自然而然的使用过程中也会限制到使用 RxJava 解决问题的能力,所以参考 RxJava 的官方文档把几乎所有的操作符都实现了一遍,现在再把这些整理一下以供自己后面随时翻阅,也可以为别人提供一下参考。

需要说明的是我使用的是 RxJava 2.0.1,所有的例子都是使用 kotlin 来是实现的,所以很多地方使用了 lambda 表达式还请注意。关于 kotlin 的 lambda 表达式请参考这里

关于 RxJava 的文章网上一搜一大把,有讲原理的有讲使用的不一而足这里推荐几个个人认为比较好的:

以上

创建操作符

下面的操作符用来创建 Observable,示例代码

Create

使用一个函数从头创建一个 Observable

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
Observable.create(object : ObservableOnSubscribe<String> {
// 常规写法
override fun subscribe(e: ObservableEmitter<String>?) {
e?.onNext("aaaa")
e?.onNext("bbb")
e?.onNext("ccc")
e?.onComplete()
}
}).subscribe(object : Observer<String> {
override fun onComplete() {
println("完成")
}
override fun onSubscribe(d: Disposable?) {
}
override fun onNext(value: String?) {
println(value)
}
override fun onError(e: Throwable?) {
}
})
}
// 下面是 lambda 写法
Observable.create<String> { e ->
e.onNext("a")
e.onNext("b")
e.onComplete()
}.subscribe { value -> println(value) }

create 操作符默认不在任何调度器上执行

Defer

只有当订阅者订阅时才创建 observable,并且为每个观察者创建新的 Observable,即使他们看起来是一样的

Empty/Never/Throw

  • Empty
    创建一个不发射数据但正常终止的 Observable
  • Never
    创建一个不发射数据也不终止的 Observable
  • Throw
    创建一个不发射数据并以一个错误终止的 Observable

From

将其他种类的对象和数据类型转换成 Observable

在 RxJava 2 中有一下几种实现

  • fromArray
    将一个数组的每一项发射出去
  • fromIterable
    将一个 Iterable 中的每一项发射出去
  • fromPublisher
    将任意的流转换成可观察的

    1
    2
    Observable.fromPublisher<String> { s -> s.onNext("aaaa") }
    .subscribe { value -> println(value) } // 输出结果 aaaa
  • fromFuture
    将 Future.get() 返回的数据发射出去

  • fromCallable
    将 Callable.call() 返回的数据发射出去

Interval

创建一个在给定间隔时间发送整数数列的 Observable,注意这个默认实现除非手动停止否则不会停止

Just

创建一个发射给定项目的 Observable,最多有10个参数,而且在 2.0 以后版本中不再支持发送 null。

Range

这个操作符的默认实现是发送一个特定范围内递增的整数序列,范围为 n ~ n+m-1

还有另外一个实现

  • intervalRange
    发送一个具有特定时间间隔并且特定范围的序列
    1
    2
    3
    // 发送一个从1 到 100 首项延迟1秒 后面项间隔为2秒的序列
    Observable.intervalRange(1, 100, 1, 2, TimeUnit.SECONDS)
    .subscribe { value -> println(value) }

Repeat

创建一个重复发射原始序列项的 Observable,在 2.0 上有一下三种实现

  • repeat
    无限次重复
  • repeatUntil
    根据参数中的函数如果返回 true 则停止重复发送
  • repeatWhen
    根据指定的 observable 发送和结束来判断原始的 observable 是否重复发送或结束

Timer

创建一个 Observable 在一定的延迟以后才发送消息

变换操作符

下面的这些操作符可以对 observable 发射的数据做各种变换,示例代码

Buffer

周期性的将原始 observable 发射的数据收集在一起然后发射出去。buffer 支持通过时间或者序列长度或者指定的 Observable 来建立不同大小缓冲区。

FlatMap

将一个对原始 observable 发射的每一项数据执行变换操作,并且返回一个 observable,然后 flatMap 将每一项返回的 observables 合并(内部执行了 merge)的结果作为自己的数据序列发射。注意和下面的 map 的区别。flatMap 有以下:

  • flatMap
    基础实现,不能保证合并以后的顺序,原因是内部执行了 merge,它还可以接受一个参数 int 设置 flatMap 原始 observable 到 observables 的最大同时订阅数,以及一个 Boolean 来确保当出现 onError 时不会终止。
  • concatMap
    和 flatMap 的区别就是保证了原始的顺序
  • flatMapIterable
    参数中的函数返回一个 list,flatMap 将这些 list 中的每一项作为一个消息单独发射出去。既是一个典型的一变多
  • concatMapIterable
    和 flatMap 一样,不同之处在于保证了消息的顺序
  • concatMapDelayError
    和 concatMap 一样,不同之处在于将变换的后的 Observable 出现的 onError 事件延迟到所有序列的最后
  • concatMapEager
    一旦有订阅,操作符就会订阅所有的原始 observable,它能够确保原始消息中的第一项首先执行,注意这个操作符在使用背压的时候才有效
  • concatMapEagerDelayError
    和 concatMapEager 一样,并保证了将 onError 延迟到最后再执行
  • switchMap
    当转换后的 observable 没有执行完时有新转换的 observable 出现,就会切换到新的 observable 上。
  • switchMapDelayError
    同上

注意: 如果任何一个通过这个 flatMap 操作产生的单独的 Observable 调用 onError 异常终止了,这个 Observable 自身会立即调用 onError 并终止,带有 DelayError 参数的除外

GroupBy

将原始 Observable 的每一项按照特定条件分组,并放在子 Observable 中发射出去。接收者可以按照设定的 Key 来对不同的子 Observable 做订阅并处理

Map

对原始的 Observable 的每一项做变换

  • cast
    对每一项做一次强制类型转换

Scan

函数中能够获取到序列中上一个被发射出去的项以及当前项,可以根据需要进行处理,并把结果发射出去

Window

将源 Observable 的序列按照长度或者时间或者另外一个指定的 Observable 等进行分组,和 Buffer 有点像,不同之处在于 Window 最后发射出去的是一个 observable 序列,Buffer 发射的是一个 List

结语

以上就是 Observable 的创建以及变换操作符,这里也只是做了大致的说明因为有很多操作符有很多重载,比如 buffer 和 window 等。