c# – 与BlockingCollection集成时,Parallel.ForEach停滞不前

前端之家收集整理的这篇文章主要介绍了c# – 与BlockingCollection集成时,Parallel.ForEach停滞不前前端之家小编觉得挺不错的,现在分享给大家,也给大家做个参考。
我根据 this question中的代码采用了并行/消费者的实现
class ParallelConsumer<T> : IDisposable
{
    private readonly int _maxParallel;
    private readonly Action<T> _action;
    private readonly TaskFactory _factory = new TaskFactory();
    private CancellationTokenSource _tokenSource;
    private readonly BlockingCollection<T> _entries = new BlockingCollection<T>();
    private Task _task;

    public ParallelConsumer(int maxParallel,Action<T> action)
    {
        _maxParallel = maxParallel;
        _action = action;
    }

    public void Start()
    {
        try
        {
            _tokenSource = new CancellationTokenSource();
            _task = _factory.StartNew(
                () =>
                {
                    Parallel.ForEach(
                        _entries.GetConsumingEnumerable(),new ParallelOptions { MaxDegreeOfParallelism = _maxParallel,CancellationToken = _tokenSource.Token },(item,loopState) =>
                        {
                            Log("Taking" + item);
                            if (!_tokenSource.IsCancellationRequested)
                            {
                                _action(item);
                                Log("Finished" + item);
                            }
                            else
                            {
                                Log("Not Taking" + item);
                                _entries.CompleteAdding();
                                loopState.Stop();
                            }
                        });
                },_tokenSource.Token);
        }
        catch (OperationCanceledException oce)
        {
            System.Diagnostics.Debug.WriteLine(oce);
        }
    }

    private void Log(string message)
    {
        Console.WriteLine(message);
    }

    public void Stop()
    {
        Dispose();
    }

    public void Enqueue(T entry)
    {
        Log("Enqueuing" + entry);
        _entries.Add(entry);
    }

    public void Dispose()
    {
        if (_task == null)
        {
            return;
        }

        _tokenSource.Cancel();
        while (!_task.IsCanceled)
        {
        }

        _task.Dispose();
        _tokenSource.Dispose();
        _task = null;
    }
}

这是一个测试代码

class Program
{
    static void Main(string[] args)
    {
        TestRepeatedEnqueue(100,1);
    }

    private static void TestRepeatedEnqueue(int itemCount,int parallelCount)
    {
        bool[] flags = new bool[itemCount];
        var consumer = new ParallelConsumer<int>(parallelCount,(i) =>
                                              {
                                                  flags[i] = true;
                                              }
            );
        consumer.Start();
        for (int i = 0; i < itemCount; i++)
        {
            consumer.Enqueue(i);
        }
        Thread.Sleep(1000);
        Debug.Assert(flags.All(b => b == true));



    }
}

测试总是失败 – 它总是停留在测试的100个项目的第93项.知道我的代码的哪一部分引起了这个问题,以及如何解决它?

解决方法

正如您所发现的,您不能将Parallel.Foreach()与BlockingCollection.GetConsumingEnumerable()一起使用.

有关解释,请参阅此博客文章

http://blogs.msdn.com/b/pfxteam/archive/2010/04/06/9990420.aspx

博客还提供了一个名为GetConsumingPartitioner()的方法的源代码,您可以使用它来解决问题.

摘自博客

BlockingCollection’s GetConsumingEnumerable implementation is using BlockingCollection’s internal synchronization which already supports multiple consumers concurrently,but ForEach doesn’t know that,and its enumerable-partitioning logic also needs to take a lock while accessing the enumerable.

As such,there’s more synchronization here than is actually necessary,resulting in a potentially non-negligable performance hit.

[Also] the partitioning algorithm employed by default by both Parallel.ForEach and PLINQ use chunking in order to minimize synchronization costs: rather than taking the lock once per element,it’ll take the lock,grab a group of elements (a chunk),and then release the lock.

While this design can help with overall throughput,for scenarios that are focused more on low latency,that chunking can be prohibitive.

猜你在找的C#相关文章