javascript – 如何交叉流(带背压)

前端之家收集整理的这篇文章主要介绍了javascript – 如何交叉流(带背压)前端之家小编觉得挺不错的,现在分享给大家,也给大家做个参考。
假设我有两个可能的无限流:
s1 = a..b..c..d..e...
s2 = 1.2.3.4.5.6.7...

我想合并流,然后将合并的流与慢速异步操作(例如,来自Promise和flatMapConcat的培根)进行映射.

我可以把它们合并在一起

me = a12b3.c45d6.7e...

然后映射

s1 = a..b..c..d..e...
s2 = 1.2.3.4.5.6.7...
me = a12b3.c45d6.7e...
mm = a..1..2..b..3..c..4..5..

从长远来看,你看到更厉害的s2流可以获得优势.这是不理想的行为.

合并行为不行,因为我想有某种背压有更多的交错,“公平”,“循环”合并.几个想要的行为的例子:

s1 = a.....b..............c...
s2 = ..1.2.3..................
mm = a...1...b...2...3....c...

s1 = a.........b..........c...
s2 = ..1.2.3..................
mm = a...1...2...b...3....c...

认为这一点的一种方法是s1和s2将任务发送给只能处理一个任务的工作人员.使用merge和flatMapConcat,我会得到一个贪心的任务管理器,但是我想要更公平的一个.

我想找到一个简单而优雅的解决方案.对于任意数量的流可以很容易地概括为好:

// roundRobinPromiseMap(streams: [Stream a],f: a -> Promise b): Stream b
var mm = roundRobinPromiseMap([s1,s2],slowAsyncFunc);

使用RxJS或其他Rx库的解决方案也很好.

澄清

不是zipAsArray

我不想要

function roundRobinPromiseMap(streams,f) {
  return Bacon.zipAsArray.apply(null,streams)
    .flatMap(Bacon.fromArray)
    .flatMapConcat(function (x) {
      return Bacon.fromPromise(f(x));
    });
}

比较大理石图示例:

s1  = a.....b..............c.......
s2  = ..1.2.3......................
mm  = a...1...b...2...3....c....... // wanted
zip = a...1...b...2........c...3... // zipAsArray based

是的,我会遇到缓冲问题

…但是我会直截了当的不公平一个:

function greedyPromiseMap(streams,f) {
  Bacon.mergeAll(streams).flatMapConcat(function (x) {
    return Bacon.fromPromise(f(x));
  });
}

大理石图

s1    = a.........b..........c...
s2    = ..1.2.3..................
mm    = a...1...2...b...3....c...
merge = a...1...2...3...b....c...

解决方法

这是一个可以帮助的一大堆代码.

它将输入流转换为单个“值”事件流,然后将它们与“发送”事件(和“结束”事件合并)进行记帐.然后,使用状态机,它会从“值”事件中建立队列,并在“发送”事件上调度值.

本来我写了一个roundRobinThrottle,但我已经把它移到一个要点.

这是一个非常相似的roundRobinPromiseMap.要点中的代码已经过测试,但这不是.

# roundRobinPromiseMap :: (a -> Promise b) -> [EventStream] -> EventStream
roundRobinPromiseMap = (promiser,streams) ->
    # A bus to trigger new sends based on promise fulfillment
    promiseFulfilled = new Bacon.Bus()

    # Merge the input streams into a single,keyed stream
    theStream = Bacon.mergeAll(streams.map((s,idx) ->
        s.map((val) -> {
            type: 'value'
            index: idx
            value: val
        })
    ))
    # Merge in 'end' events
    .merge(Bacon.mergeAll(streams.map((s) ->
        s.mapEnd(-> {
            type: 'end'
        })
    )))
    # Merge in 'send' events that fire when the promise is fulfilled.
    .merge(promiseFulfilled.map({ type: 'send' }))
    # Feed into a state machine that keeps queues and only creates
    # output events on 'send' input events.
    .withStateMachine(
        {
            queues: streams.map(-> [])
            toPush: 0
            ended: 0
        }
        handleState

    )
    # Feed this output to the promiser
    theStream.onValue((value) ->
        Bacon.fromPromise(promiser(value)).onValue(->
            promiseFulfilled.push()
    ))

handleState = (state,baconEvent) ->
    outEvents = []

    if baconEvent.hasValue()
        # Handle a round robin event of 'value','send',or 'end'
        outEvents = handleRoundRobinEvent(state,baconEvent.value())
    else
        outEvents = [baconEvent]

    [state,outEvents]

handleRoundRobinEvent = (state,rrEvent) ->
    outEvents = []

    # 'value' : push onto queue
    if rrEvent.type == 'value'
        state.queues[rrEvent.index].push(rrEvent.value)
    # 'send' : send the next value by round-robin selection
    else if rrEvent.type == 'send'
        # Here's a sentinel for empty queues
        noValue = {}
        nextValue = noValue
        triedQueues = 0

        while nextValue == noValue && triedQueues < state.queues.length
            if state.queues[state.toPush].length > 0
                nextValue = state.queues[state.toPush].shift()
            state.toPush = (state.toPush + 1) % state.queues.length
            triedQueues++
        if nextValue != noValue
            outEvents.push(new Bacon.Next(nextValue))
    # 'end': Keep track of ended streams
    else if rrEvent.type == 'end'
        state.ended++

    # End the round-robin stream if all inputs have ended
    if roundRobinEnded(state)
        outEvents.push(new Bacon.End())

    outEvents

roundRobinEnded = (state) ->
    emptyQueues = allEmpty(state.queues)
    emptyQueues && state.ended == state.queues.length

allEmpty = (arrays) ->
    for a in arrays
        return false if a.length > 0
    return true

猜你在找的JavaScript相关文章