但是,为了解决某些特别的原因,如大量的通知存储、规避悬空处理器指针等, ACE 也提供了一种有别于 Pipe 的解决方案,其采用消息排队的方式工作。当采取这种方式时,需定义 ACE_HAS_REACTOR_NOTIFICATION_QUEUE 宏并重生成 ACE 。
2.
Notify的能力、风险和风险规避
Notify
机制最重要的能力是:
1.
让反应器拥有了处理无限处理器的能力
2.
其次是提供了必要时解除反应器事件检查阻塞的能力。
Reactor的notify()让用户直接提供给Reactor待通知反应器的指针,而这些处理器无需注册到反应器上,从而提供了无限的扩展能力。但是,在使用ACE_Pipe的实现中,如果使用不当,可能会造成严重的后果。
潜在的风险:
1.
处理器被销毁后,排队等候的对应通知才被分派
2.
ACE_Pipe的数据缓冲是有限的,大量通知到来可能会造成阻塞甚至死锁
不过,在采用队列的Notify实现中,ACE
给提供了问题的解决方法。
1.
采用队列方式可无限扩展Notify
的数量
2.
采用队列方式为通知的查询提供的方便,因此ACE_Reactor提供puerge_pending_notifications()方法来移除指定的通知,当处理器销毁时,可以使用该方法移除当前队列中所有和自己相关的通知,避免了使用悬空指针。
在采用ACE_Pipe方式中,puerge_pending_notifications()被实现为空方法。
/// sleeping.
ACE_Reactor_Notify * notify_handler_;
7. 实现分析
restart 0 ,
ACE_Sig_Handler * disable_notify_pipe ACE_DISABLE_NOTIFY_PIPE_DEFAULT,
ACE_Reactor_Notify );
其中ACE_DISABLE_NOTIFY_PIPE_DEFAULT的值为0,表示默认要使用Notify功能。
这个值会传递给ACE_Select_Reactor_Notify的构造函数。在open()的实现中:
{
ACE_NEW_RETURN(this->notify_handler_,
ACE_Select_Reactor_Notify,
-1);
if(notify_handler_==0)
result=;
else
delete_notify_handler_true;
}
handler_rep_.open(size) )
result = ;
else ( open( disable_notify_pipe) )
{
ACE_ERROR((LM_ERROR,
ACE_TEXT("%p ),0);">notificationpipeopenFailed)));
result;
}
如果没有使用外部的notify时,ACE_Select_Reactor_T将会让notify_handler指向一个new出来的ACE_Select_Reactor_Notify类型的对象,并调用它的open()方法进行初始化。
ACE_Select_Reactor_Notify的open()实现如下(删除了非关键代码):
{
select_reactor_
dynamic_cast < ACE_Select_Reactor_Impl *> (r);
(select_reactor_ )
{
errno EINVAL;
return ;
}
notification_pipe_.open() )
;
(ACE::set_flags( notification_pipe_.read_handle(),
ACE_NONBLOCK)
register_handler
(
{
;
}
注意,这里的disable_notify_pipe就是ACE_Select_Reactor_T的open()方法中传递的参数,通过它,可以设置不使用notify功能。
可以看出,ACE_Select_Reactor_Notify的open()的主要功能是初始化自己的ACE_Pipe类型成员对象notification_pipe_,并把它注册到Reactor中,监测read事件。
ACE_Pipe的open()实现如下(Win32平台并删除了非关键代码):
ACE_SOCK_Acceptoracceptor;
ACE_SOCK_Connectorconnector;
ACE_SOCK_Streamreader;
ACE_SOCK_Streamwriter;
int result ;
# defined(ACE_WIN32)
ACE_INET_Addrlocal_any(static_cast u_short > #endif
(acceptor.open(local_any) || acceptor.get_local_addr(my_addr)
{
ACE_INET_Addrsv_addr(my_addr.get_port_number(),
ACE_LOCALHOST);
//Establishaconnectionwithinthesameprocess.(connector.connect(writer,sv_addr)(acceptor.accept(reader))
{
writer.close();
result;
}
}
acceptor.close();
handles_[ ] reader.get_handle();
writer.get_handle();