RxJava与微服务的实践(二)

想了想,这个系列讲的内容与微服务关联不多,就当蹭了一波微服务的热度吧,哈哈。。。。。。

在上篇文章里我们介绍了RxJava的一些简单运用和线程调度,这篇文章我们讲一些RxJava常用的操作符

map

map 是RxJava中一个简单的操作符,它会接受一个函数作为参数。这个函数会被应用到每个事件上,并将其映射成一个新的事件(使用映射一词,是因为它和转换类似,但其中的细微差别在于它是“创建一个新事件”而不是去“修改”)

1
2
3
4
5
6
7
8
Observable.just(2)
.map(m -> m * m)
.subscribe(o ->
log.debug("receive : {}", o)
);
//输出结果
22:34:18.825 [main] DEBUG cn.dysania.rxjava.Demo9 - receive : 4

这段代码我们发射了2这个事件,通过map操作符,将这个事件映射成了2的平方,

flatmap

当你想对一个事件进行映射时,但是这个映射函数是返回一个观察者的时候,用map操作符是不能满足我们的需求的

1
2
3
4
5
6
7
8
9
10
Observable.just(1)
.map(
integer -> Observable.just(++integer)
)
.subscribe(
even -> log.debug(even.getClass().toString())
);
//输出结果
16:20:20.935 [main] DEBUG cn.dysania.rxjava.Demo1 - class io.reactivex.internal.operators.observable.ObservableJust

可以看到 这里我们并没有接收到我们希望的Integer事件,而是一个ObservableJust事件,这种时候就可以用flatmap

1
2
3
4
5
6
7
8
9
10
Observable.just(1)
.flatMap(
integer -> Observable.just(++integer)
)
.blockingSubscribe(
even -> log.debug(even.getClass().toString())
);
//输出结果
16:32:07.282 [main] DEBUG cn.dysania.rxjava.Demo1 - 2

flatmap方法让你把一个观察者中的每个值都换成另一个观察者,然后把所有的观察者连接起来成为一个流。

filter

filter是一个过滤操作符,该方法接收一个函数,用来过滤不符合条件的事件

1
2
3
4
5
6
7
8
Observable
.fromArray(1, 2, 3, 4, 5, 6)
.filter(num -> num % 2 == 0)
.subscribe(integer -> log.debug("receive : {}", integer));
21:03:11.958 [main] DEBUG cn.dysania.rxjava.Demo1 - receive : 2
21:03:11.965 [main] DEBUG cn.dysania.rxjava.Demo1 - receive : 4
21:03:11.965 [main] DEBUG cn.dysania.rxjava.Demo1 - receive : 6

从上面的例子可以看出filter操作符过滤掉了不符合 num % 2 == 0 条件的事件

看了上面三个RxJava的操作符是不是觉得和Java8的Stream有点像呢, 其实两者都是基于事件驱动设计的API,具有相同的共同性,但是Java 8的Stream只能推送,不能接收监听者的信号,并且没有像RxJava所做的那样真正的对I/O进行优化,如果你对Java8的Stream很熟悉的话,相信理解RxJava也会很快。由于RxJava的操作符比较多,就不一一细讲了。