class ParallelConsumer<T> : IDisposable { private readonly int _maxParallel; private readonly Action<T> _action; private readonly TaskFactory _factory = new TaskFactory(); private CancellationTokenSource _tokenSource; private readonly BlockingCollection<T> _entries = new BlockingCollection<T>(); private Task _task; public ParallelConsumer(int maxParallel,Action<T> action) { _maxParallel = maxParallel; _action = action; } public void Start() { try { _tokenSource = new CancellationTokenSource(); _task = _factory.StartNew( () => { Parallel.ForEach( _entries.GetConsumingEnumerable(),new ParallelOptions { MaxDegreeOfParallelism = _maxParallel,CancellationToken = _tokenSource.Token },(item,loopState) => { Log("Taking" + item); if (!_tokenSource.IsCancellationRequested) { _action(item); Log("Finished" + item); } else { Log("Not Taking" + item); _entries.CompleteAdding(); loopState.Stop(); } }); },_tokenSource.Token); } catch (OperationCanceledException oce) { System.Diagnostics.Debug.WriteLine(oce); } } private void Log(string message) { Console.WriteLine(message); } public void Stop() { Dispose(); } public void Enqueue(T entry) { Log("Enqueuing" + entry); _entries.Add(entry); } public void Dispose() { if (_task == null) { return; } _tokenSource.Cancel(); while (!_task.IsCanceled) { } _task.Dispose(); _tokenSource.Dispose(); _task = null; } }
这是一个测试代码
class Program { static void Main(string[] args) { TestRepeatedEnqueue(100,1); } private static void TestRepeatedEnqueue(int itemCount,int parallelCount) { bool[] flags = new bool[itemCount]; var consumer = new ParallelConsumer<int>(parallelCount,(i) => { flags[i] = true; } ); consumer.Start(); for (int i = 0; i < itemCount; i++) { consumer.Enqueue(i); } Thread.Sleep(1000); Debug.Assert(flags.All(b => b == true)); } }
解决方法
http://blogs.msdn.com/b/pfxteam/archive/2010/04/06/9990420.aspx
该博客还提供了一个名为GetConsumingPartitioner()的方法的源代码,您可以使用它来解决问题.
摘自博客:
BlockingCollection’s GetConsumingEnumerable implementation is using BlockingCollection’s internal synchronization which already supports multiple consumers concurrently,but ForEach doesn’t know that,and its enumerable-partitioning logic also needs to take a lock while accessing the enumerable.
As such,there’s more synchronization here than is actually necessary,resulting in a potentially non-negligable performance hit.
[Also] the partitioning algorithm employed by default by both Parallel.ForEach and PLINQ use chunking in order to minimize synchronization costs: rather than taking the lock once per element,it’ll take the lock,grab a group of elements (a chunk),and then release the lock.
While this design can help with overall throughput,for scenarios that are focused more on low latency,that chunking can be prohibitive.