这里,我想根据reactor模式,其原则是per thread per loop,也即一个线程一个循环
该模式组成:
reactor:由reactor线程执行无限循环的监听活动,每当有connect到来的时候由reactor负责在线程池中找到一个空闲线程,同时将accept的返回值socket作为参数传递给空闲线程,同时reactor线程应该是单独的一个线程,也即该类应该是支持单例模式的。对于线程池的访问则是,通过组合的方式,获得线程池指针进行访问的。
线程池,正如我上一篇博客中讲述的线程池的实现那样线程池中共有三个线程队列,一个是空闲进程,m_idle_list一个是忙碌线程表示正在工作的线程m_busy_list,最后一个则是m_stop_list,该队列的线程不参与工作的接收和分配工作。
#include<pthread.h> class ServantPool; class Reactor { private: int m_sk; pthread_t m_id; static void* reactor_listen(void* m);//这里面包含了其中循环监听和请求分发 ServantPool* m_ser_pool;//from ServantPool chose the idle thread to process Reactor(); static Reactor* hInstance; public: void SetPoolPtr(ServantPool* ptr); static Reactor* getInstance(); void start();//开启监听线程 void close_thread(); };
reactor.cpp文件内容如下:
#include<stdio.h> #include<stdlib.h> #include<string.h> #include<errno.h> #include<sys/types.h> #include<sys/socket.h> #include<netinet/in.h> #include<arpa/inet.h> #include"Servant.h" #include"reactor.h" #define ACCEPTING 1 #define BUSY 2 #define STOP 2 #define PORT 2090 #define MAX_CONNECTIONS 100 Reactor* Reactor::hInstance=NULL; Reactor* Reactor::getInstance() { if(hInstance==NULL) { hInstance=new Reactor(); } return hInstance; } void* Reactor::reactor_listen(void* m) { //this static function begin listen Reactor* rec=(Reactor*)m; int serSocket=socket(AF_INET,SOCK_STREAM,IPPROTO_TCP); if(serSocket<0) { printf("sorry socket error: %s\n",strerror(errno)); return NULL; } struct sockaddr_in serAdd; memset(&serAdd,sizeof(sockaddr_in)); serAdd.sin_family=AF_INET; serAdd.sin_addr.s_addr=htonl(INADDR_ANY); serAdd.sin_port=htons(PORT); if(bind(serSocket,(struct sockaddr*)&serAdd,sizeof(serAdd))<0) { printf("sorry bind error: %s\n",strerror(errno)); } if(listen(serSocket,MAX_CONNECTIONS)<0) { printf("sorry listen error: %s\n",strerror(errno)); } else { printf("listen successfully: %s\n",strerror(errno)); } while(1) { Servant* cur; DZ: while(cur=rec->m_ser_pool->Accepting()) { struct sockaddr_in tmpAddr; socklen_t size=sizeof(tmpAddr); int skt=accept(serSocket,(struct sockaddr*)&tmpAddr,&size); if(skt<0) { printf("accept error %s\n",strerror(errno)); return NULL; } //we begin to chose the idle,执行到此,说明已经接收到了一个请求,将请求套接字传递给线程池中空闲线程的第一个线程 cur->setSocket(skt); //tell the thread,you could do something!!,通知该线程有事件发生,可以激活,进行工作了 cur->notify(); } //执行到此,说明当前已经没有了空闲线程,然后等待空闲线程的出现 while((cur=rec->m_ser_pool->Accepting())==NULL) { if(errno!=0) return NULL; } goto DZ; } return NULL; } Reactor::Reactor() { ; } void Reactor::SetPoolPtr(ServantPool* ptr) { m_ser_pool=ptr; } void Reactor::start() { printf("reactor successfully\n"); //开启监听线程,也即只有一个监听线程。 int result=pthread_create(&m_id,NULL,reactor_listen,this); if(result==-1) { printf("the Reactor thread created error \n",strerror(errno)); return ; } else { printf("the Reactor pthread_create success \n"); } } void Reactor::close_thread() { m_ser_pool->StopAll(); }
Servant.h文件,其中包含了工作线程,和线程池的定义
#include<pthread.h> #include<vector> using namespace std; class Reactor; class ServantPool; class Servant { private: pthread_mutex_t mutex_x; pthread_cond_t cond_t; int m_socket; Reactor* m_reactor; ServantPool* m_pool; int thread_id; public: int state; pthread_t m_tid; Servant(ServantPool* pol); void start(); static void* ThreadProc(void* data); void notify(); void stop(); void wakeup(); void setSocket(int m); void exit(); void join(); void Complete(); void DoJob(); }; //ServantPool supposed to be singlton class ServantPool { private: //running thread but not really work vector<Servant*> m_idle_list; //running thread and really work vector<Servant*> m_busy_list; //stopped thread vector<Servant*> m_stop_list; ServantPool(){}; static ServantPool * hInstance; public: void StopAll(); void create(int num); void startAll(); void AddtoIdle(bool add,Servant* ser); void Addtobusy(bool add,Servant* ser); void Addtostop(bool add,Servant* ser); void stop(Servant* id); void wakeup(Servant* id); void waitforAll(); ~ServantPool(); static ServantPool * getInstance(); Servant* Accepting(); };Servant.cpp中工作线程的各项定义,以及个线程池的定义
#include<stdio.h> #include<stdlib.h> #include<string.h> #include<errno.h> #include<netinet/in.h> #include<arpa/inet.h> #include<unistd.h> #include"Servant.h" #define WORKING 1 #define STOP 2 #define EXIT 3 #define IDLE 4 static int num=0; static pthread_mutex_t vec_mutex; void Servant::start() { state=IDLE; int result=pthread_create(&m_tid,ThreadProc,this); if(0!=result) { printf("thread create result :%s\n",strerror(errno)); } } Servant::Servant(ServantPool* ma) { thread_id=num++; m_pool=ma; pthread_mutex_init(&mutex_x,NULL); pthread_cond_init(&cond_t,NULL); } void Servant::setSocket(int m) { m_socket=m; } void Servant::DoJob() { char buf[100]={0}; recv(m_socket,buf,100,0); printf("we haved the %d recv:%s\n",num++,buf); close(m_socket); } void* Servant::ThreadProc(void* data) { Servant* ser=(Servant*)data; int result=0; while(ser->state!=EXIT) { while(ser->state==IDLE) { result=pthread_mutex_lock(&(ser->mutex_x)); if(0==result) { printf("waiting for the mutex \n"); } result=pthread_cond_wait(&(ser->cond_t),&(ser->mutex_x)); if(ser->state!=IDLE) goto End; printf("the conditions has been notified\n"); //we take this thread to busy list ser->m_pool->AddtoIdle(false,ser); ser->m_pool->Addtobusy(true,ser); // really work DoSomething: ser->state=WORKING; printf("Do Something...\n"); ser->DoJob(); ser->Complete();//this function change state End: pthread_mutex_unlock(&(ser->mutex_x)); } } return NULL; } void Servant::stop() { if(state==IDLE) { m_pool->AddtoIdle(false,this); m_pool->Addtostop(true,this); state=STOP; printf("thread stop!\n"); } else if(state==WORKING) { printf("current state is WORKING stop Failed!\n"); } else if(state==STOP) { printf("thread already stopped!\n"); } else { printf("sorry unknown state!\n"); state=STOP; } } void Servant::wakeup() { if(state==STOP) { m_pool->Addtostop(false,this); m_pool->AddtoIdle(true,this); state=IDLE; printf("thread wakeup!\n"); } else if(state==WORKING) { printf("current state is WORKING stop Failed!\n"); } else if(state==IDLE) { printf("current state is idle never need wakeup!\n"); } else { printf("sorry unknown state..\n"); state=IDLE; } } void Servant::Complete()//完成操作 { //完成任务,该线程变为idle m_pool->Addtobusy(false,this); m_pool->AddtoIdle(true,this); state=IDLE; } void Servant::join() { pthread_join(m_tid,NULL); } void Servant::notify() { if(state==IDLE) { printf("we have notified thread running\n"); pthread_cond_signal(&cond_t); } else { printf("sorry,the signal is not correct\n"); } } void Servant::exit() { state=EXIT; pthread_cond_signal(&cond_t); } void ServantPool::StopAll() { vector<Servant*>::iterator itr=m_idle_list.begin(); for(;itr!=m_idle_list.end();) { (*itr)->stop(); } itr=m_busy_list.begin(); for(;itr!=m_busy_list.end();) { (*itr)->stop(); } } void ServantPool::create(int num) { int i=0; for(;i<num;i++) { Servant* tmp=new Servant(this); m_idle_list.push_back(tmp); } } void ServantPool::AddtoIdle(bool add,Servant* ser) { if(add) { // add ser to idle list pthread_mutex_lock(&vec_mutex); m_idle_list.push_back(ser); pthread_mutex_unlock(&vec_mutex); } else { // del ser from idle list pthread_mutex_lock(&vec_mutex); vector<Servant*>::iterator itr=m_idle_list.begin(); for(;itr!=m_idle_list.end();itr++) { if(*itr==ser) { m_idle_list.erase(itr); break; } } pthread_mutex_unlock(&vec_mutex); } } void ServantPool::Addtobusy(bool add,Servant* ser) { if(add) { // add ser to idle list pthread_mutex_lock(&vec_mutex); m_busy_list.push_back(ser); pthread_mutex_unlock(&vec_mutex); } else { // del ser from idle list pthread_mutex_lock(&vec_mutex); vector<Servant*>::iterator itr=m_busy_list.begin(); for(;itr!=m_busy_list.end();itr++) { if(*itr==ser) { m_busy_list.erase(itr); break; } } pthread_mutex_unlock(&vec_mutex); } } void ServantPool::Addtostop(bool add,Servant* ser) { if(add) { // add ser to idle list pthread_mutex_lock(&vec_mutex); m_stop_list.push_back(ser); pthread_mutex_unlock(&vec_mutex); } else { // del ser from idle list pthread_mutex_lock(&vec_mutex); vector<Servant*>::iterator itr=m_stop_list.begin(); for(;itr!=m_stop_list.end();itr++) { if(*itr==ser) { m_stop_list.erase(itr); break; } } pthread_mutex_unlock(&vec_mutex); } } void ServantPool::startAll() { int i=0; for(;i<m_idle_list.size();i++) { Servant* tmp=m_idle_list[i]; printf("start the thread %d\n",i); tmp->start();//create the thread } } void ServantPool::stop(Servant* id) { vector<Servant*>::iterator itr=m_idle_list.begin(); for(;itr!=m_idle_list.end();itr++) { if((*itr)==id) { (*itr)->stop(); return; } } } void ServantPool::waitforAll() { int i=0; int nums=m_busy_list.size(); for(;i<nums;i++) { Servant* tmp=m_busy_list[i]; tmp->join(); } nums=m_idle_list.size(); i=0; for(;i<nums;i++) { Servant* tmp=m_idle_list[i]; tmp->join(); } } void ServantPool::wakeup(Servant* id) { vector<Servant*>::iterator itr=m_busy_list.begin(); for(;itr!=m_busy_list.end();itr++) { if((*itr)==id) { (*itr)->wakeup(); return; } } } ServantPool * ServantPool::hInstance=NULL; ServantPool * ServantPool::getInstance() { if(NULL==hInstance) { hInstance=new ServantPool(); pthread_mutex_init(&vec_mutex,NULL); } return hInstance; } ServantPool::~ServantPool() { vector<Servant*>::iterator itr=m_idle_list.begin(); for(;itr!=m_idle_list.end();) { (*itr)->exit(); delete *itr; itr=m_idle_list.erase(itr); } itr=m_busy_list.begin(); for(;itr!=m_busy_list.end();) { (*itr)->exit(); delete *itr; itr=m_busy_list.erase(itr); } itr=m_stop_list.begin(); for(;itr!=m_stop_list.end();) { (*itr)->exit(); delete *itr; itr=m_stop_list.erase(itr); } } Servant* ServantPool::Accepting() { if(m_idle_list.size()>0) { return m_idle_list[0]; } return NULL; }接下来就是makefile文件内容了:
test:Servant.o reactor.o main.o g++ -o test Servant.o reactor.o main.o -lpthread Servant.o:Servant.cpp Servant.h g++ -g -c Servant.cpp -lpthread reactor.o:reactor.cpp reactor.h g++ -g -c reactor.cpp -lpthread main.o:main.c Servant.h g++ -g -c main.c -lpthread clean: rm *.o test
这里面需要注意的就是一点,对于m_idle_list m_busy_list m_stop_list三个队列的操作,都应该是原子操作,该操作的原子性保证是,通过静态全局互斥变量来保证的。
这样就实现了reactor模式,也即反应器模式,代码下载地址: