#include "ace/OS_main.h"
#include "ace/OS_NS_string.h"
#include "ace/OS_NS_unistd.h"
#include "ace/Reactor.h"
#include "ace/Process.h"
#include "ace/SOCK_Dgram.h"
#include "ace/INET_Addr.h"
#include "ace/Log_Msg.h"
#include "ace/Thread_Manager.h"
#include "ace/Task_T.h"
#define SERVER_PORT 10101
static const size_t TASK_THREAD_POOL_SIZE = 10;
class Task_Worker: public ACE_Task<ACE_MT_SYNCH>
{
public:
virtual int svc(void)
{
while(1)
{
ACE_Message_Block *mb = NULL;
if(this->getq(mb) == -1)
{
continue;
}
process_task(mb);
}
return 0;
}
private:
void process_task(ACE_Message_Block *mb)
{
mb->release();
}
};
class Task_Manager: public ACE_Task<ACE_MT_SYNCH>
{
public:
virtual int svc(void)
{
Task_Worker task_tp;
task_tp.activate(THR_NEW_LWP | THR_JOINABLE,TASK_THREAD_POOL_SIZE);
while(1)
{
ACE_Message_Block *mb = NULL;
if(this->getq(mb) < 0)
{
task_tp.msg_queue()->deactivate();
task_tp.wait();
}
task_tp.putq(mb);
}
return 0;
}
};
class Dgram_Endpoint : public ACE_Event_Handler
{
public:
Task_Manager task_mgr;
Dgram_Endpoint (const ACE_INET_Addr &local_addr);
virtual ACE_HANDLE get_handle (void) const;
virtual int handle_input (ACE_HANDLE handle);
virtual int handle_timeout (const ACE_Time_Value & tv,const void *arg = 0);
virtual int handle_close (ACE_HANDLE handle,ACE_Reactor_Mask close_mask);
virtual int handle_signal (int signum,siginfo_t*,ucontext_t*);
int send (const char *buf,size_t len,const ACE_INET_Addr &);
private:
ACE_SOCK_Dgram endpoint_;
};
int Dgram_Endpoint::send (const char *buf,const ACE_INET_Addr &addr)
{
return this->endpoint_.send (buf,len,addr);
}
Dgram_Endpoint::Dgram_Endpoint (const ACE_INET_Addr &local_addr)
: endpoint_(local_addr)
{
task_mgr.activate();
}
ACE_HANDLE Dgram_Endpoint::get_handle (void) const
{
return this->endpoint_.get_handle();
}
int Dgram_Endpoint::handle_close (ACE_HANDLE handle,ACE_Reactor_Mask)
{
ACE_DEBUG((LM_DEBUG,"************handle_close***********/n"));
ACE_UNUSED_ARG (handle);
this->endpoint_.close();
delete this;
return 0;
}
int Dgram_Endpoint::handle_input (ACE_HANDLE)
{
char buf[BUFSIZ];
ACE_INET_Addr from_addr;
char address[32];
//ACE_DEBUG ((LM_DEBUG,"(%P|%t) activity occurred on handle %d!/n",this->endpoint_.get_handle ()));
ssize_t nbytes = this->endpoint_.recv (buf,sizeof(buf),from_addr);
#if 0
if (nbytes == -1)
ACE_ERROR ((LM_ERROR,"%p","handle_input error/n"));
else
ACE_DEBUG ((LM_DEBUG,"[%d]bytes from[%s] received:%s/n",nbytes,address,buf));
#endif
ACE_Message_Block *mb = NULL;
ACE_NEW_RETURN(mb,ACE_Message_Block(nbytes,ACE_Message_Block::MB_DATA,buf),-1);
mb->wr_ptr(nbytes);
this->task_mgr.putq(mb);
return 0;
}
int Dgram_Endpoint::handle_timeout (const ACE_Time_Value &,const void *)
{
ACE_DEBUG ((LM_DEBUG,"(%P|%t) timed out for endpoint/n"));
return 0;
}
int Dgram_Endpoint::handle_signal (int signum,siginfo_t* siginfo,ucontext_t* context)
{
return ACE_Event_Handler::handle_signal (signum,siginfo,context);
}
int main (int argc,char *argv[])
{
ACE_INET_Addr local_addr(SERVER_PORT);
Dgram_Endpoint *endpoint;
ACE_NEW_RETURN (endpoint,Dgram_Endpoint (local_addr),-1);
if (ACE_Reactor::instance ()->register_handler(endpoint,ACE_Event_Handler::READ_MASK) == -1) { ACE_ERROR_RETURN ((LM_ERROR,"ACE_Reactor::register_handler"),-1); }#if 0 if (-1 == ACE_Reactor::instance()->register_handler(SIGINT,endpoint)) { ACE_ERROR_RETURN((LM_ERROR,"fail to register SIGINT handler"),-1); } ACE_Time_Value time_out(5); ACE_Reactor::instance()->schedule_timer(&callback,(void *)"time out",ACE_Time_Value::zero,time_out);#endif ACE_Reactor::instance()->run_event_loop(); return 0;}