>两个可观测量,一个是“高”优先级,另一个是“低”
将两个观察员合并成一个,然后可以订阅,意图是这个结果Observable将永远在任何低优先级之前发出高优先级项.
我明白,这可以使用两个ConcurrentQueue集合和这样的东西进行更简单的实现;
return this.highPriorityItems.TryDequeue(out item) || this.lowPriorityItems.TryDequeue(out item);
但是这种方法有一些问题,就像没有“可预订”一样,Observable可以是相同的方式(所以一旦队列耗尽,处理就会结束而没有很多额外的功能可以把它推到任务中).
此外,我有兴趣在队列上应用一些额外的过滤器,如节流和“清除直到更改”,所以Rx看起来像这样自然.
解决方法
Rx是关于事件流,而不是队列.当然,在Rx中使用了很多队列,但是它们并不是第一类概念,而是Rx的概念的实现细节的更多部分.
我们需要队列的一个很好的例子是处理一个缓慢的观察者.事件在Rx中顺序调度,如果事件到达速度比观察者更快地处理它们,则必须对该观察者进行排队.如果有许多观察者,则必须维护多个逻辑队列,因为观察者可能会以不同的步伐进展 – 而Rx选择不使其保持锁定.
“背压”是观察者向观察者提供反馈的概念,以便允许机制处理更快的可观察的压力 – 例如混合或节流. Rx没有一种引入背压的一种方法 – 只有内置的方法是监视观察者的观察者才是通过OnNext的同步特性.任何其他机制都需要脱机.你的问题直接与背压有关,因为在慢观察者的情况下才是有关的.
我提到这一切是为了提供证据表明,Rx不是提供您正在寻找的那种优先调度的绝佳选择 – 实际上,一流的排队机制似乎更适合.
要解决手头的问题,您需要在自定义运算符中管理优先级排队.重述问题:您所说的是,如果事件在观察者处理OnNext事件期间到达,以便发生事件的建立,而不是Rx使用的典型FIFO队列,则要基于优先级调度.
值得注意的是,根据Rx不会将多个观察者保持在锁定步骤的精神,并发观察者可能会以不同的顺序看到事件,这可能或可能不是您的问题.您可以使用像Publish这样的机制来获得订单一致性 – 但是您可能不希望这样做,因为事件传递的时间在这种情况下会变得非常不可预测,效率低下.
我相信有更好的方法可以做到这一点,但是这里是一个基于优先级队列的交付的例子 – 你可以使用更好的队列实现来扩展这个功能,以支持多个流和优先级(甚至每个事件的优先级)例如基于b-tree的优先级队列),但是我选择保持这个很简单.即使这样,请注意代码必须解决的重大问题,围绕错误处理,完成等 – 我已经做出选择,说明这些信号是否有很多其他有效的选择.
总而言之,这个实现肯定让我不了解这一点.这是很复杂的,这里可能有bug.正如我所说,可能有更好的代码(特别是给我的最小的努力,我已经投入使用),但在概念上,我不舒服的想法,无论实现:
public static class ObservableExtensions { public static IObservable<TSource> MergeWithLowPriorityStream<TSource>( this IObservable<TSource> source,IObservable<TSource> lowPriority,IScheduler scheduler = null) { scheduler = scheduler ?? Scheduler.Default; return Observable.Create<TSource>(o => { // BufferBlock from TPL dataflow is used as it is // handily awaitable. package: Microsoft.Tpl.Dataflow var loQueue = new BufferBlock<TSource>(); var hiQueue = new BufferBlock<TSource>(); var errorQueue = new BufferBlock<Exception>(); var done = new TaskCompletionSource<int>(); int doneCount = 0; Action incDone = () => { var dc = Interlocked.Increment(ref doneCount); if(dc == 2) done.SetResult(0); }; source.Subscribe( x => hiQueue.Post(x),e => errorQueue.Post(e),incDone); lowPriority.Subscribe( x => loQueue.Post(x),incDone); return scheduler.ScheduleAsync(async(ctrl,ct) => { while(!ct.IsCancellationRequested) { TSource nextItem; if(hiQueue.TryReceive(out nextItem) || loQueue.TryReceive(out nextItem)) o.OnNext(nextItem); else if(done.Task.IsCompleted) { o.OnCompleted(); return; } Exception error; if(errorQueue.TryReceive(out error)) { o.OnError(error); return; } var hiAvailableAsync = hiQueue.OutputAvailableAsync(ct); var loAvailableAsync = loQueue.OutputAvailableAsync(ct); var errAvailableAsync = errorQueue.OutputAvailableAsync(ct); await Task.WhenAny( hiAvailableAsync,loAvailableAsync,errAvailableAsync,done.Task); } }); }); } }
和示例用法:
void static Main() { var xs = Observable.Range(0,3); var ys = Observable.Range(10,3); var source = ys.MergeWithLowPriorityStream(xs); source.Subscribe(Console.WriteLine,() => Console.WriteLine("Done")); }
这将首先打印出y的元素,表明它们的优先级.