ACE Reactor框架使用实例-大量代码

前端之家收集整理的这篇文章主要介绍了ACE Reactor框架使用实例-大量代码前端之家小编觉得挺不错的,现在分享给大家,也给大家做个参考。

服务端:
功能:保存所有客户端信息和在线状态,统一分配端口.对掉线的客户端信息通知到进程控制模块
ServerService.h

#ifndef _SERVERSERVICE_H_
#define _SERVERSERVICE_H_
#include <map>
#include <string>
#include <sstream>
#include <fstream>
#include "ace/TP_Reactor.h"
#include "ace/SOCK_Dgram.h"
#include "ace/Task.h"
namespace YB
{
const static ACE_UINT32 iLocalHostPort = 8001;
typedef unsignedchar BYTE;
static ACE_Thread_Mutex slock;
typedef struct STAPPID_
{
std::stringsIp; // 客户端IP
int iPort; // 客户端连接到服务端的端口
int ClientAcceptPort; // 服务端分配的客户端监听端口
BYTE byAppid; // 模块标识
BYTE byGroup; // 组号
int iTime; // 时计器,维护客户端在线状态
bool bOnline; // 在线状态
}STAppid;
/*
服务端UDP数据收发
*/
class CMain;
class CServerService: public ACE_Event_Handler
{
public:
// 构造、析构函数
CServerService();
~CServerService();
virtual ACE_HANDLE get_handle(void) const; // 继承基类
virtual inthandle_input(ACE_HANDLE fd = ACE_INVALID_HANDLE); // 有网络数据到达
virtual inthandle_close(ACE_HANDLE handle,ACE_Reactor_Mask close_mask); // 关闭网络监听

bool Open(CMain* cm);
void SendToAll(char* buf,int nSize); // 消息群发
intDeleteAppid(BYTE byappid,std::string sip); // 删除指定客户端信息
bool ReadServerIP(); // 读配置IP
private:
void UpdateClientAllSatte(STAppid stAppid,ACE_INET_Addr taddr); // 更新客户端状态
void UpdateState(YB::BYTE byappid); // 客户端在线
void MsgData(YB::BYTE byappid); // 消息报文
void ChackProtocol(const char* buf,const int nSize); // 解析报文
void ApplyConn(const char *buf,const int nSize); // 应答客户申请连接
void AllotPort(unsigned short &uiPort); // 检查分配客户端端口是否重复,并分配新的
BYTE checkFrame(unsigned char* uc_buf,unsigned short uc_length);// 帧校验
void fixFrame(unsigned char* uc_buf,unsigned char& uc_Length,// 组帧
unsigned char flag,unsigned char dest_addr,unsigned char src_addr);
void CheckAppid(BYTE byappid,std::string sIp);
public:
std::map<BYTE,STAppid> mpInfo; // 注册客户端信息表
private:
ACE_INET_Addr addr;
ACE_SOCK_Dgram udp;
std::stringsServerIP;
unsigned short usiPort; // 分配客户端端口值
STAppid stAppid;
int iPortCount;
CMain* cmn;
};

// 定时器类,监视在线客户端,
class CTaskTimer : public ACE_Task_Base
{
public:
// 构造、析构函数
CTaskTimer(){timeid = 0;};
virtual ~CTaskTimer(){};
virtual inthandle_close(ACE_HANDLE handle,ACE_Reactor_Mask close_mask); // 关闭网络监听
public:
bool Open(CServerService *udp);
inthandle_timeout(const ACE_Time_Value &current_time,const void *act = 0);// 定时器,清理掉线客户端

private:
CServerService *sudp;
long timeid;
};
// 主调类,负责启动定时器和网络监听
class CMain : public ACE_Task_Base
{
public:
// 构造、析构函数
CMain(){};
~CMain(){};
public:
bool Open();
intClose();
private:
CServerService *serudp;
CTaskTimer*taskTimer;
};
}
#endif// end of _SERVERSERVICE_H_

ServerService.cpp
#include "./ServerService.h"
using namespace YB;
bool CMain::Open()
{
ACE_NEW_RETURN(serudp,CServerService,false);
ACE_NEW_RETURN(taskTimer,CTaskTimer,false);
serudp->reactor(ACE_Reactor::instance());
if (!serudp->Open(this)) return false;
taskTimer->reactor(ACE_Reactor::instance());
if (!taskTimer->Open(serudp)) return false;
ACE_OS::sleep(ACE_Time_Value(0,10000)); // 等待10毫秒
ACE_Reactor::instance()->run_reactor_event_loop(); // 启动线程
return true;
}
int CMain::Close()
{
taskTimer->handle_close(ACE_INVALID_HANDLE,0);
serudp->handle_close(ACE_INVALID_HANDLE,0);
if (ACE_Reactor::instance()->reactor_event_loop_done() != 1)
{
ACE_Reactor::instance()->end_reactor_event_loop();
}
return 0;
}
///////////////////////////////////////////
CServerService::CServerService()
{
usiPort = 20000; // 初使分配客户端端口值
iPortCount = 0;
}
CServerService::~CServerService()
{
}
bool CServerService::ReadServerIP()
{
std::ifstream fle("ServerIp.txt",std::ios::in);
if (!fle) return false;

std::ostringstream seamServerIP;
seamServerIP<<fle.rdbuf();
sServerIP = seamServerIP.str();
fle.close();
return true;
}
bool CServerService::Open(CMain* cm)
{
if (!ReadServerIP()) return false;
this->addr.set(iLocalHostPort,sServerIP.c_str());//,ACE_LOCALHOST
this->udp.open(addr);
this->reactor()->register_handler(this,ACE_Event_Handler::READ_MASK);
cmn = cm;
return true;
}
ACE_HANDLE CServerService::get_handle() const
{
return this->udp.get_handle();
}
intCServerService::handle_input(ACE_HANDLE)
{
ACE_INET_Addr taddr;
char buf[255] = {0};
int isize = 0;
isize = this->udp.recv(buf,255,taddr);
stAppid.iPort = taddr.get_port_number();
stAppid.sIp = taddr.get_host_addr();

if (isize > 0 && isize < 255) ChackProtocol(buf,isize);
return 0;
}
intCServerService::handle_close(ACE_HANDLE handle,ACE_Reactor_Mask close_mask)
{
if (this->udp.get_handle() != ACE_INVALID_HANDLE)
{
ACE_Reactor_Mask m = ACE_Event_Handler::READ_MASK | ACE_Event_Handler::DONT_CALL;
this->reactor()->remove_handler(this,m);
this->udp.close();
}
delete this;
return 0;
}
unsigned char CServerService::checkFrame(unsigned char* uc_buf,unsigned short uc_length)
{
//检查报文
return 1;
}
void CServerService::fixFrame( unsigned char* uc_buf,
unsigned char& uc_Length,
unsigned char flag,
unsigned char dest_addr,
unsigned char src_addr)
{
//组装报文
return ;
}
void CServerService::ChackProtocol(const char* buf,const int nSize)
{
YB::BYTE *p = (YB::BYTE*)buf;

if (checkFrame(p,nSize)) return;
switch (*(p + 11))
{
case 0x00:// 心跳
{

UpdateState(*(p + 6));
if (*(p + 2) == 0x02) MsgData(*(p + 6));
break;

}
case 0x01:// 我要处理的类型
{
switch (*(p + 15))
{
case 0x00:// 正常退出,离线状态
{
DeleteAppid(*(p + 23),stAppid.sIp);
break;
}
case 0x02: // 申请连接
{
ApplyConn(buf,nSize);
break;
}
default:
break;
}
break;
}
case 0x02: // 退出
{
if (*(p + 15) == 0x04 && *(p + 6) == 0x01) cmn->Close();
break;
}
default:
break;
}
}
void CServerService::ApplyConn(const char *buf,const int nSize)
{

ACE_INET_Addr taddr;
YB::BYTE isize = 0x0C;
charpuf[255] = {0};
char *p = (char*)buf;
AllotPort(usiPort);
stAppid.ClientAcceptPort = usiPort;
stAppid.byAppid = *(p + 6);
stAppid.byGroup = *(p + 16);
CheckAppid( stAppid.byAppid,stAppid.sIp.c_str() );
taddr.set(usiPort,stAppid.sIp.c_str());
ACE_UINT32 ip = taddr.get_ip_address();
u_shortiprt = taddr.get_port_number();
/*组帧
strcpy(puf,"/x01/x01/x01/x0C/x03");
puf[5] = stAppid.byGroup;
memcpy(puf + 6,&ip,sizeof(ACE_UINT32));
memcpy(puf + 6 + sizeof(ACE_UINT32),&iprt,sizeof(u_short));
fixFrame((unsigned char*)puf,isize,0x01,stAppid.byAppid,0x04);
taddr.set(stAppid.iPort,stAppid.sIp.c_str());
*/
this->udp.send(puf,taddr);
/*
// 向其他客户端更新信息
isize = 0x0D;
memset(puf,0x00,255);
strcpy(puf,"/x01/x01/x01/x0D/x01");
puf[5] = stAppid.byGroup;
memcpy(puf + 6,sizeof(u_short));
memcpy(puf + 6 + sizeof(ACE_UINT32) + sizeof(u_short),&stAppid.byAppid,sizeof(YB::BYTE));
fixFrame((unsigned char*)puf,0x04);
*/
SendToAll(puf,isize);
// 向新增加客户更新信息
UpdateClientAllSatte(stAppid,taddr);
// 增加新的客户端到链表
stAppid.iTime = 1;
stAppid.bOnline = true;
slock.acquire();
mpInfo.insert(std::make_pair(stAppid.byAppid,stAppid));
slock.release();
}
void CServerService::CheckAppid(YB::BYTE byappid,std::string sIp)
{
std::map<YB::BYTE,STAppid>::iterator mpIter;
for (mpIter = mpInfo.begin(); mpIter != mpInfo.end(); mpIter++)
{
if (byappid == mpIter->second.byAppid &&
sIp == mpIter->second.sIp )
{
DeleteAppid(byappid,sIp);
ACE_OS::sleep(1);
}
}
}
void CServerService::AllotPort(unsigned short &uiPort)
{
if (uiPort > 65500)
{
uiPort = 20000;
iPortCount++;
}
if (iPortCount < 1)
{
uiPort++; //增加分配的端口号
}
else
{
std::map<YB::BYTE,STAppid>::iterator mpIter;
for (mpIter = mpInfo.begin(); mpIter != mpInfo.end(); mpIter++)
{
if (uiPort == mpIter->second.ClientAcceptPort)
{
uiPort++;
mpIter = mpInfo.begin();
}
}
}
}
int CServerService::DeleteAppid(YB::BYTE byappid,std::string sip)
{
std::map<YB::BYTE,STAppid>::iterator mpIter;
YB::BYTE isize = 0x0D;
boolb_isfind = false;
charpuf[255] = {0};
slock.acquire();
for (mpIter = mpInfo.find(byappid); mpIter != mpInfo.end(); mpIter++)
{
if (mpIter->first != byappid) break;
if (mpIter->second.sIp != sip) continue;

ACE_INET_Addr taddr(mpIter->second.ClientAcceptPort,sip.c_str());
ACE_UINT32ip = taddr.get_ip_address();
u_short iprt = taddr.get_port_number();
/*组帧
memset(puf,255);
isize = 0x0D;
strcpy(puf,"/x01/x01/x01/x0D");
puf[5] = mpIter->second.byGroup;
memcpy(puf + 6,&mpIter->second.byAppid,0x04);
*/
mpInfo.erase(mpIter);
b_isfind = true;
break;
}
slock.release();
// 广播到各客户端
if (b_isfind) SendToAll(puf,isize);
return 0;
}
void CServerService::SendToAll(char* buf,int nSize)
{
std::map<YB::BYTE,STAppid>::iterator mpIter;
ACE_INET_Addr taddr;
for (mpIter = mpInfo.begin(); mpIter != mpInfo.end(); mpIter++)
{
taddr.set(mpIter->second.iPort,mpIter->second.sIp.c_str());
int bychar = (unsigned char)buf[10] - mpIter->second.byAppid;
buf[10] = mpIter->second.byAppid;
buf[nSize - 2] -= bychar;
this->udp.send(buf,nSize,taddr);
}
}
void CServerService::UpdateState(YB::BYTE byappid)// 客户端在线
{
std::map<YB::BYTE,STAppid>:: iterator mpIter;

for (mpIter = mpInfo.find(byappid); mpIter != mpInfo.end(); mpIter++)
{
if (mpIter->first != byappid) break;
if (stAppid.sIp != mpIter->second.sIp) continue;
mpIter->second.iTime = 1;
break;
}
}
void CServerService::MsgData(YB::BYTE byappid)
{
std::map<YB::BYTE,STAppid>::iterator mpIter;
for (mpIter = mpInfo.find(byappid); mpIter != mpInfo.end(); mpIter++)
{
if (mpIter->first != byappid) break;
if (mpIter->second.sIp != stAppid.sIp) continue;
ACE_INET_Addr taddr(stAppid.iPort,stAppid.sIp.c_str());
char puf[255] = {0};
unsigned char iszie = 0x04;
strcpy(puf,"/xf0/x01/x01/x04");
fixFrame((unsigned char*)puf,iszie,byappid,0x04);
this->udp.send(puf,taddr);
break;
}
}
void CServerService::UpdateClientAllSatte(STAppid stAppid,ACE_INET_Addr taddr)
{
std::map<YB::BYTE,STAppid>::iterator mpIter;
ACE_INET_Addr taddr1;
unsigned char isize = 0x0D;
ACE_UINT32ip = 0;
u_short iprt = 0;
char puf[255] = {0};

ACE_Time_Value t(0,100000);
ACE_OS::sleep(t);
for (mpIter = mpInfo.begin(); mpIter != mpInfo.end(); mpIter++)
{
taddr1.set(mpIter->second.ClientAcceptPort,mpIter->second.sIp.c_str());
ip = taddr1.get_ip_address();
iprt = taddr1.get_port_number();
/*
memset(puf,"/x01/x01/x01/x0D/x01");
puf[5] = mpIter->second.byGroup;
memcpy(puf + 6,0x04);
*/
this->udp.send(puf,taddr);
t.set(0,10000);
ACE_OS::sleep(t);
}
}
//////////////////////////////////////////////
///*
boolCTaskTimer::Open(CServerService *udp)
{
sudp = udp;
ACE_Time_Value idlay(1);
ACE_Time_Value ival(60);
timeid = this->reactor()->schedule_timer(this,idlay,ival);
return true;
}
int CTaskTimer::handle_close(ACE_HANDLE handle,ACE_Reactor_Mask close_mask)
{
if (timeid) this->reactor()->cancel_timer(this);
delete this;
return 0;
}
int CTaskTimer::handle_timeout(const ACE_Time_Value &current_time,const void *act)
{
std::map<YB::BYTE,STAppid>::iterator mpIter;
for (mpIter = sudp->mpInfo.begin(); mpIter != sudp->mpInfo.end(); mpIter++)
{
mpIter->second.bOnline = mpIter->second.iTime ? true : false;
mpIter->second.iTime = 0;
}
// 删除掉线客户端
for (mpIter = sudp->mpInfo.begin(); mpIter != sudp->mpInfo.end(); mpIter++)
{
if (!mpIter->second.bOnline)
{
sudp->DeleteAppid(mpIter->second.byAppid,mpIter->second.sIp);
mpIter = sudp->mpInfo.begin();
if (mpIter == sudp->mpInfo.end()) break;
}
}
return 0;
}

执行程序

main.cpp
#include "./ServerService.h"
int ACE_TMAIN(int argc,char* argv[])
{
YB::CMain cmain;
if (!cmain.Open()) cmain.Close();
return 0;
}

客户端DLL
功能:根据模块号与其他客户进行连接.数据转发
DLL.cpp

#include "./NetWork.h"
#include "./os_fun.h"
class _mydllexport CBaseInNetwork
{
public:
CBaseInNetwork();
virtual ~CBaseInNetwork();
virtual bool InitNetwork(unsignedchar byAppid,int igroup,int itype);
virtual void CloseNetwork();
virtual int SendData(const unsigned char* buf,const int nSize,unsignedchar byAppid );
virtual int GetDataLength();
virtual int GetData(unsigned char* buf);
private:

CNetWork *pnet;
};
CBaseInNetwork::CBaseInNetwork()
{
}
CBaseInNetwork::~CBaseInNetwork()
{
}
bool CBaseInNetwork::InitNetwork( unsignedchar byAppid,int itype)
{
ACE::init();
bool bRetVal = false;
pnet =new CNetWork();
bRetVal = pnet->Open( byAppid,igroup,itype );
return bRetVal;
}
int CBaseInNetwork::SendData(const unsigned char* buf,unsignedchar byAppid )
{
return pnet->SendData( buf,byAppid );
}
int CBaseInNetwork::GetDataLength()
{
return pnet->GetDataLength();
}
int CBaseInNetwork::GetData(unsigned char* buf)
{
return pnet->GetData( buf );
}
void CBaseInNetwork::CloseNetwork( )
{
pnet->Close();
delete pnet;
pnet = NULL;
ACE::fini();
}

os_fun.h
#ifndef _OS_FUN_H_
#define _OS_FUN_H_
#ifdef WIN32
#include "./win32_fun.hpp"
#endif
#ifdef linux
#include "./linux_fun.hpp"
#endif
#endif // end of _OS_FUN_H_

win32_fun.hpp
#ifndef __WIN32_FUN_H
#define __WIN32_FUN_H
#include <windows.h>
#define _mydllexport extern "C" _declspec(dllexport)
BOOL WINAPI DllMain(HANDLE hModule,
DWORDul_reason_for_call,
LPVOID lpReserved
)
{
switch (ul_reason_for_call)
{
case DLL_PROCESS_ATTACH:
case DLL_THREAD_ATTACH:
case DLL_THREAD_DETACH:
case DLL_PROCESS_DETACH:
break;
}
return TRUE;
}
#endif // end of __WIN32_FUN_H

linux_fun.hpp
#ifndef __LINUX_FUN_H
#define __LINUX_FUN_H
#define _mydllexport
#endif // end of __LINUX_FUN_H

BaseNetWork.h

#ifndef _BASENETWORK_H_
#define _BASENETWORK_H_
namespace YB
{
class CBaseNetWork
{
protected:
CBaseNetWork(void){};
public:
virtual ~CBaseNetWork(void){};
virtual bool Open(unsignedchar byAppid,int igroup = 0,int itype = 0) = 0; //strAppid自己
virtual int SendData(const unsigned char* buf,unsignedchar byAppid = 0xff) = 0;
virtual int GetData(unsigned char* buf) = 0;
virtual int GetDataLength() = 0;
virtual void Close() = 0;

};
}
#endif // end of _BASENETWORK_H_

#ifndef _NETWORK_H_
#define _NETWORK_H_
#include <map>
#include <list>
#include <string>
#include <fstream>
#include <sstream>
#include "ace/TP_Reactor.h"
#include "ace/SOCK_Dgram.h"
#include "ace/SOCK_Acceptor.h"
#include "ace/SOCK_Stream.h"
#include "ace/SOCK_Connector.h"
#include "ace/Task.h"
const static ACE_UINT32 iServerPort = 8001;
typedef unsignedchar Byte;
static ACE_Thread_Mutex mlocka;
static ACE_Thread_Mutex mlock_mp;
const static Byte ServerAppid = 0x04;
class CNetWork;
/*
与服务端建立连接,收发信息
*/
class CServerUdpcs : public ACE_Event_Handler
{
public:
// 构造、析构函数
CServerUdpcs(){};
~CServerUdpcs(){};
virtual ACE_HANDLE get_handle()const;
virtual inthandle_input(ACE_HANDLE fd = ACE_INVALID_HANDLE); // 网络读事件
virtual inthandle_close (ACE_HANDLE handle,ACE_Reactor_Mask close_mask); // 关闭网络事件
public:
intOpen_Event(); // 注册事件
bool Open(Byte byappid,Byte bygroup);
intSendData(const char* buf,const int nSize);
intGetData(char* buf,const int nSize);
void SetParentHander(CNetWork *p); // 设置主类指针,引用数据连表

public:
intiClientStateSign; // 客户状态变化标志;1有变化,0变化
private:
void ChackProtocol(const char* buf,const int nSize); // 解析报文
void UpdateMapInfo(const char* buf,const int nSize); // 更新客户端状态
void ReadDataToList(const char* buf,const int nSize); // 保存数据到连表
void SaveMapinfo(const char* buf); // 保存服务器发送的其他客户端信息
void SaveLocalhost(const char* buf); // 保存本机IP和服务器分配的端口
intGetSourcePort(const char* buf); // 得到端口
std::string GetSourceIp(const char* buf); // 得到IP地址
private:
ACE_INET_Addr praddr;
ACE_SOCK_Dgram udp; //UDP协议流
CNetWork*net;
};
/*
客户端建立监听服务
*/
class CClientAcceptor : public ACE_Event_Handler
{
public:
// 构造、析构函数
CClientAcceptor(){};
~CClientAcceptor(){};
virtual ACE_HANDLE get_handle()const;
virtual inthandle_input(ACE_HANDLE fd = ACE_INVALID_HANDLE); // 接受客户端连接
virtual inthandle_close(ACE_HANDLE handle,ACE_Reactor_Mask close_mask); // 关闭网络事件
public:
intopen(void* avg = 0);
void SetParentHander(CNetWork *p);
private:
ACE_SOCK_Acceptor acp;
CNetWork* net;
};
class CClientService;
// 保存在线客户端连接
typedef struct STAPPIDCS
{
std::stringsIp;
int iPort;
Byte byAppid;
Byte byGroup; // 组号
CClientService *pcs; // 客户端连接
}STAppidcs;
// 保存其他客户发送到本站的数据
typedef struct STLISTDATA
{
std::stringsAppid;
int iLength;
Byte *byData;
}STListData;
/*
点对点数据收发
*/
class CClientService : public ACE_Event_Handler
{
public:
// 构造、析构函数
CClientService(void){};
~CClientService(){};
// 继承基类
virtual ACE_HANDLE get_handle()const;
virtual inthandle_input(ACE_HANDLE fd = ACE_INVALID_HANDLE); // 接受数据,保存到连表
virtual inthandle_close(ACE_HANDLE handle,ACE_Reactor_Mask mask); // 退出连接,删除资源
public:
intOpen();
void SetParentHander(CNetWork *p);
intconnect(ACE_INET_Addr addr); // 连接到其他客户端
intSendData(const char* buf,const int nSize); // 发送数据到其他客户端
ACE_SOCK_Stream &peer(){return sockeam;}

private:
void AddAppid(STAppidcs STAppidcs); // 保存客户端连接
void DeleteAppid(CClientService *cs); // 删除客户端连接
void ReadDataToList(const char* buf,const int nSize); // 保存数据到连表
void ChackProtocol(const char* buf,const int nSize); // 解析报文
private:
ACE_SOCK_Connector con;
ACE_SOCK_Streamsockeam;
STAppidcs stAppidcs; // 保存最近一个客户端连接信息
CNetWork *net;
};
class CTaskTimer;
/*
用户服务类,数据保存
*/
class CNetWork:public ACE_Task_Base
{
public:
// 构造、析构函数
CNetWork(void);
virtual ~CNetWork(void);
public:
bool Open(Byte byappid,int itype = 0); // 服务器地址;strAppid自己
intSendData(const Byte* buf,Byte byappid = 0xff);
intGetDataLength(); //返回数据长度
intGetData(Byte* buf); //返回数据
void Close();
bool ReadServerIP(); //取系统IP
intsvc(void); // 线程回调函数

public:
std::map<Byte,STAppidcs> mpInfo; // 客户端信息连表
std::list<STListData>lstData; // 数据连表
intiprtype; // 是否主控模块标识
Byte byAppid; // 本机APPID
intiport; // 服务端分配PORT,
std::string sip; // 本机IP
Byte byGroup; // 组号
STAppidcsstAppidcs; // 保存最近一个服务端返回其他客户信息CServerUdpcs.SaveMapinfo使用
CServerUdpcs *pServerUdp;
CClientAcceptor *pCAcptor;
CClientService*pCService;
CTaskTimer*ptimer;
std::stringsServerIP;
std::stringsLocalIP;
bool b_run;
private:
int GroupSend(const char* buf,const int nSize); // 组群发
int SingleSend(const char* buf,Byte byappid); // 单发

};
// 定时器类
class CTaskTimer : public ACE_Task_Base
{
public:
// 构造、析构函数
CTaskTimer(){timeid = 0;};
virtual ~CTaskTimer(){};
virtual inthandle_close(ACE_HANDLE handle,ACE_Reactor_Mask close_mask); // 关闭网络监听
public:
bool Open(CNetWork *p);
inthandle_timeout(const ACE_Time_Value &current_time,发送心跳报文

private:
CNetWork*pnet;
long timeid;
};

#endif // end of _NETWORK_H_

NetWork.cpp

#include "./NetWork.h"unsigned char checkFrame(unsigned char* uc_buf,unsigned short uc_length) {return 0;}void fixFrame(unsigned char* uc_buf,unsigned char flag,unsigned char src_addr){;}//////////////////////////////////////////////*CNetWork::CNetWork(){}CNetWork::~CNetWork(){}bool CNetWork::Open(Byte byappid,int itype){b_run = false;iprtype = itype;byAppid = byappid;byGroup = (Byte)igroup;ACE_NEW_RETURN(pServerUdp,CServerUdpcs,false);ACE_NEW_RETURN(pCAcptor,CClientAcceptor,false);ACE_NEW_RETURN(ptimer,false);if (!ReadServerIP()) return false;// 向服务端申请连接,登陆到服务端pServerUdp->SetParentHander(this);if (!pServerUdp->Open(byAppid,byGroup)) return false;// 自监听pCAcptor->SetParentHander(this);// 开启线程activate();b_run = true;return true;}int CNetWork::svc(void){ACE_Reactor rt;this->reactor(&rt);this->pServerUdp->reactor(&rt);this->pCAcptor->reactor(&rt); this->ptimer->reactor(&rt);this->pServerUdp->Open_Event();this->pCAcptor->open();this->ptimer->Open(this);rt.run_reactor_event_loop();ACE_OS::sleep(ACE_Time_Value(1));return 0;}int CNetWork::SendData(const Byte* buf,Byte byappid){int isize = 0;if (byappid == 0xff) // 组发{isize = GroupSend((char*)buf,nSize);}else if (byappid == ServerAppid) // 心跳{isize = pServerUdp->SendData((char*)buf,nSize);}else // 单发点到点{isize = SingleSend((char*)buf,byappid);}return isize;}int CNetWork::GetDataLength(){if (iprtype && pServerUdp->iClientStateSign) return (mpInfo.size() + 17);if (lstData.empty()) return 0;std::list<STListData>::iterator lstIter;lstIter = lstData.begin();return lstIter->iLength;}int CNetWork::GetData(Byte* buf){ if (iprtype && pServerUdp->iClientStateSign){std::map<Byte,STAppidcs>::iterator mpIter;unsigned char isize = 0;pServerUdp->iClientStateSign = 0;strcpy((char*)buf,"/x03/x01/x01");buf[3] = mpInfo.size() + 4;isize = 4;for (mpIter = mpInfo.begin(); mpIter != mpInfo.end(); mpIter++){ if (mpIter->second.sIp == sip) buf[isize++] = mpIter->second.byAppid;}fixFrame(buf,byAppid,ServerAppid);return isize;}if(lstData.empty()) return 0;std::list<STListData>::iterator lstIter;int ilen = 0;// 返回用户数据mlocka.acquire();lstIter = lstData.begin();ilen = lstIter->iLength;memcpy(buf,lstIter->byData,ilen);delete []lstIter->byData;lstData.erase(lstIter);mlocka.release();return ilen;}void CNetWork::Close(){std::map<Byte,STAppidcs>::iterator mpIter;Byte isize = 0x0D; charbuf[255] = {0};ACE_INET_Addr taddr(iport,sip.c_str());ACE_UINT32ip = taddr.get_ip_address();u_short iprt = taddr.get_port_number();/*strcpy(buf,"/x01/x01/x01/x0D");buf[5] = byGroup; memcpy(buf + 6,sizeof(ACE_UINT32));memcpy(buf + 6 + sizeof(ACE_UINT32),sizeof(u_short));memcpy(buf + 6 + sizeof(ACE_UINT32) + sizeof(u_short),&byAppid,sizeof(Byte));fixFrame((unsigned char*)buf,ServerAppid,byAppid);*/isize = pServerUdp->SendData(buf,isize); // 关闭监听事件ptimer->handle_close(ACE_INVALID_HANDLE,0);pCAcptor->handle_close(ACE_INVALID_HANDLE,0);pServerUdp->handle_close(ACE_INVALID_HANDLE,0);// 关闭网络事件for (mpIter = mpInfo.begin(); mpIter != mpInfo.end(); mpIter++){if (mpIter->second.pcs != NULL){ mpIter->second.pcs->handle_close(ACE_INVALID_HANDLE,0); mpIter->second.pcs = NULL;}}// 清除连表mlock_mp.acquire();mpInfo.clear();mlock_mp.release();if (b_run && this->reactor()->reactor_event_loop_done() != 1) this->reactor()->end_reactor_event_loop(); }int CNetWork::GroupSend(const char* buf,const int nSize){std::map<Byte,STAppidcs>::iterator mpIter;int isize = 0;for (mpIter = mpInfo.begin(); mpIter!= mpInfo.end(); mpIter++){if (mpIter->second.byGroup == byGroup) { isize = SingleSend(buf,mpIter->second.byAppid);}} return isize;}int CNetWork::SingleSend(const char* buf,Byte byappid) {std::map<Byte,STAppidcs>::iterator mpIter;Byte isize = 0;for (mpIter = mpInfo.find(byappid); mpIter != mpInfo.end(); mpIter++){if (mpIter->first != byappid) break; if (mpIter->second.pcs != NULL) // 已有连接{ CClientService* p = mpIter->second.pcs; isize = p->SendData(buf,nSize);}else // 无连接,新建连接到对端{ CClientService* cs; ACE_INET_Addr addr(mpIter->second.iPort,mpIter->second.sIp.c_str()); ACE_NEW_RETURN(cs,CClientService,0); cs->SetParentHander(this); cs->reactor(this->reactor()); if (cs->connect(addr) != -1) { charpuf[255] = {0}; cs->Open(); mpIter->second.pcs = cs; // 将连接保存到连表 strcpy(puf,"/x01/x01/x01/x06/x02"); puf[5] = byGroup; isize = 0x06; fixFrame((unsigned char*)puf,byAppid); isize = mpIter->second.pcs->SendData(puf,isize); // 等待 ACE_Time_Value t(1); ACE_OS::sleep(t); isize = mpIter->second.pcs->SendData(buf,nSize); } else { delete cs; }} }return isize;}bool CNetWork::ReadServerIP(){char sip1[20] = {0};char sip2[20] = {0};FILE* fp = NULL;if ((fp = fopen("./ServerIp_c.txt","r")) == NULL) return false;fscanf(fp,"%s %s",sip1,sip2);fclose(fp);sServerIP = sip1;sLocalIP = sip2;return true;}/////////////////////////////////////////////////////////////////bool CServerUdpcs::Open(Byte byappid,Byte bygroup){ACE_INET_Addr taddr;char puf[255] = {0};Byteisize = 0x06;ACE_Time_Value t(3);ACE_INET_Addr taddr_local(net->sLocalIP.c_str());udp.open(taddr_local);taddr.set(iServerPort,net->sServerIP.c_str());/*strcpy(puf,"/x01/x01/x01/x06/x02");puf[5] = bygroup;fixFrame((unsigned char*)puf,byappid);*/this->udp.send(puf,taddr);// 必须阻塞等待服务端返回分配的端口,否则不能建立监听memset(puf,255);ACE_OS::sleep(ACE_Time_Value(0,10000));isize = this->udp.recv(puf,this->praddr,&t);if (isize > 0 && isize < 255) {ChackProtocol(puf,isize);if (net->iport < 20000) //分配的端口都是>20000的,{ udp.close(); return false;}return true;}udp.close();return false;}int CServerUdpcs::Open_Event(){return (this->reactor()->register_handler(this,ACE_Event_Handler::READ_MASK));}ACE_HANDLE CServerUdpcs::get_handle()const{return this->udp.get_handle();}int CServerUdpcs::handle_input(ACE_HANDLE fd){char buf[255] = {0};int isize = this->udp.recv(buf,this->praddr);if (isize > 0 && isize < 255) ChackProtocol(buf,isize);return 0;}int CServerUdpcs::handle_close(ACE_HANDLE handle,ACE_Reactor_Mask close_mask) {if (this->udp.get_handle() != ACE_INVALID_HANDLE){ACE_Reactor_Mask m = ACE_Event_Handler::READ_MASK | ACE_Event_Handler::DONT_CALL;this->reactor()->remove_handler(this,m);this->udp.close();}delete this;return 0;}void CServerUdpcs::ChackProtocol(const char* buf,const int nSize){Byte *p = (Byte*)buf;if (checkFrame(p,nSize)) return;switch (*(p + 11)){case 0xF0: // 消息应答{ if (*(p + 6) == ServerAppid) ReadDataToList(buf,nSize); break;}case 0x01: // my{ switch (*(p + 15)) { case 0x03:// 返回本机IP和端口号 SaveLocalhost(buf); break; case 0x01:// 保存在线客户端信息(新增客户端) SaveMapinfo(buf); break; case 0x00:// 更新在线客户端信息(客户端掉线) UpdateMapInfo(buf,nSize); break; default: break; } break;}default: break;} }void CServerUdpcs::SetParentHander(CNetWork *p){net = p;}int CServerUdpcs::SendData(const char* buf,const int nSize){return this->udp.send(buf,this->praddr);}int CServerUdpcs::GetData(char* buf,const int nSize){return this->udp.recv(buf,this->praddr);}void CServerUdpcs::SaveLocalhost(const char* buf){net->sip = GetSourceIp(buf);net->iport = GetSourcePort(buf); }void CServerUdpcs::UpdateMapInfo(const char* buf,STAppidcs>::iterator mpIter;Byte byappid = (Byte)buf[23];std::string sip = GetSourceIp(buf); mlock_mp.acquire();for (mpIter = net->mpInfo.find(byappid); mpIter != net->mpInfo.end(); mpIter++) {if (mpIter->first != byappid) break;if (mpIter->second.sIp != sip) continue;if (mpIter->second.pcs != NULL){ mpIter->second.pcs->handle_close(ACE_INVALID_HANDLE,0); mpIter->second.pcs = NULL;}net->mpInfo.erase(mpIter);break;}mlock_mp.release();iClientStateSign = 1;}void CServerUdpcs::ReadDataToList(const char* buf,const int nSize){STListData stlData;stlData.iLength = nSize;stlData.byData = new Byte[nSize];memcpy(stlData.byData,buf,nSize);//将数据保存到连表mlocka.acquire();net->lstData.push_front(stlData);mlocka.release();}void CServerUdpcs::SaveMapinfo(const char* buf){std::map<Byte,STAppidcs>::iterator mpIter;Byte *p = (Byte*)buf;STAppidcs stAppidcs; boolb_Insert = false;/*stAppidcs.byAppid = *(p + 23); stAppidcs.byGroup = *(p + 16);stAppidcs.sIp = GetSourceIp(buf);stAppidcs.iPort = GetSourcePort(buf);stAppidcs.pcs = NULL;*/mlock_mp.acquire();for ((mpIter = net->mpInfo.find(stAppidcs.byAppid)); mpIter != net->mpInfo.end(); mpIter++){if (mpIter->first != stAppidcs.byAppid) break;if (mpIter->second.sIp != stAppidcs.sIp) continue;b_Insert = true;break;}if (!b_Insert) net->mpInfo.insert(std::make_pair(stAppidcs.byAppid,stAppidcs));mlock_mp.release();iClientStateSign = 1;}std::string CServerUdpcs::GetSourceIp(const char* buf){ACE_INET_Addr taddr;int iIp = 0;memcpy(&iIp,buf + 17,4);taddr.set(1000,iIp);std::string sip = taddr.get_host_addr();return sip;}int CServerUdpcs::GetSourcePort(const char* buf){int iport = 0;memcpy(&iport,buf + 21,2);return iport;}//////////////////////////////////////////////////////int CClientAcceptor::open(void * avg){ACE_INET_Addr addr(net->iport,net->sip.c_str());this->acp.open(addr,5);return this->reactor()->register_handler(this,ACE_Event_Handler::ACCEPT_MASK);}ACE_HANDLE CClientAcceptor::get_handle()const{return this->acp.get_handle();}int CClientAcceptor::handle_input(ACE_HANDLE fd){CClientService* cs = NULL;cs = new CClientService();cs->SetParentHander(net);if (this->acp.accept(cs->peer()) == -1) {delete cs;return 0;}cs->reactor(this->reactor());if (cs->Open() == -1) cs->handle_close(ACE_INVALID_HANDLE,0);return 0;}int CClientAcceptor::handle_close(ACE_HANDLE handle,ACE_Reactor_Mask close_mask){if (this->acp.get_handle() != ACE_INVALID_HANDLE){ACE_Reactor_Mask m = ACE_Event_Handler::ACCEPT_MASK | ACE_Event_Handler::DONT_CALL;this->reactor()->remove_handler(this,m);this->acp.close();}delete this;return 0;}void CClientAcceptor::SetParentHander(CNetWork *p){net = p;}///////////////////////////////////////////////////int CClientService::Open(){ACE_INET_Addr peeraddr;this->sockeam.get_remote_addr(peeraddr);stAppidcs.iPort = peeraddr.get_port_number();stAppidcs.sIp =peeraddr.get_host_addr();stAppidcs.pcs = this;return this->reactor()->register_handler(this,ACE_Event_Handler::READ_MASK);}ACE_HANDLE CClientService::get_handle()const{return this->sockeam.get_handle();}intCClientService::handle_input(ACE_HANDLE fd){char buf[1024] = {0};int isize = 0;isize = this->sockeam.recv(buf,1024);if (isize <= 0) return -1;if (isize > 0 && isize < 1024) ChackProtocol(buf,isize);return 0;}intCClientService::handle_close(ACE_HANDLE handle,ACE_Reactor_Mask mask) {// 如果对方断开连接,会触发此事件,则会被调用两次(已屏蔽系统自动调用函数)if (mask == ACE_Event_Handler::WRITE_MASK) return 0;DeleteAppid(this); // 删除外部指针 mask = ACE_Event_Handler::ALL_EVENTS_MASK | ACE_Event_Handler::DONT_CALL;this->reactor()->remove_handler(this,mask);this->sockeam.close(); delete this;return 0;}void CClientService::SetParentHander(CNetWork *p){net = p;}int CClientService::SendData(const char* buf,const int nSize){intisize = 0; isize = this->sockeam.send_n(buf,nSize);if (isize <= 0) this->handle_close(0,0);return isize;}int CClientService::connect(ACE_INET_Addr addr){ACE_Time_Value itimeout(1);return this->con.connect(sockeam,addr,&itimeout);}void CClientService::ChackProtocol(const char* buf,const int nSize){Byte *p = (Byte*)buf;if ((*(p + 11) == 0x01) && (*(p + 15) == 0x02)){if (!checkFrame(p,nSize)){ stAppidcs.byAppid = *(p + 6);// 请求连接的客户端APPID stAppidcs.byGroup = *(p + 16); AddAppid(stAppidcs); // 增加客户连接 return;}} ReadDataToList(buf,nSize); // 保存用户数据}void CClientService::AddAppid(STAppidcs stAppidcs){std::map<Byte,STAppidcs>::iterator mpIter;for (mpIter = net->mpInfo.find(stAppidcs.byAppid); mpIter != net->mpInfo.end(); mpIter++) {if (mpIter->first != stAppidcs.byAppid) break;if (mpIter->second.sIp != stAppidcs.sIp) continue;mpIter->second.pcs = this; break;} }void CClientService::DeleteAppid(CClientService* cs){std::map<Byte,STAppidcs>::iterator mpIter;for (mpIter = net->mpInfo.begin(); mpIter != net->mpInfo.end(); mpIter++) {if (mpIter->second.pcs == cs) { mpIter->second.pcs = NULL; break;}}}void CClientService::ReadDataToList(const char* buf,nSize);// 将数据保存到连表if (net->lstData.size() > 500){std::list<STListData>::iterator lstIter;mlocka.acquire();lstIter = net->lstData.begin();delete []lstIter->byData;net->lstData.erase(lstIter);mlocka.release();}mlocka.acquire();net->lstData.push_back(stlData);mlocka.release();}/////////////////////////////////////////////////*boolCTaskTimer::Open(CNetWork *p){pnet = p;ACE_Time_Value idlay(1);ACE_Time_Value ival(40);timeid = this->reactor()->schedule_timer(this,ival);return true;}int CTaskTimer::handle_close(ACE_HANDLE handle,ACE_Reactor_Mask close_mask){if (timeid) this->reactor()->cancel_timer(this); delete this;return 0;}int CTaskTimer::handle_timeout(const ACE_Time_Value& current_time,const void* act) {unsigned char buf[255] = {0};unsigned char isize = 4;/*memcpy(buf,"/x00/x01/x01/x04",4);fixFrame(buf,(pnet->iprtype ? 0x02 : 0x01),pnet->byAppid);*/pnet->SendData(buf,ServerAppid);return 0;}

原文链接:https://www.f2er.com/react/308557.html

猜你在找的React相关文章