Observable
一、什么是Observable
Observable 称它为可观察对象,它并不是Angular的东西,而是ES7的一种用来管理异步数据的标准,当然ES7离我们太远了,但借助ts让我们忽略这种标准与现实的差距。
Observable 可观察对象是开辟一个连续的通信通道给观察者 Observer,彼此之前形成一种关系,而这种关系需要由 Subscription 来确立,而在整个通道中允许对数据进行转换我们称为操作符 Operator。
将上面的描述转化成代码的话,就像这样:
import { Observable } from 'rxjs/Rx';
let sub = Observable
.interval(1000)
.map(second => second + '秒')
.subscribe(res => {
console.log(res);
});
利用 Observable.interval 每隔1秒产生一个数据,然后交给 map 操作号将内容进行转换,最后交过观察者打印结果。
注:Observable必须通过 subscribe 来订阅(即Observable与Observer确立关系)以后才会启动,并且它会返回一个可用于取消订阅的方法。
sub.unsubscribe();
而这一整个过程,称为:流。
二、创建 Observable
只需要实例一下 Observable 就行了。
new Observable<number>((observer: Observer<number>) => {
// 做任何你想做的事
setTimeout(() => {
observer.next(1000);
}, 1000);
})
内容非常简单,你可以在方法体内做任何你想做的事,当有数据需要发送给观察者时,只需要调用观察者的 next() 方法。
注:这里 new Observable<number> 是一个泛型,无形中有一个极大的好处是:限定数据类型,这也导致整个数据流受到严格的约束。
我们更进一步,比如Angular框架中的 Http 网络请求类,大概如下:
new Observable<Response>((observer: Observer<Response>) => {
let _xhr = new XMLHttpRequest();
_xhr.addEventListener('load', () => {
if (true) {
observer.next(response);
observer.complete();
} else {
observer.error(response);
}
});
_xhr.send();
})
不管输入终端来自哪里(网络请求、表单录入、定时器等),都可以将它们转化成数据流处理。
Observable.fromEvent()
从事件(文本框事件、滚动条事件等)中创建数据流。比如创建一个文本框 input 事件。
const node = document.querySelector('input[type=text]');
Observable.fromEvent(node, 'input')
.map((event: any) => event.target.value)
.filter(value => value.length >= 2)
.subscribe(value => {
console.log(value);
});
当用户向文本框录入数据时,会向观察者发送一次数据,且这些数据需要经过转换 map 和 过滤 filter 后,才会交给观察者。
三、操作符
从 Observable 产生的数据要交给观察者前,允许对这些数据进行转换,像上面示例中的 map、filter 都属于这一类。
每一次经过操作符都会重新产生新的Observable,不管有多少个操作符,最终都只有一个 Observable 会被观察者订阅。
总结
RxJS最难我想就是各种operator的应用了,这需要一些经验的积累。RxJS很火很大原因我认还是提供了丰富的API,以下是摘抄:
创建数据流
- 单值:
of、empty、never - 多值:
from - 定时:
interval、timer - 事件:
fromEvent - Promise:
fromPromise - 自定义:
create
转换
- 改变数据形态:
map,mapTo,pluck - 过滤一些值:
filter,skip,first,last,take - 时间轴上的操作:
delay,timeout,throttle,debounce,audit,bufferTime - 累加:
reduce,scan - 异常处理:
throw,catch,retry,finally - 条件执行:
takeUntil,delayWhen,retryWhen,subscribeOn,ObserveOn - 转接:
switch
组合
concat保持原来的序列顺序连接两个数据流merge合并序列race预设条件为其中一个数据流完成forkJoin预设条件为所有数据流都完成zip取各来源数据流最后一个值合并为对象combineLatest取各来源数据流最后一个值合并为数组