实现接口不同于类Unix平台上,select在windows上仅仅支持socket句柄的多路分离。而且在Unix平台上,select也不支持同步对象、线程或者SystemV消息队列的多路分离。
所以windows上增加了以WaitForMultipleObjets系统函数替代select的ACE_WFMO_Reactor类。
其新增特性有:
1.因为WaitForMultipleObjets支持多线程同时并发调用,所以不再需要实现owner()函数来进行序列化来按照Leader/Followers的方式轮流进行分发事件。
2.新增了和select相似的对应的Handler_Repository和Reactor_Notify的类,用于处理IO及定时器事件。
3.每一个对handle_events的调用都会等待一个句柄变成活动句柄。但是在获取event时,会遍历获取所有的活动句柄避免其他活动句柄被饿死。
4.ACE_WFMO_Reactor的子类,应用程序能处理所有挂载上的事件以及窗口消息。
ACE_WFMO_Reactor是ACE_Reactor在windows上的默认实现。
00848 ACE_INLINE int
00849 ACE_WFMO_Reactor::handle_events (ACE_Time_Value &how_long)
00850 {
00851 return this->event_handling (&how_long,FALSE);
00852 }
event_handling()函数中存在的dowhile循环中会调用wait_for_multiple_events,这个函数是只要有任意句柄触发就会返回,然后会去调用safe_dispatch函数去分发,这个函数中直接调用了dispatch。而dispatch中会调用dispatch_handles函数,这个函数在说明就直说了“
Dispatches any active handles from handles_[] to handles_[active_handles_] using to poll through our handle set looking for active handles.”。即这个函数中不仅会分发要取的这一个活动句柄,还会把所有的句柄集中的所有的活动句柄都获取到。
5.事件分发时会出现,指向相同处理器handler的多线程化分派。
可能会出现线程间的竞争状态。比如线程一上socket正在处理IO事件,另一个IO事件又被其他线程分发处理了。所以需要显式的对竞争状态进行保护。
01954 int
01955 ACE_WFMO_Reactor::dispatch_handler (DWORD slot,01956 DWORD max_handlep1)
01957 {
01958 // Check if there are window messages that need to be dispatched
01959 if (slot == max_handlep1)
01960 return this->dispatch_window_messages ();
01961
01962 // Dispatch the handler if it has not been scheduled for deletion.
01963 // Note that this is a very week test if there are multiple threads
01964 // dispatching this slot as no locks are held here. Generally,you
01965 // do not want to do something like deleting the this pointer in
01966 // handle_close() if you have registered multiple times and there is
01967 // more than one thread in WFMO_Reactor->handle_events().
01968 else if (!this->handler_rep_.scheduled_for_deletion (slot))
01969 {
01970 ACE_HANDLE event_handle = *(this->handler_rep_.handles () + slot);
01971
01972 if (this->handler_rep_.current_info ()[slot].io_entry_)
01973 return this->complex_dispatch_handler (slot,01974 event_handle);
01975 else
01976 return this->simple_dispatch_handler (slot,01977 event_handle);
01978 }
01979 else
01980 // The handle was scheduled for deletion,so we will skip it.
01981 return 0;
01982 }
01983
上述代码是实际分派时,而看到的对于竞争状态的避免,则在如下代码中:
02020 int
02021 ACE_WFMO_Reactor::complex_dispatch_handler (DWORD slot,02022 ACE_HANDLE event_handle)
02023 {
02024 // This dispatch is used for I/O entires.
02025
02026 ACE_WFMO_Reactor_Handler_Repository::Current_Info ¤t_info =
02027 this->handler_rep_.current_info ()[slot];
02028
02029 WSANETWORKEVENTS events;
02030 ACE_Reactor_Mask problems = ACE_Event_Handler::NULL_MASK;
02031 if (::WSAEnumNetworkEvents ((SOCKET) current_info.io_handle_,02032 event_handle,02033 &events) == SOCKET_ERROR)
02034 problems = ACE_Event_Handler::ALL_EVENTS_MASK;
02035 else
02036 {
02037 // Prepare for upcalls. Clear the bits from <events> representing
02038 // events the handler is not interested in. If there are any left,
02039 // do the upcall(s). upcall will replace events.lNetworkEvents
02040 // with bits representing any functions that requested a repeat
02041 // callback before checking handles again. In this case,continue
02042 // to call back unless the handler is unregistered as a result of
02043 // one of the upcalls. The way this is written,the upcalls will
02044 // keep being done even if one or more upcalls reported problems.
02045 // In practice this may turn out not so good,but let's see. If any
02046 // problems,please notify Steve Huston <shuston@riverace.com>
02047 // before or after you change this code.
02048 events.lNetworkEvents &= current_info.network_events_;
02049 while (events.lNetworkEvents != 0)
02050 {
02051 ACE_Event_Handler *event_handler =
02052 current_info.event_handler_;
02053
02054 int reference_counting_required =
02055 event_handler->reference_counting_policy ().value () ==
02056 ACE_Event_Handler::Reference_Counting_Policy::ENABLED;
02057
02058 // Call add_reference() if needed.
02059 if (reference_counting_required)
02060 {
02061 event_handler->add_reference ();
02062 }
02063
02064 // Upcall
02065 problems |= this->upcall (current_info.event_handler_,02066 current_info.io_handle_,02067 events);
02068
02069 // Call remove_reference() if needed.
02070 if (reference_counting_required)
02071 {
02072 event_handler->remove_reference ();
02073 }
02074
02075 if (this->handler_rep_.scheduled_for_deletion (slot))
02076 break;
02077 }
02078 }
02079
02080 if (problems != ACE_Event_Handler::NULL_MASK
02081 && !this->handler_rep_.scheduled_for_deletion (slot) )
02082 this->handler_rep_.unbind (event_handle,problems);
02083
02084 return 0;
02085 }
须知这里的 events.lNetworkEvents是直接从WSAEnumNetworkEvents中取出来的。用MSDN的原话”Pointe t a WSANETWORKEVENTS structure that i s filled with a recod of network events that occurred and an associated error codes.”在具体的upcall执行时比TP_Select或者Select_T多了很多关于mask的判断。
02087 ACE_Reactor_Mask
02088 ACE_WFMO_Reactor::upcall (ACE_Event_Handler *event_handler,02089 ACE_HANDLE io_handle,02090 WSANETWORKEVENTS &events)
02091 {
02092 // This method figures out what exactly has happened to the socket
02093 // and then calls appropriate methods.
02094 ACE_Reactor_Mask problems = ACE_Event_Handler::NULL_MASK;
02095
02096 // Go through the events and do the indicated upcalls. If the handler
02097 // doesn't want to be called back,clear the bit for that event.
02098 // At the end,set the bits back to <events> to request a repeat call.
02099
02100 long actual_events = events.lNetworkEvents;
02101 int action;
02102
02103 if (ACE_BIT_ENABLED (actual_events,FD_WRITE))
02104 {
02105 action = event_handler->handle_output (io_handle);
02106 if (action <= 0)
02107 {
02108 ACE_CLR_BITS (actual_events,FD_WRITE);
02109 if (action == -1)
02110 ACE_SET_BITS (problems,ACE_Event_Handler::WRITE_MASK);
02111 }
02112 }
需要注意的地方是:
1.waitForMultipleObjects的WRITE_MASK的语义和select不同!如果接受不了,还是切回到ACE_Select_Reactor吧。select可以持续的检测一个socket的可写状态,只要可以就可以一直被检测到。而waitForMultipleObjects则只在第一次被连接时设定WRITE事件,当检测到写事件后就只能一直不停的写或者send直至失败或者返回EWOULDBLOCK.
2.ACE_WFMO_Reactor不再需要在处理事件前,将句柄挂起,然后处理完再恢复。因为当ACE_WFMO_Reactor线程处理事件需要从WSAEnumNetworkEvents()获取该IO事件的掩码时,os会自动清除这个额句柄的内部掩码,最终结果是即使多个线程同时多路分离socket句柄,也只能有一个线程获取IO事件掩码并将分配处理器。
而select没有这样的自动关OS序列化的能力,所以才有挂起和恢复操作。
注: WSAEventSelect是用来将其他事件对象等和socket进行绑定的函数,然后绑定的socket被用于检测,若socket检测到事件则说明绑定的对象上有事件了。 WSAEnumNetworkEvents用来获取激活的事件IO事件的掩码用于处理。