rxjs和响应式编程

背景

在一个项目中看别人用到了rxjs,尤其是里面的操作符号of.
同时又在Unity3D项目里面接触的UniRx,
于是来看下响应式编程,并借由rsjx来观察一些js实现中的细节.

渊源和概念

响应式

常常与指令式做对比.
通常在指令式变成中,计算是瞬间操作.
而在响应式变成中,计算与计算之间有着相互关系,一个值变化,一些值会随之变化.

与观察者模式的关系

观察者模式算是面对对象语言中,实现响应式编程的一种方法吧.
被观察者发出事件,如果有观察者列表的话,列表里的每个观察者都会受到消息并进行处理.

和"流"的关系

就像js中的promise一样,一个promise经过一些处理后,变成了另一个promise.
observable也可以通过一些操作,比如pipe,filter之后还是observable.
我想这就是多数人认为observable名字不好应该叫Stream的原因.

优点

  1. 实现异步逻辑时能更通俗易懂(使用回调函数的方式不易读)
  2. 通俗易懂就能实现业务逻辑和代码逻辑的分离(比如不再需要全局计数器),方便维护和扩展

rxjs中的概念

observable

被观察者(通常可以是一个stream的起点,常关注如何创建这个被观察者)

1
2
3
4
5
6
7
8
9
10
11
// 创建一个observable
var observable = Rx.Observable.create(function subscribe(observer) {
try {
observer.next(1); // 不断用next在向下游传值,可看作在执行observable
observer.next(2);
observer.next(3);
observer.complete();
} catch (err) {
observer.error(err); // 可以执行的动作有 1. next 2.complete 3.error
}
});

传一系列的值时,要求有一个类似列表的东西.而如果不想写太复杂,可以用from操作符

1
var observable = Rx.Observable.from([10, 20, 30]);

subscribe

试想如果一个事件默认只能有一个观察者,同时大多数情况下一匿名的方式创建观察者,
那么这个行为不妨称为对一个事件的订阅.
如果取消了,那么就是unsubscribe.(但在取消的这一行执行前,subscribe仍然有效)

1
2
3
4
var observable = Rx.Observable.from([1,2]);
observable.subscribe(x => console.log(x)); // 鼓励函数式编程的背景下,这样好吗?这不好
var subscription = observable.subscribe(console.log); // 耗子尾汁
subscription.unsubscribe();

observer

也可以非常正式地创建一个观察者

1
2
3
4
5
6
var observer = {
next: x => console.log('Observer got a next value: ' + x),
error: err => console.error('Observer got an error: ' + err),
complete: () => console.log('Observer got a complete notification'),
};
observable.subscribe(observer);

subject

特殊的被观察者(可多播)
通常一个事件发生变化时,一段时间内只能有一个观察者观察,等complete信号出现后,再由第二个观察者从一开始的值开始响应.
如果想一个事件能有多个观察者来处理,那就需要让默认单播的事件变成多播的.

subject可以作为源头单独使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
var subject = new Rx.Subject();

subject.subscribe({
next: (v) => console.log('observerA: ' + v)
});
subject.subscribe({
next: (v) => console.log('observerB: ' + v)
});

subject.next(1);
subject.next(2);

// observerA: 1
// observerB: 1
// observerA: 2
// observerB: 2

也可以作为下游,将一个单播的eobservable转换为多播

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// 单播多次订阅
const source1 = from([1,2]);
source1.subscribe(console.log);
source1.subscribe(console.log);
// 1,2,1,2 // 一个订阅完后另一个开始响应

// 单播转多播
const source2 = from([3,4]);
const subject = new Subject();
subject.subscribe(console.log);
subject.subscribe(console.log);
source2.subscribe(subject); // subject作为source2的observer(下游)
// 3,3,4,4 // 一个变化,多个observer响应

// 使用multicast操作符
const source3 = from([5,6]);
// 将source3单播转成multi多播的过程显得更顺畅
const multi = source3.pipe(multicast(() => new Subject()));
multi.subscribe(console.log);
multi.subscribe(console.log);
multi.connect(); // 必须使用connect
// 5,5,6,6

operator

操作符是observable类型上的方法,比如 .map(...), .filter(...), .merge(...) 等.
通常用来对observable进行组合,解包,过滤等等.
操作符本质上需要是一个纯函数.

按照功能,可以分成以下几类:

  • 创建
  • 组合
  • 条件
  • 错误处理
  • 多播
  • 过滤
  • 转换
  • 工具

按照方法定义的位置,可以分成

  • 实例操作符
  • 静态操作符

通常创建类的操作符,比如create,interval,组合类的,比如merge,concat等是静态的.

scheduler

调度器,调整observable或observer的执行时机
比如of默认的scheduler是queue,会同步地立即执行
而如果换成async,则会异步地执行,具体来说,会等同一个文件中的所有同步语句执行完后再执行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import { of, pipe } from "rxjs";
import { async } from "rxjs/internal/scheduler/async";
import { subscribeOn } from "rxjs/operators";

