Task1:从二进制文件读取byte []批次,并为每个字节数组集合启动一个新任务. (操作批量用于内存管理目的).
任务2-n:这些是工作任务,每个操作都是通过字节数组的传入集合(来自Tasks1)进行操作,并对字节数组进行序列化,按照某些条件进行排序,然后存储一个结果对象的集合(每个字节数组并行字典反序列化成这样的对象)
任务(n 1)我选择了一个并发字典,因为这个任务的任务是将并行字典中存储的那些集合按照与从Task1起源的顺序相同的顺序进行合并.我通过传递一个collectionID(对于Task1中的每个新集合都是int类型,并且递增)一直从Task1到此任务来实现.此任务基本检查下一个预期的collectionID是否已经存储在并发字典中,如果是,则将其添加到Final Queue并检查并发字典中的下一个集合.
现在,从我所看到的和我看过的视频来看,TPL Dataflow似乎是这种制作人/消费者模型的完美选择.我似乎无法设计出一个设计,因此我从来没有使用过TPL Dataflow.在吞吐量和延迟方面,这个库甚至可以完成任务?我目前在最终的集合中处理了250万字节的数组,从而处理每秒的对象. TPL数据流可以帮助简化吗?我对以下问题的答案特别感兴趣:TPL数据流可以在任务1中生成工作任务时保留收集批次的顺序,并在工作任务完成工作后重新合并它能优化事情吗?对于整个结构的剖析,我觉得由于旋转而浪费了很多时间,并且涉及太多并发收藏.
任何想法,想法?
解决方法
原来的答案:
据我所知,.Net中只有一个并行构造支持按照他们进入的顺序返回处理的项目:PLINQ with AsOrdered()
.但是在我看来,PLINQ不适合你想要的.
另一方面,TPL Dataflow很适合,但是它并没有一个可以同时支持并行和返回项目的块(TransformBlock支持它们,而不是在同一时间).幸运的是,数据流块的设计考虑到可组合性,因此我们可以构建我们自己的块.
但首先,我们必须弄清楚如何排序结果.使用一个并行字典,就像你所建议的一样,以及一些同步机制,一定会奏效.但我认为有一个更简单的解决方案:使用任务队列.在输出任务中,您将出现一个Task,等待它完成(异步),当它发生时,会发送其结果.当队列为空时,我们仍然需要一些同步,但如果我们选择使用巧妙的队列,我们可以免费获得同步.
所以,一般的想法是这样的:我们正在写的是一个IPropagatorBlock,有一些输入和一些输出.创建自定义IPropagatorBlock的最简单方法是创建一个处理输入的块,另一个生成结果的块将其视为使用DataflowBlock.Encapsulate()
处理的块.
输入块将必须以正确的顺序处理传入的项目,因此在那里没有并行化.它将创建一个新的任务(实际上是一个TaskCompletionSource
,以便我们稍后可以设置任务的结果),将其添加到队列中,然后发送项目进行处理,以及一些方法来设置正确任务的结果.因为我们不需要链接这个块到任何东西,我们可以使用一个ActionBlock.
输出块将不得不从队列中取出任务,异步等待它们,然后发送它们.但是由于所有块都嵌入在其中,并且采用代理的块具有异步等待内置,这将非常简单:新的TransformBlock< Task& TOutput>,TOutput>(t => t).该块将同时用作队列和输出块.因此,我们不必处理任何同步.
最后一块拼图实际上是并行处理物品.为此,我们可以使用另一个ActionBlock,这次用MaxDegreeOfParallelism设置.它将采取输入,处理它,并将正确的任务的结果设置在队列中.
放在一起,可能看起来像这样:
public static IPropagatorBlock<TInput,TOutput> CreateConcurrentOrderedTransformBlock<TInput,TOutput>( Func<TInput,TOutput> transform) { var queue = new TransformBlock<Task<TOutput>,TOutput>(t => t); var processor = new ActionBlock<Tuple<TInput,Action<TOutput>>>( tuple => tuple.Item2(transform(tuple.Item1)),new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded }); var enqueuer = new ActionBlock<TInput>( async item => { var tcs = new TaskCompletionSource<TOutput>(); await processor.SendAsync( new Tuple<TInput,Action<TOutput>>(item,tcs.SetResult)); await queue.SendAsync(tcs.Task); }); enqueuer.Completion.ContinueWith( _ => { queue.Complete(); processor.Complete(); }); return DataflowBlock.Encapsulate(enqueuer,queue); }
经过这么多的谈话,我认为这是相当少量的代码.
看来你关心性能很多,所以你可能需要微调这个代码.例如,将处理器块的MaxDegreeOfParallelism设置为Environment.ProcessorCount
可能是有意义的,以避免超额订阅.此外,如果延迟比您的吞吐量更重要,将相同块的MaxMessagesPerTask设置为1(或另一个小数)可能是有意义的,以便在处理项目完成时立即将其发送到输出.
此外,如果要限制收到的项目,您可以设置enqueer的BoundedCapacity.