使用numpy / scipy最大限度地减少Python multiprocessing.Pool的开销

前端之家收集整理的这篇文章主要介绍了使用numpy / scipy最大限度地减少Python multiprocessing.Pool的开销前端之家小编觉得挺不错的,现在分享给大家,也给大家做个参考。
我花了几个小时来尝试并行化我的数字运算代码,但是当我这样做时它只会变慢.不幸的是,当我尝试将其减少到下面的示例时,问题就消失了,我真的不想在这里发布整个程序.所以问题是:在这类程序中我应该避免哪些陷阱?

(注意:Unutbu的答案在底部后跟进.)

以下是情况:

>它是关于一个模块,它定义了一个包含大量内部数据的类BigData.在该示例中,存在一个插值函数列表ff;在实际程序中,还有更多,例如ffA [k],ffB [k],ffC [k].
>计算将被归类为“令人尴尬的并行”:可以一次在较小的数据块上完成工作.在示例中,这是do_chunk().
>在我的实际程序中,示例中显示方法将导致最差的性能:每个块约1秒(在单个线程中完成实际计算时间的0.1秒左右).因此,对于n = 50,do_single()将在5秒内运行,do_multi()将在55秒内运行.
>我还尝试通过将xi和yi数组切割成连续的块并迭代每个块中的所有k值来分解工作.这工作得更好一点.现在,无论是使用1,2,3或4个线程,总执行时间都没有差别.但当然,我希望看到实际的加速!
>这可能是相关的:Multiprocessing.Pool makes Numpy matrix multiplication slower.但是,在程序的其他地方,我使用多处理池进行更加孤立的计算:一个看起来像def do_chunk(array1,array2,array3)的函数(未绑定到类)并对该数组进行仅限numpy计算.在那里,有显着的速度提升.
> cpu使用率随预期的并行进程数量而变化(三个线程的cpu使用率为300%).

#!/usr/bin/python2.7

import numpy as np,time,sys
from multiprocessing import Pool
from scipy.interpolate import RectBivariateSpline

_tm=0
def stopwatch(msg=''):
    tm = time.time()
    global _tm
    if _tm==0: _tm = tm; return
    print("%s: %.2f seconds" % (msg,tm-_tm))
    _tm = tm

class BigData:
    def __init__(self,n):
        z = np.random.uniform(size=n*n*n).reshape((n,n,n))
        self.ff = []
        for i in range(n):
            f = RectBivariateSpline(np.arange(n),np.arange(n),z[i],kx=1,ky=1)
            self.ff.append(f)
        self.n = n

    def do_chunk(self,k,xi,yi):
        s = np.sum(np.exp(self.ff[k].ev(xi,yi)))
        sys.stderr.write(".")
        return s

    def do_multi(self,numproc,yi):
        procs = []
        pool = Pool(numproc)
        stopwatch('Pool setup')
        for k in range(self.n):
            p = pool.apply_async( _do_chunk_wrapper,(self,yi))
            procs.append(p)
        stopwatch('Jobs queued (%d processes)' % numproc)
        sum = 0.0
        for k in range(self.n):
            # Edit/bugfix: replaced p.get by procs[k].get
            sum += np.sum(procs[k].get(timeout=30)) # timeout allows ctrl-C interrupt
            if k == 0: stopwatch("\nFirst get() done")
        stopwatch('Jobs done')
        pool.close()
        pool.join()
        return sum

    def do_single(self,yi):
        sum = 0.0
        for k in range(self.n):
            sum += self.do_chunk(k,yi)
        stopwatch('\nAll in single process')
        return sum

def _do_chunk_wrapper(bd,yi): # must be outside class for apply_async to chunk
    return bd.do_chunk(k,yi)        

if __name__ == "__main__":
    stopwatch()
    n = 50
    bd = BigData(n)
    m = 1000*1000
    xi,yi = np.random.uniform(0,size=m*2).reshape((2,m))
    stopwatch('Initialized')
    bd.do_multi(2,yi)
    bd.do_multi(3,yi)
    bd.do_single(xi,yi)

输出

Initialized: 0.06 seconds
Pool setup: 0.01 seconds
Jobs queued (2 processes): 0.03 seconds
..
First get() done: 0.34 seconds
................................................Jobs done: 7.89 seconds
Pool setup: 0.05 seconds
Jobs queued (3 processes): 0.03 seconds
..
First get() done: 0.50 seconds
................................................Jobs done: 6.19 seconds
..................................................
All in single process: 11.41 seconds

