RxJava与微服务的实践(一)

“微服务”架构是近期软件应用领域非常热门的概念。微服务的相关知识笔者也还在学习中,简单介绍下,就不班门弄斧了。

通常在微服务架构中会有API网关的服务,用来放在微服务们的最前端,用来编排后端微服务的各类API,用来简化客户端实现和微服务应用程序之间的沟通方式。客户端就不再需要知道和关心具体微服务的地址了,只需要关心API网关的地址就可以了.当然 这只是网关的一个小小功能。

一般客户端完成一项业务操作需要多次调用多个服务的接口,由于各种不同环境下,网络传输会有消耗,所以需要我们在API网关层面对API进行聚合,并暴露聚合后的端点,因为一般微服务是部署在同一内网中,所以网络传输消耗会比较小。而如果仅仅只是在网关中同步调用服务接口,并没有提升多少响应时间,仅仅是简化了客户端的实现。

所以! 我们需要 RxJava

RxJava 简介

RxJava is a Java VM implementation of Reactive Extensions: a library for composing asynchronous and event-based programs by using observable sequences.

It extends the observer pattern to support sequences of data/events and adds operators that allow you to compose sequences together declaratively while abstracting away concerns about things like low-level threading, synchronization, thread-safety and concurrent data structures.

第一次看到这句话时,我是懵比的,什么鬼。。。 其实RxJava就是一个异步操作的Java库,当然也支持其他的JVM语言,RxJava扩展于观察者模式,但是异步调用库有java的concurrent包和Guava提供的异步编程框架,为什么要使用RxJava呢?

RxJava的优点

与其他异步框架相比,RxJava的一大优点就是简洁,逻辑清晰,在业务逻辑很复杂时依然能保持程序的可读性。

正题

RxJava的工作原理是基于事件的,看到事件我们可以想象一下消息中间件的发布订阅模式,被观察者发送事件,事件保存在事件存储介质(内部实现为原子类型的数组),订阅者订阅这个事件。我们看一个简单的例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
//创建被观察者,仅仅只发送一个 hello RxJava事件,观察者收到这个事件并打印
Observable.create(
e -> {
log.debug("start emit even");
e.onNext("hello RxJava");
e.onComplete();
}
).subscribe(even ->
log.debug("receive even : {}", even)
);
//程序输出
01:06:30.257 [main] DEBUG cn.dysania.rxjava.Demo - start emit even
01:06:30.264 [main] DEBUG cn.dysania.rxjava.Demo - receive even : hello RxJava

这里调用了Objervable.create方法创建了一个被观察者,然后调用subscribe方法与一个观察者建立关系.

大家如果不理解的话可以想象小时候大家都做过的一道数学题,一根入水管和出水管,还有一个水池。这里的被观察者呢就是入水管,可以源源不断的发送事件,具体代码体现为 e.onNext(), 而水池呢就是上面所说的事件存储介质(队列),观察者呢就是出水管,如果观察到队列中有事件,就消费事件。e.onComplete()和e.onError呢就相当于水管的开关,区别在于前一个是告诉出水管我是正常关闭的,后一个能是通知出水管我是发生异常而关闭的,而。注意,Observable.create 创建的是 Cold Observable,而 just, range, timer 和 from 这些创建的同样是 Cold ObservableCold Observable 如果没有任何观察者,被观察者是不会运行的,直到有Observe来订阅它。这是RxJava的冷启动,相对应的也有热启动。是通过操作符来实现的。

细心的读者可能发现上面的观察者与被观察者是工作在同一线程的。不是说是异步框架么?其实默认情况下Observable是工作在当前线程的,而Observer默认情况下是与Observable在同一线程的。哪怎么实现异步呢? RxJava有两个操作符可以实现线程的调度。先看代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
Observable.create(
e -> {
log.debug("start emit even");
e.onNext("hello RxJava");
e.onComplete();
}
)
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.computation())
.subscribe(even ->
log.debug("receive even : {}", even)
);
Thread.sleep(2000);
//输出结果:
17:45:15.801 [RxCachedThreadScheduler-1] DEBUG cn.dysania.rxjava.Demo1 - start emit even
17:45:15.806 [RxComputationThreadPool-1] DEBUG cn.dysania.rxjava.Demo1 - receive even : hello RxJava

这段代码只比上面的代码多了三行,而观察者与被观察者分别运行在了不同的线程中

1
2
3
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.computation())
Thread.sleep(2000);

简单的来说, subscribeOn() 指定的是被观察者发送事件的线程, observeOn() 指定的是之后代码运行的线程.
Thread.sleep(2000); 默认 Schedulers的线程是以守护线程模式运行的,而这个例子中之后main线程是非守护线程,如果main线程退出了,Schedulers的线程也会退出。

RxJava还可以阻塞当前线程订阅的操作符。blockingSubscribe

1
2
3
4
5
6
7
8
9
10
Observable.create(
e -> {
Thread.sleep(1000);// imitate expensive computation
e.onNext(1);
e.onComplete();
})
.subscribeOn(Schedulers.io())
.blockingSubscribe(o -> {
log.debug("recive : {}", o);
});

blockingSubscribe操作符会阻塞当前线程去订阅,直到被观察者调用onComplete或者onError,当你没有手动调用onError 并且发生异常时,RxJava会自动调用onError发射异常通知观察者。

未完待续—