javascript – RxJs将流分成多个流

前端之家收集整理的这篇文章主要介绍了javascript – RxJs将流分成多个流前端之家小编觉得挺不错的,现在分享给大家,也给大家做个参考。
如何根据分组方法将永不结束的流拆分为多个结束流?
--a--a-a-a-a-b---b-b--b-c-c---c-c-d-d-d-e...>

进入这些可观察者

--a--a-a-a-a-|
             b---b-b--b-|
                        c-c---c-c-|
                                  d-d-d-|
                                        e...>

正如你所看到的,a在开始时,在我收到b后,我将不再得到它,所以应该结束.这就是普通groupBy不好的原因.

解决方法

您可以使用窗口并共享源Observable. bufferCount(2,1)还有一个小技巧:
const str = 'a-a-a-a-a-b-b-b-b-c-c-c-c-d-d-d-e';
const source = Observable.from(str.split('-'),Rx.Scheduler.async).share();

source
    .bufferCount(2,1) // delay emission by one item
    .map(arr => arr[0])
    .window(source
        .bufferCount(2,1) // keep the prevIoUs and current item
        .filter(([oldValue,newValue]) => oldValue !== newValue)
    )
    .concatMap(obs => obs.toArray())
    .subscribe(console.log);

这打印(因为toArray()):

[ 'a','a','a' ]
[ 'b','b','b' ]
[ 'c','c','c' ]
[ 'd','d','d' ]
[ 'e' ]

解决方案的问题是订阅源的顺序.我们需要窗口通知程序在第一个bufferCount之前订阅.否则,首先进一步推送项目,然后检查它是否与前一个项目不同.filter(([oldValue,newValue])…).

这意味着需要在窗口之前将发射延迟一个(这是第一个.bufferCount(2,1).map(arr => arr [0]).

或者也许用publish()更容易控制订阅的顺序:

const str = 'a-a-a-a-a-b-b-b-b-c-c-c-c-d-d-d-e';
const source = Observable.from(str.split('-'),Rx.Scheduler.async).share();

const connectable = source.publish();

connectable
    .window(source
        .bufferCount(2,newValue]) => oldValue !== newValue)
    )
    .concatMap(obs => obs.toArray())
    .subscribe(console.log);

connectable.connect();

输出是一样的.

猜你在找的JavaScript相关文章