我希望能够做的是按时间戳的顺序将每个事件放在总线上,即放置A1,然后放入B1,B2,A3等.此外,由于某些流具有多个具有相同时间戳的(顺序)元素,因此我希望将这些元素分组,以便每个新事件都是一个数组.所以我们将[A3]放在总线上,然后是[A15,A25],依此类推.
我试图通过创建两个ConcurrentQueue结构来实现这一点,将每个事件放在队列的后面,然后查看队列的每个前端,首先选择前面的事件,然后遍历队列,以便具有此时间戳的所有事件都是当下.
但是,我遇到了两个问题:
>如果我让这些队列无限制,我会快速耗尽内存,因为读取操作比接收事件的处理程序快得多. (我有几千兆字节的数据).
>我有时会遇到一个情况,我会在A25到来之前处理A15事件.我不知何故需要防范这一点.
我认为Rx可以在这方面提供帮助,但我没有看到明显的组合使这成为可能.因此,非常感谢任何建议.
解决方法
由于显而易见的原因IObservables不能’OrderBy'(你必须首先观察整个流以保证正确的输出顺序),所以我的答案做出了假设(你说过)你的2个源事件流是有序的.
这最终是一个有趣的问题.标准的Rx运算符缺少一个可以轻松解决这个问题的GroupByUntilChanged,只要在观察到下一组的第一个元素时它在前一个组上调用OnComplete是可观察的.然而,看看DistinctUntilChanged的实现,它不遵循这种模式,只在源observable完成时调用OnComplete(即使它知道在第一个非不同元素之后将没有更多的元素……很奇怪???).无论如何,出于这些原因,我决定采用GroupByUntilChanged方法(不破坏Rx约定)而是转而使用ToEnumerableUntilChanged.
免责声明:这是我的第一个Rx扩展,所以我希望得到有关我的选择的反馈.此外,我的一个主要问题是持有distinctElements列表的匿名观察.
首先,您的应用程序代码非常简单:
- public class Event
- {
- public DateTime Timestamp { get; set; }
- }
- private IObservable<Event> eventStream1;
- private IObservable<Event> eventStream2;
- public IObservable<IEnumerable<Event>> CombineAndGroup()
- {
- return eventStream1.CombineLatest(eventStream2,(e1,e2) => e1.Timestamp < e2.Timestamp ? e1 : e2)
- .ToEnumerableUntilChanged(e => e.Timestamp);
- }
现在为ToEnumerableUntilChanged实现(代码墙警告):
- public static IObservable<IEnumerable<TSource>> ToEnumerableUntilChanged<TSource,TKey>(this IObservable<TSource> source,Func<TSource,TKey> keySelector)
- {
- // TODO: Follow Rx conventions and create a superset overload that takes the IComparer as a parameter
- var comparer = EqualityComparer<TKey>.Default;
- return Observable.Create<IEnumerable<TSource>>(observer =>
- {
- var currentKey = default(TKey);
- var hasCurrentKey = false;
- var distinctElements = new List<TSource>();
- return source.Subscribe((value =>
- {
- TKey elementKey;
- try
- {
- elementKey = keySelector(value);
- }
- catch (Exception ex)
- {
- observer.OnError(ex);
- return;
- }
- if (!hasCurrentKey)
- {
- hasCurrentKey = true;
- currentKey = elementKey;
- distinctElements.Add(value);
- return;
- }
- bool keysMatch;
- try
- {
- keysMatch = comparer.Equals(currentKey,elementKey);
- }
- catch (Exception ex)
- {
- observer.OnError(ex);
- return;
- }
- if (keysMatch)
- {
- distinctElements.Add(value);
- return;
- }
- observer.OnNext( distinctElements);
- distinctElements.Clear();
- distinctElements.Add(value);
- currentKey = elementKey;
- }),observer.OnError,() =>
- {
- if (distinctElements.Count > 0)
- observer.OnNext(distinctElements);
- observer.OnCompleted();
- });
- });
- }