前端之家收集整理的这篇文章主要介绍了
epoll实现Reactor模式,
前端之家小编觉得挺不错的,现在分享给大家,也给大家做个参考。
转自:http://blog.csdn.net/analogous_love/article/details/53319815
最近一直在看游双的《高性能Linux服务器编程》一书,下载链接:http://download.csdn.net/detail/analogous_love/9673008
书上是这么介绍Reactor模式的:
按照这个思路,我写个简单的练习:
-
-
- *@author:zhangyl
- *@date:2016.11.23
- */
-
- #include<iostream>
- #include<string.h>
- #include<sys/types.h>
- #include<sys/socket.h>
- #include<netinet/in.h>
- #include<arpa/inet.h>//forhtonl()andhtons()
- #include<unistd.h>
- #include<fcntl.h>
- #include<sys/epoll.h>
- #include<signal.h>//forsignal()
- #include<pthread.h>
- #include<semaphore.h>
- #include<list>
- #include<errno.h>
- #include<time.h>
- #include<sstream>
- #include<iomanip>//forstd::setw()/setfill()
- #include<stdlib.h>
-
- #defineWORKER_THREAD_NUM5
- #definemin(a,b)((a<=b)?(a):(b))
- intg_epollfd=0;
- boolg_bStop=false;
- intg_listenfd=0;
- pthread_tg_acceptthreadid=0;
- pthread_tg_threadid[WORKER_THREAD_NUM]={0};
- pthread_cond_tg_acceptcond;
- pthread_mutex_tg_acceptmutex;
- pthread_cond_tg_cond;
- pthread_mutex_tg_mutex;
- pthread_mutex_tg_clientmutex;
- std::list<int>g_listClients;
- voidprog_exit(intsigno)
- {
- ::signal(SIGINT,SIG_IGN);
- ::signal(SIGKILL,SIG_IGN);
- ::signal(SIGTERM,248); line-height:18px; margin:0px!important; padding:0px 3px 0px 10px!important; list-style-position:outside!important"> std::cout<<"programrecvsignal"<<signo<<"toexit."<<std::endl;
- g_bStop=true;
- ::epoll_ctl(g_epollfd,EPOLL_CTL_DEL,g_listenfd,NULL);
- //TODO:是否需要先调用shutdown()一下?
- ::shutdown(g_listenfd,SHUT_RDWR);
- ::close(g_listenfd);
- ::close(g_epollfd);
- ::pthread_cond_destroy(&g_acceptcond);
- ::pthread_mutex_destroy(&g_acceptmutex);
- ::pthread_cond_destroy(&g_cond);
- ::pthread_mutex_destroy(&g_mutex);
- ::pthread_mutex_destroy(&g_clientmutex);
- }
- boolcreate_server_listener(constchar*ip,shortport)
- g_listenfd=::socket(AF_INET,SOCK_STREAM|SOCK_NONBLOCK,0);
- if(g_listenfd==-1)
- returninton=1;
- ::setsockopt(g_listenfd,SOL_SOCKET,SO_REUSEADDR,(char*)&on,sizeof(on));
- ::setsockopt(g_listenfd,SO_REUSEPORT,153); background-color:inherit; font-weight:bold">sizeof(on));
- structsockaddr_inservaddr;
- memset(&servaddr,sizeof(servaddr));
- servaddr.sin_family=AF_INET;
- servaddr.sin_addr.s_addr=inet_addr(ip);
- servaddr.sin_port=htons(port);
- if(::bind(g_listenfd,(sockaddr*)&servaddr,153); background-color:inherit; font-weight:bold">sizeof(servaddr))==-1)
- if(::listen(g_listenfd,50)==-1)
- false;
- g_epollfd=::epoll_create(1);
- if(g_epollfd==-1)
- structepoll_evente;
- memset(&e,153); background-color:inherit; font-weight:bold">sizeof(e));
- e.events=EPOLLIN|EPOLLRDHUP;
- e.data.fd=g_listenfd;
- if(::epoll_ctl(g_epollfd,EPOLL_CTL_ADD,&e)==-1)
- }
- voidrelease_client(intclientfd)
- {
- std::cout<<"releaseclientsocketFailedascallepoll_ctlFailed"<<std::endl;
- ::close(clientfd);
- void*accept_thread_func(void*arg)
- while(!g_bStop)
- ::pthread_mutex_lock(&g_acceptmutex);
- ::pthread_cond_wait(&g_acceptcond,&g_acceptmutex);
- //::pthread_mutex_lock(&g_acceptmutex);
- //std::cout<<"runloopinaccept_thread_func"<<std::endl;
- structsockaddr_inclientaddr;
- socklen_taddrlen;
- intnewfd=::accept(g_listenfd,(structsockaddr*)&clientaddr,&addrlen);
- ::pthread_mutex_unlock(&g_acceptmutex);
- if(newfd==-1)
- continue;
- std::cout<<"newclientconnected:"<<::inet_ntoa(clientaddr.sin_addr)<<":"<<::ntohs(clientaddr.sin_port)<<std::endl;
- //将新socket设置为non-blocking
- intoldflag=::fcntl(newfd,F_GETFL,0);
- intnewflag=oldflag|O_NONBLOCK;
- if(::fcntl(newfd,F_SETFL,newflag)==-1)
- std::cout<<"fcntlerror,oldflag="<<oldflag<<",newflag="<<newflag<<std::endl;
- e.events=EPOLLIN|EPOLLRDHUP|EPOLLET;
- e.data.fd=newfd;
- std::cout<<"epoll_ctlerror,fd="<<newfd<<std::endl;
- returnNULL;
- void*worker_thread_func(intclientfd;
- ::pthread_mutex_lock(&g_clientmutex);
- while(g_listClients.empty())
- ::pthread_cond_wait(&g_cond,&g_clientmutex);
- clientfd=g_listClients.front();
- g_listClients.pop_front();
- pthread_mutex_unlock(&g_clientmutex);
- //gdb调试时不能实时刷新标准输出,用这个函数刷新标准输出,使信息在屏幕上实时显示出来
- std::cout<<std::endl;
- std::stringstrclientmsg;
- charbuff[256];
- boolbError=while(true)
- memset(buff,153); background-color:inherit; font-weight:bold">sizeof(buff));
- intnRecv=::recv(clientfd,buff,256,0);
- if(nRecv==-1)
- if(errno==EWOULDBLOCK)
- break;
- else
- std::cout<<"recverror,clientdisconnected,fd="<<clientfd<<std::endl;
- release_client(clientfd);
- bError=true;
- //对端关闭了socket,这端也关闭。
- elseif(nRecv==0)
- std::cout<<"peerclosed,108); list-style-type:decimal-leading-zero; color:inherit; line-height:18px; margin:0px!important; padding:0px 3px 0px 10px!important; list-style-position:outside!important"> strclientmsg+=buff;
- //出错了,就不要再继续往下执行了
- if(bError)
- std::cout<<"clientmsg:"<<strclientmsg;
- //将消息加上时间标签后发回
- time_tnow=time(NULL);
- structtm*nowstr=localtime(&now);
- std::ostringstreamostimestr;
- ostimestr<<"["<<nowstr->tm_year+1900<<"-"
- <<std::setw(2)<<std::setfill('0')<<nowstr->tm_mon+1<<"-"
- <<std::setw(2)<<std::setfill('0')<<nowstr->tm_mday<<""
- <<std::setw(2)<<std::setfill('0')<<nowstr->tm_hour<<":"
- <<std::setw(2)<<std::setfill('0')<<nowstr->tm_min<<":"
- <<std::setw(2)<<std::setfill('0')<<nowstr->tm_sec<<"]serverreply:";
- strclientmsg.insert(0,ostimestr.str());
- intnSent=::send(clientfd,strclientmsg.c_str(),strclientmsg.length(),153); background-color:inherit; font-weight:bold">if(nSent==-1)
- if(errno==EWOULDBLOCK)
- ::sleep(10);
- continue;
- std::cout<<"senderror,153); background-color:inherit; font-weight:bold">break;
- std::cout<<"send:"<<strclientmsg;
- strclientmsg.erase(0,nSent);
- if(strclientmsg.empty())
- returnNULL;
- voiddaemon_run()
- intpid;
- signal(SIGCHLD,0); background-color:inherit">//1)在父进程中,fork返回新创建子进程的进程ID;
- //2)在子进程中,fork返回0;
- //3)如果出现错误,fork返回一个负值;
- pid=fork();
- if(pid<0)
- std::cout<<"forkerror"<<std::endl;
- exit(-1);
- //父进程退出,子进程独立运行
- if(pid>0){
- exit(0);
- //之前parent和child运行在同一个session里,parent是会话(session)的领头进程,
- //parent进程作为会话的领头进程,如果exit结束执行的话,那么子进程会成为孤儿进程,并被init收养。
- //执行setsid()之后,child将重新获得一个新的会话(session)id。
- //这时parent退出之后,将不会影响到child了。
- setsid();
- intfd;
- fd=open("/dev/null",O_RDWR,153); background-color:inherit; font-weight:bold">if(fd!=-1)
- dup2(fd,STDIN_FILENO);
- dup2(fd,STDOUT_FILENO);
- if(fd>2)
- close(fd);
- intmain(intargc,87); background-color:inherit; font-weight:bold">char*argv[])
- shortport=0;
- intch;
- boolbdaemon=while((ch=getopt(argc,argv,"p:d"))!=-1)
- switch(ch)
- case'd':
- bdaemon=case'p':
- port=atol(optarg);
- if(bdaemon)
- daemon_run();
- if(port==0)
- port=12345;
- if(!create_server_listener("0.0.0.0",port))
- std::cout<<"Unabletocreatelistenserver:ip=0.0.0.0,port="<<port<<"."<<std::endl;
- return-1;
- //设置信号处理
- signal(SIGPIPE,248); line-height:18px; margin:0px!important; padding:0px 3px 0px 10px!important; list-style-position:outside!important"> signal(SIGINT,prog_exit);
- signal(SIGKILL,prog_exit);
- signal(SIGTERM,248); line-height:18px; margin:0px!important; padding:0px 3px 0px 10px!important; list-style-position:outside!important"> ::pthread_cond_init(&g_acceptcond,108); list-style-type:decimal-leading-zero; color:inherit; line-height:18px; margin:0px!important; padding:0px 3px 0px 10px!important; list-style-position:outside!important"> ::pthread_mutex_init(&g_acceptmutex,NULL);
- ::pthread_cond_init(&g_cond,248); line-height:18px; margin:0px!important; padding:0px 3px 0px 10px!important; list-style-position:outside!important"> ::pthread_mutex_init(&g_mutex,248); line-height:18px; margin:0px!important; padding:0px 3px 0px 10px!important; list-style-position:outside!important"> ::pthread_mutex_init(&g_clientmutex,248); line-height:18px; margin:0px!important; padding:0px 3px 0px 10px!important; list-style-position:outside!important"> ::pthread_create(&g_acceptthreadid,NULL,accept_thread_func,0); background-color:inherit">//启动工作线程
- for(inti=0;i<WORKER_THREAD_NUM;++i)
- ::pthread_create(&g_threadid[i],worker_thread_func,153); background-color:inherit; font-weight:bold">structepoll_eventev[1024];
- intn=::epoll_wait(g_epollfd,ev,1024,10);
- if(n==0)
- if(n<0)
- std::cout<<"epoll_waiterror"<<std::endl;
- intm=min(n,1024);
- inti=0;i<m;++i)
- //通知接收连接线程接收新连接
- if(ev[i].data.fd==g_listenfd)
- pthread_cond_signal(&g_acceptcond);
- //通知普通工作线程接收数据
- else
- pthread_mutex_lock(&g_clientmutex);
- g_listClients.push_back(ev[i].data.fd);
- pthread_mutex_unlock(&g_clientmutex);
- pthread_cond_signal(&g_cond);
- //std::cout<<"signal"<<std::endl;
- return0;
- }
程序的功能一个简单的echo服务:客户端连接上服务器之后,给服务器发送信息,服务器加上时间戳等信息后返回给客户端。使用到的知识点有:
1. 条件变量
2.epoll的边缘触发模式
程序的大致框架是:
1. 主线程只负责监听侦听socket上是否有新连接,如果有新连接到来,交给一个叫accept的工作线程去接收新连接,并将新连接socket绑定到主线程使用epollfd上去。
2. 主线程如果侦听到客户端的socket上有可读事件,则通知另外五个工作线程去接收处理客户端发来的数据,并将数据加上时间戳后发回给客户端。
3. 可以通过传递-p port来设置程序的监听端口号;可以通过传递-d来使程序以daemon模式运行在后台。这也是标准linux daemon模式的书写方法。
程序难点和需要注意的地方是:
1. 条件变量为了防止虚假唤醒,一定要在一个循环里面调用pthread_cond_wait()函数,我在worker_thread_func()中使用了:
如果不加上这一行,正常运行服务器程序,程序中要打印到控制台的信息都会打印出来,但是如果用gdb调试状态下,程序的所有输出就不显示了。我不知道这是不是gdb的一个bug,所以这里加上std::endl来输出一个换行符并flush标准输出,让输出显示出来。(std::endl不仅是输出一个换行符而且是同时刷新输出,相当于fflush()函数)。
程序我部署起来了,你可以使用linux的nc命令或自己写程序连接服务器来查看程序效果,当然也可以使用telnet命令,方法:
linux:
nc120.55.94.78 12345
或
telnet120.55.94.78 12345
然后就可以给服务器自由发送数据了,服务器会给你发送的信息加上时间戳返回给你。效果如图:
另外我将这个代码改写了成纯C++11版本,使用CMake编译,为了支持编译必须加上这-std=c++11:
CMakeLists.txt代码如下:
myreactor.h文件内容:
copy
*@desc:myreactor头文件,myreactor.h
*@date:2016.12.03
#ifndef__MYREACTOR_H__
#define__MYREACTOR_H__
#include<memory>
#include<thread>
#include<mutex>
#include<condition_variable>
classCMyReactor
public:
CMyReactor();
~CMyReactor();
boolinit(shortnport);
booluninit();
boolclose_client(intclientfd);
staticvoid*main_loop(void*p);
private:
//nocopyable
CMyReactor(constCMyReactor&rhs);
CMyReactor&operator=(constCMyReactor&rhs);
shortport);
voidaccept_thread_proc(CMyReactor*pReatcor);
voidworker_thread_proc(CMyReactor*pReatcor);
//C11语法可以在这里初始化
intm_listenfd=0;
intm_epollfd=0;
boolm_bStop= std::shared_ptr<std::thread>m_acceptthread;
std::shared_ptr<std::thread>m_workerthreads[WORKER_THREAD_NUM];
std::condition_variablem_acceptcond;
std::mutexm_acceptmutex;
std::condition_variablem_workercond;
std::mutexm_workermutex;
std::list<int>m_listClients;
};
#endif//!__MYREACTOR_H__
myreactor.cpp文件内容:
copy
*@desc:myreactor实现文件,myreactor.cpp
#include"myreactor.h"
#include<fcntl.h>
#include<sys/epoll.h>
#include<unistd.h>
#definemin(a,b)((a<=b)?(a):(b))
CMyReactor::CMyReactor()
//m_listenfd=0;
//m_epollfd=0;
//m_bStop=false;
CMyReactor::~CMyReactor()
boolCMyReactor::init(shortnport)
if(!create_server_listener(ip,nport))
std::cout<<"Unabletobind:"<<ip<<":"<<nport<<"."<<std::endl;
std::cout<<"mainthreadid="<<std::this_thread::get_id()<<std::endl;
//启动接收新连接的线程
m_acceptthread.reset(newstd::thread(CMyReactor::accept_thread_proc,153); background-color:inherit; font-weight:bold">this));
//启动工作线程
for(auto&t:m_workerthreads)
t.reset(thread(CMyReactor::worker_thread_proc,153); background-color:inherit; font-weight:bold">this));
boolCMyReactor::uninit()
m_bStop= m_acceptcond.notify_one();
m_workercond.notify_all();
m_acceptthread->join();
t->join();
::epoll_ctl(m_epollfd,m_listenfd,108); list-style-type:decimal-leading-zero; color:inherit; line-height:18px; margin:0px!important; padding:0px 3px 0px 10px!important; list-style-position:outside!important"> ::shutdown(m_listenfd,248); line-height:18px; margin:0px!important; padding:0px 3px 0px 10px!important; list-style-position:outside!important"> ::close(m_listenfd);
::close(m_epollfd);
boolCMyReactor::close_client(intclientfd)
if(::epoll_ctl(m_epollfd,NULL)==-1)
std::cout<<"closeclientsocketfailedascallepoll_ctlfailed"<<std::endl;
//returnfalse;
::close(clientfd);
void*CMyReactor::main_loop(void*p)
CMyReactor*pReatcor=static_cast<CMyReactor*>(p);
while(!pReatcor->m_bStop)
intn=::epoll_wait(pReatcor->m_epollfd,153); background-color:inherit; font-weight:bold">if(ev[i].data.fd==pReatcor->m_listenfd)
pReatcor->m_acceptcond.notify_one();
std::unique_lock<std::mutex>guard(pReatcor->m_workermutex);
pReatcor->m_listClients.push_back(ev[i].data.fd);
pReatcor->m_workercond.notify_one();
}
//endfor-loop
}
std::cout<<"mainloopexit..."<<std::endl;
voidCMyReactor::accept_thread_proc(CMyReactor*pReatcor)
std::cout<<"acceptthread,threadid="<<std::this_thread::get_id()<<std::endl;
intnewfd;
structsockaddr_inclientaddr;
socklen_taddrlen;
std::unique_lock<std::mutex>guard(pReatcor->m_acceptmutex);
pReatcor->m_acceptcond.wait(guard);
if(pReatcor->m_bStop)
//std::cout<<"runloopinaccept_thread_proc"<<std::endl;
newfd=::accept(pReatcor->m_listenfd,(if(newfd==-1)
std::cout<<"newclientconnected:"<<::inet_ntoa(clientaddr.sin_addr)<<":"<<::ntohs(clientaddr.sin_port)<<std::endl;
//将新socket设置为non-blocking
intnewflag=oldflag|O_NONBLOCK;
std::cout<<"fcntlerror,newflag="<<newflag<<std::endl;
structepoll_evente;
memset(&e,153); background-color:inherit; font-weight:bold">sizeof(e));
e.events=EPOLLIN|EPOLLRDHUP|EPOLLET;
e.data.fd=newfd;
if(::epoll_ctl(pReatcor->m_epollfd,&e)==-1)
std::cout<<"epoll_ctlerror,fd="<<newfd<<std::endl;
std::cout<<"acceptthreadexit..."<<std::endl;
voidCMyReactor::worker_thread_proc(CMyReactor*pReatcor)
std::cout<<"newworkerthread,153); background-color:inherit; font-weight:bold">while(pReatcor->m_listClients.empty())
if(pReatcor->m_bStop)
std::cout<<"workerthreadexit..."<<std::endl;
return;
pReatcor->m_workercond.wait(guard);
clientfd=pReatcor->m_listClients.front();
pReatcor->m_listClients.pop_front();
pReatcor->close_client(clientfd);
std::this_thread::sleep_for(std::chrono::milliseconds(10));
boolCMyReactor::create_server_listener(shortport)
m_listenfd=::socket(AF_INET,0);
if(m_listenfd==-1)
inton=1;
::setsockopt(m_listenfd,108); list-style-type:decimal-leading-zero; color:inherit; line-height:18px; margin:0px!important; padding:0px 3px 0px 10px!important; list-style-position:outside!important"> ::setsockopt(m_listenfd,153); background-color:inherit; font-weight:bold">structsockaddr_inservaddr;
memset(&servaddr,153); background-color:inherit; font-weight:bold">sizeof(servaddr));
servaddr.sin_family=AF_INET;
servaddr.sin_addr.s_addr=inet_addr(ip);
servaddr.sin_port=htons(port);
if(::bind(m_listenfd,153); background-color:inherit; font-weight:bold">sizeof(servaddr))==-1)
if(::listen(m_listenfd,50)==-1)
m_epollfd=::epoll_create(1);
if(m_epollfd==-1)
e.events=EPOLLIN|EPOLLRDHUP;
e.data.fd=m_listenfd;
}
main.cpp文件内容:
copy
*@desc:用reactor模式练习服务器程序
#include<stdlib.h>//forexit()
#include<sys/stat.h>
CMyReactorg_reator;
g_reator.uninit();
char*argv[])
//设置信号处理
signal(SIGCHLD,SIG_DFL);
signal(SIGPIPE,108); list-style-type:decimal-leading-zero; color:inherit; line-height:18px; margin:0px!important; padding:0px 3px 0px 10px!important; list-style-position:outside!important"> signal(SIGINT,248); line-height:18px; margin:0px!important; padding:0px 3px 0px 10px!important; list-style-position:outside!important"> signal(SIGKILL,108); list-style-type:decimal-leading-zero; color:inherit; line-height:18px; margin:0px!important; padding:0px 3px 0px 10px!important; list-style-position:outside!important"> signal(SIGTERM,153); background-color:inherit; font-weight:bold">if(!g_reator.init("0.0.0.0",12345))
return-1;
g_reator.main_loop(&g_reator);
return0;
}
完整实例代码下载地址:
普通版本:https://pan.baidu.com/s/1o82Mkno
C++11版本:https://pan.baidu.com/s/1dEJdrih