c# – 如何组合两个按时间戳分组的流?

前端之家收集整理的这篇文章主要介绍了c# – 如何组合两个按时间戳分组的流?前端之家小编觉得挺不错的,现在分享给大家,也给大家做个参考。
我有两个对象流,每个对象都有一个Timestamp值.两个流都是有序的,因此例如时间戳可以是一个流中的Ta = 1,3,6,7而另一个流中的Tb = 1,2,5,8.两个流中的对象属于同一类型.

我希望能够做的是按时间戳的顺序将每个事件放在总线上,即放置A1,然后放入B1,B2,A3等.此外,由于某些流具有多个具有相同时间戳的(顺序)元素,因此我希望将这些元素分组,以便每个新事件都是一个数组.所以我们将[A3]放在总线上,然后是[A15,A25],依此类推.

我试图通过创建两个ConcurrentQueue结构来实现这一点,将每个事件放在队列的后面,然后查看队列的每个前端,首先选择前面的事件,然后遍历队列,以便具有此时间戳的所有事件都是当下.

但是,我遇到了两个问题:

>如果我让这些队列无限制,我会快速耗尽内存,因为读取操作比接收事件的处理程序快得多. (我有几千兆字节的数据).
>我有时会遇到一个情况,我会在A25到来之前处理A15事件.我不知何故需要防范这一点.

我认为Rx可以在这方面提供帮助,但我没有看到明显的组合使这成为可能.因此,非常感谢任何建议.

解决方法

Rx确实非常适合IMO的这个问题.

由于显而易见的原因IObservables不能’OrderBy'(你必须首先观察整个流以保证正确的输出顺序),所以我的答案做出了假设(你说过)你的2个源事件流是有序的.

这最终是一个有趣的问题.标准的Rx运算符缺少一个可以轻松解决这个问题的GroupByUntilChanged,只要在观察到下一组的第一个元素时它在前一个组上调用OnComplete是可观察的.然而,看看DistinctUntilChanged的实现,它不遵循这种模式,只在源observable完成时调用OnComplete(即使它知道在第一个非不同元素之后将没有更多的元素……很奇怪???).无论如何,出于这些原因,我决定采用GroupByUntilChanged方法(不破坏Rx约定)而是转而使用ToEnumerableUntilChanged.

免责声明:这是我的第一个Rx扩展,所以我希望得到有关我的选择的反馈.此外,我的一个主要问题是持有distinctElements列表的匿名观察.

首先,您的应用程序代码非常简单:

  1. public class Event
  2. {
  3. public DateTime Timestamp { get; set; }
  4. }
  5.  
  6. private IObservable<Event> eventStream1;
  7. private IObservable<Event> eventStream2;
  8.  
  9. public IObservable<IEnumerable<Event>> CombineAndGroup()
  10. {
  11. return eventStream1.CombineLatest(eventStream2,(e1,e2) => e1.Timestamp < e2.Timestamp ? e1 : e2)
  12. .ToEnumerableUntilChanged(e => e.Timestamp);
  13. }

现在为ToEnumerableUntilChanged实现(代码墙警告):

  1. public static IObservable<IEnumerable<TSource>> ToEnumerableUntilChanged<TSource,TKey>(this IObservable<TSource> source,Func<TSource,TKey> keySelector)
  2. {
  3. // TODO: Follow Rx conventions and create a superset overload that takes the IComparer as a parameter
  4. var comparer = EqualityComparer<TKey>.Default;
  5.  
  6. return Observable.Create<IEnumerable<TSource>>(observer =>
  7. {
  8. var currentKey = default(TKey);
  9. var hasCurrentKey = false;
  10. var distinctElements = new List<TSource>();
  11.  
  12. return source.Subscribe((value =>
  13. {
  14. TKey elementKey;
  15. try
  16. {
  17. elementKey = keySelector(value);
  18. }
  19. catch (Exception ex)
  20. {
  21. observer.OnError(ex);
  22. return;
  23. }
  24.  
  25. if (!hasCurrentKey)
  26. {
  27. hasCurrentKey = true;
  28. currentKey = elementKey;
  29. distinctElements.Add(value);
  30. return;
  31. }
  32.  
  33. bool keysMatch;
  34. try
  35. {
  36. keysMatch = comparer.Equals(currentKey,elementKey);
  37. }
  38. catch (Exception ex)
  39. {
  40. observer.OnError(ex);
  41. return;
  42. }
  43.  
  44. if (keysMatch)
  45. {
  46. distinctElements.Add(value);
  47. return;
  48. }
  49.  
  50. observer.OnNext( distinctElements);
  51.  
  52. distinctElements.Clear();
  53. distinctElements.Add(value);
  54. currentKey = elementKey;
  55.  
  56. }),observer.OnError,() =>
  57. {
  58. if (distinctElements.Count > 0)
  59. observer.OnNext(distinctElements);
  60.  
  61. observer.OnCompleted();
  62. });
  63. });
  64. }

猜你在找的C#相关文章