源码可以到http://www.aoc.nrao.edu/php/tjuerges/ALMA/ACE-5.5.2/html/ace/上去找.
ACE_Select_Reactor_T主要是使用select来进行多路复用和分离。为了监测多个fd,ACE中新增了ACE_Handle_Set,就像类说明的说的,C++ wrapper facade for the socket @c fd_set abstraction.
用来作为fd_set的抽象。这个类是ACE_Dev_Poll_Reactor和ACE_WFMO_Reactor中不会使用到得。
其中ACE_Select_Reactor_T的继承图如下:
而ACE_Select_Reactor_T中最核心的当然是事件的多路复用函数handle_events_i,其源码如下:
01414 template <class ACE_SELECT_REACTOR_TOKEN> int
01415 ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::handle_events_i
01416 (ACE_Time_Value *max_wait_time)
01417 {
01418 int result = -1;
01419
01420 ACE_SEH_TRY
01421 {
01422 // We use the data member dispatch_set_ as the current dispatch
01423 // set.
01424
01425 // We need to start from a clean dispatch_set
01426 this->dispatch_set_.rd_mask_.reset ();
01427 this->dispatch_set_.wr_mask_.reset ();
01428 this->dispatch_set_.ex_mask_.reset ();
01429
01430 int number_of_active_handles =
01431 this->wait_for_multiple_events (this->dispatch_set_,01432 max_wait_time);
01433
01434 result =
01435 this->dispatch (number_of_active_handles,01436 this->dispatch_set_);
01437 }
01438 ACE_SEH_EXCEPT (this->release_token ())
01439 {
01440 // As it stands now,we catch and then rethrow all Win32
01441 // structured exceptions so that we can make sure to release the
01442 // <token_> lock correctly.
01443 }
01444
01445 return result;
01446 }
从这段代码上看,ACE_Select_Reactor_T是可以监测到多个fd返回出发的多个events的,这点与ACE_Dev_Poll_Reactor的监测多个fd但是一次只能最多返回一个event不同,那么这些事件的处理又是怎么样的呢?
上述的dispatch 函数同步层层调用,最后会调到dispatch_io_set,这个函数也是处理多个触发事件的函数,其实现为:
01179 template <class ACE_SELECT_REACTOR_TOKEN> int
01180 ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::dispatch_io_set
01181 (int number_of_active_handles,01182 int &number_of_handlers_dispatched,01183 int mask,01184 ACE_Handle_Set &dispatch_mask,01185 ACE_Handle_Set &ready_mask,01186 ACE_EH_PTMF callback)
01187 {
01188 ACE_TRACE ("ACE_Select_Reactor_T::dispatch_io_set");
01189 ACE_HANDLE handle;
01190
01191 ACE_Handle_Set_Iterator handle_iter (dispatch_mask);
01192
01193 while ((handle = handle_iter ()) != ACE_INVALID_HANDLE &&
01194 number_of_handlers_dispatched < number_of_active_handles)
01195 {
01196 ++number_of_handlers_dispatched;
01197
01198 this->notify_handle (handle,01199 mask,01200 ready_mask,01201 this->handler_rep_.find (handle),01202 callback);
01203
01204 // clear the bit from that dispatch mask,
01205 // so when we need to restart the iteration (rebuilding the iterator...)
01206 // we will not dispatch the already dispatched handlers
01207 this->clear_dispatch_mask (handle,mask);
01208
01209 if (this->state_changed_)
01210 {
01211
01212 handle_iter.reset_state ();
01213 this->state_changed_ = false;
01214 }
01215 }
01216
01217 return 0;
01218 }
从实现中可以看出,在while循环中依次调用notify_handle来处理传进来的所有的handles,那么notify_handle又是如何实现的?
00799 template <class ACE_SELECT_REACTOR_TOKEN> void
00800 ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::notify_handle
00801 (ACE_HANDLE handle,00802 ACE_Reactor_Mask mask,00803 ACE_Handle_Set &ready_mask,00804 ACE_Event_Handler *event_handler,00805 ACE_EH_PTMF ptmf)
00806 {
00807 ACE_TRACE ("ACE_Select_Reactor_T::notify_handle");
00808 // Check for removed handlers.
00809 if (event_handler == 0)
00810 return;
00811
00812 int reference_counting_required =
00813 event_handler->reference_counting_policy ().value () ==
00814 ACE_Event_Handler::Reference_Counting_Policy::ENABLED;
00815
00816 // Call add_reference() if needed.
00817 if (reference_counting_required)
00818 {
00819 event_handler->add_reference ();
00820 }
00821
00822 int status = (event_handler->*ptmf) (handle);
00823
00824 if (status < 0)
00825 this->remove_handler_i (handle,mask);
00826 else if (status > 0)
00827 ready_mask.set_bit (handle);
00828
00829 // Call remove_reference() if needed.
00830 if (reference_counting_required)
00831 {
00832 event_handler->remove_reference ();
00833 }
00834 }
最终可以看到每个handle对应的事件处理也就是:
(event_handler->*ptmf) (handle)。
那么这样说的话,每次在调用handle_events,select监测所有fd的事件,若有触发,则全部返回。将这些触发的io事件依次的调用handler对应的回调函数进行处理。
一个线程处理这么多个fd的事件,而且还是要排队处理,显然不合理。那么如何去分摊这些事件的处理呢?答案当然是多线程,这里又引入了一个ACE_SELECT_REACTOR_TOKEN类来进行线程同步。
具体怎么做?由于现在监测事件和处理事件都在同一个大的函数中,谁能执行函数,谁就有权利进行监测和处理事件。另一方面,在一个线程已经监测到事件并进行处理的时候,其他线程必须要防止没有触发事件的fd上此时有事件。ACE中具体实现方案就是,owner函数来替换旧的线程,谁能够成为ACE_Select_Reactor的owner,谁就可以去执行监测并处理事件,而为了保证新旧owner之间处理时的安全以及序列化,就增加了ACE_SELECT_REACTOR_TOKEN。
下面来看owner函数的实现:
00118 template <class ACE_SELECT_REACTOR_TOKEN> int
00119 ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::owner (ACE_thread_t tid,00120 ACE_thread_t *o_id)
00121 {
00122 ACE_TRACE ("ACE_Select_Reactor_T::owner");
00123 ACE_MT (ACE_GUARD_RETURN (ACE_SELECT_REACTOR_TOKEN,ace_mon,this->token_,-1));
00124
00125 if (o_id)
00126 *o_id = this->owner_;
00127
00128 this->owner_ = tid;
00129
00130 return 0;
00131 }
这段代码说明了必须要先获取到Token,当前调用的执行的线程也即新的线程才能成为owner。
这个条件在handle_events中也是有限定的,如果不是当前线程作为owner去执行,handle_events是无法执行的,会立刻退出。其代码如下:
01385 template <class ACE_SELECT_REACTOR_TOKEN> int
01386 ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::handle_events
01387 (ACE_Time_Value *max_wait_time)
01388 {
01389 ACE_TRACE ("ACE_Select_Reactor_T::handle_events");
01390
01391 #if defined (ACE_MT_SAFE) && (ACE_MT_SAFE != 0)
01392
01393 // Stash the current time -- the destructor of this object will
01394 // automatically compute how much time elapsed since this method was
01395 // called.
01396 ACE_Countdown_Time countdown (max_wait_time);
01397
01398 ACE_GUARD_RETURN (ACE_SELECT_REACTOR_TOKEN,-1);
01399
01400 if (ACE_OS::thr_equal (ACE_Thread::self (),01401 this->owner_) == 0 || this->deactivated_)
01402 return -1;
01403
01404 // Update the countdown to reflect time waiting for the mutex.
01405 countdown.update ();
01406 #else
01407 if (this->deactivated_)
01408 return -1;
01409 #endif /* ACE_MT_SAFE */
01410
01411 return this->handle_events_i (max_wait_time);
01412 }
这样大家应该比较清楚了。ACE_Select_Reactor_T的多路复用和分离就是这样去实现的。