use Parallel::ForkManager; my $number_running = 0; my $pm = new Parallel::ForkManager(30); $pm->run_on_start( sub { ++$number_running; } ); $pm->run_on_finish( sub { --$number_running; } ); for (my $i=0; $i<=100; $i++) { if ($number_running == 5) { while ($number_running > 0) {} } # waits forever $pm->start and next; print $i; $pm->finish; }
上面的代码使用Parallel::ForkManager来使用并行进程在for循环中执行代码.它正在计算正在运行的子进程数,并相应地设置$number_running变量.一旦运行了5个子进程,我希望它等到0个子进程正在运行,然后再继续.
for循环中的第一行旨在实现此目的,但它会在该行上永远等待.这就像子进程对变量所做的更改不适用于该行代码.我究竟做错了什么?注意:我知道wait_all_children,但我不想使用它.
解决方法
回调run_on_finish通常是在所有进程分叉后使用wait_all_children触发的.它的工作也完成了
当最大进程数运行且一个进程退出时.这是通过调用wait_one_child(调用on_finish,见下文)开始的.
或者,这可以通过调用reap_finished_children方法随意完成
This is a non-blocking call to reap children and execute callbacks independent of calls to
start
orwait_all_children
. Use this in scenarios wherestart
is called infrequently but you would like the callbacks executed quickly.
这解决了如何在个别孩子退出时进行沟通的主要问题(在评论中澄清),而不是wait_all_children.
下面是一个如何使用它的示例,以便在子进程退出时回调运行.
use warnings; use strict; use feature 'say'; use Parallel::ForkManager; $| = 1; my $total_to_process = 3; my $number_running = 0; my @ds; my $pm = new Parallel::ForkManager(30); $pm->run_on_start( sub { ++$number_running; say "Started $_[0],total: $number_running"; }); $pm->run_on_finish( sub { --$number_running; my ($pid,$code,$iden,$sig,$dump,$rdata) = @_; push @ds,"gone-$pid"; say "Cleared $pid,",($rdata->[0] // ''),($code ? " exit $code" : ''); }); foreach my $i (1 .. $total_to_process) { $pm->start and next; run_job($i); $pm->finish(10*$i,[ "kid #$i" ]); } say "Running: ",map { "$_ " } $pm->running_procs; # pid's of children # Reap right as each process exits,retrieve and print info my $curr = $pm->running_procs; while ($pm->running_procs) { $pm->reap_finished_children; # may be fewer now if ($pm->running_procs < $curr) { $curr = $pm->running_procs; say "Remains: $number_running. Data: @ds"; } sleep 1; # or use Time::HiRes::sleep 0.1; } sub run_job { my ($num) = @_; my $sleep_time = ($num == 1) ? 1 : ($num == 2 ? 10 : 20); sleep $sleep_time; say "\tKid #$num slept for $sleep_time,exiting"; }
使用此方法相当于在fork之后的循环中调用waitpid -1,POSIX :: WNOHANG.这比使用最多(30)个进程少得多,可以更轻松地查看输出,并证明回调在子进程退出时运行正常.更改这些数字以查看其完整操作.
大量代码用于诊断.我们以10 * $i退出以跟踪输出中的孩子.为了相同的目的,匿名数组[…]中返回的数据是一个描述性字符串.一旦reap_finished_children完成,$number_running就会在回调中减少.这就是为什么我们需要$curr变量(再次用于诊断).
这打印
start: Started 4656,running: 1 start: Started 4657,running: 2 start: Started 4658,running: 3 Running: 4656 4658 4657 Kid #1 slept for 1,exiting Cleared 4656,kid #1 exit 10 Remains: 2. Data: gone-4656 Kid #2 slept for 10,exiting Cleared 4657,kid #2 exit 20 Remains: 1. Data: gone-4656 gone-4657 Kid #3 slept for 20,exiting Cleared 4658,kid #3 exit 30 Remains: 0. Data: gone-4656 gone-4657 gone-4658
直接的问题是如何在开始新批次之前等待整批完成.这可以在wait_for_available_procs($n)之前直接完成
Wait until
$n
available process slots are available. If$n
is not given,defaults to 1.
如果$MAX用于$n,则只有整批完成后才能使用多个插槽. $n的用途也可以在运行时决定.
模块操作的一些细节
当一个孩子退出SIGCHLD信号被发送给父母时,它必须抓住它以便知道孩子已经离开(并且首先避免僵尸).这是通过在代码或SIGCHLD处理程序中使用wait或waitpid来完成的(但仅限于一个地方).见fork,Signals in perlipc,waitpid和wait.
我们从P::FM’s source看到这是在wait_one_child中完成的(通过_waitpid sub)
06002
在wait_all_children中使用
06003
上面使用的reap_finished_children方法是此方法的同义词.
获取信号的方法wait_one_child被启动用于当最大进程数被填充并且一个进程退出时收获子进程.这是模块何时知道何时可以启动另一个进程并遵守其最大值. (它也被一些等待进程的其他例程使用.
).这就是当run_on_finish被$s-> on_finish($kid,…)触发时
06004
回调位于coderef $代码中,从对象的on_finish键中检索,该键本身在子run_on_finish中设置.一旦该子运行,这就是回调的设置方式.
用户可以使用的方法是wait_all_children和reap_finished_children.
由于在发布的代码中没有使用这个,所以$number_running没有得到更新,因此是无限循环.回想一下,父进程中的变量$number_running不能由子进程直接更改.