一、 概要
目前用于事件多路分离的OS抽象既复杂又难以使用,因而也容易出错。反应器本质上提供一组更高级的编程抽象,简化了事件驱动的分布式应用的设计和实现。除此而外,反应器还将若干不同种类的事件的多路分离集成到易于使用的API中。特别地,反应器对基于定时器的事件、信号事件、基于I/O端口监控的事件和用户定义的通知进行统一地处理。
二、 使用
a) 如何使用:
首先应用开发者
1. 创建事件处理器,以处理他所感兴趣的某事件。
2. 在反应器上登记,通知说他有兴趣处理某事件,同时传递他想要用以处理此事件的事件处理器的指针给反应器。
然后反应器框架将自动地
1. 在内部维护一些表,将不同的事件类型与事件处理器对象关联起来。
2. 在用户已登记的某个事件发生时,反应器发出对处理器中相应方法的回调。
b) 基于定时器的事件简单示例:
#include <ace/Reactor.h>
#include <ace/Event_Handler.h>
#include <iostream>
using namespace std;
class timer_handler : public ACE_Event_Handler
{
public:
virtual int handle_timeout (const ACE_Time_Value ¤t_time,const void *act = 0)
{
cout << "handle_timeout" << endl;
return -1;
}
virtual int handle_close(ACE_HANDLE handle,ACE_Reactor_Mask close_mask)
{
cout << "handle_close" << endl;
delete this;
return 0;
}
private:
~timer_handler()
{
}
};
int ACE_TMAIN (int argc,ACE_TCHAR *argv[])
{
ACE_Time_Value time_(4);
ACE_Reactor::instance()->schedule_timer(new timer_handler,NULL,time_,time_);
while (true)
{
ACE_Reactor::instance()->handle_events();
}
return 0;
}
c) 基于信号事件的简单示例
#include <ace/Reactor.h>
#include <ace/Event_Handler.h>
#include <iostream>
using namespace std;
#include <signal.h>
class signal_handler : public ACE_Event_Handler
{
public:
virtual int handle_signal(int signum,siginfo_t * /* = 0 */,ucontext_t * /* = 0 */)
{
cout << "signum = " << signum << endl;
return 0;
}
};
int ACE_TMAIN (int argc,ACE_TCHAR *argv[])
{
signal_handler handler_;
ACE_Reactor::instance()->register_handler(SIGBREAK,&handler_);
ACE_Reactor::instance()->register_handler(SIGINT,&handler_);
while (true)
{
ACE_Reactor::instance()->handle_events();
}
return 0;
}
d) 基于I/O端口监控事件的简单示例
#include <ace/INET_Addr.h>
#include <ace/SOCK_Acceptor.h>
#include <ace/SOCK_Stream.h>
class io_handler : public ACE_Event_Handler
{
public:
virtual int handle_input(ACE_HANDLE fd /* = ACE_INVALID_HANDLE */)
{
cout << "handle_input" << endl;
ACE_SOCK_Stream stream_;
ACE_INET_Addr addr_;
m_acceptor.accept(stream_,&addr_);
stream_.send("joise",6);
return 0;
}
io_handler()
{
ACE_INET_Addr addr_(8004);
m_acceptor.open(addr_);
}
virtual ACE_HANDLE get_handle() const
{
return m_acceptor.get_handle();
}
private:
ACE_SOCK_Acceptor m_acceptor;
};
int ACE_TMAIN (int argc,ACE_TCHAR *argv[])
{
io_handler handler_;
if (ACE_Reactor::instance()->register_handler(&handler_,ACE_Event_Handler::ACCEPT_MASK) != 0)
cout << "register Failed" << endl;
while (ACE_Reactor::instance()->event_loop_done() == 0)
{
ACE_Reactor::instance()->handle_events();
}
return 0;
}
class notify_handler : public ACE_Event_Handler
{
public:
virtual int handle_input(ACE_HANDLE fd /* = ACE_INVALID_HANDLE */)
{
cout << "handle_input" << endl;
return 0;
}
};
int ACE_TMAIN (int argc,ACE_TCHAR *argv[])
{
notify_handler handler_;
while (ACE_Reactor::instance()->event_loop_done() == 0)
{
ACE_Time_Value time_(4);
ACE_Reactor::instance()->handle_events(time_);
ACE_Reactor::instance()->notify(&handler_,ACE_Event_Handler::READ_MASK);
}
return 0;
}
f) ACE_Event_Handler中提供以备继承的事件回调方法:
在子类中重载,所处理事件的类型: |
|
handle_signal() |
|
handle_input() |
|
handle_exception() |
|
handle_timeout() |
|
handle_output() |
g) 在处理器上登记或删除某事件:register_handler、remove_handler、schedule_timer
掩码 |
回调方法 |
何时 |
和⋯⋯一起使用 |
ACE_Event_Handler::READ_MASK |
handle_input() |
在句柄上有数据可读时。 |
register_handler() |
ACE_Event_Handler::WRITE_MASK |
handle_output() |
在I/O设备输出缓冲区上有可用空间、并且新数据可以发送给它时。 |
register_handler() |
ACE_Event_Handler::TIMER_MASK |
handle_close() |
传给handle_close()以指示调用它的原因是超时。 |
接受器和连接器的handle_timeout方法。反应器不使用此掩码。 |
ACE_Event_Handler::ACCEPT_MASK |
handle_input() |
在OS内部的侦听队列上收到了客户的新连接请求时。 |
register_handler() |
ACE_Event_Handler::CONNECT_MASK |
handle_input() |
对于非阻塞连接,在连接已经建立时。 |
register_handler() |
ACE_Event_Handler::DONT_CALL |
None. |
remove_handler() |
h) 生命期:
i.静态管理事件处理器的生命期
在栈上声明一个事件处理器,静态管理其生命期,则
l 使用方法register_handler或schedule_timer将某事件登记到反应器上,其在反应器之上的生命期开始
l 使用方法remove_handler显式的将事件从反应器上移除,其在反应器之上的生命期结束,或
l 在事件处理器相应的handle_*方法被回调时返回-1,则框架会自动的为此事件处理器检测已登记的事件并且将其从反应器之上移除,在移除之前,会调用事件处理器的handle_close回调方法以清除资源,调用之后,其在反应器之上的生命期也结束。
ii.动态管理事件处理器的生命期
由于上述静态管理事件处理器的生命期可能会导致对已经不存在的事件处理器进行分派的问题,所以一般使用动态管理事件器的生命期:
class dynamic_handler : public ACE_Event_Handler
{
private:
~dynamic_handler(){}
ACE_Reactor_Mask m_mask;
public:
virtual int handle_close(ACE_HANDLE handle,ACE_Reactor_Mask close_mask)
{
ACE_CLR_BITS(m_mask,close_mask);
switch(close_mask)
{
case READ_MASK:
;// 执行read相关的清除逻辑
break;
...
}
if (m_mask == 0)
delete this;
}
};
三、 实现
a) 框架类图
b) ACE_Reactor类详解
初始化及清理:
i. ACE_Reactor、Open:创建并初始化Reactor实例
ii. ~ACE_Reactor、Close:清理反应器在初始化时分配的资源
事件处理器的管理:
iii. register_handler:登记事件处理器
iv. remove_handler:移除事件处理器
v. suspend_handler:暂停分派事件给事件处理器
vi. resume_handler:恢复分派事件给事件处理器
vii. mask_ops:获取、设计、增加或是清除与某事件处理器相关的事件类型及其掩码
viii. schedulee_wakeup:将指定的掩码增加到某事件处理器的条目中,该处理器在此之前必须已经通过register_handler作了登记
ix. cancel_wakeup:从某事件处理器的条目中清除指定的掩码,但并不移除此处理器
事件的分派管理:
x. handle_events:等待事件发生,并随即分派与之相关联的事件处理器。
xi. run_event_loop:反复调用handle_events方法,直到其失败或是event_loop_done返回1,或是发生超时。
xii. end_event_loop:指示反应器关闭事件循环
xiii. event_loop_done:在end_reactor_event_loop调用结束之后返回1
定时器管理:
xiv. schedule_timer:登记一个事件处理器,它将在用户规定的时间之后执行,相当于一个定时器
xv. cancel_timer:取消一个或多个先前登记的定时器
通知管理:
xvi. notify:通知指定的事件处理器有某事件发生
xvii. max_notify_iterations:设置反应器在其通知机制中分派的处理器的最大数目
其它方法:
xviii. instance:提供对反应器单子模式的访问
c) 在Reactor框架中,使用了Bridge模式来实现保持ACE_Reactor接口恒定而又可以扩展或增强其接口。下图是
i. ACE_Select_Reactor
ACE_Reactor接口的一种实现,使用select同步事件多路分离。
ii. ACE_TP_Reactor
ACE_Reactor接口的另一种实现,可以让一“池”线程并发的调用handle_events,从而解决某些应用对于单线程调用handle_events的性能不够问题。
iii. ACE_WFMO_Reactor
windows下的使用WaitForMultipleObjects函数来进行事件多路分离的一种实现。
iv. 。。。,还有一些其它的扩展的Reactor的实现。
四、 后记
Reactor是ACE框架中的基础框架,Acceptor-Connector、Proactor是依赖于Reactor的框架。