在介绍 Observable 之前,我们要先了解两个设计模式:
Observer Pattern - (观察者模式)
Iterator Pattern - (迭代器模式)
这两个模式是 Observable 的基础,下面我们先来介绍一下 Observer Pattern。
Observer Pattern
观察者模式定义
观察者模式是软件设计模式的一种。在此种模式中,一个目标对象管理所有相依于它的观察者对象,并且在它本身的状态改变时主动发出通知。这通常透过呼叫各观察者所提供的方法来实现。此种模式通常被用来实时事件处理系统。 — 维基百科
观察者模式又叫发布订阅模式(Publish/Subscribe),它定义了一种一对多的关系,让多个观察者对象同时监听某一个主题对象,这个主题对象的状态发生变化时就会通知所有的观察者对象,使得它们能够自动更新自己。
我们可以使用日常生活中,期刊订阅的例子来形象地解释一下上面的概念。期刊订阅包含两个主要的角色:期刊出版方和订阅者,他们之间的关系如下:
在观察者模式中也有两个主要角色:Subject (主题) 和 Observer (观察者) 。它们分别对应例子中的期刊出版方和订阅者。接下来我们来看张图,从而加深对上面概念的理解。
观察者模式优缺点
观察者模式的优点:
观察者模式的缺点:
观察者模式的应用
在前端领域,观察者模式被广泛地使用。最常见的例子就是为 DOM 对象添加事件监听,具体示例如下:
<button id="btn">确认</button> function clickHandler(event) { console.log('用户已点击确认按钮!'); } document.getElementById("btn").addEventListener('click',clickHandler);
上面代码中,我们通过 addEventListener API 监听 button 对象上的点击事件,当用户点击按钮时,会自动执行我们的 clickHandler
函数。
观察者模式实战
Subject 类定义:
class Subject { constructor() { this.observerCollection = []; } registerObserver(observer) { this.observerCollection.push(observer); } unregisterObserver(observer) { let index = this.observerCollection.indexOf(observer); if(index >= 0) this.observerCollection.splice(index,1); } notifyObservers() { this.observerCollection.forEach((observer)=>observer.notify()); } }
Observer 类定义:
class Observer { constructor(name) { this.name = name; } notify() { console.log(`${this.name} has been notified.`); } }
使用示例:
let subject = new Subject(); // 创建主题对象 let observer1 = new Observer('semlinker'); // 创建观察者A - 'semlinker' let observer2 = new Observer('lolo'); // 创建观察者B - 'lolo' subject.registerObserver(observer1); // 注册观察者A subject.registerObserver(observer2); // 注册观察者B subject.notifyObservers(); // 通知观察者 subject.unregisterObserver(observer1); // 移除观察者A subject.notifyObservers(); // 验证是否成功移除
semlinker has been notified. # 输出一次 2(unknown) lolo has been notified. # 输出两次
需要注意的是,在观察者模式中,通常情况下调用注册观察者后,会返回一个函数,用于移除监听,有兴趣的读者,可以自己尝试一下。(备注:在 Angular 1.x 中调用 $scope.$on() 方法后,就会返回一个函数,用于移除监听)
Iterator Pattern
迭代器模式定义
迭代器(Iterator)模式,又叫做游标(Cursor)模式。它提供一种方法顺序访问一个聚合对象中的各个元素,而又不需要暴露该对象的内部表示。迭代器模式可以把迭代的过程从业务逻辑中分离出来,在使用迭代器模式之后,即使不关心对象的内部构造,也可以按顺序访问其中的每个元素。
迭代器模式的优缺点
迭代器模式的优点:
简化了遍历方式,对于对象集合的遍历,还是比较麻烦的,对于数组或者有序列表,我们尚可以通过游标取得,但用户需要在对集合了解的前提下,自行遍历对象,但是对于 hash 表来说,用户遍历起来就比较麻烦。而引入迭代器方法后,用户用起来就简单的多了。
封装性良好,用户只需要得到迭代器就可以遍历,而不用去关心遍历算法。
迭代器模式的缺点:
遍历过程是一个单向且不可逆的遍历
ECMAScript 迭代器
在 ECMAScript 中 Iterator 最早其实是要采用类似 Python 的 Iterator 规范,就是 Iterator 在没有元素之后,执行
next
会直接抛出错误;但后来经过一段时间讨论后,决定采更 functional 的做法,改成在取得最后一个元素之后执行next
永远都回传{ done: true,value: undefined }
一个迭代器对象 ,知道如何每次访问集合中的一项, 并记录它的当前在序列中所在的位置。在 JavaScript 中迭代器是一个对象,它提供了一个 next() 方法,返回序列中的下一项。这个方法返回包含 done
和 value
两个属性的对象。对象的取值如下:
在最后一个元素前:
{ done: false,value: elementValue }
在最后一个元素后:
{ done: true,value: undefined }
详细信息可以参考 - 可迭代协议和迭代器协议
ES 5 迭代器
接下来我们来创建一个 makeIterator 函数,该函数的参数类型是数组,当调用该函数后,返回一个包含 next() 方法的 Iterator 对象, 其中 next() 方法是用来获取容器对象中下一个元素。具体示例如下:
function makeIterator(array){ var nextIndex = 0; return { next: function(){ return nextIndex < array.length ? {value: array[nextIndex++],done: false} : {done: true}; } } }
一旦初始化,next() 方法可以用来依次访问可迭代对象中的元素:
var it = makeIterator(['yo','ya']); console.log(it.next().value); // 'yo' console.log(it.next().value); // 'ya' console.log(it.next().done); // true
ES 6 迭代器
在 ES 6 中我们可以通过 Symbol.iterator
来创建可迭代对象的内部迭代器,具体示例如下:
let arr = ['a','b','c']; let iter = arr[Symbol.iterator]();
> iter.next() { value: 'a',done: false } > iter.next() { value: 'b',done: false } > iter.next() { value: 'c',done: false } > iter.next() { value: undefined,done: true }
ES 6 中可迭代的对象:
Arrays
Strings
Maps
Sets
DOM data structures (work in progress)
Observable
RxJS 是基于观察者模式和迭代器模式以函数式编程思维来实现的。RxJS 中含有两个基本概念:Observables 与 Observer。Observables 作为被观察者,是一个值或事件的流集合;而 Observer 则作为观察者,根据 Observables 进行处理。
Observables 与 Observer 之间的订阅发布关系(观察者模式) 如下:
发布:Observable 通过回调 next 方法向 Observer 发布事件。
Proposal Observable
-
Proposal Observable Implementations
自定义 Observable
如果你想真正了解 Observable,最好的方式就是自己写一个。其实 Observable 就是一个函数,它接受一个 Observer
作为参数然后返回另一个函数。
它的基本特征:
它的作用:
作为生产者与观察者之间的桥梁,并返回一种方法来解除生产者与观察者之间的联系,其中观察者用于处理时间序列上数据流。接下来我们来看一下 Observable 的基础实现:
DataSource - 数据源
class DataSource { constructor() { let i = 0; this._id = setInterval(() => this.emit(i++),200); // 创建定时器 } emit(n) { const limit = 10; // 设置数据上限值 if (this.ondata) { this.ondata(n); } if (n === limit) { if (this.oncomplete) { this.oncomplete(); } this.destroy(); } } destroy() { // 清除定时器 clearInterval(this._id); } }
myObservable
function myObservable(observer) { let datasource = new DataSource(); // 创建数据源 datasource.ondata = (e) => observer.next(e); // 处理数据流 datasource.onerror = (err) => observer.error(err); // 处理异常 datasource.oncomplete = () => observer.complete(); // 处理数据流终止 return () => { // 返回一个函数用于,销毁数据源 datasource.destroy(); }; }
使用示例:
const unsub = myObservable({ next(x) { console.log(x); },error(err) { console.error(err); },complete() { console.log('done')} }); /** * 移除注释,可以测试取消订阅 */ // setTimeout(unsub,500);
具体运行结果,可以查看线上示例。
SafeObserver - 更好的 Observer
上面的示例中,我们使用一个包含了 next、error、complete 方法的普通 JavaScript 对象来定义观察者。一个普通的 JavaScript 对象只是一个开始,在 RxJS 5 里面,为开发者提供了一些保障机制,来保证一个更安全的观察者。以下是一些比较重要的原则:
next
、complete
和error
是可选的。按需处理即可,不必全部处理
为了完成上述目标,我们得把传入的匿名 Observer
对象封装在一个 SafeObserver
里以提供上述保障。SafeObserver 的具体实现如下:
class SafeObserver { constructor(destination) { this.destination = destination; } next(value) { // 尚未取消订阅,且包含next方法 if (!this.isUnsubscribed && this.destination.next) { try { this.destination.next(value); } catch (err) { // 出现异常时,取消订阅释放资源,再抛出异常 this.unsubscribe(); throw err; } } } error(err) { // 尚未取消订阅,且包含error方法 if (!this.isUnsubscribed && this.destination.error) { try { this.destination.error(err); } catch (e2) { // 出现异常时,取消订阅释放资源,再抛出异常 this.unsubscribe(); throw e2; } this.unsubscribe(); } } complete() { // 尚未取消订阅,且包含complete方法 if (!this.isUnsubscribed && this.destination.complete) { try { this.destination.complete(); } catch (err) { // 出现异常时,取消订阅释放资源,再抛出异常 this.unsubscribe(); throw err; } this.unsubscribe(); } } unsubscribe() { // 用于取消订阅 this.isUnsubscribed = true; if (this.unsub) { this.unsub(); } } }
myObservable - 使用 SafeObserver
function myObservable(observer) { const safeObserver = new SafeObserver(observer); // 创建SafeObserver对象 const datasource = new DataSource(); // 创建数据源 datasource.ondata = (e) => safeObserver.next(e); datasource.onerror = (err) => safeObserver.error(err); datasource.oncomplete = () => safeObserver.complete(); safeObserver.unsub = () => { // 为SafeObserver对象添加unsub方法 datasource.destroy(); }; // 绑定this上下文,并返回unsubscribe方法 return safeObserver.unsubscribe.bind(safeObserver); }
使用示例:
const unsub = myObservable({ next(x) { console.log(x); },complete() { console.log('done')} });
具体运行结果,可以查看线上示例。
Operators - 也是函数
Operator 是一个函数,它接收一个 Observable 对象,然后返回一个新的 Observable 对象。当我们订阅新返回的 Observable 对象时,它内部会自动订阅前一个 Observable 对象。接下来我们来实现常用的 map 操作符:
Observable 实现:
class Observable { constructor(_subscribe) { this._subscribe = _subscribe; } subscribe(observer) { const safeObserver = new SafeObserver(observer); safeObserver.unsub = this._subscribe(safeObserver); return safeObserver.unsubscribe.bind(safeObserver); } }
map 操作符实现:
function map(source,project) { return new Observable((observer) => { const mapObserver = { next: (x) => observer.next(project(x)),error: (err) => observer.error(err),complete: () => observer.complete() }; return source.subscribe(mapObserver); }); }
具体运行结果,可以查看线上示例。
改进 Observable - 支持 Operator 链式调用
如果把 Operator
都写成如上那种独立的函数,我们链式代码会逐渐变丑:
map(map(myObservable,(x) => x + 1),(x) => x + 2);
对于上面的代码,想象一下有 5、6 个嵌套着的 Operator
,再加上更多、更复杂的参数,基本上就没法儿看了。
你也可以试下 Texas Toland 提议的简单版管道实现,合并压缩一个数组的Operator
并生成一个最终的Observable
,不过这意味着要写更复杂的 Operator
,上代码:JSBin。其实写完后你会发现,代码也不怎么漂亮:
pipe(myObservable,map(x => x + 1),map(x => x + 2));
理想情况下,我们想将代码用更自然的方式链起来:
myObservable.map(x => x + 1).map(x => x + 2);
幸运的是,我们已经有了这样一个 Observable
类,我们可以基于 prototype 在不增加复杂度的情况下支持多 Operators
的链式结构,下面我们采用prototype方式再次实现一下 Observable
:
Observable.prototype.map = function (project) { return new Observable((observer) => { const mapObserver = { next: (x) => observer.next(project(x)),complete: () => observer.complete() }; return this.subscribe(mapObserver); }); };
现在我们终于有了一个还不错的实现。这样实现还有其他好处,例如:可以写子类继承 Observable
类,然后在子类中重写某些内容以优化程序。
接下来我们来总结一下该部分的内容:Observable 就是函数,它接受 Observer 作为参数,又返回一个函数。如果你也写了一个函数,接收一个 Observer 作为参数,又返回一个函数,那么,它是异步的、还是同步的 ?其实都不是,它就只是一个函数。任何函数的行为都依赖于它的具体实现,所以当你处理一个 Observable 时,就把它当成一个普通函数,里面没有什么黑魔法。当你要构建 Operator 链时,你需要做的其实就是生成一个函数将一堆 Observers 链接在一起,然后让真正的数据依次穿过它们。
Rx.Observable.create
var observable = Rx.Observable .create(function(observer) { observer.next('Semlinker'); // RxJS 4.x 以前的版本用 onNext observer.next('Lolo'); }); // 订阅这个 Observable observable.subscribe(function(value) { console.log(value); });
以上代码运行后,控制台会依次输出 'Semlinker' 和 'Lolo' 两个字符串。
需要注意的是,很多人认为 RxJS 中的所有操作都是异步的,但其实这个观念是错的。RxJS 的核心特性是它的异步处理能力,但它也是可以用来处理同步的行为。具体示例如下:
var observable = Rx.Observable .create(function(observer) { observer.next('Semlinker'); // RxJS 4.x 以前的版本用 onNext observer.next('Lolo'); }); console.log('start'); observable.subscribe(function(value) { console.log(value); }); console.log('end');
start Semlinker Lolo end
当然我们也可以用它处理异步行为:
var observable = Rx.Observable .create(function(observer) { observer.next('Semlinker'); // RxJS 4.x 以前的版本用 onNext observer.next('Lolo'); setTimeout(() => { observer.next('RxJS Observable'); },300); }) console.log('start'); observable.subscribe(function(value) { console.log(value); }); console.log('end');
start Semlinker Lolo end RxJS Observable
从以上例子中,我们可以得出一个结论 - Observable 可以应用于同步和异步的场合。
Observable - Creation Operator
RxJS 中提供了很多操作符,用于创建 Observable 对象,常用的操作符如下:
create
of
from
fromEvent
fromPromise
empty
never
throw
interval
timer
上面的例子中,我们已经使用过了 create 操作符,接下来我们来看一下其它的操作符:
of
var source = Rx.Observable.of('Semlinker','Lolo'); source.subscribe({ next: function(value) { console.log(value); },complete: function() { console.log('complete!'); },error: function(error) { console.log(error); } });
Semlinker Lolo complete!
from
var arr = [1,2,3]; var source = Rx.Observable.from(arr); // 也支持字符串,如 "Angular 2 修仙之路" source.subscribe({ next: function(value) { console.log(value); },error: function(error) { console.log(error); } });
1 2 3 complete!
fromEvent
Rx.Observable.fromEvent(document.querySelector('button'),'click');
fromPromise
var source = Rx.Observable .fromPromise(new Promise((resolve,reject) => { setTimeout(() => { resolve('Hello RxJS!'); },3000) })); source.subscribe({ next: function(value) { console.log(value); },error: function(error) { console.log(error); } });
Hello RxJS! complete!
empty
var source = Rx.Observable.empty(); source.subscribe({ next: function(value) { console.log(value); },error: function(error) { console.log(error); } });
complete!
empty 操作符返回一个空的 Observable 对象,如果我们订阅该对象,它会立即返回 complete 信息。
never
var source = Rx.Observable.never(); source.subscribe({ next: function(value) { console.log(value); },error: function(error) { console.log(error); } });
never 操作符会返回一个无穷的 Observable,当我们订阅它后,什么事情都不会发生,它是一个一直存在却什么都不做的 Observable 对象。
throw
var source = Rx.Observable.throw('Oop!'); source.subscribe({ next: function(value) { console.log(value); },error: function(error) { console.log('Throw Error: ' + error); } });
Throw Error: Oop!
throw 操作如,只做一件事就是抛出异常。
interval
var source = Rx.Observable.interval(1000); source.subscribe({ next: function(value) { console.log(value); },error: function(error) { console.log('Throw Error: ' + error); } });
0 1 2 ...
interval 操作符支持一个数值类型的参数,用于表示定时的间隔。上面代码表示每隔 1s,会输出一个递增的值,初始值从 0 开始。
timer
var source = Rx.Observable.timer(1000,5000); source.subscribe({ next: function(value) { console.log(value); },error: function(error) { console.log('Throw Error: ' + error); } });
0 # 1s后 1 # 5s后 2 # 5s后 ...
timer 操作符支持两个参数,第一个参数用于设定发送第一个值需等待的时间,第二个参数表示第一次发送后,发送其它值的间隔时间。此外,timer 操作符也可以只传递一个参数,具体如下:
var source = Rx.Observable.timer(1000); source.subscribe({ next: function(value) { console.log(value); },error: function(error) { console.log('Throw Error: ' + error); } });
0 complete!
Subscription
有些时候对于一些 Observable 对象 (如通过 interval、timer 操作符创建的对象),当我们不需要的时候,要释放相关的资源,以避免资源浪费。针对这种情况,我们可以调用 Subscription
对象的 unsubscribe
方法来释放资源。具体示例如下:
var source = Rx.Observable.timer(1000,1000); // 取得subscription对象 var subscription = source.subscribe({ next: function(value) { console.log(value); },error: function(error) { console.log('Throw Error: ' + error); } }); setTimeout(() => { subscription.unsubscribe(); },5000);
RxJS - Observer
Observer (观察者) 是一个包含三个方法的对象,每当 Observable 触发事件时,便会自动调用观察者的对应方法。
interface Observer<T> { closed?: boolean; // 标识是否已经取消对Observable对象的订阅 next: (value: T) => void; error: (err: any) => void; complete: () => void; }
Observer 中的三个方法的作用:
接下来我们来看个具体示例:
var observable = Rx.Observable .create(function(observer) { observer.next('Semlinker'); observer.next('Lolo'); observer.complete(); observer.next('not work'); }); // 创建一个观察者 var observer = { next: function(value) { console.log(value); },error: function(error) { console.log(error); },complete: function() { console.log('complete'); } } // 订阅已创建的observable对象 observable.subscribe(observer);
Semlinker Lolo complete
上面的例子中,我们可以看出,complete 方法执行后,next 就会失效,所以不会输出 not work
。
另外观察者可以不用同时包含 next、complete、error 三种方法,它可以只包含一个 next 方法,具体如下:
var observer = { next: function(value) { console.log(value); } };
有时候 Observable 可能是一个无限的序列,例如 click 事件,对于这种场景,complete 方法就永远不会被调用。
我们也可以在调用 Observable 对象的 subscribe
方法时,依次传入 next、error、complete 三个函数,来创建观察者:
observable.subscribe( value => { console.log(value); },error => { console.log('Error: ',error); },() => { console.log('complete'); } );
Pull vs Push
Pull 和 Push 是数据生产者和数据的消费者两种不同的交流方式。
什么是Pull?
在 "拉" 体系中,数据的消费者决定何时从数据生产者那里获取数据,而生产者自身并不会意识到什么时候数据将会被发送给消费者。
每一个 JavaScript 函数都是一个 "拉" 体系,函数是数据的生产者,调用函数的代码通过 ''拉出" 一个单一的返回值来消费该数据。
const add = (a,b) => a + b; let sum = add(3,4);
ES6介绍了 iterator迭代器 和 Generator生成器 — 另一种 "拉" 体系,调用 iterator.next()
的代码是消费者,可从中拉取多个值。
什么是Push?
在 "推" 体系中,数据的生产者决定何时发送数据给消费者,消费者不会在接收数据之前意识到它将要接收这个数据。
Promise(承诺) 是当今 JS 中最常见的 "推" 体系,一个Promise (数据的生产者)发送一个 resolved value (成功状态的值)来执行一个回调(数据消费者),但是不同于函数的地方的是:Promise 决定着何时数据才被推送至这个回调函数。
RxJS 引入了 Observables (可观察对象),一个全新的 "推" 体系。一个可观察对象是一个产生多值的生产者,当产生新数据的时候,会主动 "推送给" Observer (观察者)。
生产者 | 消费者 | |
---|---|---|
pull拉 | 被请求的时候产生数据 | 决定何时请求数据 |
push推 | 按自己的节奏生产数据 | 对接收的数据进行处理 |
接下来我们来看张图,从而加深对上面概念的理解:
Observable vs Promise
Observable(可观察对象)是基于推送(Push)运行时执行(lazy)的多值集合。
MagicQ | 单值 | 多值 |
---|---|---|
拉取(Pull) | 函数 | 遍历器 |
推送(Push) | Promise | Observable |
延迟计算 & 渐进式取值
延迟计算
所有的 Observable 对象一定会等到订阅后,才开始执行,如果没有订阅就不会执行。
var source = Rx.Observable.from([1,3,4,5]); var example = source.map(x => x + 1);
上面的示例中,因为 example 对象还未被订阅,所以不会进行运算。这跟数组不一样,具体如下:
var source = [1,5]; var example = source.map(x => x + 1);
以上代码运行后,example 中就包含已运算后的值。
渐进式取值
数组中的操作符如:filter、map 每次都会完整执行并返回一个新的数组,才会继续下一步运算。具体示例如下:
var source = [1,5]; var example = source .filter(x => x % 2 === 0) // [2,4] .map(x => x + 1) // [3,5]
关于数组中的 map、filter 的详细信息,可以参考 - RxJS Functional Programming
为了更好地理解数组操作符的运算过程,我们可以参考下图:
虽然 Observable 运算符每次都会返回一个新的 Observable 对象,但每个元素都是渐进式获取的,且每个元素都会经过操作符链的运算后才输出,而不会像数组那样,每个阶段都得完整运算。具体示例如下:
var source = Rx.Observable.from([1,5]); var example = source .filter(x => x % 2 === 0) .map(x => x + 1) example.subscribe(console.log);
以上代码的执行过程如下:
source 发出 1,执行 filter 过滤操作,返回 false,该值被过滤掉
source 发出 2,执行 filter 过滤操作,返回 true,该值被保留,接着执行 map 操作,值被处理成 3,最后通过 console.log 输出
source 发出 3,执行 filter 过滤操作,返回 false,该值被过滤掉
source 发出 4,执行 filter 过滤操作,返回 true,该值被保留,接着执行 map 操作,值被处理成 5,最后通过 console.log 输出
source 发出 5,执行 filter 过滤操作,返回 false,该值被过滤掉
为了更好地理解 Observable 操作符的运算过程,我们可以参考下图: