ACE_Reactor(二)ACE_Dev_Poll_Reactor

前端之家收集整理的这篇文章主要介绍了ACE_Reactor(二)ACE_Dev_Poll_Reactor前端之家小编觉得挺不错的,现在分享给大家,也给大家做个参考。

ACE_Reactor一些重要的细节
看下具体ACE_Dev_Poll_Reactor的实现,如何将一个处理集和handle关联起来,代码如下:

int ACE_Dev_Poll_Reactor ::register_handler_i(handle,event_handler,mask)
//step 1
if(this->handler_rep_.find(handle)==0)
{
    //Handler not present in the repository.Bind it.
    if(this->handler_rep_.bind(handle,mask)!=0)
        return -1;
}

//step2
//All but the notify handler get registered with oneshot to facilitate auto suspend before the upcall.See dispatch_io_event for more information
if(event_handler!= this->notify_handler_)
    epev.events |= EPOLLONESHOT;
if(::epoll_ctl(this->poll_fd_,op,handle,&epev) == -1)
    ......

step1
首先就是要将handle和处理这个handle的event_handler绑定,若已有处理的event_handler,则只单纯将可能新增的mask添加进监测的epoll函数参数中。
这里的handler_rep_是一个Handler_Repository类的指针,其内部是一个map表可以对handle和event_handler进行有效关联对应和存储。
step2
从上面可以看出,如果是epoll的话,且注册了notify handler, 则epoll_wait会自动将EPOLLONESHOT标识添加上,这样在事件被监测到得分发处理时就不用再手动的去suspend。

Notify机制的好处是:
1. 让反应器拥有了处理无限处理器的能力
2. 其次是提供了必要时解除反应器事件检查阻塞的能力。

Reactor的notify()让用户直接提供给Reactor待通知反应器的指针,而这些处理器无需注册到反应器上,从而提供了无限的扩展能力。
不过在ACE中显然Notify机制有不同的实现方式。一种是采用ACE_Pipe的实现,这个属于默认的方式。另一种则是内存队列保存Notify消息。你可以通过定义ACE_HAS_REACTOR_NOTIFICATION_QUEUE的宏编译ACE,这样ACE将不使用ACE_Pipe作为Notify消息的管道,而使用一个自己的内存队列保存Notify消息。
但是需要注意的是,在使用ACE_Pipe的实现中,如果使用不当,可能会造成严重的后果。
如果你用不到Notify机制,最好在ACE_Reactor初始化的时候彻底关闭Notify机制。很多Reactor的初始化函数都提供了关闭notify pipe的方式。比如ACE_Select_Reactor_T的open函数的disable_notify_pipe参数。当其为1的时候表示关闭notify 管道。

潜在的风险:
1. 处理器被销毁后,排队等候的对应通知才被分派
2. ACE_Pipe的数据缓冲是有限的,大量通知到来可能会造成阻塞甚至死锁

因为通知的信息(包括处理器指针和掩码)以流数据的方式被写入ACE_Pipe,不能进行遍历查找,所以当对应处理器被销毁后,如果其在ACE_Pipe的数据区中还有存储有通知,则ACE_Reactor将会在分派中使用悬空指针,造成未知的后果。另外,在ACE_Pipe的数据缓冲已满情况下,在处理器的回调中依然发送通知,就会因阻塞发生死锁。

通过

int ACE_Dev_Poll_Reactor ::notify(eh,mask,timeout)
{
    //pass over both the event_handler* and * the mask o allow the caller to dictate which Event_Handler method the receiver invokes.Note that this call can timeout.
    n = this->notify_handle_->notify(eh,mask,timeout);
    return n=-1?-1:0;
}

在源码的owner接口中,看到了这样一句注释
//There is no need to set the owner of the event loop.Multiple threads may invoke the event loop simulataneously.
说明有可能多个线程同时调用ACE_Dev_Poll_Reactor的event_loop函数,网上很多示例代码写的都比较简单,例如:

65 int main(void)
     66 {
     67         ACE::init();
     68 
     69         ACE_Dev_Poll_Reactor dev_reactor(1024);
     70         ACE_Reactor reactor(&dev_reactor);
     71         ACE_Reactor::instance(&reactor);
     72 
     73         TestAccptor accptor;
     74         ACE_INET_Addr addr(10000);
     75         if( accptor.open(addr) == -1 ) {
     76                 cout << "Open port Failed .. " << endl;
     77                 return -1;
     78         } 
     79        
     80        cout << "Open port ok .. " << endl;
     81 
     82 
     83         ACE_Reactor::instance()->run_reactor_event_loop();
     84         ACE::fini();
     85         return 0;
     86 }

这里有一个问题,监测事件触发和同时执行的是不是就只有一个main函数的这一个主线程就完全搞定了。现在看是这样的,但是实际中应该是不同线程都可以去调用上述的run_reactor_event_loop()来完成事件的监测和触发才对。

上述的Notify的实现机制,在其open函数中可能可以看出点东西来:

int ACE_Dev_Poll_Reactor_Notify ::open(ACE_Reactor_Impl*r,ACE_Timer_Queue*in disable_notify_pipe)
{
    if(0==disable_notify_pipe)
    {
        this->dp_reactor=(ACE_Dev_Poll_Reactor*)r;
        if(this->notification_pipe_.read_handle()==-1)
            return -1;

        #if defined (F_SETFD)
        ACE_OS::fcntl(this->notification_pipe_.read_handle(),F_SETFD,1);
        ACE_OS::fcntl(this->notification_pipe_.write_handle(),1);       


        #if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
        if(this->notification_queue_.open()==-1)
            return -1;
        if(ACE_OS::set_flags(this->notification_pipe_.write_handle(),ACE_NONBLOCK) == -1)
            return -1;  
        #endif
        if(ACE_OS::set_flags(this->notification_pipe_.read_handle(),ACE_NONBLOCK) == -1)
            return -1;          

    }
    return 0;
}

若是不用pipe机制作为Notify通知机制实现,则这个open直接就return掉了。

再来看:

int ACE_Dev_Poll_Reactor ::dispatch_io_event(Token_Guard &guard)
{
    {
        CGuard(this->repo_loc_)//对于handler的respository访问,因此加锁保护
        //step1 获取event_handler
        Event_Tuple *info=this->handle_rep_.find(handle);
        if(info==0) return 0;//说明已经没有对应的event_handler了
        if(info->suspended) return 0;//说明其他线程可能已经更改了这个handle的mask。
        short revents=(pollfd*)pfds->revents;//即已触发的事件
        //step2 根据revents来指定
         disp_mask=WIRTE/READ/EXCPET_MASK;
         callback=handle_output/input/exception;
        //step3 将当前的event_handler标识置为suspended为true,以防止被重复触发
        if(eh!=this->notify_handler_)
            info->suspended=true;
    }
    //step4 将notify的event_handler直接处理,分发,且注意不要suspend和resume notify的handler,因为如果要这样做可能引起无休止的暂停和恢复,又要去经常等待获取token。
    if(eh==this->notify_handler_)//说明是notify handler,直接处理
    {
        ACE_Notification_Buffer b;
        notify_handler_->dequeue_one(b);
        guard.release_token();
        return notify_handler_->dispatch_notify(b);//直接去分发处理notify消息
    }

    //step5 执行handler中对应的回调
    {
        ACE_Dev_Poll_Handler_Guard eh_guard(eh);
        guard.release_token();
        status=this->upcall(eh,callback,handle);

    }
    //step6 检查回调的返回值status,是否等于0去恢复由于执行回调而暂停的handle
    //step7 检查handle对应的回调的返回值,若小于0则需要remove_handler_i
}

而上述upcall在实际的内联实现文件中:

int ACE_Dev_Poll_Reactor ::upcall(eventhandler,handle)
{
    do 
    {
        status=(event_handler->*callback)(handle);
    }
    while(status>0 && event_handler != this->notify_handler_);
    return status;
}

上面虽然对于注册handler和handle关联以及最终事件触发后如何调用handler中的回调函数理清了。但是有一点,如何完成多路复用和事件分离,并没有很清楚。这里就要提到

int ACE_Dev_Poll_Reactor::work_pending_i    (   ACE_Time_Value *    max_wait_time    )  [protected]

01071 {
01072   ACE_TRACE ("ACE_Dev_Poll_Reactor::work_pending_i");
01073 
01074   if (this->deactivated_)
01075     return 0;
01076 
01077 #if defined (ACE_HAS_EVENT_POLL)
01078   if (this->start_pevents_ != this->end_pevents_)
01079 #else
01080   if (this->start_pfds_ != this->end_pfds_)
01081 #endif /* ACE_HAS_EVENT_POLL */
01082     return 1;  // We still have work_pending(). Do not poll for
01083                // additional events.
01084 
01085   ACE_Time_Value timer_buf (0);
01086   ACE_Time_Value *this_timeout = 0;
01087 
01088   this_timeout = this->timer_queue_->calculate_timeout (max_wait_time,01089                                                         &timer_buf);
01090 
01091   // Check if we have timers to fire.
01092   const int timers_pending =
01093     ((this_timeout != 0 && max_wait_time == 0)
01094      || (this_timeout != 0 && max_wait_time != 0
01095          && *this_timeout != *max_wait_time) ? 1 : 0);
01096 
01097   const long timeout =
01098     (this_timeout == 0
01099      ? -1 /* Infinity */
01100      : static_cast<long> (this_timeout->msec ()));
01101 
01102 #if defined (ACE_HAS_EVENT_POLL)
01103 
01104    // Wait for events.
01105    const int nfds = ::epoll_wait (this->poll_fd_,01106                                   this->events_,01107                                   this->size_,01108                                   static_cast<int> (timeout));
01109 
01110   if (nfds > 0)
01111     {
01112       this->start_pevents_ = this->events_;
01113       this->end_pevents_ = this->start_pevents_ + nfds;
01114     }
01115 
01116 #else
01117 
01118   struct dvpoll dvp;
01119 
01120   dvp.dp_fds = this->dp_fds_;
01121   dvp.dp_nfds = this->size_;
01122   dvp.dp_timeout = timeout;  // Milliseconds
01123 
01124   // Poll for events
01125   const int nfds = ACE_OS::ioctl (this->poll_fd_,DP_POLL,&dvp);
01126 
01127   // Retrieve the results from the pollfd array.
01128   this->start_pfds_ = dvp.dp_fds;
01129 
01130   // If nfds == 0 then end_pfds_ == start_pfds_ meaning that there is
01131   // no work pending. If nfds > 0 then there is work pending.
01132   // Otherwise an error occurred.
01133   if (nfds > -1)
01134     this->end_pfds_ = this->start_pfds_ + nfds;
01135 #endif /* ACE_HAS_EVENT_POLL */
01136 
01137   // If timers are pending,override any timeout from the poll.
01138   return (nfds == 0 && timers_pending != 0 ? 1 : nfds);
01139 }

上述代码是5.5.2版本而在最新的6.3.3版本中这里的epoll_wait 第三个参数即MaxEvents是直接用魔数1来指定的。也即每次最多返回1个触发的事件。
其他的线程当然也可以调用handle_events来监测和处理事件,但是在handle_events中对这些其他的线程也做了限制,即必须是token的owner才能执行,否则直接return。
而如何成为owner,则需要调用函数Token_Guard 的函数acquire_quietly,其代码如下:

01144 {
01145   ACE_TRACE ("ACE_Dev_Poll_Reactor::handle_events");
01146 
01147   // Stash the current time
01148   //
01149   // The destructor of this object will automatically compute how much
01150   // time elapsed since this method was called.
01151   ACE_MT (ACE_Countdown_Time countdown (max_wait_time));
01152 
01153   Token_Guard guard (this->token_);
01154   int result = guard.acquire_quietly (max_wait_time);
01155 
01156   // If the guard is NOT the owner just return the retval
01157   if (!guard.is_owner ())
01158     return result;
01159 
01160   if (this->deactivated_)
01161     return -1;
01162 
01163   // Update the countdown to reflect time waiting for the mutex.
01164   ACE_MT (countdown.update ());
01165 
01166   return this->handle_events_i (max_wait_time,guard);
01167 }

这样就变成了串行化的使用epoll获取poll去监测事件,然后监测到线程去处理,而没有成为token的owner的线程则持续等待。
参考:

http://www.jb51.cc/article/p-dqycvzeu-hz.html

猜你在找的React相关文章