c# – 合并两个可观察数据,一个优先级高

前端之家收集整理的这篇文章主要介绍了c# – 合并两个可观察数据,一个优先级高前端之家小编觉得挺不错的,现在分享给大家,也给大家做个参考。
可以使用ReactiveExtensions来实现以下功能

>两个可观测量,一个是“高”优先级,另一个是“低”
将两个观察员合并成一个,然后可以订阅,意图是这个结果Observable将永远在任何低优先级之前发出高优先级项.

我明白,这可以使用两个ConcurrentQueue集合和这样的东西进行更简单的实现;

return this.highPriorityItems.TryDequeue(out item) 
    || this.lowPriorityItems.TryDequeue(out item);

但是这种方法有一些问题,就像没有“可预订”一样,Observable可以是相同的方式(所以一旦队列耗尽,处理就会结束而没有很多额外的功能可以把它推到任务中).

此外,我有兴趣在队列上应用一些额外的过滤器,如节流和“清除直到更改”,所以Rx看起来像这样自然.

解决方法

你所描述的当然是一个优先队列.

Rx是关于事件流,而不是队列.当然,在Rx中使用了很多队列,但是它们并不是第一类概念,而是Rx的概念的实现细节的更多部分.

我们需要队列的一个很好的例子是处理一个缓慢的观察者.事件在Rx中顺序调度,如果事件到达速度比观察者更快地处理它们,则必须对该观察者进行排队.如果有许多观察者,则必须维护多个逻辑队列,因为观察者可能会以不同的步伐进展 – 而Rx选择不使其保持锁定.

“背压”是观察者向观察者提供反馈的概念,以便允许机制处理更快的可观察的压力 – 例如混合或节流. Rx没有一种引入背压的一种方法 – 只有内置的方法是监视观察者的观察者才是通过OnNext的同步特性.任何其他机制都需要脱机.你的问题直接与背压有关,因为在慢观察者的情况下才是有关的.

我提到这一切是为了提供证据表明,Rx不是提供您正在寻找的那种优先调度的绝佳选择 – 实际上,一流的排队机制似乎更适合.

解决手头的问题,您需要在自定义运算符中管理优先级排队.重述问题:您所说的是,如果事件在观察者处理OnNext事件期间到达,以便发生事件的建立,而不是Rx使用的典型FIFO队列,则要基于优先级调度.

值得注意的是,根据Rx不会将多个观察者保持在锁定步骤的精神,并发观察者可能会以不同的顺序看到事件,这可能或可能不是您的问题.您可以使用像Publish这样的机制来获得订单一致性 – 但是您可能不希望这样做,因为事件传递的时间在这种情况下会变得非常不可预测,效率低下.

我相信有更好的方法可以做到这一点,但是这里是一个基于优先级队列的交付的例子 – 你可以使用更好的队列实现来扩展这个功能,以支持多个流和优先级(甚至每个事件的优先级)例如基于b-tree的优先级队列),但是我选择保持这个很简单.即使这样,请注意代码必须解决的重大问题,围绕错误处理,完成等 – 我已经做出选择,说明这些信号是否有很多其他有效的选择.

总而言之,这个实现肯定让我不了解这一点.这是很复杂的,这里可能有bug.正如我所说,可能有更好的代码(特别是给我的最小的努力,我已经投入使用),但在概念上,我不舒服的想法,无论实现:

public static class ObservableExtensions
{
    public static IObservable<TSource> MergeWithLowPriorityStream<TSource>(
        this IObservable<TSource> source,IObservable<TSource> lowPriority,IScheduler scheduler = null)
    {    
        scheduler = scheduler ?? Scheduler.Default;
        return Observable.Create<TSource>(o => {    
            // BufferBlock from TPL dataflow is used as it is
            // handily awaitable. package: Microsoft.Tpl.Dataflow        
            var loQueue = new BufferBlock<TSource>();
            var hiQueue = new BufferBlock<TSource>();
            var errorQueue = new BufferBlock<Exception>();
            var done = new TaskCompletionSource<int>();
            int doneCount = 0;
            Action incDone = () => {
                var dc = Interlocked.Increment(ref doneCount);
                if(dc == 2)
                    done.SetResult(0);
            };
            source.Subscribe(
                x => hiQueue.Post(x),e => errorQueue.Post(e),incDone);
            lowPriority.Subscribe(
                x => loQueue.Post(x),incDone);
            return scheduler.ScheduleAsync(async(ctrl,ct) => {
                while(!ct.IsCancellationRequested)
                {
                    TSource nextItem;
                    if(hiQueue.TryReceive(out nextItem)
                      || loQueue.TryReceive(out nextItem))
                        o.OnNext(nextItem);

                    else if(done.Task.IsCompleted)
                    {
                        o.OnCompleted();
                        return;
                    }

                    Exception error;                        
                    if(errorQueue.TryReceive(out error))
                    {
                        o.OnError(error);
                        return;
                    }

                    var hiAvailableAsync = hiQueue.OutputAvailableAsync(ct);    
                    var loAvailableAsync = loQueue.OutputAvailableAsync(ct);                    
                    var errAvailableAsync =
                        errorQueue.OutputAvailableAsync(ct);
                    await Task.WhenAny(
                        hiAvailableAsync,loAvailableAsync,errAvailableAsync,done.Task);
                }
            });
        });
    }
}

和示例用法

void static Main()
{
    var xs = Observable.Range(0,3);
    var ys = Observable.Range(10,3);

    var source = ys.MergeWithLowPriorityStream(xs);

    source.Subscribe(Console.WriteLine,() => Console.WriteLine("Done"));
}

这将首先打印出y的元素,表明它们的优先级.

猜你在找的C#相关文章