>有一个生产者(硬件设备)每秒生成1000个数据包.我需要将此数据记录到文件中.
>设备只有一个ReadAsync()方法一次报告一个数据包.
>我需要缓冲数据包并按照它们生成的顺序将它们写入文件,每秒只执行一次.
>如果写入过程没有在下一批数据包准备好写入时及时完成,则写操作应该失败.
到目前为止,我写了类似下面的内容.它有效,但我不确定这是否是解决问题的最佳方法.有任何意见或建议吗?在消费者需要汇总从生产者处收到的数据时,采用这种生产者/消费者问题的最佳做法是什么?
static async Task TestLogger(Device device,int seconds) { const int bufLength = 1000; bool firstIteration = true; Task writerTask = null; using (var writer = new StreamWriter("test.log"))) { do { var buffer = new byte[bufLength][]; for (int i = 0; i < bufLength; i++) { buffer[i] = await device.ReadAsync(); } if (!firstIteration) { if (!writerTask.IsCompleted) throw new Exception("Write Time Out!"); } writerTask = Task.Run(() => { foreach (var b in buffer) writer.WriteLine(ToHexString(b)); }); firstIteration = false; } while (--seconds > 0); } }
解决方法
AsyncProducerConsumerQueue<T>
特色.
AsyncProducerConsumerQueue<byte[]> _queue; Stream _stream; // producer async Task ReceiveAsync(CancellationToken token) { while (true) { var list = new List<byte>(); while (true) { token.ThrowIfCancellationRequested(token); var packet = await _device.ReadAsync(token); list.Add(packet); if (list.Count == 1000) break; } // push next batch await _queue.EnqueueAsync(list.ToArray(),token); } } // consumer async Task LogAsync(CancellationToken token) { Task prevIoUsFlush = Task.FromResult(0); CancellationTokenSource cts = null; while (true) { token.ThrowIfCancellationRequested(token); // get next batch var nextBatch = await _queue.DequeueAsync(token); if (!prevIoUsFlush.IsCompleted) { cts.Cancel(); // cancel the prevIoUs flush if not ready throw new Exception("Failed to flush on time."); } await prevIoUsFlush; // it's completed,observe for any errors // start flushing cts = CancellationTokenSource.CreateLinkedTokenSource(token); prevIoUsFlush = _stream.WriteAsync(nextBatch,nextBatch.Count,cts.Token); } }
如果您不想让记录器失败,而是希望取消刷新并继续下一批,则可以对此代码进行最小的更改.
回应@ l3arnon评论:
- A packet is not a byte,it’s byte[]. 2. You haven’t used the OP’s ToHexString. 3. AsyncProducerConsumerQueue is much less robust and
tested than .Net’s TPL Dataflow. 4. You await prevIoUsFlush for errors
just after you throw an exception which makes that line redundant.
etc. In short: I think the possible added value doesn’t justify this
very complicated solution.
>“数据包不是字节,它是字节[]” – 数据包是一个字节,这从OP的代码中很明显:buffer [i] = await device.ReadAsync().然后,一批数据包是byte [].
>“你还没有使用OP的ToHexString.” – 目标是展示如何使用Stream.WriteAsync本身接受取消令牌,而不是WriteLineAsync,它不允许取消.将ToHexString与Stream.WriteAsync一起使用并仍然利用取消支持是微不足道的:
var hexBytes = Encoding.ASCII.GetBytes(ToHexString(nextBatch) + Environment.NewLine); _stream.WriteAsync(hexBytes,hexBytes.Length,token);
>“AsyncProducerConsumerQueue的稳健性和测试性都不如.Net的TPL数据流” – 我不认为这是一个确定的事实.但是,如果OP关心它,他可以使用常规的BlockingCollection,它不会阻塞生产者线程.在等待下一批时阻止使用者线程是可以的,因为写入是并行完成的.与此相反,您的TPL Dataflow版本带有一个冗余cpu和锁密集操作:使用logAction.Post(数据包)将数据从生产者管道移动到写入器pipleline,逐字节.我的代码不这样做.>“在抛出导致该行冗余的异常之后,您等待prevIoUsFlush出错.” – 这条线不是多余的.也许,你错过了这一点:当prevIoUsFlush.IsFaulted或prevIoUsFlush.IsCancelled也为true时,prevIoUsFlush.IsCompleted可能为true.因此,等待prevIoUsFlush与之相关,以观察已完成任务上的任何错误(例如,写入失败),否则将丢失.