ACE_Reactor是反应堆的概念
- ACE_Reactor * ACE_Reactor::instance (void); 将返回缺省的反应堆。
- ACE_Reactor * ACE_Reactor::instance (ACE_Reactor *,int delete_reactor = 0); 设置新的缺省反应堆,并把旧的返回
下面的代码例子是重定义缺省的反应堆
ACE_Reactor *mReactor0 = /
new(std::nothrow) ACE_Reactor(new(std::nothrow) ACE_TP_Reactor(ACE::max_handles()),1);
ACE_Reactor::instance(mReactor0,1);
以后当使用ACE_Reactor::instance (void)都将返回mReactor0这个
当然用户可以创建多个反应堆,可以把其中一个设为缺省反应堆,或者不设。
查看Reactor.C源代码,发现缺省的反应堆是ACE_TP_Reactor类型的
#if defined (ACE_USE_TP_REACTOR_FOR_REACTOR_IMPL)
ACE_NEW (impl,
ACE_TP_Reactor);
#else
ACE_NEW (impl,
ACE_Select_Reactor);
#endif /* ACE_USE_TP_REACTOR_FOR_REACTOR_IMPL */
下面的代码是作为服务端运行的一个例子。
/#:> cat test.C #include "ace/Event_Handler.h" #include "ace/SOCK_Acceptor.h" #include "ace/SOCK_Stream.h" #include "ace/Reactor.h" #include "ace/OS.h" #include <iostream> using namespace std; /** * One instance used to listen on port,to accept client request. */ class Listener_Handler : public ACE_Event_Handler { public: virtual int open(const ACE_INET_Addr & addr); /// override functions from ACE_Event_Handler virtual int handle_input(ACE_HANDLE handle = ACE_INVALID_HANDLE); virtual int handle_close(ACE_HANDLE handle,ACE_Reactor_Mask close_mask); virtual ACE_HANDLE get_handle() const; private: ACE_SOCK_Acceptor mListener; }; /** * multiple instances whenever a client is coming. */ class Connector_Handler : public ACE_Event_Handler { public: Connector_Handler(); virtual int open(); /// overrides from ACE_Event_Handler virtual int handle_input(ACE_HANDLE handle = ACE_INVALID_HANDLE); // handle_output will be called whenever it's writable when ACE_TP_Reactor is used virtual int handle_output(ACE_HANDLE handle = ACE_INVALID_HANDLE); virtual int handle_close(ACE_HANDLE handle,ACE_Reactor_Mask close_mask); virtual ACE_HANDLE get_handle() const; ACE_SOCK_Stream & peer (); private: ACE_SOCK_Stream mStream; ACE_Thread_Mutex mMutex; bool hasSendData; char mSendBuffer[4096]; // ACE_Guard<ACE_Thread_Mutex> guard(PIN_PMTpsCounterThread::threadStateMutex); }; int main(int argc,char* argv[]) { ACE_INET_Addr addr("0.0.0.0:1234"); Listener_Handler * listener = new Listener_Handler(); int ret = listener->open(addr); if (ret == -1) { cout << "Call Listener_Handler::open() Failed" << endl; return -1; } cout << "Server is waiting on ACE_Reactor::instance()->run_event_loop() ..." << endl; return ACE_Reactor::instance()->run_event_loop(); } /// /// Implementation of class Listener_Handler /// int Listener_Handler::open(const ACE_INET_Addr & addr) { int ret = mListener.open(addr); if (ret == -1) { cout << "Call ACE_SOCK_Acceptor::open() Failed" << endl; return -1; } ret = ACE_Reactor::instance()->register_handler(this,ACCEPT_MASK); return ret; } int Listener_Handler::handle_input(ACE_HANDLE handle) { ACE_INET_Addr remote; Connector_Handler * client_handler = new Connector_Handler(); int ret = mListener.accept(client_handler->peer(),&remote); if (ret == -1) { cout << "Call ACE_SOCK_Acceptor::accept() Failed" << endl; client_handler->handle_close(ACE_INVALID_HANDLE,NULL_MASK); return -1; } cout << "Received connection from " << remote.get_host_name() << ":" << remote.get_port_number() << endl; return client_handler->open(); } int Listener_Handler::handle_close(ACE_HANDLE handle,ACE_Reactor_Mask close_mask) { mListener.close(); return -1; } ACE_HANDLE Listener_Handler::get_handle() const { return mListener.get_handle(); } /// /// Implementation of class Connector_Handler /// Connector_Handler::Connector_Handler() { ACE_OS::memset(mSendBuffer,sizeof(mSendBuffer)); hasSendData = false; } int Connector_Handler::open() { int ret = ACE_Reactor::instance()->register_handler(this,READ_MASK | WRITE_MASK); // parameter 'this' pointer must have a function 'get_handle()' return ret; } int Connector_Handler::handle_input(ACE_HANDLE handle) { char tmp[1024]; ACE_OS::memset(tmp,sizeof(tmp)); size_t size = mStream.recv(tmp,sizeof(tmp)); if (size == 0) // connection loss { //cout << "Call ACE_SOCK_Stream::recv() closed" << endl; return -1; } else if (size == -1) // block { // Failed return -1; } else { cout << "Connector_Handler::handle_input() get [" << size << "] message: " << tmp << endl; ACE_Guard<ACE_Thread_Mutex> guard(mMutex); ACE_OS::strcpy(mSendBuffer + ACE_OS::strlen(mSendBuffer),tmp); hasSendData = true; } return 0; } int Connector_Handler::handle_output(ACE_HANDLE handle) { if (hasSendData) { ACE_Guard<ACE_Thread_Mutex> guard(mMutex); if (ACE_OS::strlen(mSendBuffer) <= 0 ) { // no data need to be sent out. return 0; } size_t count = mStream.send(mSendBuffer,ACE_OS::strlen(mSendBuffer)); if (count == 0) // connection loss { cout << "Call ACE_SOCK_Stream::send() Failed" << endl; return -1; } else if (count == -1) // block { cout << "Call ACE_SOCK_Stream::send() Failed" << endl; return -1; } else { ACE_OS::memset(mSendBuffer,sizeof(mSendBuffer)); hasSendData = false; return 0; } } return 0; } int Connector_Handler::handle_close(ACE_HANDLE handle,ACE_Reactor_Mask close_mask) { if (close_mask == WRITE_MASK) { return 0; } else { ACE_INET_Addr remote; mStream.get_remote_addr(remote); cout << "Disconnected: " << remote.get_host_name() << ":" << remote.get_port_number() << endl; mStream.close(); return -1; // return value ignored by reactor } } ACE_HANDLE Connector_Handler::get_handle() const { return mStream.get_handle(); } ACE_SOCK_Stream & Connector_Handler::peer() { return mStream; }
原文链接:https://www.f2er.com/react/308361.html