ACE之Reactor模式使用实例

前端之家收集整理的这篇文章主要介绍了ACE之Reactor模式使用实例前端之家小编觉得挺不错的,现在分享给大家,也给大家做个参考。
// ACE_Reactor_Client.cpp : 定义控制台应用程序的入口点。
//

#include "stdafx.h"


#include "ace/Reactor.h"  
#include "ace/SOCK_Connector.h"  
#include "ace/OS.h"
#include "ace/Log_Msg.h"
#include <string>  
#include <iostream>  
using namespace std;  

class MyClient:public ACE_Event_Handler   
{  
public:  
	bool do_connect(string ip,int port,int local_port)  
	{  
		ACE_SOCK_Connector connector;  
		ACE_INET_Addr local_addr(local_port,"0.0.0.0");

		ACE_INET_Addr addr(port,ip.c_str());  
		ACE_Time_Value timeout(5,0);  
		if(connector.connect(peer_sock,addr,&timeout,local_addr) != 0)  
		{  
			cout<<"connect fail."<<endl;  
			return false;  
		}  
		int ret = ACE_Reactor::instance()->register_handler(this,ACE_Event_Handler::READ_MASK);  
		if (ret != 0)
		{
			cout<<"local_port:"<<local_port<<" register_handler fail."<<endl;
			return false;
		}
		sprintf(buf,"%d",local_port); 
		peer_sock.send(buf,strlen(buf)+1);  
		return true;  
	}  

	ACE_HANDLE get_handle(void) const  
	{  
		return peer_sock.get_handle();  
	}  

	int handle_input (ACE_HANDLE fd)  
	{  
		int rev=0;  
		ACE_Time_Value timeout(5,0);  
		if((rev=peer_sock.recv(buf,sizeof(buf),&timeout))>0)  
		{  
			buf[rev]='\0';  
			cout<<"recv: "<<buf<<endl;  
		} 
		ACE_INET_Addr raddr;
		peer_sock.get_local_addr(raddr);
		//ACE_DEBUG ((LM_DEBUG,ACE_TEXT ( " (%P|%t) close:%s %d\n " ),raddr.get_host_addr(),raddr.get_port_number()));
		sprintf(buf,raddr.get_port_number()); 
		peer_sock.send(buf,strlen(buf)+1); 
		
		return 0;  
	}  

private:  
	ACE_SOCK_Stream peer_sock;
	char buf[100];  
};

#include <ace/OS.h>
#include <ace/Task.h>

class TTcpNetThread : public ACE_Task_Base
{
public:
	/// 运行
	int open();

	/// 停止运行
	int close();
protected:
	/// 线程函数
	virtual int svc();
};

int TTcpNetThread::open() { return this->activate(); }

int TTcpNetThread::close()
{
	ACE_Reactor::instance()->end_reactor_event_loop(); // 终止ACE_Proactor循环

	this->wait(); // 等待清理现场

	return 0;
}

int TTcpNetThread::svc()
{
	// Proactor的事件循环开始
	while(!ACE_Reactor::instance()->event_loop_done())  
	{  
		ACE_Reactor::instance()->handle_events();   
	}

	ACE_DEBUG((LM_DEBUG,ACE_TEXT("Network fin\n")));

	return 0;
}

/**********************************************************************************************
在Socket编程中,常见的事件就是"读就绪","写就绪",通过对这两个事件的捕获分发,可以实现Socket中的异步操作。

Socket编程中的事件处理器

在前面我们已经介绍过,在ACE反应器框架中,任何都必须派生自ACE_Event_Handler类,并通过重载其相应会调事件处理函数来实现相应的回调处理的。在Socket编程中,我们通常需要重载的函数有

1.handle_input()
当I/O句柄(比如UNIX中的文件描述符)上的输入可用时,反应器自动回调该方法。

2.handle_output()
当I/O设备的输出队列有可用空间时,反应器自动回调该方法。

3.handle_close()
当事件处理器中的事件从Reactor中移除的时候调用。 

此外,为了使Reactor能通过I/O句柄找到对应的事件处理器,还必须重载其get_handle()方法以使得Reactor建立起I/O句柄和事件处理器的关联。 
***********************************************************************************************/
#pragma comment(lib,"ACEd.lib")

#define CLIENT_THREAD_NUM 4

int main(int argc,char *argv[])   
{  
	for (int i=0;i<2000;i++)
	{
		MyClient *client = new MyClient;  
		if (!client->do_connect("127.0.0.1",4567,10000+i))
			break; 
	}

	system("pause");
	TTcpNetThread netThread[CLIENT_THREAD_NUM];

	for(int i = 0; i < CLIENT_THREAD_NUM; i++)
	{
		netThread[i].open();
	}
	while (getchar())
	{
		ACE_OS::sleep(1);
	}

	/*while(true)  
	{  
		ACE_Reactor::instance()->handle_events();   
	}*/ 

	return 0;   
}  

// ACE_Reactor_Server.cpp : 定义控制台应用程序的入口点。
//

#include "stdafx.h"

#include <ace/Reactor.h>  
#include <ace/SOCK_Connector.h>   
#include <ace/SOCK_Acceptor.h>   
#include <ace/Auto_Ptr.h> 
#include "ace/OS.h"
#include "ace/Log_Msg.h"
#include <list>
#pragma comment(lib,"ACEd.lib")

class ClientService : public ACE_Event_Handler  
{  
public:  
	ACE_SOCK_Stream &peer (void) { return this->sock_; }  

	int regist_this(void)  
	{  
		//注册读就绪回调函数  
		return this->reactor ()->register_handler(this,ACE_Event_Handler::READ_MASK);  
	}  

	virtual ACE_HANDLE get_handle (void) const { return this->sock_.get_handle (); }  

	virtual int handle_input (ACE_HANDLE fd )  
	{  
		int rev = peer().recv(buf,sizeof(buf));  
		if(rev<=0)  
			return -1;  
		buf[rev] = '\0';  
		printf("recv:%s",buf);  

		return 0;  
	}  

	// 释放相应资源  
	virtual int handle_close (ACE_HANDLE,ACE_Reactor_Mask mask)  
	{  
		if (mask == ACE_Event_Handler::WRITE_MASK)  
			return 0;  
		mask = ACE_Event_Handler::ALL_EVENTS_MASK |  
			ACE_Event_Handler::DONT_CALL;  
		this->reactor ()->remove_handler (this,mask);  
		this->sock_.close ();  
		delete this;    //socket出错时,将自动删除该客户端,释放相应资源  
		return 0;  
	}  

protected:  
	char buf[100];  
	ACE_SOCK_Stream sock_;  
};  


class ClientAcceptor : public ACE_Event_Handler  
{  
public:  
	virtual ~ClientAcceptor (){this->handle_close (ACE_INVALID_HANDLE,0);}  

	int start_listen (const ACE_INET_Addr &listen_addr)  
	{  
		if (this->acceptor_.open (listen_addr,1) == -1)  
		{  
			printf("open port fail\n");  
			return -1;  
		}  
		//注册接受连接回调事件  
		return this->reactor ()->register_handler(this,ACE_Event_Handler::ACCEPT_MASK);  
	}  

	virtual ACE_HANDLE get_handle (void) const  
	{ return this->acceptor_.get_handle (); }  

	virtual int handle_input (ACE_HANDLE fd )  
	{  
		ClientService *client = new ClientService();  
		auto_ptr<ClientService> p (client);  

		if (this->acceptor_.accept (client->peer ()) == -1)  
		{  
			printf("accept client fail\n");  
			return -1;  
		}  
		p.release ();  
		client->reactor (this->reactor ());  
		if (client->regist_this () == -1)  
			client->handle_close (ACE_INVALID_HANDLE,0);  
		return 0;  
	}  

	virtual int handle_close (ACE_HANDLE handle,ACE_Reactor_Mask close_mask)  
	{  
		if (this->acceptor_.get_handle () != ACE_INVALID_HANDLE)  
		{  
			ACE_Reactor_Mask m = ACE_Event_Handler::ACCEPT_MASK |  
				ACE_Event_Handler::DONT_CALL;  
			this->reactor ()->remove_handler (this,m);  
			this->acceptor_.close ();  
		}  
		return 0;  
	}  

protected:  
	ACE_SOCK_Acceptor acceptor_;  
};  



int main1(int argc,char *argv[])   
{  
	ACE_INET_Addr addr(4567,"127.0.0.1");  
	ClientAcceptor server;  
	server.reactor(ACE_Reactor::instance());  
	server.start_listen(addr);  

	while(true)  
	{  
		ACE_Reactor::instance()->handle_events();   
	}  

	return 0;   
}

////////////////////////////////////////////////
#define MAX_BUFF_SIZE     1024
#define LISTEN_PORT       4567
#define SERVER_IP         ACE_LOCALHOST

class ClientHandler : public ACE_Event_Handler
{
public:
	friend class ServerAcceptor;
public:
	ClientHandler(){}
	~ClientHandler()
	{
		sock_stream.close();
		ACE_Reactor::instance()->remove_handler(this,ACE_Event_Handler::READ_MASK | ACE_Event_Handler::DONT_CALL);
	}
	int send_some(const void *buff,int bytes)
	{
		return sock_stream.send(buff,bytes);
	}
	ACE_SOCK_Stream& GetStream(){return sock_stream;}    //给accept提供接口绑定数据通道
public:
	virtual int handle_input(ACE_HANDLE fd);    //I/O触发事件后调用
	virtual ACE_HANDLE get_handle(void) const {return sock_stream.get_handle();}    //不重载需要手动将handle传入ACE_Reactor
private:
	ACE_INET_Addr Cli_addr;
	ACE_SOCK_Stream sock_stream;
};

