// ACE_Reactor_Client.cpp : 定义控制台应用程序的入口点。 // #include "stdafx.h" #include "ace/Reactor.h" #include "ace/SOCK_Connector.h" #include "ace/OS.h" #include "ace/Log_Msg.h" #include <string> #include <iostream> using namespace std; class MyClient:public ACE_Event_Handler { public: bool do_connect(string ip,int port,int local_port) { ACE_SOCK_Connector connector; ACE_INET_Addr local_addr(local_port,"0.0.0.0"); ACE_INET_Addr addr(port,ip.c_str()); ACE_Time_Value timeout(5,0); if(connector.connect(peer_sock,addr,&timeout,local_addr) != 0) { cout<<"connect fail."<<endl; return false; } int ret = ACE_Reactor::instance()->register_handler(this,ACE_Event_Handler::READ_MASK); if (ret != 0) { cout<<"local_port:"<<local_port<<" register_handler fail."<<endl; return false; } sprintf(buf,"%d",local_port); peer_sock.send(buf,strlen(buf)+1); return true; } ACE_HANDLE get_handle(void) const { return peer_sock.get_handle(); } int handle_input (ACE_HANDLE fd) { int rev=0; ACE_Time_Value timeout(5,0); if((rev=peer_sock.recv(buf,sizeof(buf),&timeout))>0) { buf[rev]='\0'; cout<<"recv: "<<buf<<endl; } ACE_INET_Addr raddr; peer_sock.get_local_addr(raddr); //ACE_DEBUG ((LM_DEBUG,ACE_TEXT ( " (%P|%t) close:%s %d\n " ),raddr.get_host_addr(),raddr.get_port_number())); sprintf(buf,raddr.get_port_number()); peer_sock.send(buf,strlen(buf)+1); return 0; } private: ACE_SOCK_Stream peer_sock; char buf[100]; }; #include <ace/OS.h> #include <ace/Task.h> class TTcpNetThread : public ACE_Task_Base { public: /// 运行 int open(); /// 停止运行 int close(); protected: /// 线程函数 virtual int svc(); }; int TTcpNetThread::open() { return this->activate(); } int TTcpNetThread::close() { ACE_Reactor::instance()->end_reactor_event_loop(); // 终止ACE_Proactor循环 this->wait(); // 等待清理现场 return 0; } int TTcpNetThread::svc() { // Proactor的事件循环开始 while(!ACE_Reactor::instance()->event_loop_done()) { ACE_Reactor::instance()->handle_events(); } ACE_DEBUG((LM_DEBUG,ACE_TEXT("Network fin\n"))); return 0; } /********************************************************************************************** 在Socket编程中,常见的事件就是"读就绪","写就绪",通过对这两个事件的捕获分发,可以实现Socket中的异步操作。 Socket编程中的事件处理器 在前面我们已经介绍过,在ACE反应器框架中,任何都必须派生自ACE_Event_Handler类,并通过重载其相应会调事件处理函数来实现相应的回调处理的。在Socket编程中,我们通常需要重载的函数有 1.handle_input() 当I/O句柄(比如UNIX中的文件描述符)上的输入可用时,反应器自动回调该方法。 2.handle_output() 当I/O设备的输出队列有可用空间时,反应器自动回调该方法。 3.handle_close() 当事件处理器中的事件从Reactor中移除的时候调用。 此外,为了使Reactor能通过I/O句柄找到对应的事件处理器,还必须重载其get_handle()方法以使得Reactor建立起I/O句柄和事件处理器的关联。 ***********************************************************************************************/ #pragma comment(lib,"ACEd.lib") #define CLIENT_THREAD_NUM 4 int main(int argc,char *argv[]) { for (int i=0;i<2000;i++) { MyClient *client = new MyClient; if (!client->do_connect("127.0.0.1",4567,10000+i)) break; } system("pause"); TTcpNetThread netThread[CLIENT_THREAD_NUM]; for(int i = 0; i < CLIENT_THREAD_NUM; i++) { netThread[i].open(); } while (getchar()) { ACE_OS::sleep(1); } /*while(true) { ACE_Reactor::instance()->handle_events(); }*/ return 0; }
// ACE_Reactor_Server.cpp : 定义控制台应用程序的入口点。 // #include "stdafx.h" #include <ace/Reactor.h> #include <ace/SOCK_Connector.h> #include <ace/SOCK_Acceptor.h> #include <ace/Auto_Ptr.h> #include "ace/OS.h" #include "ace/Log_Msg.h" #include <list> #pragma comment(lib,"ACEd.lib") class ClientService : public ACE_Event_Handler { public: ACE_SOCK_Stream &peer (void) { return this->sock_; } int regist_this(void) { //注册读就绪回调函数 return this->reactor ()->register_handler(this,ACE_Event_Handler::READ_MASK); } virtual ACE_HANDLE get_handle (void) const { return this->sock_.get_handle (); } virtual int handle_input (ACE_HANDLE fd ) { int rev = peer().recv(buf,sizeof(buf)); if(rev<=0) return -1; buf[rev] = '\0'; printf("recv:%s",buf); return 0; } // 释放相应资源 virtual int handle_close (ACE_HANDLE,ACE_Reactor_Mask mask) { if (mask == ACE_Event_Handler::WRITE_MASK) return 0; mask = ACE_Event_Handler::ALL_EVENTS_MASK | ACE_Event_Handler::DONT_CALL; this->reactor ()->remove_handler (this,mask); this->sock_.close (); delete this; //socket出错时,将自动删除该客户端,释放相应资源 return 0; } protected: char buf[100]; ACE_SOCK_Stream sock_; }; class ClientAcceptor : public ACE_Event_Handler { public: virtual ~ClientAcceptor (){this->handle_close (ACE_INVALID_HANDLE,0);} int start_listen (const ACE_INET_Addr &listen_addr) { if (this->acceptor_.open (listen_addr,1) == -1) { printf("open port fail\n"); return -1; } //注册接受连接回调事件 return this->reactor ()->register_handler(this,ACE_Event_Handler::ACCEPT_MASK); } virtual ACE_HANDLE get_handle (void) const { return this->acceptor_.get_handle (); } virtual int handle_input (ACE_HANDLE fd ) { ClientService *client = new ClientService(); auto_ptr<ClientService> p (client); if (this->acceptor_.accept (client->peer ()) == -1) { printf("accept client fail\n"); return -1; } p.release (); client->reactor (this->reactor ()); if (client->regist_this () == -1) client->handle_close (ACE_INVALID_HANDLE,0); return 0; } virtual int handle_close (ACE_HANDLE handle,ACE_Reactor_Mask close_mask) { if (this->acceptor_.get_handle () != ACE_INVALID_HANDLE) { ACE_Reactor_Mask m = ACE_Event_Handler::ACCEPT_MASK | ACE_Event_Handler::DONT_CALL; this->reactor ()->remove_handler (this,m); this->acceptor_.close (); } return 0; } protected: ACE_SOCK_Acceptor acceptor_; }; int main1(int argc,char *argv[]) { ACE_INET_Addr addr(4567,"127.0.0.1"); ClientAcceptor server; server.reactor(ACE_Reactor::instance()); server.start_listen(addr); while(true) { ACE_Reactor::instance()->handle_events(); } return 0; } //////////////////////////////////////////////// #define MAX_BUFF_SIZE 1024 #define LISTEN_PORT 4567 #define SERVER_IP ACE_LOCALHOST class ClientHandler : public ACE_Event_Handler { public: friend class ServerAcceptor; public: ClientHandler(){} ~ClientHandler() { sock_stream.close(); ACE_Reactor::instance()->remove_handler(this,ACE_Event_Handler::READ_MASK | ACE_Event_Handler::DONT_CALL); } int send_some(const void *buff,int bytes) { return sock_stream.send(buff,bytes); } ACE_SOCK_Stream& GetStream(){return sock_stream;} //给accept提供接口绑定数据通道 public: virtual int handle_input(ACE_HANDLE fd); //I/O触发事件后调用 virtual ACE_HANDLE get_handle(void) const {return sock_stream.get_handle();} //不重载需要手动将handle传入ACE_Reactor private: ACE_INET_Addr Cli_addr; ACE_SOCK_Stream sock_stream; }; int ClientHandler::handle_input(ACE_HANDLE fd) { char strBuffer[MAX_BUFF_SIZE]; int byte = sock_stream.recv(strBuffer,MAX_BUFF_SIZE); //可读数据 if (-1 == byte) { ACE_DEBUG((LM_INFO,ACE_TEXT("receive data Failed\n"))); } else if(0 == byte) { sock_stream.close(); ACE_Reactor::instance()->remove_handler(this,ACE_Event_Handler::READ_MASK | ACE_Event_Handler::DONT_CALL); ACE_DEBUG((LM_INFO,ACE_TEXT("client closed!\n"))); } else { ACE_DEBUG((LM_INFO,ACE_TEXT("receive:%s\n"),strBuffer)); sock_stream.send(strBuffer,strlen(strBuffer)+1); } return 0; } // ServerAcceptor class ServerAcceptor : public ACE_Event_Handler { public: ServerAcceptor(int port,char* ip); ~ServerAcceptor(); virtual int handle_input(ACE_HANDLE fd); // ACE框架回调 virtual ACE_HANDLE get_handle(void) const {return Svr_aceept.get_handle();} private: ACE_INET_Addr Svr_addr; ACE_SOCK_Acceptor Svr_aceept; std::list<ClientHandler*> m_streamPool; //stream pool }; ServerAcceptor::ServerAcceptor(int port,char* ip):Svr_addr(port,ip) { if (-1 == Svr_aceept.open(Svr_addr,1)) { ACE_DEBUG((LM_ERROR,ACE_TEXT("accept open Failed\n"))); Svr_aceept.close(); } ACE_DEBUG((LM_ERROR,ACE_TEXT("accept open success\n"))); } ServerAcceptor::~ServerAcceptor() { ACE_Reactor::instance()->remove_handler(this,ACE_Event_Handler::ACCEPT_MASK); Svr_aceept.close(); std::list<ClientHandler*>::iterator it; for (it = m_streamPool.begin();it != m_streamPool.end();++it) { if (NULL != (*it)) { delete (*it); } } } #include "ace/SOCK_SEQPACK_Association.h" int ServerAcceptor::handle_input(ACE_HANDLE fd ) { ClientHandler *stream = new ClientHandler(); //产生新通道 if (NULL != stream) { m_streamPool.push_back(stream); } if (Svr_aceept.accept(stream->GetStream()) == -1) //绑定通道 { printf("accept client fail\n"); return -1; } ACE_Reactor::instance()->register_handler(stream,ACE_Event_Handler::READ_MASK); //通道注册到ACE_Reactor ACE_INET_Addr raddr; stream->GetStream().get_remote_addr(raddr); ACE_DEBUG ((LM_DEBUG,ACE_TEXT ( "client:%s %d\n" ),raddr.get_port_number())); /*ACE_INET_Addr addr; ACE_SOCK_SEQPACK_Association ass=ACE_SOCK_SEQPACK_Association(fd); size_t addr_size=sizeof ACE_INET_Addr; ass.get_remote_addrs(&addr,addr_size); ACE_OS::printf("fd:%d ip:%d port:%d\n",(int)fd,addr.get_ip_address(),addr.get_port_number());*/ //ACE_DEBUG((LM_ERROR,ACE_TEXT("User connect success!\n"))); return 0; } #include <ace/OS.h> #include <ace/Task.h> class TTcpNetThread : public ACE_Task_Base { public: /// 运行 int open(); /// 停止运行 int close(); protected: /// 线程函数 virtual int svc(); }; int TTcpNetThread::open() { return this->activate(); } int TTcpNetThread::close() { ACE_Reactor::instance()->end_reactor_event_loop(); // 终止ACE_Proactor循环 this->wait(); // 等待清理现场 return 0; } int TTcpNetThread::svc() { ACE_Reactor::instance()->run_reactor_event_loop(); ACE_DEBUG((LM_DEBUG,ACE_TEXT("Network fin\n"))); return 0; } #define CLIENT_THREAD_NUM 4 int main(int argc,char *argv[]) { ServerAcceptor server(LISTEN_PORT,(char *)SERVER_IP); //listen port注册到ACE_Reactor ACE_Reactor::instance()->register_handler(&server,ACE_Event_Handler::ACCEPT_MASK); TTcpNetThread netThread[CLIENT_THREAD_NUM]; for(int i = 0; i < CLIENT_THREAD_NUM; i++) { netThread[i].open(); } while (getchar()) { ACE_OS::sleep(1); } //进入消息循环,有I/O事件回调handle_input //ACE_Reactor::instance()->run_reactor_event_loop(); return 0; }