如何根据分组方法将永不结束的流拆分为多个结束流?
--a--a-a-a-a-b---b-b--b-c-c---c-c-d-d-d-e...>
进入这些可观察者
--a--a-a-a-a-| b---b-b--b-| c-c---c-c-| d-d-d-| e...>
正如你所看到的,a在开始时,在我收到b后,我将不再得到它,所以应该结束.这就是普通groupBy不好的原因.
解决方法
您可以使用窗口并共享源Observable. bufferCount(2,1)还有一个小技巧:
const str = 'a-a-a-a-a-b-b-b-b-c-c-c-c-d-d-d-e'; const source = Observable.from(str.split('-'),Rx.Scheduler.async).share(); source .bufferCount(2,1) // delay emission by one item .map(arr => arr[0]) .window(source .bufferCount(2,1) // keep the prevIoUs and current item .filter(([oldValue,newValue]) => oldValue !== newValue) ) .concatMap(obs => obs.toArray()) .subscribe(console.log);
这打印(因为toArray()):
[ 'a','a','a' ] [ 'b','b','b' ] [ 'c','c','c' ] [ 'd','d','d' ] [ 'e' ]
此解决方案的问题是订阅源的顺序.我们需要窗口通知程序在第一个bufferCount之前订阅.否则,首先进一步推送项目,然后检查它是否与前一个项目不同.filter(([oldValue,newValue])…).
这意味着需要在窗口之前将发射延迟一个(这是第一个.bufferCount(2,1).map(arr => arr [0]).
或者也许用publish()更容易控制订阅的顺序:
const str = 'a-a-a-a-a-b-b-b-b-c-c-c-c-d-d-d-e'; const source = Observable.from(str.split('-'),Rx.Scheduler.async).share(); const connectable = source.publish(); connectable .window(source .bufferCount(2,newValue]) => oldValue !== newValue) ) .concatMap(obs => obs.toArray()) .subscribe(console.log); connectable.connect();
输出是一样的.