RxJava操作符 —— 过滤

前言

本系列旨在整理总结 RxJava 2 的操作符,以供大家参考。本文内容 —— 过滤操作符。

其他文章可查阅:

过滤

过滤操作符用来过滤和选择 Obserbale 发射的序列。示例代码

Debounce

在特定时间内如果没有发射数据,就将当前可观察的最后一条数据发射出去。
Debounce

Distinct

去除 Observable 发射出的重复项。

在 rxJava 2 中有两个实现:

  • distinct
    默认实现。
    distinct

    而且有一些重载,比如通过一个函数来自己指定相应的 key,如果消息序列中有 key 相同的项,则保留第一项。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    Observable.just(1, 2, 3, 4, 5, 5, 6, 1)
    .distinct { t ->
    when {
    t < 3 -> return@distinct "a"
    t in 3..4 -> return@distinct "b"
    else -> return@distinct "c"
    }
    }
    .subscribe { value -> println(value) }
  • distinctUntilChanged
    只去除掉连续的重复项。
    distinctUntilChanged

ElementAt

仅让第 n 个消息通过发射,其他的消息均被丢弃掉。
ElementAt

Filter

过滤掉不符合条件的消息,只发送符合条件的消息。
Filter

1
2
3
Observable.just(1, 2, 3, 4, 5)
.filter { t -> t < 3 }
.subscribe { value -> println(value) }

还有一个操作符是 ofType,检查消息的类型,不符合类型的消息将会被丢掉
ofType

1
2
3
4
5
6
7
//不过注意 比如这里发送的 消息并不是 kotlin 的 Int 型 而是 java 的 Integer 类型
Observable.just(1, 5, "", 4, "aa", 5, 6, 1)
.ofType(Class.forName("java.lang.Integer"))
.subscribe { value ->
println(value.javaClass)
println(value)
}

First

只发送可观察到的第一个消息,或者是符合条件的第一个消息,需要注意的是 first 返回的是一个 Single 对象。

在 rxJava 2 中有以下几个实现

  • blockingFirst
    只发射第一个消息,如果为空且没有默认值就抛出一个异常 NoSuchElementException,需要注意的是它的返回值,不再是一个 Observable 或 Single,而是一个泛型。
  • blockingSingle
    如果 observable 发射一个消息以后完成了发射(即消息序列中只有一个消息),就返回这一个消息;如果后面还有消息发射,就抛出异常 NoSuchElementException
    blockingSingle
  • firstElement
    只发射第一个消息,其实和 ElementAt(0) 是一样的。
    firstElement
  • first
    只发射第一个消息,如果为空就发送默认值。
    first
  • single
    原始 observable 只能发射一个或者不发射消息,否则就会报错,如果是空数据,就将 single 中的默认值发射出去。
    single
  • singleElement
    原始 observable 只能发射一个消息,如果为空或者多个消息就抛出异常。
    singleElement

IgnoreElements

IgnoreElements 操作符确保 observable 不发射任何数据,只发射 observable 的终止通知。即它过滤掉 onNext,只会使 onError 或者 onComplete 被触发。
IgnoreElements

Last

只发射 observable 的最后一项数据或者符合条件的最后一项数据。

Last 在 RxJava 2 中有以下几个实现:

  • lastElement
    发射 observable 的最后一项,如果当前 observable 为空则直接执行 onComplete。
    lastElement
  • last
    只发射 observable 的最后一项,如果为空则将 last(defaultItem) 中 defaultItem 发射出去。
    last
  • blockingLast
    返回最后一项数据,在这之前会一直阻塞住线程,(这个操作符使用线程调度好像不起作用,还是会阻塞原有线程),如果不指定默认值,且原始 observable 为空则抛出一个 NoSuchElementException 异常,如果指定了默认值就返回默认值。
    blockingLast
    1
    2
    3
    4
    5
    6
    7
    8
    thread {
    println(System.currentTimeMillis())
    Observable.intervalRange(1, 5, 0, 2, TimeUnit.SECONDS)
    .subscribeOn(Schedulers.io())
    .observeOn(Schedulers.newThread()) //即使这里指定了在新线程,他还是阻塞了原始的线程
    .blockingLast()
    println(System.currentTimeMillis())
    }

Sample

周期性的将原始 observable 中当前周期内发射的最后一条消息发射出去。

在 rxJava 2 中有以下实现:

  • sample
    默认实现,周期性的将原始 observable 中当前周期内发射的最后一条消息发射出去,而且只发射一次。可以看到下图中第三个周期并没有发射数据。
    sample
  • throttleFirst
    周期性的将原始 observable 中的当前周期内的第一条消息发射出去。
    throttleFirst
  • throttleLast
    周期性的将原始 observable 中当前周期内发射的最后一条消息发射出去,其实这个在内部实现就是 sample。
    throttleLast

Skip

跳过 observable 开始的几项。rxJava 的强大之处在于 skip 不但可以在空间维度上跳过一些项,也可以指定在时间维度上跳过一些项。

Skip

SkipLast

跳过 observable 的最后几项,和 skip 一样,不但可以指定空间维度,还可以指定时间维度。
SkipLast

Take

只发送 observable 的前几项,和 skip 一样,不但可以指定空间维度,还可以指定时间维度。
Take

TakeLast

只发送 observable 的前几项,和 skip 一样,不但可以指定空间维度,还可以指定时间维度。
TakeLast

结语

以上就是 rxJava 2 中所有的过滤操作符,下面会接着来说结合操作符。