我坚持使用rx和特定的查询.
问题:
问题:
Many single update operations are produced by continuous stream. The operations can be insert or delete. I want to buffer those streams and perform few operations at the time,but it is really important to preserve the order. Additionally,operations should be buffered and done in sequences every X seconds
例:
在:
insert-insert-insert-delete-delete-insert-delete-delete-delete-delete
日期:
insert(3)-delete(2)-insert(1)-delete(4)
我写了一个简单的应用程序来测试它,它或多或少地工作,但它不尊重传入插入/删除的顺序
namespace RxTests { using System; using System.Collections.Generic; using System.Globalization; using System.Linq; using System.Reactive.Concurrency; using System.Reactive.Linq; using System.Reactive.Subjects; using System.Text; using System.Threading; internal class Program { private static readonly Random Random = new Random(); private static readonly CancellationTokenSource ProducerStopped = new CancellationTokenSource(); private static readonly ISubject<UpdateOperation> operations = new Subject<UpdateOperation>(); private static void Main(string[] args) { Console.WriteLine("Starting production"); var producerScheduler = new EventLoopScheduler(); var consumerScheduler = new EventLoopScheduler(); var producer = Observable.Interval(TimeSpan.FromSeconds(2)) .SubscribeOn(producerScheduler) .Subscribe(Produce,WriteProductionCompleted); var consumer = operations.ObserveOn(producerScheduler) .GroupBy(operation => operation.Delete) .SelectMany(observable => observable.Buffer(TimeSpan.FromSeconds(8),50)) .SubscribeOn(consumerScheduler) .Subscribe(WriteUpdateOperations); Console.WriteLine("Type any key to stop"); Console.ReadKey(); consumer.Dispose(); producer.Dispose(); } private static void Produce(long time) { var delete = Random.NextDouble() < 0.5; Console.WriteLine("Produce {0},{1} at {2}",time + 1,delete,time); var idString = (time + 1).ToString(CultureInfo.InvariantCulture); var id = time + 1; operations.OnNext( new UpdateOperation(id,idString,time.ToString(CultureInfo.InvariantCulture))); } private static void WriteProductionCompleted() { Console.WriteLine("Production completed"); ProducerStopped.Cancel(); } private static void WriteUpdateOperation(UpdateOperation updateOperation) { Console.WriteLine("Consuming {0}",updateOperation); } private static void WriteUpdateOperations(IList<UpdateOperation> updateOperation) { foreach (var operation in updateOperation) { WriteUpdateOperation(operation); } } private class UpdateOperation { public UpdateOperation(long id,bool delete,params string[] changes) { this.Id = id; this.Delete = delete; this.Changes = new List<string>(changes ?? Enumerable.Empty<string>()); } public bool Delete { get; set; } public long Id { get; private set; } public IList<string> Changes { get; private set; } public override string ToString() { var stringBuilder = new StringBuilder("{UpdateOperation "); stringBuilder.AppendFormat("Id: {0},Delete: {1},Changes: [",this.Id,this.Delete); if (this.Changes.Count > 0) { stringBuilder.Append(this.Changes.First()); foreach (var change in this.Changes.Skip(1)) { stringBuilder.AppendFormat(",{0}",change); } } stringBuilder.Append("]}"); return stringBuilder.ToString(); } } }
}
任何人都可以帮助我正确的查询?
谢谢
更新08.03.13(JerKimball的建议)
以下几行是对JerKimball代码的小改动/补充,用于打印结果:
using(query.Subscribe(Print)) { Console.ReadLine(); producer.Dispose(); }
使用以下打印方法:
private static void Print(IObservable<IList<Operation>> operations) { operations.Subscribe(Print); } private static void Print(IList<Operation> operations) { var stringBuilder = new StringBuilder("["); if (operations.Count > 0) { stringBuilder.Append(operations.First()); foreach (var item in operations.Skip(1)) { stringBuilder.AppendFormat(",item); } } stringBuilder.Append("]"); Console.WriteLine(stringBuilder); }
以及要操作的字符串:
public override string ToString() { return string.Format("{0}:{1}",this.Type,this.Seq); }
订单保留,但是:
>我不确定在另一个订阅中订阅:它是否正确(这是我很久以前的一个问题,我从来都不清楚)?
>我在每个列表上总是不超过两个元素(即使流生成两个以上具有相同类型的连续值)
解决方法
让我们尝试一种新方法(因此新答案):
首先,让我们定义一个扩展方法,该方法将根据键“折叠”项目列表,同时保留顺序:
public static class Ext { public static IEnumerable<List<T>> ToRuns<T,TKey>( this IEnumerable<T> source,Func<T,TKey> keySelector) { using (var enumerator = source.GetEnumerator()) { if (!enumerator.MoveNext()) yield break; var currentSet = new List<T>(); // inspect the first item var lastKey = keySelector(enumerator.Current); currentSet.Add(enumerator.Current); while (enumerator.MoveNext()) { var newKey = keySelector(enumerator.Current); if (!Equals(newKey,lastKey)) { // A difference == new run; return what we've got thus far yield return currentSet; lastKey = newKey; currentSet = new List<T>(); } currentSet.Add(enumerator.Current); } // Return the last run. yield return currentSet; // and clean up currentSet = new List<T>(); lastKey = default(TKey); } } }
相当简单 – 给定IEnumerable< T>,将返回List< List< T>>每个子列表将具有相同的密钥.
现在,喂它并使用它:
var rnd = new Random(); var fakeSource = new Subject<Operation>(); var producer = Observable .Interval(TimeSpan.FromMilliseconds(1000)) .Subscribe(i => { var op = new Operation(); op.Type = rnd.NextDouble() < 0.5 ? "insert" : "delete"; fakeSource.OnNext(op); }); var singleSource = fakeSource .Publish().RefCount(); var query = singleSource // change this value to alter your "look at" time window .Buffer(TimeSpan.FromSeconds(5)) .Select(buff => buff.ToRuns(op => op.Type).Where(run => run.Count > 0)); using(query.Subscribe(batch => { foreach(var item in batch) { Console.WriteLine("{0}({1})",item.First().Type,item.Count); } })) { Console.ReadLine(); producer.Dispose(); }
给它一个旋转 – 这是我在典型的运行中看到的:
insert(4) delete(2) insert(1) delete(1) insert(1) insert(1) delete(1) insert(1) delete(2) delete(2) insert(2) delete(1) insert(1) delete(2) insert(2)