在accept_thread_func()函数里面我没有使用循环,这样会有问题吗?
2. 使用条件变量pthread_cond_wait()函数的时候一定要先获得与该条件变量相关的mutex,即像下面这样的结构:
copy
mutex_lock(...);
while(conditionis ::pthread_cond_wait(...);
//这里可以有其他代码...
mutex_unlock(...);
//这里可以有其他代码...
因为pthread_cond_wait()如果阻塞的话,它解锁相关mutex和阻塞当前线程这两个动作加在一起是原子的。
3. 作为服务器端程序最好对侦听socket调用setsocketopt()设置SO_REUSEADDR和SO_REUSEPORT两个标志,因为服务程序有时候会需要重启(比如调试的时候就会不断重启),如果不设置这两个标志的话,绑定端口时就会调用失败。因为一个端口使用后,即使不再使用,因为四次挥手该端口处于TIME_WAIT状态,有大约2min的MSL(Maximum Segment Lifetime,最大存活期)。这2min内,该端口是不能被重复使用的。你的服务器程序上次使用了这个端口号,接着重启,因为这个缘故,你再次绑定这个端口就会失败(bind函数调用失败)。要不你就每次重启时需要等待2min后再试(这在频繁重启程序调试是难以接收的),或者设置这种SO_REUSEADDR和SO_REUSEPORT立即回收端口使用。
其实,SO_REUSEADDR在windows上和Unix平台上还有些细微的区别,我在libevent源码中看到这样的描述:
copy
intevutil_make_listen_socket_reuseable(evutil_socket_tsock)
#ifndefWIN32
intone=1;
/*REUSEADDRonUnixmeans,"don'thangontothisaddressafterthe
*listenerisclosed."OnWindows,though,itmeans"don'tkeepother
*processesfrombindingtothisaddresswhilewe'reusingit.*/
returnsetsockopt(sock,153); background-color:inherit; font-weight:bold">void*)&one,
(ev_socklen_t)sizeof(one));
#else
#endif
}
注意注释部分,
在Unix平台上设置这个选项意味着,任意进程可以复用该地址;而在windows,不要阻止其他进程复用该地址。也就是在在Unix平台上,如果不设置这个选项,任意进程在一定时间内,不能bind该地址;在windows平台上,在一定时间内,其他进程不能bind该地址,而本进程却可以再次bind该地址。
4. epoll_wait对新连接socket使用的是边缘触发模式EPOLLET(edge trigger),而不是默认的水平触发模式(level trigger)。因为如果采取水平触发模式的话,主线程检测到某个客户端socket数据可读时,通知工作线程去收取该socket上的数据,这个时候主线程继续循环,只要在工作线程没有将该socket上数据全部收完,或者在工作线程收取数据的过程中,客户端有新数据到来,主线程会继续发通知(通过pthread_cond_signal())函数,再次通知工作线程收取数据。这样会可能导致多个工作线程同时调用recv函数收取该客户端socket上的数据,这样产生的结果将会导致数据错乱。
相反,采取边缘触发模式,只有等某个工作线程将那个客户端socket上数据全部收取完毕,主线程的epoll_wait才可能会再次触发来通知工作线程继续收取那个客户端socket新来的数据。
5. 代码中有这样一行:
如果不加上这一行,正常运行服务器程序,程序中要打印到控制台的信息都会打印出来,但是如果用gdb调试状态下,程序的所有输出就不显示了。我不知道这是不是gdb的一个bug,所以这里加上std::endl来输出一个换行符并flush标准输出,让输出显示出来。(std::endl不仅是输出一个换行符而且是同时刷新输出,相当于fflush()函数)。
程序我部署起来了,你可以使用linux的nc命令或自己写程序连接服务器来查看程序效果,当然也可以使用telnet命令,方法:
linux:
nc120.55.94.78 12345
或
telnet120.55.94.78 12345
然后就可以给服务器自由发送数据了,服务器会给你发送的信息加上时间戳返回给你。效果如图:
另外我将这个代码改写了成纯C++11版本,使用CMake编译,为了支持编译必须加上这-std=c++11:
CMakeLists.txt代码如下:
myreactor.h文件内容:
copy
*@desc:myreactor头文件,myreactor.h
*@date:2016.12.03
#ifndef__MYREACTOR_H__
#define__MYREACTOR_H__
#include<memory>
#include<thread>
#include<mutex>
#include<condition_variable>
classCMyReactor
public:
CMyReactor();
~CMyReactor();
boolinit(shortnport);
booluninit();
boolclose_client(intclientfd);
staticvoid*main_loop(void*p);
private:
//nocopyable
CMyReactor(constCMyReactor&rhs);
CMyReactor&operator=(constCMyReactor&rhs);
shortport);
voidaccept_thread_proc(CMyReactor*pReatcor);
voidworker_thread_proc(CMyReactor*pReatcor);
//C11语法可以在这里初始化
intm_listenfd=0;
intm_epollfd=0;
boolm_bStop= std::shared_ptr<std::thread>m_acceptthread;
std::shared_ptr<std::thread>m_workerthreads[WORKER_THREAD_NUM];
std::condition_variablem_acceptcond;
std::mutexm_acceptmutex;
std::condition_variablem_workercond;
std::mutexm_workermutex;
std::list<int>m_listClients;
};
#endif//!__MYREACTOR_H__
myreactor.cpp文件内容:
copy
*@desc:myreactor实现文件,myreactor.cpp
#include"myreactor.h"
#include<fcntl.h>
#include<sys/epoll.h>
#include<unistd.h>
#definemin(a,b)((a<=b)?(a):(b))
CMyReactor::CMyReactor()
//m_listenfd=0;
//m_epollfd=0;
//m_bStop=false;
CMyReactor::~CMyReactor()
boolCMyReactor::init(shortnport)
if(!create_server_listener(ip,nport))
std::cout<<"Unabletobind:"<<ip<<":"<<nport<<"."<<std::endl;
std::cout<<"mainthreadid="<<std::this_thread::get_id()<<std::endl;
//启动接收新连接的线程
m_acceptthread.reset(newstd::thread(CMyReactor::accept_thread_proc,153); background-color:inherit; font-weight:bold">this));
//启动工作线程
for(auto&t:m_workerthreads)
t.reset(thread(CMyReactor::worker_thread_proc,153); background-color:inherit; font-weight:bold">this));
boolCMyReactor::uninit()
m_bStop= m_acceptcond.notify_one();
m_workercond.notify_all();
m_acceptthread->join();
t->join();
::epoll_ctl(m_epollfd,m_listenfd,108); list-style-type:decimal-leading-zero; color:inherit; line-height:18px; margin:0px!important; padding:0px 3px 0px 10px!important; list-style-position:outside!important"> ::shutdown(m_listenfd,248); line-height:18px; margin:0px!important; padding:0px 3px 0px 10px!important; list-style-position:outside!important"> ::close(m_listenfd);
::close(m_epollfd);
boolCMyReactor::close_client(intclientfd)
if(::epoll_ctl(m_epollfd,NULL)==-1)
std::cout<<"closeclientsocketfailedascallepoll_ctlfailed"<<std::endl;
//returnfalse;
::close(clientfd);
void*CMyReactor::main_loop(void*p)
CMyReactor*pReatcor=static_cast<CMyReactor*>(p);
while(!pReatcor->m_bStop)
intn=::epoll_wait(pReatcor->m_epollfd,153); background-color:inherit; font-weight:bold">if(ev[i].data.fd==pReatcor->m_listenfd)
pReatcor->m_acceptcond.notify_one();
std::unique_lock<std::mutex>guard(pReatcor->m_workermutex);
pReatcor->m_listClients.push_back(ev[i].data.fd);
pReatcor->m_workercond.notify_one();
}
//endfor-loop
}
std::cout<<"mainloopexit..."<<std::endl;
voidCMyReactor::accept_thread_proc(CMyReactor*pReatcor)
std::cout<<"acceptthread,threadid="<<std::this_thread::get_id()<<std::endl;
intnewfd;
structsockaddr_inclientaddr;
socklen_taddrlen;
std::unique_lock<std::mutex>guard(pReatcor->m_acceptmutex);
pReatcor->m_acceptcond.wait(guard);
if(pReatcor->m_bStop)
//std::cout<<"runloopinaccept_thread_proc"<<std::endl;
newfd=::accept(pReatcor->m_listenfd,(if(newfd==-1)
std::cout<<"newclientconnected:"<<::inet_ntoa(clientaddr.sin_addr)<<":"<<::ntohs(clientaddr.sin_port)<<std::endl;
//将新socket设置为non-blocking
intnewflag=oldflag|O_NONBLOCK;
std::cout<<"fcntlerror,newflag="<<newflag<<std::endl;
structepoll_evente;
memset(&e,153); background-color:inherit; font-weight:bold">sizeof(e));
e.events=EPOLLIN|EPOLLRDHUP|EPOLLET;
e.data.fd=newfd;
if(::epoll_ctl(pReatcor->m_epollfd,&e)==-1)
std::cout<<"epoll_ctlerror,fd="<<newfd<<std::endl;
std::cout<<"acceptthreadexit..."<<std::endl;
voidCMyReactor::worker_thread_proc(CMyReactor*pReatcor)
std::cout<<"newworkerthread,153); background-color:inherit; font-weight:bold">while(pReatcor->m_listClients.empty())
if(pReatcor->m_bStop)
std::cout<<"workerthreadexit..."<<std::endl;
return;
pReatcor->m_workercond.wait(guard);
clientfd=pReatcor->m_listClients.front();
pReatcor->m_listClients.pop_front();
pReatcor->close_client(clientfd);
std::this_thread::sleep_for(std::chrono::milliseconds(10));
boolCMyReactor::create_server_listener(shortport)
m_listenfd=::socket(AF_INET,0);
if(m_listenfd==-1)
inton=1;
::setsockopt(m_listenfd,108); list-style-type:decimal-leading-zero; color:inherit; line-height:18px; margin:0px!important; padding:0px 3px 0px 10px!important; list-style-position:outside!important"> ::setsockopt(m_listenfd,153); background-color:inherit; font-weight:bold">structsockaddr_inservaddr;
memset(&servaddr,153); background-color:inherit; font-weight:bold">sizeof(servaddr));
servaddr.sin_family=AF_INET;
servaddr.sin_addr.s_addr=inet_addr(ip);
servaddr.sin_port=htons(port);
if(::bind(m_listenfd,153); background-color:inherit; font-weight:bold">sizeof(servaddr))==-1)
if(::listen(m_listenfd,50)==-1)
m_epollfd=::epoll_create(1);
if(m_epollfd==-1)
e.events=EPOLLIN|EPOLLRDHUP;
e.data.fd=m_listenfd;
}
main.cpp文件内容:
copy
*@desc:用reactor模式练习服务器程序
#include<stdlib.h>//forexit()
#include<sys/stat.h>
CMyReactorg_reator;
g_reator.uninit();
char*argv[])
//设置信号处理
signal(SIGCHLD,SIG_DFL);
signal(SIGPIPE,108); list-style-type:decimal-leading-zero; color:inherit; line-height:18px; margin:0px!important; padding:0px 3px 0px 10px!important; list-style-position:outside!important"> signal(SIGINT,248); line-height:18px; margin:0px!important; padding:0px 3px 0px 10px!important; list-style-position:outside!important"> signal(SIGKILL,108); list-style-type:decimal-leading-zero; color:inherit; line-height:18px; margin:0px!important; padding:0px 3px 0px 10px!important; list-style-position:outside!important"> signal(SIGTERM,153); background-color:inherit; font-weight:bold">if(!g_reator.init("0.0.0.0",12345))
return-1;
g_reator.main_loop(&g_reator);
return0;
}
完整实例代码下载地址:
普通版本:https://pan.baidu.com/s/1o82Mkno
C++11版本:https://pan.baidu.com/s/1dEJdrih