我订阅了一个可观察数组的问题.
在我的例子中,我有一个ID数组,我需要从我的状态获取所有ID,将所有结果合并到一个observable中并订阅该observable.
我需要我的订阅者最终会获得一系列已解析的observable.
在我的例子中,我有一个ID数组,我需要从我的状态获取所有ID,将所有结果合并到一个observable中并订阅该observable.
我需要我的订阅者最终会获得一系列已解析的observable.
此外,我需要保持此订阅打开,所以如果我的一个内部可观察的更改我的订阅者将通过它通知.
这是我的代码:
getTransactionsByIDs(transactionsIDs){ return Observable.of(transactionIDs .map(transactionID => this.getTransactionByID(transactionID))); } this.transactionsService.getTransactionsByIDs(transactionsIDs) .subscribe(transactions=>{ .... })
如何解决每个商店和所有这些商店?
我还尝试在transactionsID上使用Observable.from()将每个ID转换为observable,然后解析它.它工作正常,但我的订户分别收到每个ID的通知.如果有办法批处理所有Observable.from()结果(并保持订阅打开),请告诉我.
这就是我的Observable.from()的样子:
getTransactionsByIDs(transactionsIDs){ return transactionIDs .mergeMap(transactionID => this.getTransactionByID(transactionID)); } this.transactionsService.getTransactionsByIDs(Observable.from(transactionsIDs)) .subscribe(transactions=>{ .... })
谢谢.
解决方法
我认为你想要的是最好的.它不会发出任何值,直到所有内部可观察量发出至少一个值.之后,每次从一个内部可观察者发出新的发射时,它将从所有发出最新的.
以下是一些阅读材料:learnrxjs.io/operators/combination/combinelatest.html
这是一个例子:
function getTransactionByID(transactionId) { let count = 0; return Rx.Observable.of(transactionId) .delay(Math.random() * 4000) .map(x => `${x}: ${count++} `) .repeat(); } function getTransactionsByIDs(transactionsIDs){ return Rx.Observable.combineLatest(transactionsIDs.map(transactionID => getTransactionByID(transactionID))); } const transactionsIDs = [1,2,3]; getTransactionsByIDs(transactionsIDs) .take(10) .subscribe(x => { console.log(x); });
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.5.6/Rx.min.js"></script>
take(10)就是让这个例子永远持续下去.