ACE:Reactor框架处理事件和多个I/O流

前端之家收集整理的这篇文章主要介绍了ACE:Reactor框架处理事件和多个I/O流前端之家小编觉得挺不错的,现在分享给大家,也给大家做个参考。

ACE Reactor框架:

只要做三件事:

1.从ACE_Event_Handler派生一个或多个类,并给各个虚回调方法增加应用特有的事件处理行为

2.向ACE_Reactor类登记应用的事件处理对象,把每个事件处理对象与它感兴趣的事件关联起来

3.运行ACE_Reactor事件循环

一个接受连接的例子:

1.#include <iostream> 2.#include "ace/auto_ptr.h" 3.#include "ace/log_msg.h" 4.#include "ace/inet_addr.h" 5.#include "ace/sock_acceptor.h" 6.#include "ace/reactor.h" 7.8.#include "ace/Message_Block.h" 9.#include "ace/Message_Queue.h" 10.#include "ace/SOCK_Stream.h" 11.12.#include "ace/Null_Mutex.h" 13.#include "ace/Null_Condition.h" 14.15.16.using namespace std;@H_403_13@17.//服务客户 18.class ClientService:public ACE_Event_Handler@H_403_13@19.{@H_403_13@20.public:@H_403_13@21. ACE_SOCK_Stream &peer(void)@H_403_13@22. {@H_403_13@23. return this->sock_;@H_403_13@24. }@H_403_13@25. int open(void);@H_403_13@26. virtual ACE_HANDLE get_handle(void) const27. {@H_403_13@28. return this->sock_.get_handle();@H_403_13@29. }@H_403_13@30. virtual int handle_input(ACE_HANDLE fd=ACE_INVALID_HANDLE);@H_403_13@31. virtual int handle_output(ACE_HANDLE fd=ACE_INVALID_HANDLE);@H_403_13@32. virtual int handle_close(ACE_HANDLE handle,ACE_Reactor_Mask close_mask);@H_403_13@33.34.protected:@H_403_13@35. ACE_SOCK_Stream sock_;@H_403_13@36. ACE_Message_Queue<ACE_NULL_SYNCH> output_queue_;@H_403_13@37.};@H_403_13@38.int ClientService::open(void)@H_403_13@39.{@H_403_13@40. ACE_TCHAR peer_name[512];@H_403_13@41. ACE_INET_Addr peer_addr;@H_403_13@42. if(this->sock_.get_remote_addr(peer_addr)==0&&peer_addr.addr_to_string(peer_name,512)==0)@H_403_13@43. cout<<" connection from "<<peer_name<<endl;@H_403_13@44. return this->reactor()->register_handler(this,ACE_Event_Handler::READ_MASK);@H_403_13@45.}@H_403_13@46.int ClientService::handle_input(ACE_HANDLE)@H_403_13@47.{@H_403_13@48. const size_t INPUT_SIZE=4096;@H_403_13@49. char buffer[INPUT_SIZE];@H_403_13@50. ssize_t recv_cnt,send_cnt;@H_403_13@51. if((recv_cnt=this->sock_.recv(buffer,sizeof(buffer)))<=0)@H_403_13@52. {@H_403_13@53. ACE_DEBUG((LM_DEBUG,ACE_TEXT("(%P|%t) connection closed/n")));@H_403_13@54. return -1;@H_403_13@55. }@H_403_13@56. send_cnt=this->sock_.send(buffer,ACE_static_cast(size_t,recv_cnt));@H_403_13@57. if(send_cnt==recv_cnt)@H_403_13@58. return 0;@H_403_13@59. if(send_cnt==-1&&ACE_OS::last_error()!=EWOULDBLOCK)@H_403_13@60. ACE_ERROR_RETURN((LM_ERROR,ACE_TEXT("%P|%t) %p/n"),ACE_TEXT("send")),0);@H_403_13@61. if(send_cnt==-1)@H_403_13@62. send_cnt=0;@H_403_13@63. ACE_Message_Block *mb;@H_403_13@64. size_t remaining=ACE_static_cast(size_t,(recv_cnt-send_cnt));@H_403_13@65. ACE_NEW_RETURN(mb,ACE_Message_Block(&buffer[send_cnt],remaining),-1);@H_403_13@66. int output_off=this->output_queue_.is_empty();@H_403_13@67. ACE_Time_Value nowait(ACE_OS::gettimeofday());@H_403_13@68. if(this->output_queue_.enqueue_tail(mb,&nowait)==-1)@H_403_13@69. { @H_403_13@70. ACE_ERROR((LM_ERROR,ACE_TEXT("(%P|%t)%P;discarding data/n"),ACE_TEXT("enqueue Failed ")));@H_403_13@71. mb->release();@H_403_13@72. return 0;@H_403_13@73. }@H_403_13@74. if(output_off)@H_403_13@75. return this->reactor()->register_handler(this,ACE_Event_Handler::WRITE_MASK);@H_403_13@76. return 0;@H_403_13@77.78.79.}@H_403_13@80.int ClientService::handle_output(ACE_HANDLE)@H_403_13@81.{@H_403_13@82. ACE_Message_Block *mb;@H_403_13@83. ACE_Time_Value nowait(ACE_OS::gettimeofday());@H_403_13@84. while(0==this->output_queue_.dequeue_head(mb,&nowait))@H_403_13@85. {@H_403_13@86. ssize_t send_cnt=this->sock_.send(mb->rd_ptr(),mb->length());@H_403_13@87. if(send_cnt==-1)@H_403_13@88. ACE_ERROR((LM_ERROR,ACE_TEXT("(%P|%t)%p/n"),ACE_TEXT("send")));@H_403_13@89. else90. mb->rd_ptr(ACE_static_cast(size_t,send_cnt));@H_403_13@91. if(mb->length()>0)@H_403_13@92. {@H_403_13@93. this->output_queue_.enqueue_head(mb);@H_403_13@94. break;@H_403_13@95. }@H_403_13@96. mb->release();@H_403_13@97. }@H_403_13@98. return (this->output_queue_.is_empty())?-1:0;@H_403_13@99.}@H_403_13@100.int ClientService::handle_close(ACE_HANDLE,ACE_Reactor_Mask mask)@H_403_13@101.{@H_403_13@102. if(mask==ACE_Event_Handler::WRITE_MASK)@H_403_13@103. return 0;@H_403_13@104. mask=ACE_Event_Handler::ALL_EVENTS_MASK|ACE_Event_Handler::DONT_CALL;@H_403_13@105. this->reactor()->remove_handler(this,mask);@H_403_13@106. this->sock_.close();@H_403_13@107. this->output_queue_.flush();@H_403_13@108. delete this;@H_403_13@109. return 0;@H_403_13@110.}@H_403_13@111.//接受客户 112.class ClientAccept:public ACE_Event_Handler@H_403_13@113.{@H_403_13@114.public:@H_403_13@115. virtual ~ClientAccept()@H_403_13@116. {@H_403_13@117. this->handle_close(ACE_INVALID_HANDLE,0);@H_403_13@118. }@H_403_13@119. int open(const ACE_INET_Addr &listen_addr);@H_403_13@120. virtual ACE_HANDLE get_handle(void) const121. {@H_403_13@122. return this->acceptor_.get_handle();@H_403_13@123. }@H_403_13@124. virtual int handle_input(ACE_HANDLE fd=ACE_INVALID_HANDLE);@H_403_13@125. virtual int handle_close(ACE_HANDLE handle,ACE_Reactor_Mask close_mask);@H_403_13@126.protected:@H_403_13@127. ACE_SOCK_Acceptor acceptor_;@H_403_13@128.};@H_403_13@129.int ClientAccept::open(const ACE_INET_Addr &listen_addr)@H_403_13@130.{@H_403_13@131. if(this->acceptor_.open(listen_addr,1)==-1)@H_403_13@132. {@H_403_13@133. ACE_ERROR_RETURN((LM_ERROR,ACE_TEXT("%p/n"),ACE_TEXT("acceptor.open")),-1);@H_403_13@134. }@H_403_13@135. return this->reactor()->register_handler(this,ACE_Event_Handler::ACCEPT_MASK);@H_403_13@136.}@H_403_13@137.int ClientAccept::handle_input(ACE_HANDLE)@H_403_13@138.{@H_403_13@139. ClientService *client;@H_403_13@140. ACE_NEW_RETURN(client,ClientService,-1);@H_403_13@141. auto_ptr<ClientService>p(client);@H_403_13@142. if(this->acceptor_.accept(client->peer())==-1)@H_403_13@143. ACE_ERROR_RETURN((LM_ERROR,ACE_TEXT("%P|%T)%p/N"),ACE_TEXT("Failed to accept ")ACE_TEXT("client connection")),-1);@H_403_13@144. p.release();@H_403_13@145. client->reactor(this->reactor());@H_403_13@146. if(client->open()==-1)@H_403_13@147. client->handle_close(ACE_INVALID_HANDLE,0);@H_403_13@148. return 0;@H_403_13@149.150.}@H_403_13@151.int ClientAccept::handle_close(ACE_HANDLE,ACE_Reactor_Mask)@H_403_13@152.{@H_403_13@153. if(this->acceptor_.get_handle()!=ACE_INVALID_HANDLE)@H_403_13@154. {@H_403_13@155. ACE_Reactor_Mask m=ACE_Event_Handler::ACCEPT_MASK|ACE_Event_Handler::DONT_CALL;@H_403_13@156. this->reactor()->remove_handler(this,m);@H_403_13@157.158. }@H_403_13@159. return 0;@H_403_13@160.}@H_403_13@161.int main(int argc,char *argv[])@H_403_13@162.{@H_403_13@163. ACE_INET_Addr port_to_listen(50000,ACE_LOCALHOST);@H_403_13@164. ClientAccept acceptor;@H_403_13@165. acceptor.reactor(ACE_Reactor::instance());@H_403_13@166. if(acceptor.open(port_to_listen)==-1)@H_403_13@167. return 1;@H_403_13@168. ACE_Reactor::instance()->run_reactor_event_loop();@H_403_13@169. return 0;@H_403_13@170.}@H_403_13@每个类要处理任何类型的Reactor事件的类,必须从ACE_Event_Handler派生,虽然可以用一个类控制接受和所有客户的连接,但还是创建“连接接受”和“连接服务”不同的类比较好!

