c# – “合并”流的流以生成每个流的最新值的流

前端之家收集整理的这篇文章主要介绍了c# – “合并”流的流以生成每个流的最新值的流前端之家小编觉得挺不错的,现在分享给大家,也给大家做个参考。
我有一个IObservable< IObservable< T>>其中每个内部IObservable< T>是一个值流,后跟最终的OnCompleted事件.

我想将其转换为IObservable< IEnumerable< T>>,这是一个由未完成的任何内部流的最新值组成的流.它应该产生一个新的IEnumerable< T>每当从一个内部流(或内部流到期)产生新值时

最容易用大理石图显示(我希望它足够全面):

input ---.----.---.----------------
         |    |   '-f-----g-|      
         |    'd------e---------|
         'a--b----c-----|          

result ---a--b-b--c-c-c-e-e-e---[]-
               d  d d e f g        
                    f f

([]是一个空的IEnumerable< T>而 – |代表OnCompleted)

您可以看到它略微类似于CombineLatest操作.
我一直在玩Join和GroupJoin无济于事,但我觉得这几乎肯定是正确的方向.

我想在这个运算符中使用尽可能少的状态.

更新

我已经更新了这个问题,不仅包括单值序列 – 结果IObservable< IEnumerable< T>>应该只包含每个序列的最新值 – 如果序列没有产生值,则不应包括它.

解决方法

这是昨天基于您的解决方案的版本,针对新要求进行了调整.基本思路是将引用放入易腐变集合中,然后在内部序列生成新值时更新引用的值.

我还修改了正确跟踪内部订阅并取消订阅外部observable是否取消订阅.

如果任何流产生错误,也进行修改以将其全部拆除.

最后,我修正了一些可能违反Rx指南的竞争条件.如果你的内部observable是从不同的线程同时触发,你可以同时调用obs.OnNext,这是一个很大的禁忌.所以我使用相同的锁来控制每个内部observable以防止这种情况(参见同步调用).请注意,因此,您可能会使用常规双链表而不是PerishableCollection,因为现在使用该集合的所有代码都在一个锁中,因此您不需要PerishableCollection的线程保证.

// Acts as a reference to the current value stored in the list
private class BoxedValue<T>
{
    public T Value;
    public BoxedValue(T initialValue) { Value = initialValue; }
}

public static IObservable<IEnumerable<T>> MergeLatest<T>(this IObservable<IObservable<T>> source)
{
    return Observable.Create<IEnumerable<T>>(obs =>
    {
        var collection = new PerishableCollection<BoxedValue<T>>();
        var outerSubscription = new SingleAssignmentDisposable();
        var subscriptions = new CompositeDisposable(outerSubscription);
        var innerLock = new object();

        outerSubscription.Disposable = source.Subscribe(duration =>
        {
            BoxedValue<T> value = null;
            var lifetime = new DisposableLifetime(); // essentially a CancellationToken
            var subscription = new SingleAssignmentDisposable();

            subscriptions.Add(subscription);
            subscription.Disposable = duration.Synchronize(innerLock)
                .Subscribe(
                    x =>
                    {
                        if (value == null)
                        {
                            value = new BoxedValue<T>(x);
                            collection.Add(value,lifetime.Lifetime);
                        }
                        else
                        {
                            value.Value = x;
                        }
                        obs.OnNext(collection.CurrentItems().Select(p => p.Value.Value));
                    },obs.OnError,// handle an error in the stream.
                    () => // on complete
                    {
                        if (value != null)
                        {
                            lifetime.Dispose(); // removes the item
                            obs.OnNext(collection.CurrentItems().Select(p => p.Value.Value));
                            subscriptions.Remove(subscription); // remove this subscription
                        }
                    }
            );
        });

        return subscriptions;
    });
}
原文链接:https://www.f2er.com/csharp/99530.html

猜你在找的C#相关文章