我有一个IObservable< IObservable< T>>其中每个内部IObservable< T>是一个值流,后跟最终的OnCompleted事件.
我想将其转换为IObservable< IEnumerable< T>>,这是一个由未完成的任何内部流的最新值组成的流.它应该产生一个新的IEnumerable< T>每当从一个内部流(或内部流到期)产生新值时
最容易用大理石图显示(我希望它足够全面):
input ---.----.---.---------------- | | '-f-----g-| | 'd------e---------| 'a--b----c-----| result ---a--b-b--c-c-c-e-e-e---[]- d d d e f g f f
([]是一个空的IEnumerable< T>而 – |代表OnCompleted)
您可以看到它略微类似于CombineLatest操作.
我一直在玩Join和GroupJoin无济于事,但我觉得这几乎肯定是正确的方向.
我想在这个运算符中使用尽可能少的状态.
更新
我已经更新了这个问题,不仅包括单值序列 – 结果IObservable< IEnumerable< T>>应该只包含每个序列的最新值 – 如果序列没有产生值,则不应包括它.
解决方法
这是昨天基于您的解决方案的版本,针对新要求进行了调整.基本思路是将引用放入易腐变集合中,然后在内部序列生成新值时更新引用的值.
我还修改了正确跟踪内部订阅并取消订阅外部observable是否取消订阅.
最后,我修正了一些可能违反Rx指南的竞争条件.如果你的内部observable是从不同的线程同时触发,你可以同时调用obs.OnNext,这是一个很大的禁忌.所以我使用相同的锁来控制每个内部observable以防止这种情况(参见同步调用).请注意,因此,您可能会使用常规双链表而不是PerishableCollection,因为现在使用该集合的所有代码都在一个锁中,因此您不需要PerishableCollection的线程保证.
// Acts as a reference to the current value stored in the list private class BoxedValue<T> { public T Value; public BoxedValue(T initialValue) { Value = initialValue; } } public static IObservable<IEnumerable<T>> MergeLatest<T>(this IObservable<IObservable<T>> source) { return Observable.Create<IEnumerable<T>>(obs => { var collection = new PerishableCollection<BoxedValue<T>>(); var outerSubscription = new SingleAssignmentDisposable(); var subscriptions = new CompositeDisposable(outerSubscription); var innerLock = new object(); outerSubscription.Disposable = source.Subscribe(duration => { BoxedValue<T> value = null; var lifetime = new DisposableLifetime(); // essentially a CancellationToken var subscription = new SingleAssignmentDisposable(); subscriptions.Add(subscription); subscription.Disposable = duration.Synchronize(innerLock) .Subscribe( x => { if (value == null) { value = new BoxedValue<T>(x); collection.Add(value,lifetime.Lifetime); } else { value.Value = x; } obs.OnNext(collection.CurrentItems().Select(p => p.Value.Value)); },obs.OnError,// handle an error in the stream. () => // on complete { if (value != null) { lifetime.Dispose(); // removes the item obs.OnNext(collection.CurrentItems().Select(p => p.Value.Value)); subscriptions.Remove(subscription); // remove this subscription } } ); }); return subscriptions; }); }