3 ACE_Reactor 同步框架 网络聊天室
ACE_Reactor 框架
网络聊天室
项目文件:
chunli@Linux:~/ace/AceChatRoom$tree . ├──ChatMain.cpp ├──ChatRoom.cpp ├──ChatRoom.h ├──ParticipantAcceptor.cpp ├──ParticipantAcceptor.h ├──Participant.cpp ├──Participant.h ├──SignalHandler.cpp └──SignalHandler.h 0directories,9files chunli@Linux:~/ace/AceChatRoom$
主程序:
chunli@Linux:~/ace/AceChatRoom$catChatMain.cpp #include<ace/Reactor.h> #include"ParticipantAcceptor.h" #include"SignalHandler.h" intmain(){ SignalHandlersh; ParticipantAcceptoracceptor; ACE_INET_Addraddr(8868); if(acceptor.open(addr)==-1) return1; returnACE_Reactor::instance()->run_reactor_event_loop(); } chunli@Linux:~/ace/AceChatRoom$
ChatRoom类文件
chunli@Linux:~/ace/AceChatRoom$catChatRoom.h #ifndefCHATROOM_H_ #defineCHATROOM_H_ #include<list> #include<ace/Singleton.h> #include<ace/Null_Mutex.h> classParticipant; classChatRoom{ public: voidjoin(Participant*user); voidleave(Participant*user); voidforwardMsg(constchar*msg); private: std::list<Participant*>users; }; //不加锁的方式 typedefACE_Singleton<ChatRoom,ACE_Null_Mutex>Room; #endif/*CHATROOM_H_*/ chunli@Linux:~/ace/AceChatRoom$ chunli@Linux:~/ace/AceChatRoom$catChatRoom.cpp #include<cstring> #include<iostream> #include"ChatRoom.h" #include"Participant.h" voidChatRoom::join(Participant*user) { users.push_back(user); } voidChatRoom::leave(Participant*user) { std::list<Participant*>::iteratorit=users.begin(); for(;it!=users.end();++it) { if(*it==user) { users.erase(it); break; } } } voidChatRoom::forwardMsg(constchar*msg) { std::list<Participant*>::const_iteratorit=users.begin(); for(;it!=users.end();++it) { ACE_SOCK_Stream&sock=(*it)->socket(); if(sock.send(msg,std::strlen(msg))==-1) (*it)->handle_close(ACE_INVALID_HANDLE,0); } } chunli@Linux:~/ace/AceChatRoom$
ParticipantAcceptor类文件
chunli@Linux:~/ace/AceChatRoom$catParticipantAcceptor.h #ifndefPARTICIPANTACCEPTOR_H_ #definePARTICIPANTACCEPTOR_H_ #include<ace/Reactor.h> #include<ace/Event_Handler.h> #include<ace/SOCK_Acceptor.h> classParticipantAcceptor:ACE_Event_Handler{ public: ParticipantAcceptor(ACE_Reactor*reactor=ACE_Reactor::instance()); virtual~ParticipantAcceptor(); intopen(constACE_INET_Addr&addr); virtualACE_HANDLEget_handle()const; virtualinthandle_input(ACE_HANDLEh=ACE_INVALID_HANDLE); virtualinthandle_close(ACE_HANDLEh,ACE_Reactor_MaskcloseMask); private: ACE_SOCK_Acceptoracceptor; }; #endif/*PARTICIPANTACCEPTOR_H_*/ chunli@Linux:~/ace/AceChatRoom$ chunli@Linux:~/ace/AceChatRoom$ chunli@Linux:~/ace/AceChatRoom$catParticipantAcceptor.cpp #include<ace/Log_Msg.h> #include"ParticipantAcceptor.h" #include"ChatRoom.h" #include"Participant.h" ParticipantAcceptor::ParticipantAcceptor(ACE_Reactor*reactor){ this->reactor(reactor); } ParticipantAcceptor::~ParticipantAcceptor(){ handle_close(ACE_INVALID_HANDLE,0); } intParticipantAcceptor::open(constACE_INET_Addr&addr){ if(acceptor.open(addr,0)==-1) ACE_ERROR_RETURN((LM_ERROR,"%p\n","acceptor.open"),-1); returnreactor()->register_handler(this,ACE_Event_Handler::ACCEPT_MASK); } ACE_HANDLEParticipantAcceptor::get_handle()const{ returnacceptor.get_handle(); } intParticipantAcceptor::handle_input(ACE_HANDLEh){ Participant*user=newParticipant(reactor()); if(acceptor.accept(user->socket())==-1) ACE_ERROR_RETURN((LM_ERROR,"acceptor.accept"),-1); if(user->open()==-1){ ACE_ERROR_RETURN((LM_ERROR,-1); user->handle_close(ACE_INVALID_HANDLE,0); }else{ Room::instance()->join(user); } return0; } intParticipantAcceptor::handle_close(ACE_HANDLEh,ACE_Reactor_MaskcloseMask){ if(acceptor.get_handle()!=ACE_INVALID_HANDLE){ ACE_Reactor_Maskm=ACE_Event_Handler::ACCEPT_MASK |ACE_Event_Handler::DONT_CALL; reactor()->remove_handler(this,m); acceptor.close(); } return0; } chunli@Linux:~/ace/AceChatRoom$
Participant类文件
chunli@Linux:~/ace/AceChatRoom$catParticipant.h #ifndefPARTICIPANT_H_ #definePARTICIPANT_H_ #include<ace/Reactor.h> #include<ace/Event_Handler.h> #include<ace/SOCK_Acceptor.h> classParticipant:ACE_Event_Handler{ public: staticACE_Time_ValuemaxMsgInterval; Participant(ACE_Reactor*reactor=ACE_Reactor::instance()); intopen(); virtualACE_HANDLEget_handle()const; virtualinthandle_input(ACE_HANDLEh=ACE_INVALID_HANDLE); virtualinthandle_timeout(constACE_Time_Value&t,constvoid*=0); virtualinthandle_close(ACE_HANDLEh,ACE_Reactor_MaskcloseMask); ACE_SOCK_Stream&socket(); private: ACE_Time_ValuelastMsgTime; ACE_SOCK_Streamsock; }; #endif/*PARTICIPANT_H_*/ chunli@Linux:~/ace/AceChatRoom$catParticipant.cpp #include<ace/Log_Msg.h> #include<ace/Timer_Queue.h> #include"Participant.h" #include"ChatRoom.h" //ACE_Time_ValueParticipant::maxMsgInterval=ACE_Time_Value(5); ACE_Time_ValueParticipant::maxMsgInterval=ACE_Time_Value(20);//20秒没有在聊天室说话的人,就被close Participant::Participant(ACE_Reactor*reactor){ this->reactor(reactor); } intParticipant::open(){ lastMsgTime=reactor()->timer_queue()->gettimeofday(); intresult= reactor()->register_handler(this,ACE_Event_Handler::READ_MASK); if(result!=0) returnresult; result=reactor()->schedule_timer(this,ACE_Time_Value::zero,maxMsgInterval); returnresult; } ACE_HANDLEParticipant::get_handle()const{ returnsock.get_handle(); } intParticipant::handle_input(ACE_HANDLEh){ charbuf[512]=""; ssize_trecvBytes=sock.recv(buf,sizeof(buf)); if(recvBytes<=0) ACE_ERROR_RETURN((LM_ERROR,"sock.recv"),-1); lastMsgTime=reactor()->timer_queue()->gettimeofday(); Room::instance()->forwardMsg(buf); return0; } intParticipant::handle_timeout(constACE_Time_Value&t,constvoid*){ if(t-lastMsgTime>maxMsgInterval) reactor()->remove_handler(this,ACE_Event_Handler::READ_MASK); return0; } intParticipant::handle_close(ACE_HANDLEh,ACE_Reactor_MaskcloseMask){ if(sock.get_handle()!=ACE_INVALID_HANDLE){ ACE_Reactor_Maskm=ACE_Event_Handler::ALL_EVENTS_MASK |ACE_Event_Handler::DONT_CALL; reactor()->cancel_timer(this); reactor()->remove_handler(this,m); sock.close(); Room::instance()->leave(this); deletethis; } return0; } ACE_SOCK_Stream&Participant::socket(){ returnsock; } chunli@Linux:~/ace/AceChatRoom$
SignalHandler类文件
chunli@Linux:~/ace/AceChatRoom$catSignalHandler.h #ifndefSIGNALHANDLER_H_ #defineSIGNALHANDLER_H_ #include<ace/Signal.h> #include<ace/Reactor.h> #include<ace/Event_Handler.h> classSignalHandler:ACE_Event_Handler{ public: SignalHandler(ACE_Reactor*reactor=ACE_Reactor::instance()); virtualinthandle_signal(intsignum,siginfo_t*,ucontext_t*); }; #endif/*SIGNALHANDLER_H_*/ chunli@Linux:~/ace/AceChatRoom$catSignalHandler.cpp #include<ace/Log_Msg.h> #include"SignalHandler.h" #include"ChatRoom.h" SignalHandler::SignalHandler(ACE_Reactor*reactor){ this->reactor(reactor); ACE_Sig_Setsignals; signals.fill_set(); this->reactor()->register_handler(signals,this); } intSignalHandler::handle_signal(intsignum,ucontext_t*){ switch(signum){ caseSIGINT: ACE_DEBUG((LM_DEBUG,"signalSIGINT,butnotbeterminated!\n")); break; caseSIGUSR1: ACE_DEBUG((LM_DEBUG,"signalSIGUSR1,broadcastgreeting...\n")); Room::instance()->forwardMsg("helloeveryone!\n"); break; caseSIGUSR2: ACE_DEBUG((LM_DEBUG,"signalSIGUSR2,shutdownchatroom...\n")); this->reactor()->end_reactor_event_loop(); break; } return0; } chunli@Linux:~/ace/AceChatRoom$
编译运行:
编译运行: chunli@Linux:~/ace/AceChatRoom$g++*.cpp-lACE-Wall&&./a.out 3个客户端连接上来: chunli@Linux:~$nclocalhost8868 1111111111111111111111 1111111111111111111111 222222222222222222222222 3333333333333333333333333333333 11111111111111111111111111111 11111111111111111111111111111 2222222222222222222222222222222 333333333333333333333333333333 chunli@Linux:~$nclocalhost8868 222222222222222222222222 222222222222222222222222 3333333333333333333333333333333 11111111111111111111111111111 2222222222222222222222222222222 2222222222222222222222222222222 333333333333333333333333333333 chunli@Linux:~$ chunli@Linux:~$nclocalhost8868 3333333333333333333333333333333 3333333333333333333333333333333 11111111111111111111111111111 2222222222222222222222222222222 333333333333333333333333333333 333333333333333333333333333333 chunli@Linux:~$ Ctrl+C杀不死 chunli@Linux:~/ace/AceChatRoom$g++*.cpp-lACE-Wall&&./a.out ^CsignalSIGINT,butnotbeterminated! 断开SSH终端:看到变成了守护进程 chunli@Linux:~$psaux|grepa.out chunli29780.00.1227044208?S10:540:00./a.out 杀死服务程序: chunli@Linux:~$psaux|grepa.out chunli29520.00.0227042308pts/7S+10:500:00./a.out chunli@Linux:~$kill-92952