这里,我想根据reactor模式,其原则是per thread per loop,也即一个线程一个循环
#include<pthread.h> class ServantPool; class Reactor { private: int m_sk; pthread_t m_id; static void* reactor_listen(void* m);//这里面包含了其中循环监听和请求分发 ServantPool* m_ser_pool;//from ServantPool chose the idle thread to process Reactor(); static Reactor* hInstance; public: void SetPoolPtr(ServantPool* ptr); static Reactor* getInstance(); void start();//开启监听线程 void close_thread(); };
#include<stdio.h> #include<stdlib.h> #include<string.h> #include<errno.h> #include<sys/types.h> #include<sys/socket.h> #include<netinet/in.h> #include<arpa/inet.h> #include"Servant.h" #include"reactor.h" #define ACCEPTING 1 #define BUSY 2 #define STOP 2 #define PORT 2090 #define MAX_CONNECTIONS 100 Reactor* Reactor::hInstance=NULL; Reactor* Reactor::getInstance() { if(hInstance==NULL) { hInstance=new Reactor(); } return hInstance; } void* Reactor::reactor_listen(void* m) { //this static function begin listen Reactor* rec=(Reactor*)m; int serSocket=socket(AF_INET,SOCK_STREAM,IPPROTO_TCP); if(serSocket<0) { printf("sorry socket error: %s\n",strerror(errno)); return NULL; } struct sockaddr_in serAdd; memset(&serAdd,sizeof(sockaddr_in)); serAdd.sin_family=AF_INET; serAdd.sin_addr.s_addr=htonl(INADDR_ANY); serAdd.sin_port=htons(PORT); if(bind(serSocket,(struct sockaddr*)&serAdd,sizeof(serAdd))<0) { printf("sorry bind error: %s\n",strerror(errno)); } if(listen(serSocket,MAX_CONNECTIONS)<0) { printf("sorry listen error: %s\n",strerror(errno)); } else { printf("listen successfully: %s\n",strerror(errno)); } while(1) { Servant* cur; DZ: while(cur=rec->m_ser_pool->Accepting()) { struct sockaddr_in tmpAddr; socklen_t size=sizeof(tmpAddr); int skt=accept(serSocket,(struct sockaddr*)&tmpAddr,&size); if(skt<0) { printf("accept error %s\n",strerror(errno)); return NULL; } //we begin to chose the idle,执行到此,说明已经接收到了一个请求,将请求套接字传递给线程池中空闲线程的第一个线程 cur->setSocket(skt); //tell the thread,you could do something!!,通知该线程有事件发生,可以激活,进行工作了 cur->notify(); } //执行到此,说明当前已经没有了空闲线程,然后等待空闲线程的出现 while((cur=rec->m_ser_pool->Accepting())==NULL) { if(errno!=0) return NULL; } goto DZ; } return NULL; } Reactor::Reactor() { ; } void Reactor::SetPoolPtr(ServantPool* ptr) { m_ser_pool=ptr; } void Reactor::start() { printf("reactor successfully\n"); //开启监听线程,也即只有一个监听线程。 int result=pthread_create(&m_id,NULL,reactor_listen,this); if(result==-1) { printf("the Reactor thread created error \n",strerror(errno)); return ; } else { printf("the Reactor pthread_create success \n"); } } void Reactor::close_thread() { m_ser_pool->StopAll(); }
#include<pthread.h> #include<vector> using namespace std; class Reactor; class ServantPool; class Servant { private: pthread_mutex_t mutex_x; pthread_cond_t cond_t; int m_socket; Reactor* m_reactor; ServantPool* m_pool; int thread_id; public: int state; pthread_t m_tid; Servant(ServantPool* pol); void start(); static void* ThreadProc(void* data); void notify(); void stop(); void wakeup(); void setSocket(int m); void exit(); void join(); void Complete(); void DoJob(); }; //ServantPool supposed to be singlton class ServantPool { private: //running thread but not really work vector<Servant*> m_idle_list; //running thread and really work vector<Servant*> m_busy_list; //stopped thread vector<Servant*> m_stop_list; ServantPool(){}; static ServantPool * hInstance; public: void StopAll(); void create(int num); void startAll(); void AddtoIdle(bool add,Servant* ser); void Addtobusy(bool add,Servant* ser); void Addtostop(bool add,Servant* ser); void stop(Servant* id); void wakeup(Servant* id); void waitforAll(); ~ServantPool(); static ServantPool * getInstance(); Servant* Accepting(); };Servant.cpp中工作线程的各项定义,以及个线程池的定义
#include<stdio.h> #include<stdlib.h> #include<string.h> #include<errno.h> #include<netinet/in.h> #include<arpa/inet.h> #include<unistd.h> #include"Servant.h" #define WORKING 1 #define STOP 2 #define EXIT 3 #define IDLE 4 static int num=0; static pthread_mutex_t vec_mutex; void Servant::start() { state=IDLE; int result=pthread_create(&m_tid,ThreadProc,this); if(0!=result) { printf("thread create result :%s\n",strerror(errno)); } } Servant::Servant(ServantPool* ma) { thread_id=num++; m_pool=ma; pthread_mutex_init(&mutex_x,NULL); pthread_cond_init(&cond_t,NULL); } void Servant::setSocket(int m) { m_socket=m; } void Servant::DoJob() { char buf[100]={0}; recv(m_socket,buf,100,0); printf("we haved the %d recv:%s\n",num++,buf); close(m_socket); } void* Servant::ThreadProc(void* data) { Servant* ser=(Servant*)data; int result=0; while(ser->state!=EXIT) { while(ser->state==IDLE) { result=pthread_mutex_lock(&(ser->mutex_x)); if(0==result) { printf("waiting for the mutex \n"); } result=pthread_cond_wait(&(ser->cond_t),&(ser->mutex_x)); if(ser->state!=IDLE) goto End; printf("the conditions has been notified\n"); //we take this thread to busy list ser->m_pool->AddtoIdle(false,ser); ser->m_pool->Addtobusy(true,ser); // really work DoSomething: ser->state=WORKING; printf("Do Something...\n"); ser->DoJob(); ser->Complete();//this function change state End: pthread_mutex_unlock(&(ser->mutex_x)); } } return NULL; } void Servant::stop() { if(state==IDLE) { m_pool->AddtoIdle(false,this); m_pool->Addtostop(true,this); state=STOP; printf("thread stop!\n"); } else if(state==WORKING) { printf("current state is WORKING stop Failed!\n"); } else if(state==STOP) { printf("thread already stopped!\n"); } else { printf("sorry unknown state!\n"); state=STOP; } } void Servant::wakeup() { if(state==STOP) { m_pool->Addtostop(false,this); m_pool->AddtoIdle(true,this); state=IDLE; printf("thread wakeup!\n"); } else if(state==WORKING) { printf("current state is WORKING stop Failed!\n"); } else if(state==IDLE) { printf("current state is idle never need wakeup!\n"); } else { printf("sorry unknown state..\n"); state=IDLE; } } void Servant::Complete()//完成操作 { //完成任务,该线程变为idle m_pool->Addtobusy(false,this); m_pool->AddtoIdle(true,this); state=IDLE; } void Servant::join() { pthread_join(m_tid,NULL); } void Servant::notify() { if(state==IDLE) { printf("we have notified thread running\n"); pthread_cond_signal(&cond_t); } else { printf("sorry,the signal is not correct\n"); } } void Servant::exit() { state=EXIT; pthread_cond_signal(&cond_t); } void ServantPool::StopAll() { vector<Servant*>::iterator itr=m_idle_list.begin(); for(;itr!=m_idle_list.end();) { (*itr)->stop(); } itr=m_busy_list.begin(); for(;itr!=m_busy_list.end();) { (*itr)->stop(); } } void ServantPool::create(int num) { int i=0; for(;i<num;i++) { Servant* tmp=new Servant(this); m_idle_list.push_back(tmp); } } void ServantPool::AddtoIdle(bool add,Servant* ser) { if(add) { // add ser to idle list pthread_mutex_lock(&vec_mutex); m_idle_list.push_back(ser); pthread_mutex_unlock(&vec_mutex); } else { // del ser from idle list pthread_mutex_lock(&vec_mutex); vector<Servant*>::iterator itr=m_idle_list.begin(); for(;itr!=m_idle_list.end();itr++) { if(*itr==ser) { m_idle_list.erase(itr); break; } } pthread_mutex_unlock(&vec_mutex); } } void ServantPool::Addtobusy(bool add,Servant* ser) { if(add) { // add ser to idle list pthread_mutex_lock(&vec_mutex); m_busy_list.push_back(ser); pthread_mutex_unlock(&vec_mutex); } else { // del ser from idle list pthread_mutex_lock(&vec_mutex); vector<Servant*>::iterator itr=m_busy_list.begin(); for(;itr!=m_busy_list.end();itr++) { if(*itr==ser) { m_busy_list.erase(itr); break; } } pthread_mutex_unlock(&vec_mutex); } } void ServantPool::Addtostop(bool add,Servant* ser) { if(add) { // add ser to idle list pthread_mutex_lock(&vec_mutex); m_stop_list.push_back(ser); pthread_mutex_unlock(&vec_mutex); } else { // del ser from idle list pthread_mutex_lock(&vec_mutex); vector<Servant*>::iterator itr=m_stop_list.begin(); for(;itr!=m_stop_list.end();itr++) { if(*itr==ser) { m_stop_list.erase(itr); break; } } pthread_mutex_unlock(&vec_mutex); } } void ServantPool::startAll() { int i=0; for(;i<m_idle_list.size();i++) { Servant* tmp=m_idle_list[i]; printf("start the thread %d\n",i); tmp->start();//create the thread } } void ServantPool::stop(Servant* id) { vector<Servant*>::iterator itr=m_idle_list.begin(); for(;itr!=m_idle_list.end();itr++) { if((*itr)==id) { (*itr)->stop(); return; } } } void ServantPool::waitforAll() { int i=0; int nums=m_busy_list.size(); for(;i<nums;i++) { Servant* tmp=m_busy_list[i]; tmp->join(); } nums=m_idle_list.size(); i=0; for(;i<nums;i++) { Servant* tmp=m_idle_list[i]; tmp->join(); } } void ServantPool::wakeup(Servant* id) { vector<Servant*>::iterator itr=m_busy_list.begin(); for(;itr!=m_busy_list.end();itr++) { if((*itr)==id) { (*itr)->wakeup(); return; } } } ServantPool * ServantPool::hInstance=NULL; ServantPool * ServantPool::getInstance() { if(NULL==hInstance) { hInstance=new ServantPool(); pthread_mutex_init(&vec_mutex,NULL); } return hInstance; } ServantPool::~ServantPool() { vector<Servant*>::iterator itr=m_idle_list.begin(); for(;itr!=m_idle_list.end();) { (*itr)->exit(); delete *itr; itr=m_idle_list.erase(itr); } itr=m_busy_list.begin(); for(;itr!=m_busy_list.end();) { (*itr)->exit(); delete *itr; itr=m_busy_list.erase(itr); } itr=m_stop_list.begin(); for(;itr!=m_stop_list.end();) { (*itr)->exit(); delete *itr; itr=m_stop_list.erase(itr); } } Servant* ServantPool::Accepting() { if(m_idle_list.size()>0) { return m_idle_list[0]; } return NULL; }接下来就是makefile文件内容了:
test:Servant.o reactor.o main.o g++ -o test Servant.o reactor.o main.o -lpthread Servant.o:Servant.cpp Servant.h g++ -g -c Servant.cpp -lpthread reactor.o:reactor.cpp reactor.h g++ -g -c reactor.cpp -lpthread main.o:main.c Servant.h g++ -g -c main.c -lpthread clean: rm *.o test
这里面需要注意的就是一点,对于m_idle_list m_busy_list m_stop_list三个队列的操作,都应该是原子操作,该操作的原子性保证是,通过静态全局互斥变量来保证的。