ACE_Reactor一些重要的细节
看下具体ACE_Dev_Poll_Reactor的实现,如何将一个处理集和handle关联起来,代码如下:
int ACE_Dev_Poll_Reactor ::register_handler_i(handle,event_handler,mask)
//step 1
if(this->handler_rep_.find(handle)==0)
{
//Handler not present in the repository.Bind it.
if(this->handler_rep_.bind(handle,mask)!=0)
return -1;
}
//step2
//All but the notify handler get registered with oneshot to facilitate auto suspend before the upcall.See dispatch_io_event for more information
if(event_handler!= this->notify_handler_)
epev.events |= EPOLLONESHOT;
if(::epoll_ctl(this->poll_fd_,op,handle,&epev) == -1)
......
step1
首先就是要将handle和处理这个handle的event_handler绑定,若已有处理的event_handler,则只单纯将可能新增的mask添加进监测的epoll函数参数中。
这里的handler_rep_是一个Handler_Repository类的指针,其内部是一个map表可以对handle和event_handler进行有效关联对应和存储。
step2
从上面可以看出,如果是epoll的话,且注册了notify handler, 则epoll_wait会自动将EPOLLONESHOT标识添加上,这样在事件被监测到得分发处理时就不用再手动的去suspend。
Notify机制的好处是:
1. 让反应器拥有了处理无限处理器的能力
2. 其次是提供了必要时解除反应器事件检查阻塞的能力。
Reactor的notify()让用户直接提供给Reactor待通知反应器的指针,而这些处理器无需注册到反应器上,从而提供了无限的扩展能力。
不过在ACE中显然Notify机制有不同的实现方式。一种是采用ACE_Pipe的实现,这个属于默认的方式。另一种则是内存队列保存Notify消息。你可以通过定义ACE_HAS_REACTOR_NOTIFICATION_QUEUE的宏编译ACE,这样ACE将不使用ACE_Pipe作为Notify消息的管道,而使用一个自己的内存队列保存Notify消息。
但是需要注意的是,在使用ACE_Pipe的实现中,如果使用不当,可能会造成严重的后果。
如果你用不到Notify机制,最好在ACE_Reactor初始化的时候彻底关闭Notify机制。很多Reactor的初始化函数都提供了关闭notify pipe的方式。比如ACE_Select_Reactor_T的open函数的disable_notify_pipe参数。当其为1的时候表示关闭notify 管道。
潜在的风险:
1. 处理器被销毁后,排队等候的对应通知才被分派
2. ACE_Pipe的数据缓冲是有限的,大量通知到来可能会造成阻塞甚至死锁
因为通知的信息(包括处理器指针和掩码)以流数据的方式被写入ACE_Pipe,不能进行遍历查找,所以当对应处理器被销毁后,如果其在ACE_Pipe的数据区中还有存储有通知,则ACE_Reactor将会在分派中使用悬空指针,造成未知的后果。另外,在ACE_Pipe的数据缓冲已满情况下,在处理器的回调中依然发送通知,就会因阻塞发生死锁。
通过
int ACE_Dev_Poll_Reactor ::notify(eh,mask,timeout)
{
//pass over both the event_handler* and * the mask o allow the caller to dictate which Event_Handler method the receiver invokes.Note that this call can timeout.
n = this->notify_handle_->notify(eh,mask,timeout);
return n=-1?-1:0;
}
在源码的owner接口中,看到了这样一句注释
//There is no need to set the owner of the event loop.Multiple threads may invoke the event loop simulataneously.
说明有可能多个线程同时调用ACE_Dev_Poll_Reactor的event_loop函数,网上很多示例代码写的都比较简单,例如:
65 int main(void)
66 {
67 ACE::init();
68
69 ACE_Dev_Poll_Reactor dev_reactor(1024);
70 ACE_Reactor reactor(&dev_reactor);
71 ACE_Reactor::instance(&reactor);
72
73 TestAccptor accptor;
74 ACE_INET_Addr addr(10000);
75 if( accptor.open(addr) == -1 ) {
76 cout << "Open port Failed .. " << endl;
77 return -1;
78 }
79
80 cout << "Open port ok .. " << endl;
81
82
83 ACE_Reactor::instance()->run_reactor_event_loop();
84 ACE::fini();
85 return 0;
86 }
这里有一个问题,监测事件触发和同时执行的是不是就只有一个main函数的这一个主线程就完全搞定了。现在看是这样的,但是实际中应该是不同线程都可以去调用上述的run_reactor_event_loop()来完成事件的监测和触发才对。
上述的Notify的实现机制,在其open函数中可能可以看出点东西来:
int ACE_Dev_Poll_Reactor_Notify ::open(ACE_Reactor_Impl*r,ACE_Timer_Queue*,in disable_notify_pipe)
{
if(0==disable_notify_pipe)
{
this->dp_reactor=(ACE_Dev_Poll_Reactor*)r;
if(this->notification_pipe_.read_handle()==-1)
return -1;
#if defined (F_SETFD)
ACE_OS::fcntl(this->notification_pipe_.read_handle(),F_SETFD,1);
ACE_OS::fcntl(this->notification_pipe_.write_handle(),1);
#if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
if(this->notification_queue_.open()==-1)
return -1;
if(ACE_OS::set_flags(this->notification_pipe_.write_handle(),ACE_NONBLOCK) == -1)
return -1;
#endif
if(ACE_OS::set_flags(this->notification_pipe_.read_handle(),ACE_NONBLOCK) == -1)
return -1;
}
return 0;
}
若是不用pipe机制作为Notify通知机制实现,则这个open直接就return掉了。
再来看:
int ACE_Dev_Poll_Reactor ::dispatch_io_event(Token_Guard &guard)
{
{
CGuard(this->repo_loc_)//对于handler的respository访问,因此加锁保护
//step1 获取event_handler
Event_Tuple *info=this->handle_rep_.find(handle);
if(info==0) return 0;//说明已经没有对应的event_handler了
if(info->suspended) return 0;//说明其他线程可能已经更改了这个handle的mask。
short revents=(pollfd*)pfds->revents;//即已触发的事件
//step2 根据revents来指定
disp_mask=WIRTE/READ/EXCPET_MASK;
callback=handle_output/input/exception;
//step3 将当前的event_handler标识置为suspended为true,以防止被重复触发
if(eh!=this->notify_handler_)
info->suspended=true;
}
//step4 将notify的event_handler直接处理,分发,且注意不要suspend和resume notify的handler,因为如果要这样做可能引起无休止的暂停和恢复,又要去经常等待获取token。
if(eh==this->notify_handler_)//说明是notify handler,直接处理
{
ACE_Notification_Buffer b;
notify_handler_->dequeue_one(b);
guard.release_token();
return notify_handler_->dispatch_notify(b);//直接去分发处理notify消息
}
//step5 执行handler中对应的回调
{
ACE_Dev_Poll_Handler_Guard eh_guard(eh);
guard.release_token();
status=this->upcall(eh,callback,handle);
}
//step6 检查回调的返回值status,是否等于0去恢复由于执行回调而暂停的handle
//step7 检查handle对应的回调的返回值,若小于0则需要remove_handler_i
}
而上述upcall在实际的内联实现文件中:
int ACE_Dev_Poll_Reactor ::upcall(eventhandler,handle)
{
do
{
status=(event_handler->*callback)(handle);
}
while(status>0 && event_handler != this->notify_handler_);
return status;
}
上面虽然对于注册handler和handle关联以及最终事件触发后如何调用handler中的回调函数理清了。但是有一点,如何完成多路复用和事件分离,并没有很清楚。这里就要提到
int ACE_Dev_Poll_Reactor::work_pending_i ( ACE_Time_Value * max_wait_time ) [protected]
01071 {
01072 ACE_TRACE ("ACE_Dev_Poll_Reactor::work_pending_i");
01073
01074 if (this->deactivated_)
01075 return 0;
01076
01077 #if defined (ACE_HAS_EVENT_POLL)
01078 if (this->start_pevents_ != this->end_pevents_)
01079 #else
01080 if (this->start_pfds_ != this->end_pfds_)
01081 #endif /* ACE_HAS_EVENT_POLL */
01082 return 1; // We still have work_pending(). Do not poll for
01083 // additional events.
01084
01085 ACE_Time_Value timer_buf (0);
01086 ACE_Time_Value *this_timeout = 0;
01087
01088 this_timeout = this->timer_queue_->calculate_timeout (max_wait_time,01089 &timer_buf);
01090
01091 // Check if we have timers to fire.
01092 const int timers_pending =
01093 ((this_timeout != 0 && max_wait_time == 0)
01094 || (this_timeout != 0 && max_wait_time != 0
01095 && *this_timeout != *max_wait_time) ? 1 : 0);
01096
01097 const long timeout =
01098 (this_timeout == 0
01099 ? -1 /* Infinity */
01100 : static_cast<long> (this_timeout->msec ()));
01101
01102 #if defined (ACE_HAS_EVENT_POLL)
01103
01104 // Wait for events.
01105 const int nfds = ::epoll_wait (this->poll_fd_,01106 this->events_,01107 this->size_,01108 static_cast<int> (timeout));
01109
01110 if (nfds > 0)
01111 {
01112 this->start_pevents_ = this->events_;
01113 this->end_pevents_ = this->start_pevents_ + nfds;
01114 }
01115
01116 #else
01117
01118 struct dvpoll dvp;
01119
01120 dvp.dp_fds = this->dp_fds_;
01121 dvp.dp_nfds = this->size_;
01122 dvp.dp_timeout = timeout; // Milliseconds
01123
01124 // Poll for events
01125 const int nfds = ACE_OS::ioctl (this->poll_fd_,DP_POLL,&dvp);
01126
01127 // Retrieve the results from the pollfd array.
01128 this->start_pfds_ = dvp.dp_fds;
01129
01130 // If nfds == 0 then end_pfds_ == start_pfds_ meaning that there is
01131 // no work pending. If nfds > 0 then there is work pending.
01132 // Otherwise an error occurred.
01133 if (nfds > -1)
01134 this->end_pfds_ = this->start_pfds_ + nfds;
01135 #endif /* ACE_HAS_EVENT_POLL */
01136
01137 // If timers are pending,override any timeout from the poll.
01138 return (nfds == 0 && timers_pending != 0 ? 1 : nfds);
01139 }
上述代码是5.5.2版本而在最新的6.3.3版本中这里的epoll_wait 第三个参数即MaxEvents是直接用魔数1来指定的。也即每次最多返回1个触发的事件。
其他的线程当然也可以调用handle_events来监测和处理事件,但是在handle_events中对这些其他的线程也做了限制,即必须是token的owner才能执行,否则直接return。
而如何成为owner,则需要调用函数Token_Guard 的函数acquire_quietly,其代码如下:
01144 {
01145 ACE_TRACE ("ACE_Dev_Poll_Reactor::handle_events");
01146
01147 // Stash the current time
01148 //
01149 // The destructor of this object will automatically compute how much
01150 // time elapsed since this method was called.
01151 ACE_MT (ACE_Countdown_Time countdown (max_wait_time));
01152
01153 Token_Guard guard (this->token_);
01154 int result = guard.acquire_quietly (max_wait_time);
01155
01156 // If the guard is NOT the owner just return the retval
01157 if (!guard.is_owner ())
01158 return result;
01159
01160 if (this->deactivated_)
01161 return -1;
01162
01163 // Update the countdown to reflect time waiting for the mutex.
01164 ACE_MT (countdown.update ());
01165
01166 return this->handle_events_i (max_wait_time,guard);
01167 }
这样就变成了串行化的使用epoll获取poll去监测事件,然后监测到线程去处理,而没有成为token的owner的线程则持续等待。
参考: