最近研究了下ACE的Reactor模型的源码。相比之前自己写的ACE Select模型,复杂了不少。ACE的Reactor框架,用户通过继承ACE_Event_Handler事件处理类。关联ACE_Reactor反应器,将无阻塞的IO隐蔽在ACE_Reactor对象的底层实现,这样减少了开发的事件和风险,提高了效率。
照例,首先叙述顶层的例子。这里,我首先定义一个ACE_Event_Handler的派生类HandleAccept(故名思议也知道是干什么的),负责输入描述符的处理:
class HandleAccept : public ACE_Event_Handler { private: ACE_SOCK_Acceptor acceptor_; ACE_INET_Addr inet_address_; ACE_Reactor_Mask mask_; u_short mPort; HandleData *handle_data_; public: HandleAccept( ACE_Reactor *reactor ) : ACE_Event_Handler(reactor) {} int open(u_short nPort); virtual int handle_input(ACE_HANDLE handle = ACE_INVALID_HANDLE); virtual int handle_output(ACE_HANDLE handle = ACE_INVALID_HANDLE){} virtual int handle_close( ACE_HANDLE handle = ACE_INVALID_HANDLE,ACE_Reactor_Mask mask_ = 0); virtual ACE_HANDLE get_handle (void) const; };
再定义一个ACE_Event_Handler的派生类HandleData,负责数据的处理:
class HandleData : public ACE_Event_Handler { private: ACE_SOCK_Stream peer_; ACE_Message_Block *head_; ACE_Message_Block *data_; public: HandleData(ACE_Reactor *reactor) : ACE_Event_Handler(reactor) {} int open(); virtual int handle_input(ACE_HANDLE handle = ACE_INVALID_HANDLE); virtual int handle_output(ACE_HANDLE handle = ACE_INVALID_HANDLE){} virtual int handle_close(ACE_HANDLE handle = ACE_INVALID_HANDLE,ACE_Reactor_Mask mask_ = 0); virtual ACE_HANDLE get_handle(void) const; ACE_SOCK_Stream &peer() {return peer_;} int recv_data(ACE_SOCK_Stream strem); };
这里着重介绍几个重要的接口,首先是open,在open中需要先初始化一个acceptor的socket,并且注册相关事件的mask到ACE_Reactor的反应器中(其实这里主要是ACCEPT_MASK)。
int HandleAccept::open(u_short nPort) { inet_address_.set(nPort); if ( acceptor_.open(inet_address_,1) < 0 ) return -1; ACE_SET_BITS(mask_,ACE_Event_Handler::READ_MASK | ACE_Event_Handler::WRITE_MASK | ACE_Event_Handler::ACCEPT_MASK); std::cout<<"HandleAccept::open()"<<std::endl; return reactor()->register_handler(this,mask_); }当有输入事件的时候,handle_input回调将会被调用,在这里我们先new一个HandleData,这也是一个ACE_Event_Handler的派生类,负责数据的处理。然后调用ACE_SOCK_Acceptor的accept方法,等待输入的描述符。
int HandleAccept::handle_input(ACE_HANDLE handle) { handle_data_ = new HandleData(reactor()); ACE_INET_Addr remote_addr_; if ( acceptor_.accept(handle_data_->peer(),&remote_addr_) < 0 ) { std::cout<<"*ERROR* Fail to Accept connection"<<std::endl; return -1; } std::cout<<"connect from "<<remote_addr_.get_host_addr()<<std::endl; handle_data_->open(); return 0; }
随后,在main的主程序中:
int main(int argc,char *argv[]) { HandleAccept yankee(ACE_Reactor::instance()); yankee.open(1234); ACE_Reactor::instance()->run_event_loop(); return 0; }
如果使用传统的网络模型来实现,开发者将不得不面对以下问题:
- 设置和清楚fd_sets
- 检测事件,并对信号中断做出响应
- 管理内部锁
- 将事件多路分离给相关联的事件处理器
- 分派对I/O,信号和定时器事件的处理函数
ACE_Reactor框架解决了这一切,开发者只需关心上层的内容即可。首先从HandleAccep::open进入,层层转进到ACE_WFMO_Reactor::register_handler_i(至于为什么走到ACE_WFMO_Reactor,稍后就会提到):
int ACE_WFMO_Reactor::register_handler_i (ACE_HANDLE event_handle,ACE_HANDLE io_handle,ACE_Event_Handler *event_handler,ACE_Reactor_Mask new_masks=READ_MASK) { // ....................... int found = this->handler_rep_.modify_network_events_i (io_handle,new_masks,old_masks,new_network_events,event_handle,delete_event,ACE_Reactor::ADD_MASK); // ....................... int result = ::WSAEventSelect ((SOCKET) io_handle,new_network_events); if (found) return result; else if (result != SOCKET_ERROR && this->handler_rep_.bind_i (1,event_handler,io_handle,delete_event) != -1) // ....................... }
首先调用 this->handler_rep_.modify_network_events_i(),在这里old_masks将获取event_handler感兴趣的事件集所对应的MASK,并增加new_masks对应的事件集FD,modify_network_events_i返回是否在active handles,suspended handles或者records to be added中找到io_handle,如果找到event_handle将指向这个io_handle。WSAEventSelect 将io_handle和event_handle绑定在一起。如果modify_network_events_i没有找到io_handle在记录中,则随后调用this->handler_rep_.bind_i负责插入一个新的Event_Handler入口到to_be_added_info_[]中。随后,to_be_added_info_[]的数据会被加入到current_handles_[]中,这一步将在ACE_WFMO_Reactor_Handler_Repository::make_changes_in_to_be_added_infos()中完成。
从ACE_Reactor::run_event_loop()进入,逐步分析。run_event_loop()走到ACE_Reactor::run_reactor_event_loop。程序进入一个while循环:
int ACE_Reactor::run_reactor_event_loop (REACTOR_EVENT_HOOK eh)//typedef int (*REACTOR_EVENT_HOOK)(ACE_Reactor *); { // ............................. while (1) { int const result = this->implementation_->handle_events (); // ............................. }
this->implementation_是在ACE_Reactor::ACE_Reactor()中分配的,这是一种代理模式的用法,为的是确保windows和linux之间的移植性,具体的实现根据不同的平台而异,具体底层的各种操作,都是由这个this->implementation_做的。在windows下,this->implementation_由ACE_WFMO_Reactor在ACE_Reactor的构造中实现:
ACE_Reactor::ACE_Reactor (ACE_Reactor_Impl *impl,bool delete_implementation) { // ..................... ACE_NEW (impl,ACE_WFMO_Reactor); //......................... this->implementation (impl); // ......................... } }接着前面的this->implementation_->handle_events ()调用,由这里进入,调用到ACE_WFMO_Reactor::event_handling,这里会调用wait_for_multiple_events和safe_dispatch,前者负责监听输入,后者负责分发事件。首先分析wait_for_multiple_events:
DWORD ACE_WFMO_Reactor::wait_for_multiple_events (int timeout,int alertable) { // ................................ #else return ::WaitForMultipleObjectsEx (this->handler_rep_.max_handlep1 (),this->handler_rep_.handles (),FALSE,timeout,alertable); #endif /* ACE_HAS_PHARLAP */ //noblock }这里,调用WaitForMultipleObjectsEx非阻塞地去等待相关事件,第一个参数是等待对象的数组的size(对象句柄的最大数量是64。此参数不能是零),第二个参数制定一个等待处理的对象数组。this->handler_rep_是ACE_WFMO_Reactor_Handler_Repository对象,ACE_Reactor将所有要管理的handle都存储在这个对象中this->handler_rep_.handles()将返回ACE_WFMO_Reactor_Handler_Repository::current_handles_,这保存了当前记录的相关句柄。这里将会向上层返回一个有输入的对象数组下标。
再来分析数据分发的safe_dispatch,这个方法经过层层调用,执行到ACE_WFMO_Reactor::complex_dispatch_handler,这里通过之前获取的数组下标,获取相关handle:
int ACE_WFMO_Reactor::complex_dispatch_handler (DWORD slot,ACE_HANDLE event_handle) { // This dispatch is used for I/O entires. ACE_WFMO_Reactor_Handler_Repository::Current_Info ¤t_info = this->handler_rep_.current_info ()[slot]; // .......................................然后调用upcall:
int ACE_WFMO_Reactor::complex_dispatch_handler (DWORD slot,ACE_HANDLE event_handle) { // .............. problems |= this->upcall (current_info.event_handler_,current_info.io_handle_,events); // .............. }在upcall中,通过匹配事件描述符,决定执行的顶层HandleAccept::handle_input()方法,假设此时有个外部链接connect进来,则会造成一个FD_ACCEPT事件:
ACE_Reactor_Mask ACE_WFMO_Reactor::upcall (ACE_Event_Handler *event_handler,WSANETWORKEVENTS &events) { // ................. if (ACE_BIT_ENABLED (actual_events,FD_ACCEPT)) { action = event_handler->handle_input (io_handle); //这里执行最顶层HandleAccept::handle_input方法 if (action <= 0) { ACE_CLR_BITS (actual_events,FD_ACCEPT); if (action == -1) ACE_SET_BITS (problems,ACE_Event_Handler::ACCEPT_MASK); } } // .................. }
如果HandleAccept::handle_input()方法返回错误值-1,那么程序会将ACCEPT_MASK加入problems,这个值将导致后面执行handle_close()
int ACE_WFMO_Reactor_Handler_Repository::remove_handler_i (size_t slot,ACE_Reactor_Mask to_be_removed_masks) { // ...................... else if (ACE_BIT_ENABLED (to_be_removed_masks,ACE_Event_Handler::DONT_CALL) == 0) { ACE_HANDLE handle = this->current_info_[slot].io_handle_; this->current_info_[slot].event_handler_->handle_close (handle,to_be_removed_masks); } return 0; }总之,ACE的Reactor框架充分利用了C++的封装性以及多态性,并通过代理,实现了跨平台底层的透明,对快速开发应用有重要的意义。