of(1,2).subscribe(console.log);;
console.log(3);
// 1,2,3

of(4,5,async).subscribe(console.log);
console.log(6);
// 6, 4, 5

of(7,8).pipe(subscribeOn(async)).subscribe(console.log);
console.log(9);
// 9, 7, 8

常用操作符

目的在于遇见了能知道是做什么的

创建

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import { Observable,from } from 'rxjs';

// 1. create 普通创建
const hello = Observable.create(function(observer) {
observer.next('Hello');
observer.next('World');
});

// 2. from 使用数组可以少写一些next
const arraySource = from([1, 2, 3, 4, 5]);
// 还可以转换promise
const promiseSource = from(new Promise(resolve => resolve('Hello World!')));

// 3. of 依次发出提供的任意数量的值
const source = of(1, 2, 3, 4, 5);
// 不仅仅能发出数字
const source = of({ name: 'Brian' }, [1, 2, 3], function hello() {
return 'Hello';
});

// 4. range 依次发出1-10
const source = range(1, 10);

// 5. fromEvent,不使用框架的时候常常可以用这种方式来获取点击事件,
// 至于如何做出响应,以后再说
var refreshButton = document.querySelector('.refresh');
var refreshClickStream = Rx.Observable.fromEvent(refreshButton, 'click');

// 6. interval 每秒发出自增数字 0,1,2,3...
const source = interval(1000);

// 7. timer 1秒后发出值,然后每2秒发出值,发出的也是自增数字
const source = timer(1000, 2000);

pipe

不知道为什么,许多操作符不能对observable对象使用,必须放在pipe里执行.
另外一个observable在subscribe之后就不再是observabel而是subscription.不能再用点连接subscribe.
但点pipe之后还是一个observable,只不过内部的值可以用来处理.

1
2
3
4
5
const source = from([1, 2, 3, 4, 5]);
const example = source.pipe(map(val => val + 10));
// const error = source.map(val => val + 10); // 很奇怪map不能直接用在observable上...
const subscribe = example.subscribe(console.log);
// 输出: 11,12,13,14,15

另外由于来自函数式编程的概念, pipe(f1, f2, f3) 相当于 f3(f2(f1()))

组合

merge类似两条河流和汇合,汇合之后还保有各自的漂浮物.

1
2
3
4
5
6
7
8
9
const p4 = interval(4000);
const p1 = interval(1000);
const example = merge(
p1.pipe(mapTo('p1')),
p4.pipe(mapTo('p4'))
);
const subscribe = example.subscribe(val => console.log(val));
// p1,p1,p1,(p1,p4同时),p1,p1,p1,(p1,p4同时)
// 观察到同时输出时p4在p1前

条件

感觉every算是组合,毕竟要求输入都满足一定条件

1
2
3
4
5
6
7
const source = of(1, 2, 3, 4, 5);
const example = source.pipe(
// 每个值都是偶数吗?
every(val => val % 2 === 0)
);
const subscribe = example.subscribe(console.log);
// 得到false

多播

multicast

过滤

filter表达式过滤

1
2
3
4
const source = of(1, 2, 3, 4, 5);
const example = source.pipe(filter(val => val %2 === 0));
const subscribe = example.subscribe(console.log);
// 2, 4

first只响应第一个

1
2
3
4
const source = of(1, 2, 3, 4, 5);
const example = source.pipe(first());
const subscribe = example.subscribe(console.log);
// 1

skip略过头几个

1
2
3
4
const source = of(1, 2, 3, 4, 5);
const example = source.pipe(skip(2));
const subscribe = example.subscribe(console.log);
// 3,4,5

take只要前几个

1
2
3
4
const source = of(1, 2, 3, 4, 5);
const example = source.pipe(take(2));
const subscribe = example.subscribe(console.log);
// 1,2

single有且最多有一个答案

1
2
3
4
5
6
7
8
9
10
11
12
13
14
const source = from([1, 0, 3, 4, 5]);
const example = source.pipe(single(id => id === 2));
const subscribe = example.subscribe(console.log);
// undefined

const source = from([1, 2, 3, 4, 5]);
const example = source.pipe(single(id => id === 2));
const subscribe = example.subscribe(console.log);
// 2

const source = from([1, 2, 2, 4, 5]);
const example = source.pipe(single(id => id === 2));
const subscribe = example.subscribe(console.log);
// 报错

throttleTime 节流用,开始节流后的5秒内,源头不发出事件

1
2
3
4
5
6
7
8
9
10
11
12
const source = interval(1000);
const example = source.pipe(throttleTime(5000));
// 0 | (1,2,3,4,5),6 | (7,8,9,10,11),12 | (13...)
const subscribe = example.subscribe(val => console.log(val));
// 输出: 0...6...12