1.这样可以更好的封装数据和行为,这个类接受来自客户的连接,而这是他所做的全部事情

2.代表客户的类将为客户连接提供服务

在针对一些I/O事件向反应器登记某个事件处理器时,反应器会把一个ACE_Event_Handler指针与一个句柄以及处理器感兴趣的I/O事件类型关联在一起!

当I/O事件触发时,会回调特定的句柄传给handle_input()方法的ACE_HANDLE参数

而在上面程序例子中,创建了一个clientservice实例,为每个连接使用单独的服务处理对象,所以每次接受新的连接都会得到一个新的CLIENTSERVICE实例

为了对要发送的数据进行排队,CLientService用一个ACE_Message_Queue,当要对稍后发送的数据进行排队时,分配一个ACE_Message_Block保存这些数据,并把它放入队列中,以备后用,如果我们无法把数据放入队列,我们就会放弃,抛弃那些数据。如果在我们尝试把余下的数据放入队列之前,输出队列是空的,我们就会再向反应器登记这个处理,这一次针对的是 WRITE事件

ACE_Message_Queue

通过在类声明是指定锁类型就可以很方便实现进程,线程安全的消息队列@H_403_13@ACE_Message_Queue<ACE_MT_SYNCH> message_queue_;如果程序是单线程的话,@H_403_13@可以ACE_Message_Queue<ACE_NULL_SYNCH> message_queue_。

