RxJS进阶——关于流的理解和应用

RxJS是微软公司推出的响应式编程的JavaScript库。
对于它的学习,最开始我的理解是把它当成是 能优雅地解决异步问题的lodash
随着学习的深入,发现它采用了订阅者模式,其中也带有纯函数的思想。
直到在使用了RxJS 6之后才了解其少有人意识到的另一面——流。

什么是流?这里我们不用专业术语来解释,用生活中大家熟悉的的例子来类比,比如“河流”。

河流有什么特点?
至少有两个特点:

有朝向。
水往低处流,河流虽然可能会蜿蜒盘旋,但是朝向是固定的,比如我国的长江和黄河就都是由西往东流。
在RxJS中数据的流向也是固定的,就是从发送者到订阅者。基本都如下面这种形式:

1
2
3
from(Promise.resolve(1)) // 流的源头
......
.subscribe(x => console.log(x)); // 流的终点

有分支。

大的河流一般有干流和支流,大大小小的支流汇入干流。

RxJS中的数据则可以通过操作符将数据流进行聚合或拆分。

1
2
3
4
5
6
7
8
9
10
11
// 流的聚合
mergeMap(from(Promise.resolve(1)), from(Promise.resolve(2)))
......
.subscribe(x => console.log(x))

// 流的拆分
const obs$ = from(Promise.resolve(1)
obs$.subscribe(x => console.log(x))
obs$.subscribe(x => {
// do sth
})/

RxJS 6 相对于 RxJS 5(这里指5.5以下的版本,因为pipe函数在RxJS 5.5中作为新特性已被引入。) 来说不仅修改了一部分操作符的名称,同时做了一个较大的改动,引入了管道(pipe)。这个改动到底有多大?

首先是写法上的变化。
RxJS 5的这种操作符的调用方式有没有一种似曾相识的感觉?
是的,它看上去很像JQuery那种意大利面条式的链式调用
而RxJS 6和Gulp的写法有些像,想想Gulp是什么?基于流的构建工具!

1
2
3
4
5
6
7
8
9
10
11
// RxJS 5 伪代码
myObservable
.map(data => data * 2)
.switchMap(...)
.throttle(...))
.subscribe(...);

// RxJS 6 伪代码
myObservable
.pipe(map(data => data * 2), switchMap(...), throttle(...))
.subscribe(...);

这种写法上的变化就带来了用法上的变化,以前的固定“河流”可以通过“管道”(pipe)来控制形成灵活的“水流”。

下面举个例子来更加形象地阐述加入管道之后流的灵活性。

现在有一个这样的业务场景:
点击按钮之后发送一个请求,让服务端开始执行任务,然后轮询发送请求查询任务执行状态,根据不同状态进行不同操作。有3种状态”controlling”——继续轮询, “stop”——停止轮询,”finish”——停止轮询,并进行后续操作。

不考虑判断条件,伪代码是下面这样子:

1
2
3
4
5
6
7
8
// 开始任务
start$().pipe(
switchMap(() => interval(1000)), // 开始轮询
switchMap(() => getStatus$()), // 查询状态
)
.subscribe(x => {
// 后续操作
})

这段代码有一个问题没有解决,根据状态进行相应操作。
先来看看这3种状态对应的操作。

  • controlling。继续轮询很好处理,不进行任何操作即可。
  • stop。停止轮询的操作符有3个:take,需要固定次数,这个次数没法预先确定。takeUntil,需要创建一个额外的subject来进行停止,应该可以实现,不过代码量比较大。takeWhile,只需简单的逻辑判断即可,比较合适。
  • finish。问题来了,如过我们在管道操作符中判断状态的并停止流的话,那么订阅者将无法收到消息,意味着后续操作无法执行。

解决方法就是把后续操作放到管道中。代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
// 开始任务
start$().pipe(
switchMap(() => interval(1000)), // 开始轮询
switchMap(() => getStatus$()), // 查询状态
filter(x => x==='stop' || x==='finish') // 'controlling'状态下继续轮询,其它状态进行对应操作
takeWhile(x => x!=='stop') // 当为'stop'时结束轮询
tap(() => {
// 后续操作
})
takeWhile(() => false) // 操作完成结束轮询
)
.subscribe();

现在需求变化了,在另一段代码中,我们也要通过查询状态并根据状态进行,但是不再需要开始任务和轮询了。
那么上面的代码查询和操作部分可以利用pipe方法抽取出来。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
const handle = pipe(
switchMap(() => getStatus$()), // 查询状态
filter(x => x==='stop' || x==='finish') // 'controlling'状态下继续轮询,其它状态进行对应操作
takeWhile(x => x!=='stop') // 当为'stop'时结束轮询
tap(() => {
// 后续操作
})
takeWhile(() => false) // 操作完成结束轮询
)
start$().pipe(
switchMap(() => interval(1000)), // 开始轮询
handle
).subscribe();
// 直接复用查询状态代码和后续操作部分代码
other.pipe(
handle
).subscribe()

总结一下。RxJS比较完整的理解应该是基于流的订阅者模式,而流的灵活性体现在可拆分和聚合,有了pipe管道的加入,流的可复用性增强,因此更容易对代码逻辑进行抽象。


作者信息:朱德龙,人和未来高级前端工程师。