我在multiprocessing.Pool.apply中遇到问题.
我的目标是要有5个过程,每个过程用100个元素填充一个数组(此测试为100个),然后将数组合并为长度为500的单个数组.问题是,由于任何原因,最终只能包含400个元素了解.
我尝试过更改池创建的进程数量,但是除了执行时间外,它什么都没有改变.
import torch.multiprocessing as mp
import itertools
pool = mp.Pool(processes=5)
split = int(500/5)
lst = pool.apply(RampedGraph,(split,[])) #each foo returns a list of 100 elements
lst = list(itertools.chain.from_iterable(lst)) #merging the lists into one
len(lst)
>>>400
len(lst)的预期输出应为500.
谁能启发我我做错了什么?
EDIT Foo方法说明:
def RampedGraph(popsize,graph_lst):
cyclic_size = int(math.ceil(popsize/2))
acyclic_size = popsize - full_size
append = graph_lst.append
for _ in range(cyclic_size):
t = c.Node().cyclic()
nn = c.number_of_nodes()
c = c.calculate(0,False)
append((t,nn,c))
for _ in range(acyclic_size):
t = c.Node().acyclic()
nn = c.number_of_nodes()
c = c.calculate(0,c))
return graph_lst
import torch.multiprocessing as mp
# import multiprocessing as mp
import itertools
def RampedGraph(popsize,graph_lst):
print(mp.current_process().name)
return list(range(100))
num_workers = 5
pool = mp.Pool(processes=num_workers)
split = int(500/num_workers)
lst = pool.starmap(RampedGraph,[(split,[])]*num_workers)
lst = list(itertools.chain.from_iterable(lst))
print(len(lst))
# 500
pool.starmap(RampedGraph,[])]*5)
将5个任务发送到任务池.
它使RampedGraph(split,[])同时被调用5次.
RampedGraph返回的5个结果被收集到一个列表lst中.
请注意,并发调用RampedGraph 5次并不能保证使用所有5个处理器.例如,如果RampedGraph要非常快地完成,则一个处理器可能会处理多个任务,而另一个处理器可能根本就不会使用.
但是,如果RampedGraph花费很短的时间,通常可以预期所有5个辅助进程都将被使用.
注意:我使用导入多处理作为mp而不是导入torch.multiprocessing作为mp运行以上代码.但是,由于torch.multiprocessing应该是多处理的直接替代,因此不会有什么不同.
使用多处理既有成本也有好处.
当然,好处是可以同时使用多个处理器.
成本包括启动其他流程所需的时间以及进程间通信的成本.多重处理使用队列将参数传输到工作进程运行的函数,并将返回的值传输回主进程.为了通过队列传输返回的值,对象将通过腌制被序列化为字节.如果通过队列发送的腌制对象很大,则在使用多处理时这会增加可观的开销成本.请注意,所有这些费用并非由相同代码的等效顺序版本引起.
特别是当工作进程运行的功能快速完成时,开销成本可能会占据程序的总运行时间,这使得使用多重处理的代码比相同代码的顺序版本慢.
因此,使用多处理时加快速度的关键是尝试最小化进程间的通信并确保工作进程完成大量工作,因此开销成本占总运行时间的比例很小.