ACE_Message_Block功能简介 @H_403_13@ACE_Message_Block在Ace中用来表示消息的存放空间,可用做网络通信中的消息缓冲区,使用非常频繁,下面将在如下方简单的介绍一下ACE_Message_Block相关功能

1.创建消息块 @H_403_13@2.释放消息块 @H_403_13@3.从消息块中读写数据 @H_403_13@4.数据的拷贝 @H_403_13@5.其它常用函数 @H_403_13@1。创建消息块

创建消息块的方式比较灵活,常用的有以下几种方式 :

1。直接给消息块分配内存空间创建。

ACE_Message_Block *mb = new ACE_Message_Block (30);

2。共享底层数据块创建。

char buffer[100];@H_403_13@ ACE_Message_Block *mb = new ACE_Message_Block (buffer,30);

这种方式共享底层的数据块,被创建的消息块并不拷贝该数据,也不假定自己拥有它的所有权。在消息块mb被销毁时,相关联的数据缓冲区data将不会被销毁。这是有意义的:消息块没有拷贝数据,因此内存也不是它分配的,这样它也不应该负责销毁它。

3。通过duplicate()函数从已有的消息块中创建副本。

ACE_Message_Block *mb = new ACE_Message_Block (30);@H_403_13@ ACE_Message_Block *mb2 = mb->duplicate();

