高性能服务器开发基础系列 (二)Reactor模式

前端之家收集整理的这篇文章主要介绍了高性能服务器开发基础系列 (二)Reactor模式前端之家小编觉得挺不错的,现在分享给大家,也给大家做个参考。

系列目录

第01篇 主线程与工作线程的分工

第02篇 Reactor模式

第03篇 一个服务器程序的架构介绍

第04篇 如何将socket设置为非阻塞模式

第05篇 如何编写高性能日志

第06篇 关于网络编程的一些实用技巧和细节

第07篇 开源一款即时通讯软件的源码

第08篇 高性能服务器架构设计总结1

第09篇 高性能服务器架构设计总结2

第10篇 高性能服务器架构设计总结3

第11篇 高性能服务器架构设计总结4

最近一直在看游双的《性能linux服务器编程》一书,下载链接http://download.csdn.net/deta...

书上是这么介绍Reactor模式的:

按照这个思路,我写个简单的练习:

  1. /**
  2. *@desc: 用reactor模式练习服务器程序,main.cpp
  3. *@author: zhangyl
  4. *@date: 2016.11.23
  5. */
  6.  
  7. #include <iostream>
  8. #include <string.h>
  9. #include <sys/types.h>
  10. #include <sys/socket.h>
  11. #include <netinet/in.h>
  12. #include <arpa/inet.h> //for htonl() and htons()
  13. #include <unistd.h>
  14. #include <fcntl.h>
  15. #include <sys/epoll.h>
  16. #include <signal.h> //for signal()
  17. #include <pthread.h>
  18. #include <semaphore.h>
  19. #include <list>
  20. #include <errno.h>
  21. #include <time.h>
  22. #include <sstream>
  23. #include <iomanip> //for std::setw()/setfill()
  24. #include <stdlib.h>
  25.  
  26.  
  27. #define WORKER_THREAD_NUM 5
  28.  
  29. #define min(a,b) ((a <= b) ? (a) : (b))
  30.  
  31. int g_epollfd = 0;
  32. bool g_bStop = false;
  33. int g_listenfd = 0;
  34. pthread_t g_acceptthreadid = 0;
  35. pthread_t g_threadid[WORKER_THREAD_NUM] = { 0 };
  36. pthread_cond_t g_acceptcond;
  37. pthread_mutex_t g_acceptmutex;
  38.  
  39. pthread_cond_t g_cond /*= PTHREAD_COND_INITIALIZER*/;
  40. pthread_mutex_t g_mutex /*= PTHREAD_MUTEX_INITIALIZER*/;
  41.  
  42. pthread_mutex_t g_clientmutex;
  43.  
  44. std::list<int> g_listClients;
  45.  
  46. void prog_exit(int signo)
  47. {
  48. ::signal(SIGINT,SIG_IGN);
  49. //::signal(SIGKILL,SIG_IGN);//该信号不能被阻塞、处理或者忽略
  50. ::signal(SIGTERM,SIG_IGN);
  51.  
  52. std::cout << "program recv signal " << signo << " to exit." << std::endl;
  53.  
  54. g_bStop = true;
  55.  
  56. ::epoll_ctl(g_epollfd,EPOLL_CTL_DEL,g_listenfd,NULL);
  57.  
  58. //TODO: 是否需要先调用shutdown()一下?
  59. ::shutdown(g_listenfd,SHUT_RDWR);
  60. ::close(g_listenfd);
  61. ::close(g_epollfd);
  62.  
  63. ::pthread_cond_destroy(&g_acceptcond);
  64. ::pthread_mutex_destroy(&g_acceptmutex);
  65. ::pthread_cond_destroy(&g_cond);
  66. ::pthread_mutex_destroy(&g_mutex);
  67.  
  68. ::pthread_mutex_destroy(&g_clientmutex);
  69. }
  70.  
  71. bool create_server_listener(const char* ip,short port)
  72. {
  73. g_listenfd = ::socket(AF_INET,SOCK_STREAM | SOCK_NONBLOCK,0);
  74. if (g_listenfd == -1)
  75. return false;
  76.  
  77. int on = 1;
  78. ::setsockopt(g_listenfd,SOL_SOCKET,SO_REUSEADDR,(char *)&on,sizeof(on));
  79. ::setsockopt(g_listenfd,SO_REUSEPORT,sizeof(on));
  80.  
  81. struct sockaddr_in servaddr;
  82. memset(&servaddr,sizeof(servaddr));
  83. servaddr.sin_family = AF_INET;
  84. servaddr.sin_addr.s_addr = inet_addr(ip);
  85. servaddr.sin_port = htons(port);
  86. if (::bind(g_listenfd,(sockaddr *)&servaddr,sizeof(servaddr)) == -1)
  87. return false;
  88.  
  89. if (::listen(g_listenfd,50) == -1)
  90. return false;
  91.  
  92. g_epollfd = ::epoll_create(1);
  93. if (g_epollfd == -1)
  94. return false;
  95.  
  96. struct epoll_event e;
  97. memset(&e,sizeof(e));
  98. e.events = EPOLLIN | EPOLLRDHUP;
  99. e.data.fd = g_listenfd;
  100. if (::epoll_ctl(g_epollfd,EPOLL_CTL_ADD,&e) == -1)
  101. return false;
  102.  
  103. return true;
  104. }
  105.  
  106. void release_client(int clientfd)
  107. {
  108. if (::epoll_ctl(g_epollfd,clientfd,NULL) == -1)
  109. std::cout << "release client socket Failed as call epoll_ctl Failed" << std::endl;
  110.  
  111. ::close(clientfd);
  112. }
  113.  
  114. void* accept_thread_func(void* arg)
  115. {
  116. while (!g_bStop)
  117. {
  118. ::pthread_mutex_lock(&g_acceptmutex);
  119. ::pthread_cond_wait(&g_acceptcond,&g_acceptmutex);
  120. //::pthread_mutex_lock(&g_acceptmutex);
  121.  
  122. //std::cout << "run loop in accept_thread_func" << std::endl;
  123.  
  124. struct sockaddr_in clientaddr;
  125. socklen_t addrlen;
  126. int newfd = ::accept(g_listenfd,(struct sockaddr *)&clientaddr,&addrlen);
  127. ::pthread_mutex_unlock(&g_acceptmutex);
  128. if (newfd == -1)
  129. continue;
  130.  
  131. std::cout << "new client connected: " << ::inet_ntoa(clientaddr.sin_addr) << ":" << ::ntohs(clientaddr.sin_port) << std::endl;
  132.  
  133. //将新socket设置为non-blocking
  134. int oldflag = ::fcntl(newfd,F_GETFL,0);
  135. int newflag = oldflag | O_NONBLOCK;
  136. if (::fcntl(newfd,F_SETFL,newflag) == -1)
  137. {
  138. std::cout << "fcntl error,oldflag =" << oldflag << ",newflag = " << newflag << std::endl;
  139. continue;
  140. }
  141.  
  142. struct epoll_event e;
  143. memset(&e,sizeof(e));
  144. e.events = EPOLLIN | EPOLLRDHUP | EPOLLET;
  145. e.data.fd = newfd;
  146. if (::epoll_ctl(g_epollfd,newfd,&e) == -1)
  147. {
  148. std::cout << "epoll_ctl error,fd =" << newfd << std::endl;
  149. }
  150. }
  151.  
  152. return NULL;
  153. }
  154.  
  155.  
  156. void* worker_thread_func(void* arg)
  157. {
  158. while (!g_bStop)
  159. {
  160. int clientfd;
  161. ::pthread_mutex_lock(&g_clientmutex);
  162. while (g_listClients.empty())
  163. ::pthread_cond_wait(&g_cond,&g_clientmutex);
  164. clientfd = g_listClients.front();
  165. g_listClients.pop_front();
  166. pthread_mutex_unlock(&g_clientmutex);
  167.  
  168. //gdb调试时不能实时刷新标准输出,用这个函数刷新标准输出,使信息在屏幕上实时显示出来
  169. std::cout << std::endl;
  170.  
  171. std::string strclientmsg;
  172. char buff[256];
  173. bool bError = false;
  174. while (true)
  175. {
  176. memset(buff,sizeof(buff));
  177. int nRecv = ::recv(clientfd,buff,256,0);
  178. if (nRecv == -1)
  179. {
  180. if (errno == EWOULDBLOCK)
  181. break;
  182. else
  183. {
  184. std::cout << "recv error,client disconnected,fd = " << clientfd << std::endl;
  185. release_client(clientfd);
  186. bError = true;
  187. break;
  188. }
  189. }
  190. //对端关闭了socket,这端也关闭
  191. else if (nRecv == 0)
  192. {
  193. std::cout << "peer closed,fd = " << clientfd << std::endl;
  194. release_client(clientfd);
  195. bError = true;
  196. break;
  197. }
  198.  
  199. strclientmsg += buff;
  200. }
  201.  
  202. //出错了,就不要再继续往下执行了
  203. if (bError)
  204. continue;
  205. std::cout << "client msg: " << strclientmsg;
  206.  
  207. //将消息加上时间标签后发回
  208. time_t now = time(NULL);
  209. struct tm* nowstr = localtime(&now);
  210. std::ostringstream ostimestr;
  211. ostimestr << "[" << nowstr->tm_year + 1900 << "-"
  212. << std::setw(2) << std::setfill('0') << nowstr->tm_mon + 1 << "-"
  213. << std::setw(2) << std::setfill('0') << nowstr->tm_mday << " "
  214. << std::setw(2) << std::setfill('0') << nowstr->tm_hour << ":"
  215. << std::setw(2) << std::setfill('0') << nowstr->tm_min << ":"
  216. << std::setw(2) << std::setfill('0') << nowstr->tm_sec << "]server reply: ";
  217.  
  218. strclientmsg.insert(0,ostimestr.str());
  219. while (true)
  220. {
  221. int nSent = ::send(clientfd,strclientmsg.c_str(),strclientmsg.length(),0);
  222. if (nSent == -1)
  223. {
  224. if (errno == EWOULDBLOCK)
  225. {
  226. ::sleep(10);
  227. continue;
  228. }
  229. else
  230. {
  231. std::cout << "send error,fd = " << clientfd << std::endl;
  232. release_client(clientfd);
  233. break;
  234. }
  235. }
  236.  
  237. std::cout << "send: " << strclientmsg;
  238. strclientmsg.erase(0,nSent);
  239.  
  240. if (strclientmsg.empty())
  241. break;
  242. }
  243. }
  244.  
  245. return NULL;
  246. }
  247.  
  248. void daemon_run()
  249. {
  250. int pid;
  251. signal(SIGCHLD,SIG_IGN);
  252. //1)在父进程中,fork返回新创建子进程的进程ID;
  253. //2)在子进程中,fork返回0;
  254. //3)如果出现错误,fork返回一个负值;
  255. pid = fork();
  256. if (pid < 0)
  257. {
  258. std:: cout << "fork error" << std::endl;
  259. exit(-1);
  260. }
  261. //父进程退出,子进程独立运行
  262. else if (pid > 0) {
  263. exit(0);
  264. }
  265. //之前parent和child运行在同一个session里,parent是会话(session)的领头进程,//parent进程作为会话的领头进程,如果exit结束执行的话,那么子进程会成为孤儿进程,并被init收养。
  266. //执行setsid()之后,child将重新获得一个新的会话(session)id。
  267. //这时parent退出之后,将不会影响到child了。
  268. setsid();
  269. int fd;
  270. fd = open("/dev/null",O_RDWR,0);
  271. if (fd != -1)
  272. {
  273. dup2(fd,STDIN_FILENO);
  274. dup2(fd,STDOUT_FILENO);
  275. dup2(fd,STDERR_FILENO);
  276. }
  277. if (fd > 2)
  278. close(fd);
  279. }
  280.  
  281.  
  282. int main(int argc,char* argv[])
  283. {
  284. short port = 0;
  285. int ch;
  286. bool bdaemon = false;
  287. while ((ch = getopt(argc,argv,"p:d")) != -1)
  288. {
  289. switch (ch)
  290. {
  291. case 'd':
  292. bdaemon = true;
  293. break;
  294. case 'p':
  295. port = atol(optarg);
  296. break;
  297. }
  298. }
  299.  
  300. if (bdaemon)
  301. daemon_run();
  302.  
  303.  
  304. if (port == 0)
  305. port = 12345;
  306. if (!create_server_listener("0.0.0.0",port))
  307. {
  308. std::cout << "Unable to create listen server: ip=0.0.0.0,port=" << port << "." << std::endl;
  309. return -1;
  310. }
  311.  
  312. //设置信号处理
  313. signal(SIGCHLD,SIG_DFL);
  314. signal(SIGPIPE,SIG_IGN);
  315. signal(SIGINT,prog_exit);
  316. //signal(SIGKILL,prog_exit);<span style="font-family:Arial,Helvetica,sans-serif;">//该信号不能被阻塞、处理或者忽略</span>
  317.  
  318. signal(SIGTERM,prog_exit);
  319.  
  320. ::pthread_cond_init(&g_acceptcond,NULL);
  321. ::pthread_mutex_init(&g_acceptmutex,NULL);
  322.  
  323. ::pthread_cond_init(&g_cond,NULL);
  324. ::pthread_mutex_init(&g_mutex,NULL);
  325.  
  326. ::pthread_mutex_init(&g_clientmutex,NULL);
  327. ::pthread_create(&g_acceptthreadid,NULL,accept_thread_func,NULL);
  328. //启动工作线程
  329. for (int i = 0; i < WORKER_THREAD_NUM; ++i)
  330. {
  331. ::pthread_create(&g_threadid[i],worker_thread_func,NULL);
  332. }
  333.  
  334. while (!g_bStop)
  335. {
  336. struct epoll_event ev[1024];
  337. int n = ::epoll_wait(g_epollfd,ev,1024,10);
  338. if (n == 0)
  339. continue;
  340. else if (n < 0)
  341. {
  342. std::cout << "epoll_wait error" << std::endl;
  343. continue;
  344. }
  345.  
  346. int m = min(n,1024);
  347. for (int i = 0; i < m; ++i)
  348. {
  349. //通知接收连接线程接收新连接
  350. if (ev[i].data.fd == g_listenfd)
  351. pthread_cond_signal(&g_acceptcond);
  352. //通知普通工作线程接收数据
  353. else
  354. {
  355. pthread_mutex_lock(&g_clientmutex);
  356. g_listClients.push_back(ev[i].data.fd);
  357. pthread_mutex_unlock(&g_clientmutex);
  358. pthread_cond_signal(&g_cond);
  359. //std::cout << "signal" << std::endl;
  360. }
  361. }
  362.  
  363. }
  364. return 0;
  365. }

程序的功能一个简单的echo服务:客户端连接上服务器之后,给服务器发送信息,服务器加上时间戳等信息后返回给客户端。

使用到的知识点有:

1.条件变量

2.epoll的边缘触发模式

程序的大致框架是:

主线程只负责监听侦听socket上是否有新连接,如果有新连接到来,交给一个叫accept的工作线程去接收新连接,并将新连接socket绑定到主线程使用epollfd上去。

主线程如果侦听到客户端的socket上有可读事件,则通知另外五个工作线程去接收处理客户端发来的数据,并将数据加上时间戳后发回给客户端。

可以通过传递-p port来设置程序的监听端口号;可以通过传递-d来使程序以daemon模式运行在后台。这也是标准linux daemon模式的书写方法

程序难点和需要注意的地方是:

条件变量为了防止虚假唤醒,一定要在一个循环里面调用pthread_cond_wait()函数,我在worker_thread_func()中使用了:

  1. while (g_listClients.empty())
  2. ::pthread_cond_wait(&g_cond,&g_clientmutex);

accept_thread_func()函数里面我没有使用循环,这样会有问题吗?

使用条件变量pthread_cond_wait()函数的时候一定要先获得与该条件变量相关的mutex,即像下面这样的结构:

  1. mutex_lock(...);
  2.  
  3. while (condition is true)
  4. ::pthread_cond_wait(...);
  5.  
  6. //这里可以有其他代码...
  7. mutex_unlock(...);
  8.  
  9. //这里可以有其他代码...

