但是,我发现矢量化程序的性能比for循环版本差得多:
import numpy as np import datetime kpt_list = np.zeros((10000,20),dtype='float') rpt_list = np.zeros((1000,dtype='float') h_r = np.zeros((20,20,1000),dtype='complex') r_ndegen = np.zeros(1000,dtype='float') r_ndegen.fill(1) # setup completed # this is a the vectorized version r_ndegen_tile = np.tile(r_ndegen.reshape(1000,1),10000) start = datetime.datetime.now() phase = np.exp(1j * np.dot(rpt_list,kpt_list.T))/r_ndegen_tile kpt_data_1 = h_r.dot(phase) end = datetime.datetime.now() print((end-start).total_seconds()) # the result is 19.302483 # this is the for loop version kpt_data_2 = np.zeros((20,10000),dtype='complex') start = datetime.datetime.now() for i in range(10000): kpt = kpt_list[i,:] phase = np.exp(1j * np.dot(kpt,rpt_list.T))/r_ndegen kpt_data_2[:,:,i] = h_r.dot(phase) end = datetime.datetime.now() print((end-start).total_seconds()) # the result is 7.74583
这里发生了什么?
解决方法
def setup(n1=10000,n2=1000,n3=20,seed=None): gen = np.random.RandomState(seed) kpt_list = gen.randn(n1,n3).astype(np.float) rpt_list = gen.randn(n2,n3).astype(np.float) h_r = (gen.randn(n3,n3,n2) + 1j*gen.randn(n3,n2)).astype(np.complex) r_ndegen = gen.randn(1000).astype(np.float) return kpt_list,rpt_list,h_r,r_ndegen def original_vec(*args,**kwargs): kpt_list,r_ndegen = setup(*args,**kwargs) r_ndegen_tile = np.tile(r_ndegen.reshape(1000,10000) phase = np.exp(1j * np.dot(rpt_list,kpt_list.T)) / r_ndegen_tile kpt_data = h_r.dot(phase) return kpt_data def original_loop(*args,**kwargs) kpt_data = np.zeros((20,dtype='complex') for i in range(10000): kpt = kpt_list[i,:] phase = np.exp(1j * np.dot(kpt,rpt_list.T)) / r_ndegen kpt_data[:,i] = h_r.dot(phase) return kpt_data
我还强烈建议使用随机数据而不是全零或全一数组,除非这是你的实际数据(!).这样可以更容易地检查代码的正确性 – 例如,如果您的最后一步是乘以一个零的矩阵,那么无论先前是否存在错误,您的输出将始终为全零.你的代码.
接下来,我将通过line_profiler
运行这些功能,看看他们花费大部分时间在哪里.特别是对于original_vec:
In [1]: %lprun -f original_vec original_vec() Timer unit: 1e-06 s Total time: 23.7598 s File: <ipython-input-24-c57463f84aad> Function: original_vec at line 12 Line # Hits Time Per Hit % Time Line Contents ============================================================== 12 def original_vec(*args,**kwargs): 13 14 1 86498 86498.0 0.4 kpt_list,**kwargs) 15 16 1 69700 69700.0 0.3 r_ndegen_tile = np.tile(r_ndegen.reshape(1000,10000) 17 1 1331947 1331947.0 5.6 phase = np.exp(1j * np.dot(rpt_list,kpt_list.T)) / r_ndegen_tile 18 1 22271637 22271637.0 93.7 kpt_data = h_r.dot(phase) 19 20 1 4 4.0 0.0 return kpt_data
您可以看到它花费93%的时间来计算h_r和相位之间的点积.这里,h_r是(20,1000)阵列,相位是(1000,10000).我们计算了h_r的最后一个维度和阶段的第一维度的和积(你可以用ejum表示法写成ijk,kl-> ijl).
h_r的前两个维度在这里并不重要 – 我们可以在获取点积之前轻松地将h_r重塑为(20 * 20,1000)数组.事实证明,这种重塑操作本身可以带来巨大的性能提升:
In [2]: %timeit h_r.dot(phase) 1 loop,best of 3: 22.6 s per loop In [3]: %timeit h_r.reshape(-1,1000).dot(phase) 1 loop,best of 3: 1.04 s per loop
我不完全确定为什么会出现这种情况 – 我希望numpy的dot函数足够聪明,可以自动应用这个简单的优化.在我的笔记本电脑上,第二种情况似乎使用多个线程,而第一种情况则没有,这表明它可能不会调用多线程BLAS例程.
这是一个包含整形操作的矢量化版本:
def new_vec(*args,**kwargs) phase = np.exp(1j * np.dot(rpt_list,kpt_list.T)) / r_ndegen[:,None] kpt_data = h_r.reshape(-1,phase.shape[0]).dot(phase) return kpt_data.reshape(h_r.shape[:2] + (-1,))
-1索引告诉numpy根据其他维度和数组中元素的数量推断出这些维度的大小.我还使用broadcasting除以r_ndegen,这消除了对np.tile的需要.
通过使用相同的随机输入数据,我们可以检查新版本是否提供与原始版本相同的结果:
In [4]: ans1 = original_loop(seed=0) In [5]: ans2 = new_vec(seed=0) In [6]: np.allclose(ans1,ans2) Out[6]: True
一些性能基准测试:
In [7]: %timeit original_loop() 1 loop,best of 3: 13.5 s per loop In [8]: %timeit original_vec() 1 loop,best of 3: 24.1 s per loop In [5]: %timeit new_vec() 1 loop,best of 3: 2.49 s per loop
更新:
我很好奇为什么np.dot对于原始(20,1000)h_r数组来说要慢得多,所以我挖到了numpy源代码.在multiarraymodule.c
中实现的逻辑变得非常简单:
#if defined(HAVE_CBLAS) if (PyArray_NDIM(ap1) <= 2 && PyArray_NDIM(ap2) <= 2 && (NPY_DOUBLE == typenum || NPY_CDOUBLE == typenum || NPY_FLOAT == typenum || NPY_CFLOAT == typenum)) { return cblas_matrixproduct(typenum,ap1,ap2,out); } #endif
换句话说,numpy只检查输入数组中的任何一个是否具有> 2维,并立即回到矩阵矩阵乘法的非BLAS实现.看起来检查两个阵列的内部尺寸是否兼容似乎不太困难,如果是这样,则将它们视为2D并对它们执行* gemm矩阵 – 矩阵乘法.事实上there’s an open feature request for this dating back to 2012,如果任何numpy开发者正在阅读……
与此同时,在倍增张量时,要注意这是一个很好的性能技巧.
更新2:
我忘了np.tensordot
.因为它在2D数组上调用与np.dot相同的底层BLAS例程,它可以实现相同的性能提升,但没有所有那些丑陋的重塑操作:
In [6]: %timeit np.tensordot(h_r,phase,axes=1) 1 loop,best of 3: 1.05 s per loop