rxjs和响应式编程
背景
在一个项目中看别人用到了rxjs,尤其是里面的操作符号of.
同时又在Unity3D项目里面接触的UniRx,
于是来看下响应式编程,并借由rsjx来观察一些js实现中的细节.
渊源和概念
响应式
常常与指令式做对比.
通常在指令式变成中,计算是瞬间操作.
而在响应式变成中,计算与计算之间有着相互关系,一个值变化,一些值会随之变化.
与观察者模式的关系
观察者模式算是面对对象语言中,实现响应式编程的一种方法吧.
被观察者发出事件,如果有观察者列表的话,列表里的每个观察者都会受到消息并进行处理.
和"流"的关系
就像js中的promise一样,一个promise经过一些处理后,变成了另一个promise.
observable也可以通过一些操作,比如pipe,filter之后还是observable.
我想这就是多数人认为observable名字不好应该叫Stream的原因.
优点
- 实现异步逻辑时能更通俗易懂(使用回调函数的方式不易读)
- 通俗易懂就能实现业务逻辑和代码逻辑的分离(比如不再需要全局计数器),方便维护和扩展
rxjs中的概念
observable
被观察者(通常可以是一个stream的起点,常关注如何创建这个被观察者)
1 | // 创建一个observable |
传一系列的值时,要求有一个类似列表的东西.而如果不想写太复杂,可以用from操作符
1 | var observable = Rx.Observable.from([10, 20, 30]); |
subscribe
试想如果一个事件默认只能有一个观察者,同时大多数情况下一匿名的方式创建观察者,
那么这个行为不妨称为对一个事件的订阅.
如果取消了,那么就是unsubscribe.(但在取消的这一行执行前,subscribe仍然有效)
1 | var observable = Rx.Observable.from([1,2]); |
observer
也可以非常正式地创建一个观察者
1 | var observer = { |
subject
特殊的被观察者(可多播)
通常一个事件发生变化时,一段时间内只能有一个观察者观察,等complete信号出现后,再由第二个观察者从一开始的值开始响应.
如果想一个事件能有多个观察者来处理,那就需要让默认单播的事件变成多播的.
subject可以作为源头单独使用
1 | var subject = new Rx.Subject(); |
也可以作为下游,将一个单播的eobservable转换为多播
1 | // 单播多次订阅 |
operator
操作符是observable类型上的方法,比如 .map(...)
, .filter(...)
, .merge(...)
等.
通常用来对observable进行组合,解包,过滤等等.
操作符本质上需要是一个纯函数.
按照功能,可以分成以下几类:
- 创建
- 组合
- 条件
- 错误处理
- 多播
- 过滤
- 转换
- 工具
按照方法定义的位置,可以分成
- 实例操作符
- 静态操作符
通常创建类的操作符,比如create,interval,组合类的,比如merge,concat等是静态的.
scheduler
调度器,调整observable或observer的执行时机
比如of默认的scheduler是queue,会同步地立即执行
而如果换成async,则会异步地执行,具体来说,会等同一个文件中的所有同步语句执行完后再执行
1 | import { of, pipe } from "rxjs"; |
常用操作符
目的在于遇见了能知道是做什么的
创建
1 | import { Observable,from } from 'rxjs'; |
pipe
不知道为什么,许多操作符不能对observable对象使用,必须放在pipe里执行.
另外一个observable在subscribe之后就不再是observabel而是subscription.不能再用点连接subscribe.
但点pipe之后还是一个observable,只不过内部的值可以用来处理.
1 | const source = from([1, 2, 3, 4, 5]); |
另外由于来自函数式编程的概念, pipe(f1, f2, f3)
相当于 f3(f2(f1()))
组合
merge类似两条河流和汇合,汇合之后还保有各自的漂浮物.
1 | const p4 = interval(4000); |
条件
感觉every算是组合,毕竟要求输入都满足一定条件
1 | const source = of(1, 2, 3, 4, 5); |
多播
multicast
过滤
filter表达式过滤
1 | const source = of(1, 2, 3, 4, 5); |
first只响应第一个
1 | const source = of(1, 2, 3, 4, 5); |
skip略过头几个
1 | const source = of(1, 2, 3, 4, 5); |
take只要前几个
1 | const source = of(1, 2, 3, 4, 5); |
single有且最多有一个答案
1 | const source = from([1, 0, 3, 4, 5]); |
throttleTime 节流用,开始节流后的5秒内,源头不发出事件
1 | const source = interval(1000); |
转换
buffer 遇到一定条件就释放上游积攒的数组
1 | const $increment = document.querySelector("#increment"); |
bufferCount 积攒到确定数量后放到下游
1 | const myInterval = interval(1000); // 每秒发出信号 |
bufferTime() 缓存一定时间,然后将积攒的内容以数组形式传递到下游
mapTo 相当于 map(x => 'string')
,将值映射成常量
pluck 类似ORM中的作用,将 [{'name':'xx1'},{'name':'xx2'}]
通过 pluck('name')
的方式变成 ['xx1','xx2']
工具
delay 过一段时间再抛向下游
1 | const example = of(null, null); |
timeout 超时并发出一个错误
1 | const source = interval(1000); |
项目中使用的例子
路由的参数被看作一个observable.
filter被用来检测路由的参数中有无特定字段,
map被用来从路由参数中根据字段名取出值.
一个在页面上显示的元素被定义成observable,
在页面上使用async管道修饰,
在控制器中,将取到的值用of重新包装成了observable以供给前端显示,
个人感觉大可不必,前端应该有能力识别空对象.
其他地方没有使用.
版本的变迁
有时会奇怪为什么有些人直接使用map,
而又有些map需要在pipe中使用.
查看之下发现rxjs的版本发生了变化.
更改的有:
-
更改了import的一些路径
1
2
3
4
5
6
7import { merge } from 'rxjs/operators';
a$.pipe(merge(b$, c$));
// becomes
import { merge } from 'rxjs';
merge(a$, b$, c$); -
推荐使用管道操作而不是链式操作
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20// an operator chain
source
.map(x => x + x)
.mergeMap(n => of(n + 1, n + 2)
.filter(x => x % 1 == 0)
.scan((acc, x) => acc + x, 0)
)
.catch(err => of('error found'))
.subscribe(printResult);
// must be updated to a pipe flow
source.pipe(
map(x => x + x),
mergeMap(n => of(n + 1, n + 2).pipe(
filter(x => x % 1 == 0),
scan((acc, x) => acc + x, 0),
)),
catchError(err => of('error found')),
).subscribe(printResult); -
推荐使用操作符创建而不是类来创建observable
1
2
3
4
5// removed
ArrayObservable.create(myArray)
// use instead
from(myArray) -
移除一些旧功能
使用pipable思想的原因:
- 使用pipe不会改变原先的observable(copy而不修改),适合组合与重用
- 直接在Obeservable原型上扩展定义operator可能会让系统混乱,
所以把operator和核心功能(subscribe,pipe)分离