我在我的Angular 2项目中使用了
RxJs version 5.
我想创建一些可观察值,但是我不想立即调用可观测值.
我想创建一些可观察值,但是我不想立即调用可观测值.
在version 4,您可以使用(例如)Controlled命令或Pausable Buffers来控制调用.
但该功能在版本5中不可用(yet).
如何在RxJs 5中获得这种功能?
我的最终目标是排列创建的可观察值并逐个调用它们.只有当前一个被成功处理时,才调用下一个.
当一个失败时,队列被清空.
编辑
通过@Niklas Fasching的评论,我可以用Publish操作创建一个工作解决方案.
// Queue to queue operations const queue = []; // Just a function to create Observers function createObserver(id): Observer { return { next: function (x) { console.log('Next: ' + id + x); },error: function (err) { console.log('Error: ' + err); },complete: function () { console.log('Completed'); } }; }; // Creates an async operation and add it to the queue function createOperation(name: string): Observable { console.log('add ' + name); // Create an async operation var observable = Rx.Observable.create(observer => { // Some async operation setTimeout(() => observer.next(' Done'),500); }); // Hold the operation var published = observable.publish(); // Add Global subscribe published.subscribe(createObserver('Global')); // Add it to the queue queue.push(published); // Return the published so the caller could add a subscribe return published; }; // Create 4 operations on hold createOperation('SourceA').subscribe(createObserver('SourceA')); createOperation('SourceB').subscribe(createObserver('SourceB')); createOperation('SourceC').subscribe(createObserver('SourceC')); createOperation('SourceD').subscribe(createObserver('SourceD')); // Dequeue and run the first queue.shift().connect();
您可以通过
publishing观察器来分离观察者的开始.已发表的观察结果只能在调用
connect之后开始.
var published = Observable.of(42).publish(); // subscription does not start the observable sequence published.subscribe(value => console.log('received: ',value)); // connect starts the sequence; subscribers will now receive values published.connect();