这几天一直在看muduo的Eventloop(事件循环)这一块的源代码,感觉里面有好多东西例如:智能指针的使用,将eventfd,timerfd等linux新性能运用进去,C++一些容器的合理使用,还有如何能在多线程情况下减少锁的使用等都是我们应该学习的东西。
关于muduo实现的reactor模式,有三个关键的类
.事件分发器类Channel
.封装了I/O复用的Poller
.定时器接口类TimerQueue
接下来就给大家先一一介绍这几个类,然后才给大家介绍reactor的基本构成
1.事件分发类Channel
事件分发器Channel的数据成员如下
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
static const int kNoneEvent;
int kReadEvent;
int kWriteEvent;
EventLoop *loop_;
int fd_;
int events_;
int revents_;
int index_;
bool logHup_;
boost::weak_ptr<
void> tie_;
bool tied_;
bool eventHandling_;
bool addedToLoop_;
ReadEventCallback readCallback_;
EventCallback writeCallback_;
EventCallback closeCallback_;
ReadEventCallback errorCallback_;
其中EventCallback和ReadEventCallback的声明如下
1
2
typedef
boost::function<void()>
EventCallback;
typedef
:function<void(
Timestamp)>
ReadEventCallback;
//处理事件
void handleEvent(Timestamp receiveTime);
//设置可读事件回调
void setReadCallback(const ReadEventCallback &cb)
{
readCallback_ = cb;
}
//设置可写事件回调
void setWriteCallback(const EventCallback &cb)
{
writeCallback_ = cb;
}
//设置关闭事件回调
void setCloseCallback(const EventCallback &cb)
{
closeCallback_ = cb;
}
//设置错误事件回调
void setErrorCallback(const EventCallback &cb)
{
errorCallback_ = cb;
}
void tie(const boost::shared_ptr<void>&);
//返回注册的事件
int events()const
{
return events_;
}
//设置注册事件
void set_revents(int revt)
{
revents_ = revt;
}
//判断是否注册的事件
bool isNoneEvent()const
{
return events_ == kNoneEvent;
}
//设置可读事件
void enableReading()
{
events_ |= kReadEvent;
update();
}
//销毁读事件
void disableReading()
{
events_ &= ~kReadEvent;
update();
}
//注册写事件
void enableWriting()
{
events_ |= kWriteEvent;
update();
}
//销毁写事件
void disableWriting()
{
events_ &= ~kWriteEvent;
update();
}
//销毁所有事件
void disableAll()
{
events_ = kNoneEvent;
update();
}
//是否注册可写事件
isWriting()const
{
return events_ & kWriteEvent;
}
1
这里写
代码片
Channel的主要功能为管理各种注册给poller的套接字描述符及其上发生的事件,以及事件发生了所调的回调函数
Channel的主要作用如下
1.首先我们给定Channel所属的loop以及其要处理的fd
2.接着我们开始注册fd上需要监听的事件,如果是常用事件(读写等)的话,我们可以直接调用接口enable***来注册对应fd上的事件,与之对应的是disable*用来销毁特定的事件
3.在然后我们通过set***Callback来事件发生时的回调
2.I/O复用类Poller
Poller类是个基类,它的定义如下
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
class Poller : boost::noncopyable
{
public:
typedef std::
vector<Channel *> ChannelList;
Poller(EventLoop *loop);
virtual ~Poller();
virtual Timestamp poll(
int timeoutMs,ChannelList *activeChannels) =
0;
virtual void updateChannel(Channel *channel) =
//移除Channel
void removeChannel(Channel *channel) =
//这个channel是否在map中存在
bool hasChannel(Channel *channel)
const;
static Poller *newDefaultPoller(EventLoop *loop);
void assertInLoopThread()
const
{
ownerLoop_->assertInLoopThread();
}
protected:
map<
int,Channel*> ChannelMap;
ChannelMap Channels_;
private:
EventLoop *owerLoop_;
};
需要注意的是我们的事件分发器channel集用关联容器map来保存,map的关键字为channel所管理的fd,这样我们在更新已有的channel时时间复杂多会将为O(1)
接下来我们谈谈epoll对Poller的实现
EPollPoller类的定义为
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
class EPollPoller :
public Poller
{
public:
EPollPoller(EventLoop *loop);
//内部调用epoll_wait函数
void updateChannel(Channel *channel);
void removeChannel(Channel *Channel);
private:
int kInitEventListSize =
16;
char *operatoionToString(
int op);
void fillActiveChannels(
int numEvents,ChannelList *activeChannels)
const;
void update(
int operation,Channel *channel);
vector<
struct epoll_event> EventList;
int epollfd_;
EventList events_;
};
Poller类的主要功能如下
.调用poll函数监听注册了事件的文件描述符
.当poll返回时将发生事件的事件集装入channel中
.可以控制channel中事件的增删改
3.定时器TimerQueue
EventLoop直接调用的是定时器队列类TimerQueue类,该类的数据定义如下
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
std::pair<Timestamp,Timer *> Entry;
typedef sta::
set<Entry> TimerList;
std::pair<Timer *,int64_t> ActiveTimer;
set<ActiveTimeri> ActiveTimerSet;
EventLoop *loop_;
int timerfd_;
Channel timerfdChannel_;
TimerList timers_;
ActiveTimerSet activeTimers_;
bool callingExpiredTimers_;
ActiveTimerSet cancelingTimers_;
由上图我们发现我们用来存储定时器集的容器使用了set,set针对定时器有它天然的优势,首先set的特性是所存储的元素为默认升序的,这样当我们某个事件点取到期的定时器,就直接取该时间点之前的所有定时器就好,其次我们往set中添加定时器的效率也相对较好为0(logn)。但是用set有个问题,我们如何存储俩个定时时间一样的定时器呢?muduo的解决方案就是使用一个pair类型,pair为pair
1
2
3
4
5
6
7
增加一个定时器
TimerId addTimer(
const TimerCallback &cb,Timestamp when,
double interval);
删除一个定时器
void cancel(TimerId timerId);
上述接口函数,都不是线程安全的,但是muduo中并没有用加锁来解决问题,而是通过EventLoop中runInloop函数来使,上述函数在主I/O线程中执行,这样做的好处是,我们可以在上述函数实现中不使用锁,程序在运行过程中就会减少由于锁而引起的上下文切换,由于主I/O线程也不一定是一直忙,所以这种做法可能会在效率上有所提升
4.EventLoop类的实现
EventLoop类实现了reactor的基本模式
它的数据定义如下
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
vector<Channel *> ChannelList;
bool looping_;
bool quit_;
bool callingPendingFunctors_;
int64_t iteration_;
const pid_t threadId_;
Timestamp pollReturnTime_;
boost::scoped_ptr<Poller> poller_;
boost::scoped_ptr<TimerQueue> TimerQueue_;
int wakeupFd_;
boost::scoped_ptr<Channel> wakeupChannel_;
boost::any context_;
ChannelList activeChannel_;
Channel *currentActiveChannel_;
MutexLock mutex_;
vector<Functor> pendingFunctors_;
EventLoop通过boost库下的智能指针scoped_ptr来管理Poller_,TimerQueue_,wakeupChannel_对象,这样不容易发生内存显露,其中变量pendingFunctors_为需要在I/O线程中执行的任务集,例如上面所讲的定时器的增删接口的执行,就会先放在此集合里,然后有主I/O线程来执行,那么主线程在调用loop函数之后会阻塞在poller函数中,此时我们应该如何唤醒I/O线程呢?muduo中采用了linux的新特性eventfd来唤醒I/O线程,他的具体用法在我的前几篇博客中有介绍
Eventloop的主要功能如下
.首先我们因该调用updateChannel来添加一些事件
.接着我们就可以调用loop函数来执行事件循环了,在执行事件循环的过程中,我们会阻塞在poller_poll调用处,当有事件发生时,Poller类就会把活跃的事件放在activeChannel集合中
.之后我们调用Channel中的handleEvent来处理事件发生时对应的回调函数,处理完事件函数后还会处理必须有I/O线程来完成的doPendingFuncors函数
当然我们可以在中间的过程中注册一些普通事件或通过run*类函数来注册定时事件,我们也可以调用updateChannel和removeChannel来增删该Channel
EventLoop的源码如下
EventLoop.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
#ifndef MUDUO_NET_EVENTLOOP_H
#define MUDUO_NET_EVENTLOOP_H
#include <vector>
#include <boost/any.hpp>
#include <boost/function.hpp>
#include <boost/noncopyable.hpp>
#include <boost/scoped_ptr.hpp>
#include <muduo/base/Mutex.h>
#include <muduo/base/CurrentThread.h>
#include <muduo/base/Timestamp.h>
#include <muduo/net/Callbacks.h>
#include <muduo/net/TimerId.h>
namespace muduo
{
namespace net
{
class Channel;
class Poller;
Class TimerQueue;
class EventLoop : boost::noncopyable
{
typedef boost::function<
void()> Functor;
EventLoop();
~EventLoop();
void loop();
void quit();
Timestamp pollReturnTime()
const
{
return pollReturnTime_;
}
int64_t iteration()
return iteration_;
}
void runInLoop(
const Functor &cb);
void queueInLoop(
//某个时间点执行定时回调
TimerId runAt(
const Timestamp &time,136);
Box-sizing: border-
Box;">const TimerCallback &cb);
TimerId runAfter(
double delay,136);
Box-sizing: border-
Box;">const TimerCallback & cb);
TimerId runEvery(
double interval,0);
Box-sizing: border-
Box;">//
删除某个定时器
void cancel(TimerId timerId);
void wakeup();
void updateChannel(Channel *channel);
void removeChannel(Channel *channel);
bool hasChannel(Channel *channel);
void assertInLoopThread()
{
if(!isInLoopThread())
{
abortNotInLoopThread();
}
}
bool isInLoopThread()
return threadId_ == CurrentThread::tid();
}
bool eventHandling()
const {
return eventHandling_;}
void setContext(
const boost::any &contex)
{
return context_;
}
const boost::any &getContext()
return context_;
}
boost::any *getMutableContext()
{
return &context_;
}
static EventLoop *getEventLoopOfCurrentThread();
private:
void abortNotInLoopThread();
void handleRead();
void doPendingFunctors();
void printActiveChannels()
const;
//需要在主I/O线程执行的任务
};
}
}
#endif MUDUO_NET_ENENTLOOP_H
EventLoop.cc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
#include <muduo/net/EventLoop.h>
#include <muduo/base/Logging.h>
#include <muduo/net/Channel.h>
#include <muduo/net/Poller.h>
#include <muduo/net/SocketsOps.h>
#include <muduo/net/TimerQueue.h>
#include <boost/bind.hpp>
#include <signal.h>
#include <sys/eventfd.h>
using namespace muduo;
namespace muduo::net;
namespace
{
__thread EventLoop *t_loopInThisThread =
0;
int kPollTimeMs =
10000;
int createEventfd()
{
int evtfd = ::eventfd(
0,EFD_NONBLOCK | EFD_CFD_CLOEXEC);
if(evtfd <
0)
{
LOG_SYSERR <<
"Failed in eventfd";
abort();
}
return evtfd;
}
}
EventLoop *EventLoop::getEventLoopOfCurrentThread()
{
return t_loopInThisThread;
}
EventLoop::EventLoop()
:looping_(
false),quit_(
0),threadId_(CurrentThread::tid()),poller_(Poller::newDefaultPoller(
this)),timerQueue_(
new TimerQueue(
new Channel(
this,wakeupFd_)),currentActiveChannel_(NULL)
{
LOG_DEBUG <<
"EventLoop created " <<
this <<
"in thread" << threadId_;
if(t_loopInThisThread)
{
LOG_FATAL <<
"Another EventLoop " << t_loopInThisThread <<
"exists in this thread" << threadId_;
}
else
{
t_loopInThisThread =
this;
}
wakeupChannel_->setReadCallback(boost::bind(&EventLoop::handleRead,136);
Box-sizing: border-
Box;">this));
wakeupChannel_->enableReading();
}
EventLoop::~EventLoop()
{
LOG_DEBUG <<
"EventLoop" <<
this<<
"of thread" << threadId_
<<
"destructs in thread" << CurrentThread::tid();
wakeupChannel_->disableAll();
wakeupChannel_->remove();
::close(wakeupFd_);
t_loopInThisThread = NULL;
}
void EventLoop::loop()
{
assert(!looping_);
assertInLoopThread();
looping_ =
true;
quit_ =
false;
LOG_TRACE <<
"EventLoop" <<
"start looping";
while(!quit_)
{
activeChannels_.clear();
pollReturnTime_ = poller_->poll(kPollTimeMs,&activeChannels_);
if(Logger::LogLevel() <= Logger::TRACE)
{
printActiveChannels();
}
eventHandling_ =
true;
for(ChannelList::iterator it = activeChannels_.begin();
it != activeChannels_.end(); ++it)
{
currentActiveChannel_ = *it;
currentActiveChannel_->handleEvent(pollReturnTime_);
}
currentActiveChannel_ = NULL;
eventHandling_ =
false;
doPendingFunctors();
}
LOG_TRACE <<
"stop looping";
looping_ =
false;
}
void EventLoop::quit()
{
quit_ =
true;
if(!isInLoopThread())
{
wakeup();
}
}
void EventLoop::runInLoop(
const Functor &cb)
{
if(isInLoopThread())
{
cb();
}
else
{
queueInLoop(cb);
}
}
void EventLoop::queueInLoop(
const Functor &cb)
{
{
MutexLockGuard lock(mutex_);
pendingFunctors_.push_back(cb);
}
if(!isInLoopThread() || callingPendingFunctors_)
{
wakeup();
}
}
TimerId EventLoop::runAt(
const TimerCallback &cb)
{
return timerQueue_->addTimer(cb,time,
0.0);
}
TimerId EventLoop::runAfter(
const TimerCallback &cb)
{
Timestamp time(addTime(Timestamp::now(),delay));
return runAt(time,cb);
}
TimerId EventLoop::runEvery(
const TimerCallback &cb)
{
Timestamp time(addTime(Timestamp::now()),interval);
void EventLoop::cancel(TimerId timerId)
{
return timerQueue_->cancel(timerId);
}
void EventLoop::updateChannel(Channel *channel)
{
assert(channel->ownerLoop() ==
this);
assertInLoopThread();
poller_->updateChannel(channel);
}
void EventLoop::removeChannel(Channel *channel)
{
assert(channel->ownerLoop() ==
this);
assertInLoopThread();
if(eventHandling_)
{
assert(currentActiveChannel_ == channel ||
std::find(activeChannels_.begin(),activeChannels_.end(),channel) == activeChannels_.end());
}
poller_->removeChannel(channel);
}
bool EventLoop::hasChannel(Channel *channel)
{
assert(channel->ownerLoop() ==
return poller_->hasChannel(channel);
}
void EventLoop::abortNotInLoopThread()
{
LOG_FATAL <<
"EventLoop::abortNotInLoopThread - EventLoop" <<
this
<<
"was created in threadId_ = " << threadId_
<<
",current thread id = " << CurrentThread::tid();
}
void EventLoop::wakeup()
{
uint64_t one =
1;
ssize_t n = sockets::write(wakeupFd_,&one,136);
Box-sizing: border-
Box;">sizeof one);
if(n !=
sizeof one)
{
LOG_ERROR <<
"EventLoop::wakeup() writes" << n <<
"bytes instead of 8";
}
}
void EventLoop::handleRead()
{
uint64_t one =
1;
ssize_t n = sockets::read(wakeupFd_,0);
Box-sizing: border-
Box;">"EventLoop::handleRead() reads" << n <<
void EventLoop::doPendingFunctors()
{
vector<Functor> Functors;
callingPendingFunctors_ =
true;
{
MutexLockGuard lock(mutex_);
Functors.swap(pendingFunctors_);
}
for(ssize_t i =
0;i < Functors.size();++i)
{
Functors[i]();
}
callingPendingFunctors_ =
false;
}