例如
void *thread_func(void *thread_args) { // extract socket_fd,epoll_fd,&event,&events_array from // thread_args // epoll_wait() using epoll_fd and events_array received from main // now all threads would be using same epoll_fd and events array } void main( void ) { // create and bind to socket // create events_fd // allocate memory for events array // subscribe to events EPOLLIN and EPOLLET // pack the socket_fd,&events,&events_array into // thread_args struct. // create multiple threads and pass thread_func and // same thread_args to all threads }
或者这样做更好:
void *thread_func(void *socket_fd) { // create events_fd // allocate memory for events array // subscribe to events EPOLLIN and EPOLLET // epoll_wait using own epoll_fd and events_array // now all threads would have a separate epoll_fd with // events populated on its own array } void main(void) { // create and bind to socket //create multiple threads and pass thread_func and socket_fd to // all threads }
在C中有一个很好的例子吗?我看到的示例在main()中运行事件循环,并在检测到事件时生成一个新线程来处理请求.我想要做的是在程序开始时创建特定数量的线程,并让每个线程运行事件循环和处理请求.
解决方法
Is it safe to share the same Epoll fd (not socket fd) among several
threads.
是的,它是安全的 – epoll(7)接口是线程安全的 – 但是在这样做时你应该小心,你应该至少使用EPOLLET(边缘触发模式,而不是默认的级别触发)来避免虚假在其他线程中唤醒.这是因为当新事件可用于处理时,级别触发模式将唤醒每个线程.由于只有一个线程将处理它,这将不必要地唤醒大多数线程.
If shared epfd is used will each thread have to pass its own events
array or a shared events array to epoll_wait()
是的,你需要在每个线程上有一个单独的事件数组,否则你将遇到竞争条件,并且可能会发生令人讨厌的事情.例如,您可能有一个线程仍在迭代epoll_wait(2)返回的事件,并在突然另一个线程使用相同的数组调用epoll_wait(2)时处理请求,然后事件被同时覆盖线程正在阅读它们.不好!你绝对需要为每个线程一个单独的数组.
假设你为每个线程确实有一个单独的数组,那么任何一种可能性 – 等待同一个epoll fd或者为每个线程都有一个单独的epoll fd – 将同样有效,但请注意语义是不同的.使用全局共享的epoll fd,每个线程都会等待来自任何客户端的请求,因为客户端都被添加到同一个epoll fd中.对于每个线程使用单独的epoll fd,然后每个线程基本上负责客户端的子集(该线程接受的那些客户端).
这可能与您的系统无关,也可能会产生巨大的差异.例如,可能会发生一个线程不幸导致一组高级和频繁请求的高级用户,使该线程过度工作,而其他具有较少攻击性客户端的线程几乎空闲.这不是不公平的吗?另一方面,也许你想只有一些线程处理特定类的用户,在这种情况下,在每个线程上有不同的epoll fds是有意义的.像往常一样,您需要考虑两种可能性,评估权衡,考虑您的具体问题,并做出决定.
以下是使用全局共享epoll fd的示例.我原本不打算做所有这些,但有一件事导致另一件事,而且,这很有趣,我认为它可能会帮助你开始.它是一个侦听端口3000的echo服务器,它有一个由20个线程组成的池,使用epoll来同时接受新客户端并提供请求.
#include <stdio.h> #include <stdlib.h> #include <inttypes.h> #include <errno.h> #include <string.h> #include <pthread.h> #include <assert.h> #include <unistd.h> #include <sys/types.h> #include <sys/socket.h> #include <arpa/inet.h> #include <sys/epoll.h> #define SERVERPORT 3000 #define SERVERBACKLOG 10 #define THREADSNO 20 #define EVENTS_BUFF_SZ 256 static int serversock; static int epoll_fd; static pthread_t threads[THREADSNO]; int accept_new_client(void) { int clientsock; struct sockaddr_in addr; socklen_t addrlen = sizeof(addr); if ((clientsock = accept(serversock,(struct sockaddr *) &addr,&addrlen)) < 0) { return -1; } char ip_buff[INET_ADDRSTRLEN+1]; if (inet_ntop(AF_INET,&addr.sin_addr,ip_buff,sizeof(ip_buff)) == NULL) { close(clientsock); return -1; } printf("*** [%p] Client connected from %s:%" PRIu16 "\n",(void *) pthread_self(),ntohs(addr.sin_port)); struct epoll_event epevent; epevent.events = EPOLLIN | EPOLLET; epevent.data.fd = clientsock; if (epoll_ctl(epoll_fd,EPOLL_CTL_ADD,clientsock,&epevent) < 0) { perror("epoll_ctl(2) Failed attempting to add new client"); close(clientsock); return -1; } return 0; } int handle_request(int clientfd) { char readbuff[512]; struct sockaddr_in addr; socklen_t addrlen = sizeof(addr); ssize_t n; if ((n = recv(clientfd,readbuff,sizeof(readbuff)-1,0)) < 0) { return -1; } if (n == 0) { return 0; } readbuff[n] = '\0'; if (getpeername(clientfd,&addrlen) < 0) { return -1; } char ip_buff[INET_ADDRSTRLEN+1]; if (inet_ntop(AF_INET,sizeof(ip_buff)) == NULL) { return -1; } printf("*** [%p] [%s:%" PRIu16 "] -> server: %s",ntohs(addr.sin_port),readbuff); ssize_t sent; if ((sent = send(clientfd,n,0)) < 0) { return -1; } readbuff[sent] = '\0'; printf("*** [%p] server -> [%s:%" PRIu16 "]: %s",readbuff); return 0; } void *worker_thr(void *args) { struct epoll_event *events = malloc(sizeof(*events)*EVENTS_BUFF_SZ); if (events == NULL) { perror("malloc(3) Failed when attempting to allocate events buffer"); pthread_exit(NULL); } int events_cnt; while ((events_cnt = epoll_wait(epoll_fd,events,EVENTS_BUFF_SZ,-1)) > 0) { int i; for (i = 0; i < events_cnt; i++) { assert(events[i].events & EPOLLIN); if (events[i].data.fd == serversock) { if (accept_new_client() == -1) { fprintf(stderr,"Error accepting new client: %s\n",strerror(errno)); } } else { if (handle_request(events[i].data.fd) == -1) { fprintf(stderr,"Error handling request: %s\n",strerror(errno)); } } } } if (events_cnt == 0) { fprintf(stderr,"epoll_wait(2) returned 0,but timeout was not specified...?"); } else { perror("epoll_wait(2) error"); } free(events); return NULL; } int main(void) { if ((serversock = socket(AF_INET,SOCK_STREAM,IPPROTO_TCP)) < 0) { perror("socket(2) Failed"); exit(EXIT_FAILURE); } struct sockaddr_in serveraddr; serveraddr.sin_family = AF_INET; serveraddr.sin_port = htons(SERVERPORT); serveraddr.sin_addr.s_addr = INADDR_ANY; if (bind(serversock,(const struct sockaddr *) &serveraddr,sizeof(serveraddr)) < 0) { perror("bind(2) Failed"); exit(EXIT_FAILURE); } if (listen(serversock,SERVERBACKLOG) < 0) { perror("listen(2) Failed"); exit(EXIT_FAILURE); } if ((epoll_fd = epoll_create(1)) < 0) { perror("epoll_create(2) Failed"); exit(EXIT_FAILURE); } struct epoll_event epevent; epevent.events = EPOLLIN | EPOLLET; epevent.data.fd = serversock; if (epoll_ctl(epoll_fd,serversock,&epevent) < 0) { perror("epoll_ctl(2) Failed on main server socket"); exit(EXIT_FAILURE); } int i; for (i = 0; i < THREADSNO; i++) { if (pthread_create(&threads[i],NULL,worker_thr,NULL) < 0) { perror("pthread_create(3) Failed"); exit(EXIT_FAILURE); } } /* main thread also contributes as worker thread */ worker_thr(NULL); return 0; }
几个笔记:
> main()应返回int,而不是void(如您在示例中所示)>始终处理错误返回码.忽略它们是很常见的,当事情发生时,很难知道发生了什么.>代码假定没有请求大于511字节(如handle_request()中的缓冲区大小所示).如果请求大于此值,则有可能某些数据在套接字中保留很长时间,因为epoll_wait(2)在该文件描述符上发生新事件之前不会报告它(因为我们正在使用EPOLLET ).在最坏的情况下,客户端可能永远不会发送任何新数据,并永远等待回复.>为每个请求打印线程标识符的代码假定pthread_t是不透明的指针类型.实际上,pthread_t是Linux中的指针类型,但在其他平台中它可能是整数类型,因此这不是可移植的.但是,这可能不是什么大问题,因为epoll是特定于Linux的,因此代码无论如何都不可移植.>它假定当一个线程仍然在服务来自该客户端的请求时,来自同一客户端的其他请求没有到达.如果同时到达新请求并且另一个线程开始提供它,我们就会遇到竞争条件,并且客户端不一定会按照发送它们的相同顺序接收回显消息(但是,write(2)是原子的,所以回复可能会失灵,他们不会穿插).