EventLoop是整个Reactor的核心。其类图如下:
one loop per thread意味着每个线程只能有一个EventLoop对象,用变量
__thread EventLoop* t_loopInThisThread = 0;
表示,在创建EventLoop对象时将t_loopInThisThread赋值,以后再创建时就可以检查这个变量,如果已经赋值就说明当前线程已经创建过EventLoop对象了。线程调用静态函数EventLoop::getEventLoopOfCurrentThread就可以获得当前线程的EventLoop对象的指针了。
EventLoop有许多变量,几个bool变量,looping_:是否正在执行loop循环;quit_:是否已经调用quit()函数退出loop循环;eventHandling是否正在处理event事件;callingPendingFunctors是否正在调用pendingFunctors_的函数对象。
其他变量,poller_是用来调用pool或epool的,activeChannels_记录这激活事件的集合,currentActiveChannel_是当前正在处理的channel事件,pendingFunctors_是当前线程要执行任务的集合。可以在loop()函数中看到这一点:
void EventLoop::loop()//EventLoop在这里循环
{
assert(!looping_);
assertInLoopThread();
looping_ = true;
quit_ = false; // FIXME: what if someone calls quit() before loop() ?
LOG_TRACE << "EventLoop " << this << " start looping";
while (!quit_)
{
activeChannels_.clear();//清空激活事件集合
pollReturnTime_ = poller_->poll(kPollTimeMs,&activeChannels_);//pool_wait或epoll_wait。阻塞在这里
++iteration_;
if (Logger::logLevel() <= Logger::TRACE)
{
printActiveChannels();
}
// TODO sort channel by priority
eventHandling_ = true;
for (ChannelList::iterator it = activeChannels_.begin();
it != activeChannels_.end(); ++it)
{
currentActiveChannel_ = *it;
currentActiveChannel_->handleEvent(pollReturnTime_);//事件处理I/O
}
currentActiveChannel_ = NULL;
eventHandling_ = false;
doPendingFunctors();
}
LOG_TRACE << "EventLoop " << this << " stop looping";
looping_ = false;
}
线程在poller_->poll等待监听事件的到来,当poller_->poll返回后,监听到的事件放到了activeChannel中,随后一一处理激活事件。
最后面调用doPendingFunctors()是执行pendingFunctors_中的任务。
void EventLoop::doPendingFunctors()//执行任务队列中的任务
{
std::vector<Functor> functors;
callingPendingFunctors_ = true;
{
MutexLockGuard lock(mutex_);//尽量让临界区小
functors.swap(pendingFunctors_);
}
for (size_t i = 0; i < functors.size(); ++i)
{
functors[i]();
}
callingPendingFunctors_ = false;
}
EventLoop的owner线程除了等待poll、执行poll返回的激活事件,还可以处理一些其他任务,例如调用某一个回调函数,处理其他EventLoop对象的,调用void EventLoop::runInLoop(const Functor& cb)即可让EventLoop的owner线程执行cb函数。
void EventLoop::runInLoop(const Functor& cb)//可以夸线程调用
{
if (isInLoopThread())//当前线程是ower线程则立即执行,否则放到ower线程的任务队列,异步执行
{
cb();
}
else
{
queueInLoop(cb);
}
}
void EventLoop::queueInLoop(const Functor& cb)
{
{
MutexLockGuard lock(mutex_);
pendingFunctors_.push_back(cb);
}
//不是EventLoop的owner线程,或者是当前线程,但是正在执行任务队列中的任务
if (!isInLoopThread() || callingPendingFunctors_)
{
wakeup();//唤醒owner线程
}
}
如果时EventLoop的owner线程,会调用runInLoop会立即执行回调函数cb,否则会把回调函数放到任务队列(其实时vector),即调用queueInLoop函数。如果不是当前线程调用,或者正在执行pendingFunctors_中的任务,都要唤醒EventLoop的owner线程,让其执行pendingFunctors_中的任务。如果正在执行pendingFunctors_中的任务,添加新任务后不会执行新的任务,因为functors.swap(pendingFunctors_)后,执行的时functors中的任务。
这里的唤醒wakeup()用了eventfd。这是2.6内核新增的一个技术。
#include <sys/eventfd.h>
int eventfd(unsigned int initval,int flags);
创建一个eventfd的fd后,就可以对它进行read、write等操作。eventfd相当于一个计数器,read以后计数器清零,write递增计数器;fd可以进行如下操作:select(poll、epoll)、close操作。
EventLoop中的wakeupFd_就是eventfd,wakeupChannel_和wakeupFd_相关联,EventLoop关注了wakeupChannel_的读事件,当要唤醒(即poller_->poll)时,写wakeupFd_即可。
void EventLoop::wakeup()//wakeupFd写,唤醒读wakeupFd的线程
{
uint64_t one = 1;
ssize_t n = sockets::write(wakeupFd_,&one,sizeof one);
if (n != sizeof one)
{
LOG_ERROR << "EventLoop::wakeup() writes " << n << " bytes instead of 8";
}
}
EventLoop中还有定时器,可以在某一时刻(runAt),未来多久(runAfter),每隔多久(runEvery)执行某一函数。定时用到了TimerQueue,稍后分析它。
TimerId EventLoop::runAt(const Timestamp& time,TimerCallback&& cb)
{
return timerQueue_->addTimer(std::move(cb),time,0.0);
}
TimerId EventLoop::runAfter(double delay,TimerCallback&& cb)
{
Timestamp time(addTime(Timestamp::now(),delay));
return runAt(time,std::move(cb));
}
TimerId EventLoop::runEvery(double interval,interval));
return timerQueue_->addTimer(std::move(cb),interval);
}