我希望能够做的是按时间戳的顺序将每个事件放在总线上,即放置A1,然后放入B1,B2,A3等.此外,由于某些流具有多个具有相同时间戳的(顺序)元素,因此我希望将这些元素分组,以便每个新事件都是一个数组.所以我们将[A3]放在总线上,然后是[A15,A25],依此类推.
我试图通过创建两个ConcurrentQueue结构来实现这一点,将每个事件放在队列的后面,然后查看队列的每个前端,首先选择前面的事件,然后遍历队列,以便具有此时间戳的所有事件都是当下.
但是,我遇到了两个问题:
>如果我让这些队列无限制,我会快速耗尽内存,因为读取操作比接收事件的处理程序快得多. (我有几千兆字节的数据).
>我有时会遇到一个情况,我会在A25到来之前处理A15事件.我不知何故需要防范这一点.
我认为Rx可以在这方面提供帮助,但我没有看到明显的组合使这成为可能.因此,非常感谢任何建议.
解决方法
由于显而易见的原因IObservables不能’OrderBy'(你必须首先观察整个流以保证正确的输出顺序),所以我的答案做出了假设(你说过)你的2个源事件流是有序的.
这最终是一个有趣的问题.标准的Rx运算符缺少一个可以轻松解决这个问题的GroupByUntilChanged,只要在观察到下一组的第一个元素时它在前一个组上调用OnComplete是可观察的.然而,看看DistinctUntilChanged的实现,它不遵循这种模式,只在源observable完成时调用OnComplete(即使它知道在第一个非不同元素之后将没有更多的元素……很奇怪???).无论如何,出于这些原因,我决定采用GroupByUntilChanged方法(不破坏Rx约定)而是转而使用ToEnumerableUntilChanged.
免责声明:这是我的第一个Rx扩展,所以我希望得到有关我的选择的反馈.此外,我的一个主要问题是持有distinctElements列表的匿名观察.
首先,您的应用程序代码非常简单:
public class Event { public DateTime Timestamp { get; set; } } private IObservable<Event> eventStream1; private IObservable<Event> eventStream2; public IObservable<IEnumerable<Event>> CombineAndGroup() { return eventStream1.CombineLatest(eventStream2,(e1,e2) => e1.Timestamp < e2.Timestamp ? e1 : e2) .ToEnumerableUntilChanged(e => e.Timestamp); }
现在为ToEnumerableUntilChanged实现(代码墙警告):
public static IObservable<IEnumerable<TSource>> ToEnumerableUntilChanged<TSource,TKey>(this IObservable<TSource> source,Func<TSource,TKey> keySelector) { // TODO: Follow Rx conventions and create a superset overload that takes the IComparer as a parameter var comparer = EqualityComparer<TKey>.Default; return Observable.Create<IEnumerable<TSource>>(observer => { var currentKey = default(TKey); var hasCurrentKey = false; var distinctElements = new List<TSource>(); return source.Subscribe((value => { TKey elementKey; try { elementKey = keySelector(value); } catch (Exception ex) { observer.OnError(ex); return; } if (!hasCurrentKey) { hasCurrentKey = true; currentKey = elementKey; distinctElements.Add(value); return; } bool keysMatch; try { keysMatch = comparer.Equals(currentKey,elementKey); } catch (Exception ex) { observer.OnError(ex); return; } if (keysMatch) { distinctElements.Add(value); return; } observer.OnNext( distinctElements); distinctElements.Clear(); distinctElements.Add(value); currentKey = elementKey; }),observer.OnError,() => { if (distinctElements.Count > 0) observer.OnNext(distinctElements); observer.OnCompleted(); }); }); }