这种方式下,mb2和mb共享同一数据空间,使用的是ACE_Message_Block的引用计数机制。它返回指向要被复制的消息块的指针,并在内部增加内部引用计数。

4。通过clone()函数从已有的消息块中复制。

ACE_Message_Block *mb = new ACE_Message_Block (30);@H_403_13@ ACE_Message_Block *mb2 = mb->clone();

clone()方法实际地创建整个消息块的新副本,包括它的数据块和附加部分;也就是说,这是一次"深拷贝"。

2。释放消息块

一旦使用完消息块,程序员可以调用它的release()方法来释放它。

1.如果消息数据内存是由该消息块分配的,调用release()方法就也会释放此内存。 @H_403_13@2.如果消息块是引用计数的,release()就会减少计数,直到到达0为止;之后消息块和与它相关联的数据块才从内存中被移除。 @H_403_13@3.如果消息块是通过共享已分配的底层数据块创建的,底层数据块不会被释放。 @H_403_13@无论消息块是哪种方式创建的,只要在使用完后及时调用release()函数,就能确保相应的内存能正确的释放。

3。从消息块中读写数据

ACE_Message_Block提供了两个指针函数以供程序员进行读写操作,rd_ptr()指向可读的数据块地址,wr_ptr()指向可写的数据块地址,默认情况下都执行数据块的首地址。下面的例子简单了演示它的使用方法

#include "ace/Message_Queue.h"@H_403_13@#include "ace/OS.h"

int main(int argc,char *argv[]) @H_403_13@{@H_403_13@ ACE_Message_Block *mb = new ACE_Message_Block (30);@H_403_13@ ACE_OS::sprintf(mb->wr_ptr(),"%s","hello");@H_403_13@ ACE_OS::printf("%s/n",mb->rd_ptr ());@H_403_13@ mb->release();@H_403_13@ return 0; @H_403_13@}

注意:这两个指针所指向的位置并不会自动移动,在上面的例子中,函数执行完毕后,执行的位置仍然是最开始的0,而不是最新的可写位置5,程序员需要通过wr_ptr(5)函数手动移动写指针的位置。

4。数据的拷贝

一般的数据的拷贝可以通过函数来实现数据的拷贝,copy()还会保证wr_ptr()的更新,使其指向缓冲区的新末尾处。

下面的例子演示了copy()函数用法

mb->copy("hello");@H_403_13@ mb->copy("123",4);

注意:由于c++是以'/0'作为字符串结束标志的,对于上面的例子,底层数据块中保存的是"hello/0123/0",而用ACE_OS::printf("%s/n",mb->rd_ptr ());打印出来的结果是"hello",使用copy函数进行字符串连接的时候需要注意。

5。其它常用函数

1.length() 返回当前的数据长度 @H_403_13@2.next() 获取和设置下一个ACE_Message_Block的链接。(用来建立消息队列非常有用) @H_403_13@3.space() 获取剩余可用空间大小 @H_403_13@4.size() 获取和设置数据存储空间大小。 @H_403_13@注:ACE_NEW_RETURN的意思用new动态生成一个参数2类型的空间,并将空间的首地址副给第一个参数。如果有错误产生则将第一个参数的值设为空,并返回值RET_VAL。

本文来自CSDN博客,转载请标明出处:http://blog.csdn.net/jacklam200/archive/2008/12/05/3455481.aspx

猜你在找的React相关文章