int ClientHandler::handle_input(ACE_HANDLE fd)
{
	char strBuffer[MAX_BUFF_SIZE];
	int byte = sock_stream.recv(strBuffer,MAX_BUFF_SIZE); //可读数据
	if (-1 == byte)
	{
		ACE_DEBUG((LM_INFO,ACE_TEXT("receive data Failed\n")));
	}
	else if(0 == byte)
	{
		sock_stream.close();
		ACE_Reactor::instance()->remove_handler(this,ACE_Event_Handler::READ_MASK | ACE_Event_Handler::DONT_CALL);
		ACE_DEBUG((LM_INFO,ACE_TEXT("client closed!\n")));
	}
	else
	{
		ACE_DEBUG((LM_INFO,ACE_TEXT("receive:%s\n"),strBuffer));
		
		sock_stream.send(strBuffer,strlen(strBuffer)+1);
	}
	return 0;
}

// ServerAcceptor
class ServerAcceptor : public ACE_Event_Handler
{
public:
	ServerAcceptor(int port,char* ip);
	~ServerAcceptor();
	virtual int handle_input(ACE_HANDLE fd);  // ACE框架回调
	virtual ACE_HANDLE get_handle(void) const {return Svr_aceept.get_handle();}
private:
	ACE_INET_Addr Svr_addr;
	ACE_SOCK_Acceptor Svr_aceept;
	std::list<ClientHandler*> m_streamPool;  //stream pool
};

ServerAcceptor::ServerAcceptor(int port,char* ip):Svr_addr(port,ip)
{
	if (-1 == Svr_aceept.open(Svr_addr,1))
	{
		ACE_DEBUG((LM_ERROR,ACE_TEXT("accept open Failed\n")));
		Svr_aceept.close();
	}
	ACE_DEBUG((LM_ERROR,ACE_TEXT("accept open success\n")));
}

ServerAcceptor::~ServerAcceptor()
{
	ACE_Reactor::instance()->remove_handler(this,ACE_Event_Handler::ACCEPT_MASK);
	Svr_aceept.close();

	std::list<ClientHandler*>::iterator it;
	for (it = m_streamPool.begin();it != m_streamPool.end();++it)
	{
		if (NULL != (*it))
		{
			delete (*it);
		}
	}
}
#include "ace/SOCK_SEQPACK_Association.h"
int ServerAcceptor::handle_input(ACE_HANDLE fd )  
{
	ClientHandler *stream = new ClientHandler();    //产生新通道
	if (NULL != stream)
	{
		m_streamPool.push_back(stream);
	}
	if (Svr_aceept.accept(stream->GetStream()) == -1)  //绑定通道
	{  
		printf("accept client fail\n");  
		return -1;  
	}
	ACE_Reactor::instance()->register_handler(stream,ACE_Event_Handler::READ_MASK);  //通道注册到ACE_Reactor

	ACE_INET_Addr raddr;
	stream->GetStream().get_remote_addr(raddr);
	ACE_DEBUG ((LM_DEBUG,ACE_TEXT ( "client:%s %d\n" ),raddr.get_port_number()));

	/*ACE_INET_Addr addr;
	ACE_SOCK_SEQPACK_Association ass=ACE_SOCK_SEQPACK_Association(fd);
	size_t addr_size=sizeof ACE_INET_Addr;
	ass.get_remote_addrs(&addr,addr_size);
	ACE_OS::printf("fd:%d ip:%d port:%d\n",(int)fd,addr.get_ip_address(),addr.get_port_number());*/
	//ACE_DEBUG((LM_ERROR,ACE_TEXT("User connect success!\n")));
	return 0;
}  

#include <ace/OS.h>
#include <ace/Task.h>

class TTcpNetThread : public ACE_Task_Base
{
public:
	/// 运行
	int open();

	/// 停止运行
	int close();
protected:
	/// 线程函数
	virtual int svc();
};

int TTcpNetThread::open() { return this->activate(); }

int TTcpNetThread::close()
{
	ACE_Reactor::instance()->end_reactor_event_loop(); // 终止ACE_Proactor循环

	this->wait(); // 等待清理现场

	return 0;
}

int TTcpNetThread::svc()
{
	ACE_Reactor::instance()->run_reactor_event_loop();

	ACE_DEBUG((LM_DEBUG,ACE_TEXT("Network fin\n")));

	return 0;
}

#define CLIENT_THREAD_NUM 4

int main(int argc,char *argv[])
{
	ServerAcceptor server(LISTEN_PORT,(char *)SERVER_IP);

	//listen port注册到ACE_Reactor
	ACE_Reactor::instance()->register_handler(&server,ACE_Event_Handler::ACCEPT_MASK);

	TTcpNetThread netThread[CLIENT_THREAD_NUM];

	for(int i = 0; i < CLIENT_THREAD_NUM; i++)
	{
		netThread[i].open();
	}
	while (getchar())
	{
		ACE_OS::sleep(1);
	}

	//进入消息循环,有I/O事件回调handle_input
	//ACE_Reactor::instance()->run_reactor_event_loop();

	return 0;
}

猜你在找的React相关文章