计时采用Intel Core i3-3227 cpu,具有2个内核,4个线程,运行64位Linux.对于实际程序,多处理版本(池机制,即使只使用一个核心)比单进程版本慢10倍.

跟进

Unutbu的回答让我走上正轨.在实际的程序中,self被腌制成一个需要传递给工作进程的37到140 MB的对象.更糟糕的是,Python酸洗非常缓慢;酸洗本身花了几秒钟,这发生在传递给工人流程的每一块工作中.除了挑选和传递大数据对象之外,Linux中apply_async的开销非常小;对于一个小函数(添加几个整数参数),每个apply_async / get对只需0.2 ms.因此,以非常小的块分割工作本身并不是问题.所以,我将所有大数组参数作为索引传递给全局变量.为了cpu缓存优化,我保持小块大小.

全局变量存储在全局字典中;在设置工作池之后,将立即在父进程中删除这些条目.只有dict的密钥才会传送给工作人员.酸洗/ IPC唯一的大数据是工人创建的新数据.

#!/usr/bin/python2.7

import numpy as np,sys
from multiprocessing import Pool

_mproc_data = {}  # global storage for objects during multiprocessing.

class BigData:
    def __init__(self,size):
        self.blah = np.random.uniform(0,1,size=size)

    def do_chunk(self,yi):
        # do the work and return an array of the same shape as xi,yi
        zi = k*np.ones_like(xi)
        return zi

    def do_all_work(self,yi,num_proc):
        global _mproc_data
        mp_key = str(id(self))
        _mproc_data['bd'+mp_key] = self # BigData
        _mproc_data['xi'+mp_key] = xi
        _mproc_data['yi'+mp_key] = yi
        pool = Pool(processes=num_proc)
        # processes have now inherited the global variabele; clean up in the parent process
        for v in ['bd','xi','yi']:
            del _mproc_data[v+mp_key]

        # setup indices for the worker processes (placeholder)
        n_chunks = 45
        n = len(xi)
        chunk_len = n//n_chunks
        i1list = np.arange(0,chunk_len)
        i2list = i1list + chunk_len
        i2list[-1] = n
        klist = range(n_chunks) # placeholder

        procs = []
        for i in range(n_chunks):
            p = pool.apply_async( _do_chunk_wrapper,(mp_key,i1list[i],i2list[i],klist[i]) )
            sys.stderr.write(".")
            procs.append(p)
        sys.stderr.write("\n")

        # allocate space for combined results
        zi = np.zeros_like(xi)

        # get data from workers and finish  
        for i,p in enumerate(procs):
            zi[i1list[i]:i2list[i]] = p.get(timeout=30) # timeout allows ctrl-C handling

        pool.close()
        pool.join()

        return zi

def _do_chunk_wrapper(key,i1,i2,k):
    """All arguments are small objects."""
    global _mproc_data
    bd = _mproc_data['bd'+key]
    xi = _mproc_data['xi'+key][i1:i2]
    yi = _mproc_data['yi'+key][i1:i2]
    return bd.do_chunk(k,yi)


if __name__ == "__main__":
    xi,yi = np.linspace(1,100,100001),np.linspace(1,100001)
    bd = BigData(int(1e7))
    bd.do_all_work(xi,4)

以下是速度测试的结果(同样,2个内核,4个线程),改变了工作进程的数量和块中的内存量(xi,zi数组切片的总字节数).这些数字是“每秒百万结果值”,但这对比较并不重要. “1 process”的行是带有完整输入数据的do_chunk的直接调用,没有任何子进程.

#Proc   125K    250K    500K   1000K   unlimited
1                                      0.82 
2       4.28    1.96    1.3     1.31 
3       2.69    1.06    1.06    1.07 
4       2.17    1.27    1.23    1.28

数据大小对内存的影响非常大. cpu具有3 MB共享L3缓存,每个核心具有256 KB L2缓存.请注意,计算还需要访问BigData对象的几MB内部数据.因此,我们从中学到的是进行这种速度测试很有用.对于这个程序,2个进程最快,其次是4个,3个是最慢的.

解决方法

尝试减少进程间通信.
在多处理模块中,通过队列完成所有(单机)进程间通信.通过队列传递的对象
被腌制.因此,尝试通过队列发送更少和/或更小的对象.

>不要通过队列发送自我,BigData的实例.它相当大,随着自我数据量的增加而变大:

