这个模式为同步读+多线程处理的一个模型,在Windows下面默认的实现是ACE_WFMO_Reactor,他内部使用WaitForMuiltiObject来等待OVERLAPPED当中的event句柄,LINUX下使用ACE_Select_Reacotr实现,内部使用select函数来分配操作。
http://dl.vmall.com/c0bda5pwb4
Demo1
- #include "ace/Auto_Ptr.h"
- #include "ace/Log_Msg.h"
- #include "ace/INET_Addr.h"
- #include "ace/SOCK_Acceptor.h"
- #include "ace/Reactor.h"
- #include "ace/Message_Block.h"
- #include "ace/Message_Queue.h"
- #include "ace/SOCK_Stream.h"
- #include "ace/Synch_Traits.h"
- #include "ace/Synch.h"
- #include "ace/OS_NS_sys_time.h"
- #include "ace/os_include/os_netdb.h"
- class ClientAcceptor : public ACE_Event_Handler
- {
- public:
- virtual ~ClientAcceptor();
- int open(const ACE_INET_Addr &listenAddr);
- // Get this handler's I/O handle.
- virtual ACE_HANDLE get_handle(void) const
- {
- return this->m_acceptor.get_handle();
- }
- // Called when a connection is ready to accept.
- virtual int handle_input(ACE_HANDLE fd = ACE_INVALID_HANDLE);
- // Called when this handler is removed from the ACE_Reactor.
- virtual int handle_close(ACE_HANDLE handle,ACE_Reactor_Mask closeMask);
- protected:
- ACE_SOCK_Acceptor m_acceptor;
- };
- class ClientService : public ACE_Event_Handler
- {
- public:
- int open(void);
- ACE_SOCK_Stream &peer(void)
- {
- return this->m_sock;
- }
- // Get this handler's I/O handle.
- virtual ACE_HANDLE get_handle(void) const
- {
- return this->m_sock.get_handle();
- }
- // Called when input is available from the client
- virtual int handle_input(ACE_HANDLE fd = ACE_INVALID_HANDLE);
- // Called when output is possible.
- virtual int handle_output(ACE_HANDLE fd = ACE_INVALID_HANDLE);
- // Called when this handler is removed from the ACE_Reactor.
- virtual int handle_close(ACE_HANDLE handle,ACE_Reactor_Mask close_mask);
- protected:
- ACE_SOCK_Stream m_sock;
- ACE_Message_Queue<ACE_NULL_SYNCH> m_output_queue;
- };
- int ClientAcceptor::open(const ACE_INET_Addr &listenAddr)
- {
- if (this->m_acceptor.open(listenAddr,1) == -1)
- {
- ACE_ERROR_RETURN((LM_ERROR,ACE_TEXT("%p\n"),ACE_TEXT("acceptor.open")),-1);
- }
- /**
- * Register handler for I/O events.
- *
- * A handler can be associated with multiple handles. A handle
- * cannot be associated with multiple handlers.
- *
- * The handle will come from ACE_Event_Handler::get_handle().
- *
- * Reactor will call ACE_Event_Handler::add_reference() for a new
- * handler/handle pair.
- *
- * If this handler/handle pair has already been registered,any new
- * masks specified will be added. In this case,* ACE_Event_Handler::add_reference() will not be called.
- *
- * If the registered handler is currently suspended,it will remain
- * suspended. When the handler is resumed,it will have the
- * existing masks plus any masks added through this call. Handlers
- * do not have partial suspensions.
- */
- // Get the event demultiplexors.
- // virtual ACE_Reactor *reactor (void) const;
- return this->reactor()->register_handler(this,ACE_Event_Handler::ACCEPT_MASK);
- }
- int ClientAcceptor::handle_input(ACE_HANDLE)
- {
- // 为每次连接都使用单独的服务器处理器对象
- ClientService *client;
- ACE_NEW_RETURN (client,ClientService,-1);
- auto_ptr<ClientService> p(client);
- // client的peer方法返回的是一个ACE_SOCK_Stream对象
- if (this->m_acceptor.accept(client->peer()) == -1)
- {
- ACE_ERROR_RETURN((LM_ERROR,ACE_TEXT("(%P|%t) %p\n"),ACE_TEXT("Failed to accept"),ACE_TEXT("client connection")),-1);
- }
- p.release();
- client->reactor(this->reactor());
- // open方法会向反应器登记新得ClientService实例
- if (client->open() == -1)
- {
- client->handle_close(ACE_INVALID_HANDLE,0);
- }
- return 0;
- }
- int ClientAcceptor::handle_close(ACE_HANDLE handle,ACE_Reactor_Mask closeMask)
- {
- if (this->m_acceptor.get_handle() != ACE_INVALID_HANDLE)
- {
- ACE_Reactor_Mask m = ACE_Event_Handler::ACCEPT_MASK | ACE_Event_Handler::DONT_CALL;
- this->reactor()->remove_handler(this,m);
- this->m_acceptor.close();
- }
- return 0;
- }
- ClientAcceptor::~ClientAcceptor()
- {
- this->handle_close(ACE_INVALID_HANDLE,0);
- }
- //--------------------------------------------------
- int ClientService::open(void)
- {
- ACE_TCHAR peer_name[128];
- ACE_INET_Addr peer_addr;
- if (this->m_sock.get_remote_addr(peer_addr) == 0 &&
- peer_addr.addr_to_string(peer_name,128) == 0)
- {
- ACE_DEBUG((LM_DEBUG,ACE_TEXT("(%P|%t) Connection from %s\n"),peer_name));
- }
- return this->reactor()->register_handler(this,ACE_Event_Handler::READ_MASK);
- }
- int ClientService::handle_input(ACE_HANDLE)
- {
- const size_t INPUT_SIZE = 4096;
- char buffer[INPUT_SIZE];
- ssize_t recv_cnt,send_cnt;
- if (recv_cnt = (this->m_sock.recv(buffer,sizeof(buffer))) <= 0)
- {
- ACE_DEBUG((LM_DEBUG,ACE_TEXT("(%P|%t) Conection closed\n")));
- return -1;
- }
- send_cnt = this->m_sock.send(buffer,ACE_static_cast(size_t,recv_cnt));
- if (send_cnt == recv_cnt)
- {
- return 0;
- }
- // EWOULBLOCK无法现在发送数据
- if (send_cnt == -1 && ACE_OS::last_error() != EWOULDBLOCK)
- {
- ACE_ERROR_RETURN((LM_ERROR,ACE_TEXT("send")),0);
- }
- if (send_cnt == -1)
- {
- send_cnt = 0;
- }
- ACE_Message_Block *mb;
- size_t remaining = ACE_static_cast(size_t,(recv_cnt - send_cnt));
- ACE_NEW_RETURN(mb,ACE_Message_Block(&buffer[send_cnt],remaining),-1);
- int output_off = this->m_output_queue.is_empty();
- ACE_Time_Value nowait(ACE_OS::gettimeofday());
- if (this->m_output_queue.enqueue_tail(mb,&nowait) == -1)
- {
- ACE_ERROR((LM_ERROR,ACE_TEXT("(%P|%t) %p; discarding data\n"),ACE_TEXT("enqueue Failed")));
- mb->release();
- return 0;
- }
- if (output_off)
- {
- return this->reactor()->register_handler(this,ACE_Event_Handler::WRITE_MASK);
- }
- return 0;
- }
- int ClientService::handle_output(ACE_HANDLE)
- {
- ACE_Message_Block *mb;
- ACE_Time_Value nowait(ACE_OS::gettimeofday());
- while (0 == this->m_output_queue.dequeue_head(mb,&nowait))
- {
- ssize_t send_cnt = this->m_sock.send(mb->rd_ptr(),mb->length());
- if (send_cnt == -1)
- {
- ACE_ERROR((LM_ERROR,ACE_TEXT("send")));
- }
- else
- {
- mb->rd_ptr(ACE_static_cast(size_t,send_cnt));
- }
- if(mb->length() > 0)
- {
- this->m_output_queue.enqueue_head(mb);
- break;
- }
- mb->release();
- }
- return (this->m_output_queue.is_empty()) ? -1:0;
- }
- int ClientService::handle_close(ACE_HANDLE,ACE_Reactor_Mask mask)
- {
- if(mask == ACE_Event_Handler::WRITE_MASK)
- return 0;
- mask = ACE_Event_Handler::ALL_EVENTS_MASK |
- ACE_Event_Handler::DONT_CALL;
- this->reactor()->remove_handler(this,mask);
- this->m_sock.close();
- this->m_output_queue.flush();
- delete this;
- return 0;
- }
- int ACE_TMAIN(int,ACE_TCHAR *[])
- {
- ACE_INET_Addr port_to_listen("5000");
- ClientAcceptor acceptor;
- // ACE_Event_Handler含有一个ACE_Reactor指针,用于方便的引用正在使用的反应器
- // 该反应器实例会在第一次用到的时候创建,在程序结束自动关闭,单例模式+智能指针?
- // Set the event demultiplexors.
- // virtual void reactor (ACE_Reactor *reactor);
- acceptor.reactor(ACE_Reactor::instance());
- if (acceptor.open(port_to_listen) == -1)
- {
- return 1;
- }
- ACE_Reactor::instance()->run_reactor_event_loop();
- return 0;
- }
Demo2
- // $Id: HAStatus.cpp 91626 2010-09-07 10:59:20Z johnnyw $
- #include "ace/OS_NS_sys_time.h"
- #include "ace/os_include/os_netdb.h"
- // Listing 1 code/ch07
- #include "ace/Auto_Ptr.h"
- #include "ace/Log_Msg.h"
- #include "ace/INET_Addr.h"
- #include "ace/SOCK_Acceptor.h"
- #include "ace/Reactor.h"
- class ClientAcceptor : public ACE_Event_Handler
- {
- public:
- virtual ~ClientAcceptor ();
- //FUZZ: disable check_for_lack_ACE_OS
- int open (const ACE_INET_Addr &listen_addr);
- //FUZZ: enable check_for_lack_ACE_OS
- // Get this handler's I/O handle.
- virtual ACE_HANDLE get_handle (void) const
- { return this->acceptor_.get_handle (); }
- // Called when a connection is ready to accept.
- virtual int handle_input (ACE_HANDLE fd = ACE_INVALID_HANDLE);
- // Called when this handler is removed from the ACE_Reactor.
- virtual int handle_close (ACE_HANDLE handle,ACE_Reactor_Mask close_mask);
- protected:
- ACE_SOCK_Acceptor acceptor_;
- };
- // Listing 1
- // Listing 6 code/ch07
- #include "ace/Message_Block.h"
- #include "ace/Message_Queue.h"
- #include "ace/SOCK_Stream.h"
- #include "ace/Synch.h"
- class ClientService : public ACE_Event_Handler
- {
- public:
- ACE_SOCK_Stream &peer (void) { return this->sock_; }
- //FUZZ: disable check_for_lack_ACE_OS
- int open (void);
- //FUZZ: enable check_for_lack_ACE_OS
- // Get this handler's I/O handle.
- virtual ACE_HANDLE get_handle (void) const
- { return this->sock_.get_handle (); }
- // Called when input is available from the client.
- virtual int handle_input (ACE_HANDLE fd = ACE_INVALID_HANDLE);
- // Called when output is possible.
- virtual int handle_output (ACE_HANDLE fd = ACE_INVALID_HANDLE);
- // Called when this handler is removed from the ACE_Reactor.
- virtual int handle_close (ACE_HANDLE handle,ACE_Reactor_Mask close_mask);
- protected:
- ACE_SOCK_Stream sock_;
- ACE_Message_Queue<ACE_NULL_SYNCH> output_queue_;
- };
- // Listing 6
- // Listing 5 code/ch07
- ClientAcceptor::~ClientAcceptor ()
- {
- this->handle_close (ACE_INVALID_HANDLE,0);
- }
- // Listing 5
- // Listing 2 code/ch07
- int
- ClientAcceptor::open (const ACE_INET_Addr &listen_addr)
- {
- if (this->acceptor_.open (listen_addr,1) == -1)
- ACE_ERROR_RETURN ((LM_ERROR,ACE_TEXT ("%p\n"),ACE_TEXT ("acceptor.open")),-1);
- return this->reactor ()->register_handler
- (this,ACE_Event_Handler::ACCEPT_MASK);
- }
- // Listing 2
- // Listing 3 code/ch07
- int
- ClientAcceptor::handle_input (ACE_HANDLE)
- {
- ClientService *client;
- ACE_NEW_RETURN (client,-1);
- auto_ptr<ClientService> p (client);
- if (this->acceptor_.accept (client->peer ()) == -1)
- ACE_ERROR_RETURN ((LM_ERROR,ACE_TEXT ("(%P|%t) %p\n"),ACE_TEXT ("Failed to accept ")
- ACE_TEXT ("client connection")),-1);
- p.release ();
- client->reactor (this->reactor ());
- if (client->open () == -1)
- client->handle_close (ACE_INVALID_HANDLE,0);
- return 0;
- }
- // Listing 3
- // Listing 4 code/ch07
- int
- ClientAcceptor::handle_close (ACE_HANDLE,ACE_Reactor_Mask)
- {
- if (this->acceptor_.get_handle () != ACE_INVALID_HANDLE)
- {
- ACE_Reactor_Mask m = ACE_Event_Handler::ACCEPT_MASK |
- ACE_Event_Handler::DONT_CALL;
- this->reactor ()->remove_handler (this,m);
- this->acceptor_.close ();
- }
- return 0;
- }
- // Listing 4
- // Listing 7 code/ch07
- int
- ClientService::open (void)
- {
- ACE_TCHAR peer_name[MAXHOSTNAMELEN];
- ACE_INET_Addr peer_addr;
- if (this->sock_.get_remote_addr (peer_addr) == 0 &&
- peer_addr.addr_to_string (peer_name,MAXHOSTNAMELEN) == 0)
- ACE_DEBUG ((LM_DEBUG,ACE_TEXT ("(%P|%t) Connection from %s\n"),peer_name));
- return this->reactor ()->register_handler
- (this,ACE_Event_Handler::READ_MASK);
- }
- // Listing 7
- // Listing 8 code/ch07
- int
- ClientService::handle_input (ACE_HANDLE)
- {
- const size_t INPUT_SIZE = 4096;
- char buffer[INPUT_SIZE];
- ssize_t recv_cnt,send_cnt;
- if ((recv_cnt = this->sock_.recv (buffer,sizeof(buffer))) <= 0)
- {
- ACE_DEBUG ((LM_DEBUG,ACE_TEXT ("(%P|%t) Connection closed\n")));
- return -1;
- }
- send_cnt =
- this->sock_.send (buffer,static_cast<size_t> (recv_cnt));
- if (send_cnt == recv_cnt)
- return 0;
- if (send_cnt == -1 && ACE_OS::last_error () != EWOULDBLOCK)
- ACE_ERROR_RETURN ((LM_ERROR,ACE_TEXT ("send")),0);
- if (send_cnt == -1)
- send_cnt = 0;
- ACE_Message_Block *mb = 0;
- size_t remaining =
- static_cast<size_t> ((recv_cnt - send_cnt));
- ACE_NEW_RETURN (mb,ACE_Message_Block (remaining),-1);
- mb->copy (&buffer[send_cnt],remaining);
- int output_off = this->output_queue_.is_empty ();
- ACE_Time_Value nowait (ACE_OS::gettimeofday ());
- if (this->output_queue_.enqueue_tail (mb,&nowait) == -1)
- {
- ACE_ERROR ((LM_ERROR,ACE_TEXT ("(%P|%t) %p; discarding data\n"),ACE_TEXT ("enqueue Failed")));
- mb->release ();
- return 0;
- }
- if (output_off)
- return this->reactor ()->register_handler
- (this,ACE_Event_Handler::WRITE_MASK);
- return 0;
- }
- // Listing 8
- // Listing 9 code/ch07
- int
- ClientService::handle_output (ACE_HANDLE)
- {
- ACE_Message_Block *mb = 0;
- ACE_Time_Value nowait (ACE_OS::gettimeofday ());
- while (0 <= this->output_queue_.dequeue_head
- (mb,&nowait))
- {
- ssize_t send_cnt =
- this->sock_.send (mb->rd_ptr (),mb->length ());
- if (send_cnt == -1)
- ACE_ERROR ((LM_ERROR,ACE_TEXT ("send")));
- else
- mb->rd_ptr (static_cast<size_t> (send_cnt));
- if (mb->length () > 0)
- {
- this->output_queue_.enqueue_head (mb);
- break;
- }
- mb->release ();
- }
- return (this->output_queue_.is_empty ()) ? -1 : 0;
- }
- // Listing 9
- // Listing 10 code/ch07
- int
- ClientService::handle_close (ACE_HANDLE,ACE_Reactor_Mask mask)
- {
- if (mask == ACE_Event_Handler::WRITE_MASK)
- return 0;
- mask = ACE_Event_Handler::ALL_EVENTS_MASK |
- ACE_Event_Handler::DONT_CALL;
- this->reactor ()->remove_handler (this,mask);
- this->sock_.close ();
- this->output_queue_.flush ();
- delete this;
- return 0;
- }
- // Listing 10
- // Listing 12 code/ch07
- class LoopStopper : public ACE_Event_Handler
- {
- public:
- LoopStopper (int signum = SIGINT);
- // Called when object is signaled by OS.
- virtual int handle_signal (int signum,siginfo_t * = 0,ucontext_t * = 0);
- };
- LoopStopper::LoopStopper (int signum)
- {
- ACE_Reactor::instance ()->register_handler (signum,this);
- }
- int
- LoopStopper::handle_signal (int,siginfo_t *,ucontext_t *)
- {
- ACE_Reactor::instance ()->end_reactor_event_loop ();
- return 0;
- }
- // Listing 12
- // Listing 13 code/ch07
- #include "ace/Signal.h"
- class LogSwitcher : public ACE_Event_Handler
- {
- public:
- LogSwitcher (int on_sig,int off_sig);
- // Called when object is signaled by OS.
- virtual int handle_signal (int signum,ucontext_t * = 0);
- // Called when an exceptional event occurs.
- virtual int handle_exception (ACE_HANDLE fd = ACE_INVALID_HANDLE);
- private:
- LogSwitcher () {}
- int on_sig_; // Signal to turn logging on
- int off_sig_; // Signal to turn logging off
- int on_off_; // 1 == turn on,0 == turn off
- };
- LogSwitcher::LogSwitcher (int on_sig,int off_sig)
- : on_sig_ (on_sig),off_sig_ (off_sig)
- {
- ACE_Sig_Set sigs;
- sigs.sig_add (on_sig);
- sigs.sig_add (off_sig);
- ACE_Reactor::instance ()->register_handler (sigs,this);
- }
- // Listing 13
- // Listing 14 code/ch07
- int
- LogSwitcher::handle_signal (int signum,ucontext_t *)
- {
- if (signum == this->on_sig_ || signum == this->off_sig_)
- {
- this->on_off_ = signum == this->on_sig_;
- ACE_Reactor::instance ()->notify (this);
- }
- return 0;
- }
- // Listing 14
- // Listing 15 code/ch07
- int
- LogSwitcher::handle_exception (ACE_HANDLE)
- {
- if (this->on_off_)
- ACE_LOG_MSG->clr_flags (ACE_Log_Msg::SILENT);
- else
- ACE_LOG_MSG->set_flags (ACE_Log_Msg::SILENT);
- return 0;
- }
- // Listing 15
- // Listing 11 code/ch07
- int ACE_TMAIN (int,ACE_TCHAR *[])
- {
- ACE_INET_Addr port_to_listen ("HAStatus");
- ClientAcceptor acceptor;
- acceptor.reactor (ACE_Reactor::instance ());
- if (acceptor.open (port_to_listen) == -1)
- return 1;
- ACE_Reactor::instance ()->run_reactor_event_loop ();
- return (0);
- }
- // Listing 11
http://wenku.baidu.com/view/866966a1284ac850ad0242ab.html
http://daimojingdeyu.iteye.com/blog/828696
http://book.2cto.com/201208/1893.html
http://www.jb51.cc/article/p-nlrfvpff-yn.html
http://developer.51cto.com/art/201208/354591.htm
acceptor-connector
http://shanghai.kankanews.com/mobile/2013-05-20/1546541.shtml
http://auto.people.com.cn/n/2013/0317/c1005-20815213.html
http://scitech.people.com.cn/n/2013/0310/c1007-20736417.html