ACE_Select_Reactor的Notify功能实现浅析

前端之家收集整理的这篇文章主要介绍了ACE_Select_Reactor的Notify功能实现浅析前端之家小编觉得挺不错的,现在分享给大家,也给大家做个参考。

ACE_Select_ReactorNotify功能实现浅析

作者 : ydogg 如需转载 , 请注明 .
1. 前言
本文关注平台为Win32,但在其它的类Unix 平台,就实现 框架而言并没有太多变 ,惟一不同的是用于底机制的ACE_Pipe 实现
但是,为了解决某些特别的原因,如大量的通知存储、规避悬空处理器指针等, ACE 也提供了一种有别于 Pipe 解决方案,其采用消息排队的方式工作。当采取这种方式时,需定义 ACE_HAS_REACTOR_NOTIFICATION_QUEUE 宏并重生成 ACE

2. Notify能力、风险和风险

Notify 机制最重要的能力是:
1.
让反应器拥有了处理无限处理器的能力
2.
其次是提供了必要时解除反应器事件检查阻塞的能力。

Reactornotify()用户直接提供给Reactor通知反应器的指针,而这些处理器无需注册到反应器上,从而提供了无限的扩展能力。但是,在使用ACE_Pipe的实现中,如果使用不当,可能会造成严重的后果。

潜在的风险:
1.
处理器被销毁后,排队等候的对应通知才被分派
2.
ACE_Pipe的数据缓冲是有限的,大量通知到来可能会造成阻塞甚至死锁

因为通知的信息(包括处理器指针和掩码)以流数据的方式被写入ACE_Pipe,不能进行遍历查找,所以当对应处理器被销毁后,如果其在CE_Pipe的数据区中还有存储有通知,则ACE_Reactor将会在分派中使用悬空指针,造成未知的后果。另外,在ACE_Pipe的数据缓冲已满情况下,在处理器的回调中依然发送通知,就会因阻塞发生死锁。

在采用ACE_PipeNotify实现中,以上两个问题的规避只能依靠用户自己的应用机制。

不过,在采用队列的Notify实现中,ACE 给提供了问题的解决方法
1.
采用队列方式可无限扩展Notify 数量
2.
采用队列方式为通知查询提供的方便,因此ACE_Reactor提供puerge_pending_notifications()方法来移除指定的通知,当处理器销毁时,可以使用该方法移除当前队列中所有和自己相关的通知,避免了使用悬空指针。

在采用ACE_Pipe方式中,puerge_pending_notifications()被实现为空方法


3. ACE_Select_Reactor类图
可以看出,ACE_Select_Reactor_T 是具体的实现负责 者。

4. 参与Notify机制的类
参与Notify 机制的有:
ACE_Select_Reactor_Impl ACE_Select_Reactor_Notify ACE_Pipe 。从ACE_Select_Reactor 开始,采用自上而向下的行分析。
5. 实现原理
Reactor 实现 方法有所不同,比如Select_Reactor 是通管道来实现notify 功能,而WFMO则 过ACE_Event类 的的事件加通知队列来实现
Reactor 在初始化,会 一个pipe 。然后将该pipe 的句柄注册Reactor 这样 Pipe Reactor 的事件分派句柄集中,像其他普通句柄一,被Reactor 监测 和分派。当用户notify()时 notify() 会把参数信息写入 管道,通过事件监测,从而Reactor 就能得知有新notify 到来,从管道 相关数据后,就可以 的分派 。从而实现 通知和事件分派的序列化。
详细 的来ACE为 所有的Reactor 都定专为 自己服Notify类 ,如
ACE_Select_Reactor_Notify ACE_WFO_Reactor_Notify
类Unix 平台上,支持管道并且select 可用于监测 管道事件。但是在Win32 平台并不支持管道功能,而且Winsock select 只能支持对socket 句柄的事件监测 。因此,ACE Win32 平台用一个本地 来模拟Pipe 功能,其中接受 socket 句柄作为Pipe读 句柄,而 socket 句柄作为Pipe 的写句柄。
可以看出,notify 功能实现机制类似于ACE_Reactor 用法且也有一个似于ACE_Event_Handler 存在,只不是被包含在Reactor 中。在逻辑 上,就是一个本地 ,用于和 内的其他部分的通信 (略有差别的地方在于事件分派的方式)
ACE_Select_Reactor,ACE_Pipe 充当了ACE_Sock_Stream 的角色,而ACE_Select_Reactor_Notify则 充当了ACE_Event_Handler 的角色。ACE_Select_Reactor_Notify 派生自ACE_Reactor_Notify,ACE_Reactor_Notify 正是派生自ACE_Event_Handler.

6. 相关变量
ACE_Select_Reactor_Impl类 中,定为ACE_Reactor_Notify* 的成员变
/ CallbackobjectthatunblockstheACE_Select_Reactorifit's
/// sleeping.
ACE_Reactor_Notify * notify_handler_;

7. 实现分析

7.1 初始化
ACE_Select_Reactor_T open 函数如下:
virtual int open(size_tmax_number_of_handles = DEFAULT_SIZE,
restart 0 ,
ACE_Sig_Handler
* disable_notify_pipe ACE_DISABLE_NOTIFY_PIPE_DEFAULT,
ACE_Reactor_Notify
);

其中ACE_DISABLE_NOTIFY_PIPE_DEFAULT值为0,表示默要使用Notify功能
传递给ACE_Select_Reactor_Notify的构造函数open()实现中:

if (result != - 1 && this -> notify_handler_ == 0 )
{
ACE_NEW_RETURN(
this->notify_handler_,
ACE_Select_Reactor_Notify,
-1);

if(notify_handler_==0)
result
=;
else
delete_notify_handler_true;
}


handler_rep_.open(size) )
result
= ;
else ( open( disable_notify_pipe) )
{
ACE_ERROR((LM_ERROR,
ACE_TEXT(
"%p ),0);">notificationpipeopenFailed)));
result
;
}

如果没有使用外部的notifyACE_Select_Reactor_T将会notify_handler指向一个new出来的ACE_Select_Reactor_Notify型的象,并用它的open()方法行初始化。

ACE_Select_Reactor_Notifyopen()实现如下(了非关):

(disable_notify_pipe )
{
select_reactor_
dynamic_cast
< ACE_Select_Reactor_Impl *> (r);

(select_reactor_ )
{
errno
EINVAL;
return ;
}

notification_pipe_.open() )
;

(ACE::set_flags( notification_pipe_.read_handle(),
ACE_NONBLOCK)

register_handler
(

{
;
}

注意,里的disable_notify_pipe就是ACE_Select_Reactor_Topen()方法传递的参数,通它,可以置不使用notify功能
可以看出,
ACE_Select_Reactor_Notifyopen()的主要功能是初始化自己的ACE_Pipe型成员对notification_pipe_,并把它注册Reactor中,监测read事件。

ACE_Pipeopen()实现如下(Win32平台并除了非关):

ACE_INET_Addrmy_addr;
ACE_SOCK_Acceptoracceptor;
ACE_SOCK_Connectorconnector;
ACE_SOCK_Streamreader;
ACE_SOCK_Streamwriter;
int result ;
#
defined(ACE_WIN32)
ACE_INET_Addrlocal_any(static_cast
u_short > #endif
(acceptor.open(local_any) || acceptor.get_local_addr(my_addr)
{
ACE_INET_Addrsv_addr(my_addr.get_port_number(),
ACE_LOCALHOST);

//Establishaconnectionwithinthesameprocess.(connector.connect(writer,sv_addr)(acceptor.accept(reader))
{
writer.close();
result
;
}

}


acceptor.close();
handles_[ ] reader.get_handle();
writer.get_handle();

可以看到, ACE_Pipe 了一个自身的 Tcp 接来模 Pipe 写句柄分 就是两端的 socket 句柄,很巧妙的 Pipe 移植。 ACE_Pipe read_handle () 函数返回的是 handles_ [ 0 ] ,也就是说,注册 Reactor 是读 socket, handles_ [1] 将会被 客户所使用。

7.2 方法

使用notify相关方法ACE_Select_Reactor_T充当中介,会将将它直接交由内部的notify_handler对象理。
ACE_Select_Reactor_Tnotify()代码

ssize_t const n notify(eh,mask,imeout);
? : ;

ACE_Select_Reactor_Nofitynotify()函数代码

;

ACE_Event_Handler_varsafe_handler(event_handler);

(event_handler)
event_handler
add_reference();

ACE_Notification_Bufferbuffer(event_handler,mask);
ssize_t
ACE::send( notification_pipe_.write_handle(),
(
char * ) & buffer,255);">sizeof (n // Nofailures. safe_handler.release();

将用户传递的“目的Handler“和”掩“参数,写入了管道,实则是通过socket句柄发送了数据。ACE_Notification_Buffer是一个简单的数据包裹类。

7.3 事件分派

Reactor()事件多路分离流程:
ACE_Reactor::run_reactor_event_loop() -> ACE_Select_Reactor_T::handle_events() ->
ACE_Select_Reactor_T::
handle_events_i()

ACE_Select_Reactor_T::()代码如下:

ACE_SEH_TRY
{
Weusethedatamemberdispatch_set_asthecurrentdispatch
set.
Weneedtostartfromacleandispatch_set dispatch_set_.rd_mask_.reset();
dispatch_set_.wr_mask_.reset();
dispatch_set_.ex_mask_.reset();

number_of_active_handles wait_for_multiple_events( dispatch_set_,
max_wait_time);

result
dispatch(number_of_active_handles,0);">dispatch_set_);
}
ACE_SEH_EXCEPT(
release_token())
{

}

Reactor在监测事件时,不区分Noftiy用句柄和其它句柄,一视同仁。所以无需关心wait_for_multiple_events的实现。
ACE_Select_Reactor_T:: dispatch 的关键代码如下:dispatch_timer_handlers(other_handlers_dispatched) StatehaschangedortimerqueuehasFailed,exitloop. break dispatch_notification_handlers
(dispatch_set,
active_handle_count,
other_handlers_dispatched)
StatehaschangedoraserIoUsfailurehasoccured,soexit
loop. dispatch_io_handlers
(dispatch_set,
io_handlers_dispatched)
Statehaschanged,soexitloop. ;


可以看出,Select_Reactor分派事件时是按照:定时器、通知IO事件的顺序进行的。
这里也许有人会疑惑,为什么不直接将notify的处理放在io的分派函数
dispatch_io_handlers()中处理呢? 这个想法合情合理,但是Select_Reactor中,规定notify事件的优先级要比普通IO事件的优先级高,以便可以处理一些优先级较高的突发事件。所以必须在IO事件分派前,主动的分派notify事件。并且notify事件分派后,需要将Pipe的句柄从dispatch_set去除,以免在事件分派中被重复分派。


dispatch_notification_handlers
代码dispatch_notifications(number_of_active_handles,
dispatch_set.rd_mask_);


{
number_of_handlers_dispatched
+= n;
number_of_active_handles
-= n;
}


ACE_Select_Reactor_T::dispatch_notifications函数代码ACE_HANDLE read_handle notification_pipe_.read_handle();

(read_handle ACE_INVALID_HANDLE
rd_mask.is_set(read_handle))
{
--number_of_active_handles;
rd_mask.clr_bit(read_handle);
returnhandle_input(read_handle);
}
;

到了这里,handle_input出现了,下面的处理和ACE_Event_Handler相近了。
ACE_Select_Reactor_Notify::handle_input()函数代码

number_dispatched ;
ACE_Notification_Bufferbuffer;

while ((result read_notify_pipe(handle,buffer)) dispatch_notify(buffer) ++ number_dispatched;

(number_dispatched max_notify_iterations_)
;
}

代码逻辑很清楚,从Pipe的缓冲区中依次读取所有接受到的Notify事件信息,并且交给dispatch_notify去处理,如果达到设定值max_notify_iterations_(ACE_Reactor:: max_notify_iterations函数功能),就停止分派。

需要注意:由于是依次从Pipe中读取信息,如果在用户指定的Notify处理Handler调用notify()(嵌套使用,给在此写入信息,就容易造成Reactor的死循环。
WFMO_Reactor的对应实现中,因为使用Event,所以对应的函数handle_signal().

ACE_Select_Reactor_Notify::dispatch_notify

(buffer.eh_ )
{
ACE_Event_Handler
event_handler buffer.eh_;

bool requires_reference_counting
event_handler
reference_counting_policy().value()
ACE_Event_Handler::Reference_Counting_Policy::ENABLED;

switch (buffer.mask_)
{
case ACE_Event_Handler::READ_MASK:
ACE_Event_Handler::ACCEPT_MASK:
result
handle_input(ACE_INVALID_HANDLE);
ACE_Event_Handler::WRITE_MASK:
result
handle_output(ACE_INVALID_HANDLE);
ACE_Event_Handler::EXCEPT_MASK:
result
handle_exception(ACE_INVALID_HANDLE);
ACE_Event_Handler::QOS_MASK:
result
handle_qos(ACE_INVALID_HANDLE);
ACE_Event_Handler::GROUP_QOS_MASK:
result
handle_group_qos(ACE_INVALID_HANDLE);
default :
Shouldwebailoutifwegetaninvalidmask? ACE_ERROR((LM_ERROR,
ACE_TEXT(
" invalidmask=%d )
event_handler
handle_close(ACE_INVALID_HANDLE,
ACE_Event_Handler::EXCEPT_MASK);
}

这里是分派的终点,用户传递的用于处理Notify事件的Handler根据mask的类型而被调用

8.序列图

猜你在找的React相关文章