// 一个按钮,第一次点击立即生效,之后5秒内不再生效
// 而不是等待了5秒才生效一次
const $increment = document.querySelector("#increment");
const clickStream = fromEvent($increment, 'click');
clickStream.pipe(throttleTime(5000))
.subscribe(val => console.log('clicked'));

转换

buffer 遇到一定条件就释放上游积攒的数组

1
2
3
4
5
6
7
8
9
const $increment = document.querySelector("#increment");

const myInterval = interval(1000); // 每秒发出信号
const bufferBy = fromEvent($increment, 'click'); // 一个点击事件
// 缓存上游的信号直到点击事件发生
// 而后将积攒出的数组全放放到下游
const myBufferedInterval = myInterval.pipe(buffer(bufferBy));
const subscribe = myBufferedInterval.subscribe(console.log);
// [0,1],[2,3,4,5,6],[7,8,9]等,受到点击时刻的影响

bufferCount 积攒到确定数量后放到下游

1
2
3
4
const myInterval = interval(1000); // 每秒发出信号
const myBufferedInterval = myInterval.pipe(bufferCount(2));
const subscribe = myBufferedInterval.subscribe(console.log);
// [0,1],[2,3],[4,5]...

bufferTime() 缓存一定时间,然后将积攒的内容以数组形式传递到下游
mapTo 相当于 map(x => 'string'),将值映射成常量
pluck 类似ORM中的作用,将 [{'name':'xx1'},{'name':'xx2'}] 通过 pluck('name') 的方式变成 ['xx1','xx2']

工具

delay 过一段时间再抛向下游

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
const example = of(null, null);

const message = merge(
example.pipe(mapTo('Hello')),
example.pipe(
mapTo('World'),
delay(1000) // delay在外层?,但总之先delay后显示world的
),
example.pipe(
mapTo('Goodbye'),
delay(2000)
),
example.pipe(
mapTo('World!'),
delay(3000)
)
);

const subscribe = message.subscribe(console.log);
// 输出: 'Hello','Hello' 0s
// 'World','World' 1s
// 'Goodbye','Goodbye' 2s
// 'World!','World!' 3s
// 每一个null会出发一连串编排好的有延迟的动作

timeout 超时并发出一个错误

1
2
3
4
5
6
7
8
9
10
11
12
13
const source = interval(1000);
source.pipe(
timeout(1000),
catchError(error => of('error occured'))
).subscribe(console.log);
// 0,1,2...

const source = interval(1000);
source.pipe(
timeout(500),
catchError(error => of('error occured')) // timeout后会抛出错误,在错误处理时,会产生一个新的stream放到下游
).subscribe(console.log);
// 'error occured' 仅一次

项目中使用的例子

路由的参数被看作一个observable.
filter被用来检测路由的参数中有无特定字段,
map被用来从路由参数中根据字段名取出值.

一个在页面上显示的元素被定义成observable,
在页面上使用async管道修饰,
在控制器中,将取到的值用of重新包装成了observable以供给前端显示,
个人感觉大可不必,前端应该有能力识别空对象.

其他地方没有使用.

版本的变迁

有时会奇怪为什么有些人直接使用map,
而又有些map需要在pipe中使用.
查看之下发现rxjs的版本发生了变化.

更改的有:

  • 更改了import的一些路径

    1
    2
    3
    4
    5
    6
    7
    import { merge } from 'rxjs/operators';
    a$.pipe(merge(b$, c$));

    // becomes

    import { merge } from 'rxjs';
    merge(a$, b$, c$);
  • 推荐使用管道操作而不是链式操作

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    // an operator chain
    source
    .map(x => x + x)
    .mergeMap(n => of(n + 1, n + 2)
    .filter(x => x % 1 == 0)
    .scan((acc, x) => acc + x, 0)
    )
    .catch(err => of('error found'))
    .subscribe(printResult);

    // must be updated to a pipe flow

    source.pipe(
    map(x => x + x),
    mergeMap(n => of(n + 1, n + 2).pipe(
    filter(x => x % 1 == 0),
    scan((acc, x) => acc + x, 0),
    )),
    catchError(err => of('error found')),
    ).subscribe(printResult);
  • 推荐使用操作符创建而不是类来创建observable

    1
    2
    3
    4
    5
    // removed
    ArrayObservable.create(myArray)

    // use instead
    from(myArray)
  • 移除一些旧功能

使用pipable思想的原因:

  • 使用pipe不会改变原先的observable(copy而不修改),适合组合与重用
  • 直接在Obeservable原型上扩展定义operator可能会让系统混乱,
    所以把operator和核心功能(subscribe,pipe)分离

参考

  1. 优秀的使用场景和优点介绍
  2. 简单入门知识
  3. 接地气且有具体用例的官方文档
  4. 从源码出发,脚踏实地
  5. 推荐一个在线的rxjs沙盒
  6. 调度器的说明
  7. 升级后的一些变化