因为pthread_cond_wait()如果阻塞的话,它解锁相关mutex和阻塞当前线程这两个动作加在一起是原子的。

作为服务器端程序最好对侦听socket调用setsocketopt()设置SO_REUSEADDR和SO_REUSEPORT两个标志,因为服务程序有时候会需要重启(比如调试的时候就会不断重启),如果不设置这两个标志的话,绑定端口时就会调用失败。因为一个端口使用后,即使不再使用,因为四次挥手该端口处于TIME_WAIT状态,有大约2min的MSL(Maximum Segment Lifetime,最大存活期)。这2min内,该端口是不能被重复使用的。你的服务器程序上次使用了这个端口号,接着重启,因为这个缘故,你再次绑定这个端口就会失败(bind函数调用失败)。要不你就每次重启时需要等待2min后再试(这在频繁重启程序调试是难以接收的),或者设置这种SO_REUSEADDR和SO_REUSEPORT立即回收端口使用。

其实,SO_REUSEADDR在windows上和Unix平台上还有些细微的区别,我在libevent源码中看到这样的描述:

  1. int evutil_make_listen_socket_reuseable(evutil_socket_t sock)
  2. {
  3. #ifndef WIN32
  4. int one = 1;
  5. /* REUSEADDR on Unix means,"don't hang on to this address after the
  6. * listener is closed." On Windows,though,it means "don't keep other
  7. * processes from binding to this address while we're using it. */
  8. return setsockopt(sock,(void*) &one,(ev_socklen_t)sizeof(one));
  9. #else
  10. return 0;
  11. #endif
  12. }

注意注释部分,在Unix平台上设置这个选项意味着,任意进程可以复用该地址;而在windows,不要阻止其他进程复用该地址。也就是在在Unix平台上,如果不设置这个选项,任意进程在一定时间内,不能bind该地址;在windows平台上,在一定时间内,其他进程不能bind该地址,而本进程却可以再次bind该地址。

epoll_wait对新连接socket使用的是边缘触发模式EPOLLET(edge trigger),而不是默认的水平触发模式(level trigger)。因为如果采取水平触发模式的话,主线程检测到某个客户端socket数据可读时,通知工作线程去收取该socket上的数据,这个时候主线程继续循环,只要在工作线程没有将该socket上数据全部收完,或者在工作线程收取数据的过程中,客户端有新数据到来,主线程会继续发通知(通过pthread_cond_signal())函数,再次通知工作线程收取数据。这样会可能导致多个工作线程同时调用recv函数收取该客户端socket上的数据,这样产生的结果将会导致数据错乱。

相反,采取边缘触发模式,只有等某个工作线程将那个客户端socket上数据全部收取完毕,主线程的epoll_wait才可能会再次触发来通知工作线程继续收取那个客户端socket新来的数据。

代码中有这样一行:

  1. //gdb调试时不能实时刷新标准输出,用这个函数刷新标准输出,使信息在屏幕上实时显示出来
  2. std::cout << std::endl;

如果不加上这一行,正常运行服务器程序,程序中要打印到控制台的信息都会打印出来,但是如果用gdb调试状态下,程序的所有输出不显示了。我不知道这是不是gdb的一个bug,所以这里加上std::endl来输出一个换行符并flush标准输出,让输出显示出来。(std::endl不仅是输出一个换行符而且是同时刷新输出,相当于fflush()函数)。

程序我部署起来了,你可以使用linux的nc命令或自己写程序连接服务器来查看程序效果,当然也可以使用telnet命令,方法

linux:

nc 120.55.94.78 12345

telnet 120.55.94.78 12345

然后就可以给服务器自由发送数据了,服务器会给你发送的信息加上时间戳返回给你。效果如图:

另外我将这个代码改写了成纯C++11版本,使用CMake编译,为了支持编译必须加上这-std=c++11:

CMakeLists.txt代码如下:

  1. cmake_minimum_required(VERSION 2.8)
  2.  
  3. PROJECT(myreactorserver)
  4.  
  5. AUX_SOURCE_DIRECTORY(./ SRC_LIST)
  6. SET(EXECUTABLE_OUTPUT_PATH ./)
  7.  
  8. ADD_DEFINITIONS(-g -W -Wall -Wno-deprecated -DLINUX -D_REENTRANT -D_FILE_OFFSET_BITS=64 -DAC_HAS_INFO -DAC_HAS_WARNING -DAC_HAS_ERROR -DAC_HAS_CRITICAL -DTIXML_USE_STL -DHAVE_CXX_STDHEADERS ${CMAKE_CXX_FLAGS} -std=c++11)
  9.  
  10. INCLUDE_DIRECTORIES(
  11. ./
  12. )
  13. LINK_DIRECTORIES(
  14. ./
  15. )
  16.  
  17. set(
  18. main.cpp
  19. myreator.cpp
  20. )
  21.  
  22. ADD_EXECUTABLE(myreactorserver ${SRC_LIST})
  23.  
  24. TARGET_LINK_LIBRARIES(myreactorserver pthread)

myreactor.h文件内容

  1. /**
  2. *@desc: myreactor头文件,myreactor.h
  3. *@author: zhangyl
  4. *@date: 2016.12.03
  5. */
  6. #ifndef __MYREACTOR_H__
  7. #define __MYREACTOR_H__
  8.  
  9. #include <list>
  10. #include <memory>
  11. #include <thread>
  12. #include <mutex>
  13. #include <condition_variable>
  14.  
  15. #define WORKER_THREAD_NUM 5
  16.  
  17. class CMyReactor
  18. {
  19. public:
  20. CMyReactor();
  21. ~CMyReactor();
  22.  
  23. bool init(const char* ip,short nport);
  24. bool uninit();
  25.  
  26. bool close_client(int clientfd);
  27.  
  28. static void* main_loop(void* p);
  29.  
  30. private:
  31. //no copyable
  32. CMyReactor(const CMyReactor& rhs);
  33. CMyReactor& operator = (const CMyReactor& rhs);
  34.  
  35. bool create_server_listener(const char* ip,short port);
  36. static void accept_thread_proc(CMyReactor* pReatcor);
  37. static void worker_thread_proc(CMyReactor* pReatcor);
  38.  
  39. private:
  40. //C11语法可以在这里初始化
  41. int m_listenfd = 0;
  42. int m_epollfd = 0;
  43. bool m_bStop = false;
  44. std::shared_ptr<std::thread> m_acceptthread;
  45. std::shared_ptr<std::thread> m_workerthreads[WORKER_THREAD_NUM];
  46. std::condition_variable m_acceptcond;
  47. std::mutex m_acceptmutex;
  48.  
  49. std::condition_variable m_workercond ;
  50. std::mutex m_workermutex;
  51.  
  52. std::list<int> m_listClients;
  53. };
  54.  
  55. #endif //!__MYREACTOR_H__

myreactor.cpp文件内容

  1. /**
  2. *@desc: myreactor实现文件,myreactor.cpp
  3. *@author: zhangyl
  4. *@date: 2016.12.03
  5. */
  6. #include "myreactor.h"
  7. #include <iostream>
  8. #include <string.h>
  9. #include <sys/types.h>
  10. #include <sys/socket.h>
  11. #include <netinet/in.h>
  12. #include <arpa/inet.h> //for htonl() and htons()
  13. #include <fcntl.h>
  14. #include <sys/epoll.h>
  15. #include <list>
  16. #include <errno.h>
  17. #include <time.h>
  18. #include <sstream>
  19. #include <iomanip> //for std::setw()/setfill()
  20. #include <unistd.h>
  21.  
  22. #define min(a,b) ((a <= b) ? (a) : (b))
  23.  
  24. CMyReactor::CMyReactor()
  25. {
  26. //m_listenfd = 0;
  27. //m_epollfd = 0;
  28. //m_bStop = false;
  29. }
  30.  
  31. CMyReactor::~CMyReactor()
  32. {
  33.  
  34. }
  35.  
  36. bool CMyReactor::init(const char* ip,short nport)
  37. {
  38. if (!create_server_listener(ip,nport))
  39. {
  40. std::cout << "Unable to bind: " << ip << ":" << nport << "." << std::endl;
  41. return false;
  42. }
  43.  
  44.  
  45. std::cout << "main thread id = " << std::this_thread::get_id() << std::endl;
  46.  
  47. //启动接收新连接的线程
  48. m_acceptthread.reset(new std::thread(CMyReactor::accept_thread_proc,this));
  49. //启动工作线程
  50. for (auto& t : m_workerthreads)
  51. {
  52. t.reset(new std::thread(CMyReactor::worker_thread_proc,this));
  53. }
  54.  
  55.  
  56. return true;
  57. }
  58.  
  59. bool CMyReactor::uninit()
  60. {
  61. m_bStop = true;
  62. m_acceptcond.notify_one();
  63. m_workercond.notify_all();
  64.  
  65. m_acceptthread->join();
  66. for (auto& t : m_workerthreads)
  67. {
  68. t->join();
  69. }
  70.  
  71. ::epoll_ctl(m_epollfd,m_listenfd,NULL);
  72.  
  73. //TODO: 是否需要先调用shutdown()一下?
  74. ::shutdown(m_listenfd,SHUT_RDWR);
  75. ::close(m_listenfd);
  76. ::close(m_epollfd);
  77.  
  78. return true;
  79. }
  80.  
  81. bool CMyReactor::close_client(int clientfd)
  82. {
  83. if (::epoll_ctl(m_epollfd,NULL) == -1)
  84. {
  85. std::cout << "close client socket Failed as call epoll_ctl Failed" << std::endl;
  86. //return false;
  87. }
  88.  
  89. ::close(clientfd);
  90.  
  91. return true;
  92. }
  93.  
  94.  
  95. void* CMyReactor::main_loop(void* p)
  96. {
  97. std::cout << "main thread id = " << std::this_thread::get_id() << std::endl;
  98. CMyReactor* pReatcor = static_cast<CMyReactor*>(p);
  99. while (!pReatcor->m_bStop)
  100. {
  101. struct epoll_event ev[1024];
  102. int n = ::epoll_wait(pReatcor->m_epollfd,1024);
  103. for (int i = 0; i < m; ++i)
  104. {
  105. //通知接收连接线程接收新连接
  106. if (ev[i].data.fd == pReatcor->m_listenfd)
  107. pReatcor->m_acceptcond.notify_one();
  108. //通知普通工作线程接收数据
  109. else
  110. {
  111. {
  112. std::unique_lock<std::mutex> guard(pReatcor->m_workermutex);
  113. pReatcor->m_listClients.push_back(ev[i].data.fd);
  114. }
  115. pReatcor->m_workercond.notify_one();
  116. //std::cout << "signal" << std::endl;
  117. }// end if
  118.  
  119. }// end for-loop
  120. }// end while
  121.  
  122. std::cout << "main loop exit ..." << std::endl;
  123.  
  124. return NULL;
  125. }
  126.  
  127. void CMyReactor::accept_thread_proc(CMyReactor* pReatcor)
  128. {
  129. std::cout << "accept thread,thread id = " << std::this_thread::get_id() << std::endl;
  130.  
  131. while (true)
  132. {
  133. int newfd;
  134. struct sockaddr_in clientaddr;
  135. socklen_t addrlen;
  136. {
  137. std::unique_lock<std::mutex> guard(pReatcor->m_acceptmutex);
  138. pReatcor->m_acceptcond.wait(guard);
  139. if (pReatcor->m_bStop)
  140. break;
  141.  
  142. //std::cout << "run loop in accept_thread_proc" << std::endl;
  143. newfd = ::accept(pReatcor->m_listenfd,&addrlen);
  144. }
  145. if (newfd == -1)
  146. continue;
  147.  
  148. std::cout << "new client connected: " << ::inet_ntoa(clientaddr.sin_addr) << ":" << ::ntohs(clientaddr.sin_port) << std::endl;
  149.  
  150. //将新socket设置为non-blocking
  151. int oldflag = ::fcntl(newfd,sizeof(e));
  152. e.events = EPOLLIN | EPOLLRDHUP | EPOLLET;
  153. e.data.fd = newfd;
  154. if (::epoll_ctl(pReatcor->m_epollfd,fd =" << newfd << std::endl;
  155. }
  156. }
  157.  
  158. std::cout << "accept thread exit ..." << std::endl;
  159. }
  160.  
  161. void CMyReactor::worker_thread_proc(CMyReactor* pReatcor)
  162. {
  163. std::cout << "new worker thread,thread id = " << std::this_thread::get_id() << std::endl;
  164.  
  165. while (true)
  166. {
  167. int clientfd;
  168. {
  169. std::unique_lock<std::mutex> guard(pReatcor->m_workermutex);
  170. while (pReatcor->m_listClients.empty())
  171. {
  172. if (pReatcor->m_bStop)
  173. {
  174. std::cout << "worker thread exit ..." << std::endl;
  175. return;
  176. }
  177. pReatcor->m_workercond.wait(guard);
  178. }
  179. clientfd = pReatcor->m_listClients.front();
  180. pReatcor->m_listClients.pop_front();
  181. }
  182.  
  183. //gdb调试时不能实时刷新标准输出,用这个函数刷新标准输出,使信息在屏幕上实时显示出来
  184. std::cout << std::endl;
  185.  
  186. std::string strclientmsg;
  187. char buff[256];
  188. bool bError = false;
  189. while (true)
  190. {
  191. memset(buff,fd = " << clientfd << std::endl;
  192. pReatcor->close_client(clientfd);
  193. bError = true;
  194. break;
  195. }
  196.  
  197. }
  198. //对端关闭了socket,这端也关闭
  199. else if (nRecv == 0)
  200. {
  201. std::cout << "peer closed,fd = " << clientfd << std::endl;
  202. pReatcor->close_client(clientfd);
  203. bError = true;
  204. break;
  205. }
  206.  
  207. strclientmsg += buff;
  208. }
  209.  
  210. //出错了,就不要再继续往下执行了
  211. if (bError)
  212. continue;
  213.  
  214. std::cout << "client msg: " << strclientmsg;
  215.  
  216. //将消息加上时间标签后发回
  217. time_t now = time(NULL);
  218. struct tm* nowstr = localtime(&now);
  219. std::ostringstream ostimestr;
  220. ostimestr << "[" << nowstr->tm_year + 1900 << "-"
  221. << std::setw(2) << std::setfill('0') << nowstr->tm_mon + 1 << "-"
  222. << std::setw(2) << std::setfill('0') << nowstr->tm_mday << " "
  223. << std::setw(2) << std::setfill('0') << nowstr->tm_hour << ":"
  224. << std::setw(2) << std::setfill('0') << nowstr->tm_min << ":"
  225. << std::setw(2) << std::setfill('0') << nowstr->tm_sec << "]server reply: ";
  226.  
  227. strclientmsg.insert(0,ostimestr.str());
  228.  
  229. while (true)
  230. {
  231. int nSent = ::send(clientfd,0);
  232. if (nSent == -1)
  233. {
  234. if (errno == EWOULDBLOCK)
  235. {
  236. std::this_thread::sleep_for(std::chrono::milliseconds(10));
  237. continue;
  238. }
  239. else
  240. {
  241. std::cout << "send error,fd = " << clientfd << std::endl;
  242. pReatcor->close_client(clientfd);
  243. break;
  244. }
  245.  
  246. }
  247.  
  248. std::cout << "send: " << strclientmsg;
  249. strclientmsg.erase(0,nSent);
  250.  
  251. if (strclientmsg.empty())
  252. break;
  253. }
  254. }
  255. }
  256.  
  257. bool CMyReactor::create_server_listener(const char* ip,short port)
  258. {
  259. m_listenfd = ::socket(AF_INET,0);
  260. if (m_listenfd == -1)
  261. return false;
  262.  
  263. int on = 1;
  264. ::setsockopt(m_listenfd,sizeof(on));
  265. ::setsockopt(m_listenfd,sizeof(servaddr));
  266. servaddr.sin_family = AF_INET;
  267. servaddr.sin_addr.s_addr = inet_addr(ip);
  268. servaddr.sin_port = htons(port);
  269. if (::bind(m_listenfd,sizeof(servaddr)) == -1)
  270. return false;
  271.  
  272. if (::listen(m_listenfd,50) == -1)
  273. return false;
  274.  
  275. m_epollfd = ::epoll_create(1);
  276. if (m_epollfd == -1)
  277. return false;
  278.  
  279. struct epoll_event e;
  280. memset(&e,sizeof(e));
  281. e.events = EPOLLIN | EPOLLRDHUP;
  282. e.data.fd = m_listenfd;
  283. if (::epoll_ctl(m_epollfd,&e) == -1)
  284. return false;
  285.  
  286. return true;
  287. }

main.cpp文件内容

  1. /**
  2. *@desc: 用reactor模式练习服务器程序
  3. *@author: zhangyl
  4. *@date: 2016.12.03
  5. */
  6.  
  7. #include <iostream>
  8. #include <signal.h> //for signal()
  9. #include<unistd.h>
  10. #include <stdlib.h> //for exit()
  11. #include <sys/types.h>
  12. #include <sys/stat.h>
  13. #include <fcntl.h>
  14. #include "myreactor.h"
  15.  
  16. CMyReactor g_reator;
  17.  
  18. void prog_exit(int signo)
  19. {
  20. std::cout << "program recv signal " << signo << " to exit." << std::endl;
  21.  
  22. g_reator.uninit();
  23. }
  24.  
  25. void daemon_run()
  26. {
  27. int pid;
  28. signal(SIGCHLD,STDERR_FILENO);
  29. }
  30. if (fd > 2)
  31. close(fd);
  32. }
  33.  
  34.  
  35. int main(int argc,char* argv[])
  36. {
  37. //设置信号处理
  38. signal(SIGCHLD,prog_exit);
  39. signal(SIGKILL,prog_exit);
  40. signal(SIGTERM,prog_exit);
  41. short port = 0;
  42. int ch;
  43. bool bdaemon = false;
  44. while ((ch = getopt(argc,"p:d")) != -1)
  45. {
  46. switch (ch)
  47. {
  48. case 'd':
  49. bdaemon = true;
  50. break;
  51. case 'p':
  52. port = atol(optarg);
  53. break;
  54. }
  55. }
  56.  
  57. if (bdaemon)
  58. daemon_run();
  59.  
  60.  
  61. if (port == 0)
  62. port = 12345;
  63.  
  64. if (!g_reator.init("0.0.0.0",12345))
  65. return -1;
  66. g_reator.main_loop(&g_reator);
  67.  
  68. return 0;
  69. }

完整实例代码下载地址:

普通版本:https://pan.baidu.com/s/1o82Mkno

C++11版本:https://pan.baidu.com/s/1dEJdrih

您可以接着阅读下一篇:《一个服务器程序的架构介绍》。

欢迎关注公众号『easyserverdev』。如果有任何技术或者职业方面的问题需要我提供帮助,可通过这个公众号与我取得联系,此公众号不仅分享性能服务器开发经验和故事,同时也免费为广大技术朋友提供技术答疑和职业解惑,您有任何问题都可以在微信公众号直接留言,我会尽快回复您。

猜你在找的React相关文章