In [6]: import pickle
In [14]: len(pickle.dumps(BigData(50)))
Out[14]: 1052187

一切
调用time pool.apply_async(_do_chunk_wrapper,yi)),
self在主进程中被腌制并在工作进程中被取消.该
len的大小(pickle.dumps(BigData(N)))增加N增加.
>让数据从全局变量中读取.在Linux上,您可以利用Copy-on-Write.如Jan-Philip Gehrcke explains

After fork(),parent and child are in an equivalent state. It would be stupid to copy the entire memory of the parent to another place in the RAM. That’s [where] the copy-on-write principle [comes] in. As long as the child does not change its memory state,it actually accesses the parent’s memory. Only upon modification,the corresponding bits and pieces are copied into the memory space of the child.

因此,您可以避免通过Queue传递BigData实例
通过简单地将实例定义为全局,bd = BigData(n),(正如您已经在做的那样)并在工作进程中引用它的值(例如_do_chunk_wrapper).它基本上等于从对pool.apply_async的调用删除self:

p = pool.apply_async(_do_chunk_wrapper,(k_start,k_end,yi))

并将bd作为全局访问,并对do_chunk_wrapper的呼叫签名进行必要的附加更改.
>尝试将运行时间较长的函数func传递给pool.apply_async.
如果你有很多快速完成对pool.apply_async的调用,那么传递参数和通过队列返回值的开销将成为整个时间的重要部分.相反,如果你对pool.apply_async进行较少的调用,并在返回结果之前给每个func做更多工作,那么进程间通信将占总时间的一小部分.

下面,我修改了_do_chunk_wrapper以接受k_start和k_end参数,这样每次调用pool.apply_async都会在返回结果之前计算k的许多值的总和.

import math
import numpy as np
import time
import sys
import multiprocessing as mp
import scipy.interpolate as interpolate

_tm=0
def stopwatch(msg=''):
    tm = time.time()
    global _tm
    if _tm==0: _tm = tm; return
    print("%s: %.2f seconds" % (msg,n))
        self.ff = []
        for i in range(n):
            f = interpolate.RectBivariateSpline(
                np.arange(n),yi):
        n = self.n
        s = np.sum(np.exp(self.ff[k].ev(xi,yi)))
        sys.stderr.write(".")
        return s

    def do_chunk_of_chunks(self,k_start,yi):
        s = sum(np.sum(np.exp(self.ff[k].ev(xi,yi)))
                    for k in range(k_start,k_end))
        sys.stderr.write(".")
        return s

    def do_multi(self,yi):
        procs = []
        pool = mp.Pool(numproc)
        stopwatch('\nPool setup')
        ks = list(map(int,np.linspace(0,self.n,numproc+1)))
        for i in range(len(ks)-1):
            k_start,k_end = ks[i:i+2]
            p = pool.apply_async(_do_chunk_wrapper,yi))
            procs.append(p)
        stopwatch('Jobs queued (%d processes)' % numproc)
        total = 0.0
        for k,p in enumerate(procs):
            total += np.sum(p.get(timeout=30)) # timeout allows ctrl-C interrupt
            if k == 0: stopwatch("\nFirst get() done")
        print(total)
        stopwatch('Jobs done')
        pool.close()
        pool.join()
        return total

    def do_single(self,yi):
        total = 0.0
        for k in range(self.n):
            total += self.do_chunk(k,yi)
        stopwatch('\nAll in single process')
        return total

def _do_chunk_wrapper(k_start,yi): 
    return bd.do_chunk_of_chunks(k_start,yi)

产量

Initialized: 0.15 seconds

Pool setup: 0.06 seconds
Jobs queued (2 processes): 0.00 seconds

First get() done: 6.56 seconds
83963796.0@R_403_448@
Jobs done: 0.55 seconds
..
Pool setup: 0.08 seconds
Jobs queued (3 processes): 0.00 seconds

First get() done: 5.19 seconds
83963796.0@R_403_448@
Jobs done: 1.57 seconds
...
All in single process: 12.13 seconds

与原始代码相比:

Initialized: 0.10 seconds
Pool setup: 0.03 seconds
Jobs queued (2 processes): 0.00 seconds

First get() done: 10.47 seconds
Jobs done: 0.00 seconds
..................................................
Pool setup: 0.12 seconds
Jobs queued (3 processes): 0.00 seconds

First get() done: 9.21 seconds
Jobs done: 0.00 seconds
..................................................
All in single process: 12.12 seconds

猜你在找的Python相关文章