我已经实现了一个简单函数的普通和并行版本,它从32bppArgb位图计算直方图.正常版本在1920×1080图像上大约需要0.03秒,而并行版本需要0.07秒.
线程开销真的很重吗?除了Parallel之外还有其他一些构造吗?这可以加快这个过程吗?因为我正在使用30fps视频,所以我需要加快速度.
这是简化的代码:
public sealed class Histogram { public int MaxA = 0; public int MaxR = 0; public int MaxG = 0; public int MaxB = 0; public int MaxT = 0; public int [] A = null; public int [] R = null; public int [] G = null; public int [] B = null; public Histogram () { this.A = new int [256]; this.R = new int [256]; this.G = new int [256]; this.B = new int [256]; this.Initialize(); } public void Initialize () { this.MaxA = 0; this.MaxR = 0; this.MaxG = 0; this.MaxB = 0; this.MaxT = 0; for (int i = 0; i < this.A.Length; i++) this.A [i] = 0; for (int i = 0; i < this.R.Length; i++) this.R [i] = 0; for (int i = 0; i < this.G.Length; i++) this.G [i] = 0; for (int i = 0; i < this.B.Length; i++) this.B [i] = 0; } public void ComputeHistogram (System.Drawing.Bitmap bitmap,bool parallel = false) { System.Drawing.Imaging.BitmapData data = null; data = bitmap.LockBits ( new System.Drawing.Rectangle(0,bitmap.Width,bitmap.Height),System.Drawing.Imaging.ImageLockMode.ReadOnly,System.Drawing.Imaging.PixelFormat.Format32bppArgb ); try { ComputeHistogram(data,parallel); } catch { bitmap.UnlockBits(data); throw; } bitmap.UnlockBits(data); } public void ComputeHistogram (System.Drawing.Imaging.BitmapData data,bool parallel = false) { int stride = System.Math.Abs(data.Stride); this.Initialize(); if (parallel) { unsafe { System.Threading.Tasks.Parallel.For ( 0,data.Height,new System.Threading.Tasks.ParallelOptions() { MaxDegreeOfParallelism = System.Environment.ProcessorCount },y => { byte* pointer = ((byte*) data.Scan0) + (stride * y); for (int x = 0; x < stride; x += 4) { this.B [pointer [x + 0]]++; this.G [pointer [x + 1]]++; this.R [pointer [x + 2]]++; this.A [pointer [x + 3]]++; } } ); } } else { unsafe { for (int y = 0; y < data.Height; y++) { byte* pointer = ((byte*) data.Scan0) + (stride * y); for (int x = 0; x < stride; x += 4) { this.B [pointer [x + 0]]++; this.G [pointer [x + 1]]++; this.R [pointer [x + 2]]++; this.A [pointer [x + 3]]++; } } } } for (int i = 0; i < this.A.Length; i++) if (this.MaxA < this.A [i]) this.MaxA = this.A [i]; for (int i = 0; i < this.R.Length; i++) if (this.MaxR < this.R [i]) this.MaxR = this.R [i]; for (int i = 0; i < this.G.Length; i++) if (this.MaxG < this.G [i]) this.MaxG = this.G [i]; for (int i = 0; i < this.B.Length; i++) if (this.MaxB < this.B [i]) this.MaxB = this.B [i]; if (this.MaxT < this.MaxA) this.MaxT = this.MaxA; if (this.MaxT < this.MaxR) this.MaxT = this.MaxR; if (this.MaxT < this.MaxG) this.MaxT = this.MaxG; if (this.MaxT < this.MaxB) this.MaxT = this.MaxB; } }
解决方法
好吧,首先,你的并行循环中有一个巨大的错误:
您将有多个线程访问,递增和更新共享阵列 – 由于固有的竞争条件,仅在同一图像上多次运行示例代码会导致截然不同的结果.
但这不是你问的.
至于为什么你看到使用并行实现的性能下降,简单的答案是你可能没有在每个并行任务的主体中做足够的工作来抵消创建新任务的“旋转成本”,安排它,等等
可能更重要的是,我相信你正在从内存中跳出L1 / L2缓存的地狱 – 每个任务线程都会尝试将它认为需要的内容加载到缓存中,但是就像你一样在整个地方建立索引,您不再创建一致的访问模式,因此每次尝试访问位图缓冲区或内部数组时,您可能会遇到缓存未命中.
还有一种同样高效的方法来获取位图的只读数据而不使用不安全的代码……实际上,让我们先做到这一点:
因此,通过调用LockBits,您可以获得指向非托管内存的指针.让我们复制一下吧:
System.Drawing.Imaging.BitmapData data = null; data = bitmap.LockBits ( new System.Drawing.Rectangle(0,System.Drawing.Imaging.PixelFormat.Format32bppArgb ); // For later usage var imageStride = data.Stride; var imageHeight = data.Height; // allocate space to hold the data byte[] buffer = new byte[data.Stride * data.Height]; // Source will be the bitmap scan data IntPtr pointer = data.Scan0; // the CLR marshalling system knows how to move blocks of bytes around,FAST. Marshal.Copy(pointer,buffer,buffer.Length); // and now we can unlock this since we don't need it anymore bitmap.UnlockBits(data); ComputeHistogram(buffer,imageStride,imageHeight,parallel);
现在,至于竞争条件 – 您可以通过使用Interlocked调用来提高计数来以合理的性能克服这一点(注意!!!多线程编程很难,而且完全有可能我的解决方案不完美!)
public void ComputeHistogram (byte[] data,int stride,int height,bool parallel = false) { this.Initialize(); if (parallel) { System.Threading.Tasks.Parallel.For ( 0,height,new ParallelOptions() { MaxDegreeOfParallelism = Environment.ProcessorCount },y => { int startIndex = (stride * y); int endIndex = stride * (y+1); for (int x = startIndex; x < endIndex; x += 4) { // Interlocked actions are more-or-less atomic // (caveats abound,but this should work for us) Interlocked.Increment(ref this.B[data[x]]); Interlocked.Increment(ref this.G[data[x+1]]); Interlocked.Increment(ref this.R[data[x+2]]); Interlocked.Increment(ref this.A[data[x+3]]); } } ); } else { // the original way is ok for non-parallel,since only one // thread is mucking around with the data } // Sorry,couldn't help myself,this just looked "cleaner" to me this.MaxA = this.A.Max(); this.MaxR = this.R.Max(); this.MaxG = this.G.Max(); this.MaxB = this.B.Max(); this.MaxT = new[] { this.MaxA,this.MaxB,this.MaxG,this.MaxR }.Max(); }
那么,这对运行时行为有何影响?
不是很多,但至少并行分叉现在计算正确的结果.