我一直在编写一个“检查器”系统,对各种服务,系统,数据库,文件等执行各种“检查”.“检查”本质上是通用的,可以是任何东西.所有检查都以他们通过或失败的通用格式天气报告,无论可能是什么.
它以模块化OO方式编写,因此开发人员可以简单地遵循框架并独立于其中一个编写检查.每个对象都包含一个共享的报告对象,在它们运行检查后,它们只需$self-> {‘reporting’} – > report(params).定义了参数,并假设开发人员适当报告.报告对象然后索引这些报告.我的主加载器脚本包含以下条目:
my $reportingObject = new Checks::Reporting(params); my @checks; push @checks,new Checks::Check_One($reportingObject,params)); push @checks,params)); . . push @checks,new Checks::Check_N($reportingObject,params));
在完成检查并完成报告后,我一直在做:
foreach my $check (@checks) { $check->run_stuff(); } $reportingObject->finalize_report();
现在,由于这些检查完全独立(不用担心报告对象),因此它们可以并行运行.作为一项改进,我做了:
my @threads; foreach my $check (@checks) { push @threads,async { $check->run_stuff(); } } foreach my $thread (@threads) { $thread->join; } #All threads are complete,and thus all checks are done $reportingObject->finalize_report();
正如我之前所说,开发人员将彼此独立地编写Checks.有些检查很简单,有些则不是.简单检查可能没有异步代码,但其他人可能需要在内部异步运行,例如
sub do_check { my @threads; my @list = @{$self->{'list'}}; foreach my $item (@list) { push @threads,async { #do_work_on_$item #return 1 or 0 for success or fail }; foreach my $thread (@threads) { my $res = $thread->join; if($res == 1) { $self->{'reporting'}->report(params_here); } } } }
正如您所看到的,线程模型允许我以非常模糊的方式执行操作.每个“检查”无论它在自己的线程中独立运行.如果一个开发人员有异步操作,无论它是什么,他只是在自己的线程中独立完成.我想要一个类似于此的模型.
不幸的是,线程缓慢且效率低下.所有异步库都有特定的观察者,如IO等.我不想要任何特定的东西.我想要一个基于事件的模型,它允许我简单地启动异步任务,无论它们是什么,只需通知他们什么时候完成所以我可以继续前进.
希望这能解释它,你可以指出我正确的方向.
解决方法
这似乎非常适合老板工人模型:
>在项目开始时产生一些工人.确保他们都可以访问队列.
>根据需要排队多次检查.工作人员将检查队列出列,执行它们,并将结果排入输出队列.
>您的主线程查看输出线程的结果,并执行它想要的任何操作.
>在END区块加入工人
如果您希望将coderefs放入队列,您可能希望查看Thread::Queue::Any
.
这是一个完全可运行的例子:
use strict; use feature 'say'; use threads; use threads::shared; use Thread::Queue::Any; use constant NUM_THREADS => 5; local $Storable::Deparse = 1; local $Storable::Eval = 1; # needed to serialize code my $check_q = Thread::Queue::Any->new; my $result_q = Thread::Queue::Any->new; # start the workers { my $running :shared = NUM_THREADS; my @threads = map threads->new(\&worker,$check_q,$result_q,\$running),1..NUM_THREADS; END { $_->join for @threads } } # enqueue the checks $check_q->enqueue($_) for sub {1},sub{2},sub{"hi"},sub{ die }; $check_q->enqueue(undef) for 1..NUM_THREADS; # end the queue while(defined( my $result = $result_q->dequeue )) { report($$result); } sub report { say shift // "Failed"; } sub worker { my ($in,$out,$running_ref) = @_; while (defined( my $check = $in->dequeue )) { my $result = eval { $check->() }; $out->enqueue(\$result); } # last thread closes the door lock $$running_ref; --$$running_ref || $out->enqueue(undef); }
这打印
1 2 hi Failed
以略微随机的顺序.