《linux多线程服务端编程 使用muduo c++网络库》的第8章是从0开始讲述一个网络库的实现,比较适合初学者入门。
在本书的第8章中是实现了以下几个类:eventloop类,poller类,channel类
首先分析channel类的定义:
#ifndef MUDUO_NET_CHANNEL_H #define MUDUO_NET_CHANNEL_H #include <boost/function.hpp> #include <boost/noncopyable.hpp> namespace muduo { class EventLoop; /// /// A selectable I/O channel. /// /// This class doesn't own the file descriptor. /// The file descriptor could be a socket,/// an eventfd,a timerfd,or a signalfd class Channel : boost::noncopyable { public: typedef boost::function<void()> EventCallback; Channel(EventLoop* loop,int fd); void handleEvent(); void setReadCallback(const EventCallback& cb) { readCallback_ = cb; } void setWriteCallback(const EventCallback& cb) { writeCallback_ = cb; } void setErrorCallback(const EventCallback& cb) { errorCallback_ = cb; } int fd() const { return fd_; } int events() const { return events_; } void set_revents(int revt) { revents_ = revt; } bool isNoneEvent() const { return events_ == kNoneEvent; } void enableReading() { events_ |= kReadEvent; update(); } // void enableWriting() { events_ |= kWriteEvent; update(); } // void disableWriting() { events_ &= ~kWriteEvent; update(); } // void disableAll() { events_ = kNoneEvent; update(); } // for Poller int index() { return index_; } void set_index(int idx) { index_ = idx; } EventLoop* ownerLoop() { return loop_; } private: void update(); static const int kNoneEvent; static const int kReadEvent; static const int kWriteEvent; EventLoop* loop_; const int fd_; int events_; int revents_; int index_; // used by Poller. EventCallback readCallback_; EventCallback writeCallback_; EventCallback errorCallback_; }; } #endif // MUDUO_NET_CHANNEL_Hfd_是channel类的私有变量,一个channel对象是跟文件句柄fd直接挂钩的。channel类的构造函数是需要loop 和 fd两个参数,因此每一个关注的fd都会创建一个专门的channel对象来对其负责。
接下来看看poller类的定义:
class Poller : boost::noncopyable { public: typedef std::vector<Channel*> ChannelList; Poller(EventLoop* loop); ~Poller(); /// Polls the I/O events. /// Must be called in the loop thread. Timestamp poll(int timeoutMs,ChannelList* activeChannels); /// Changes the interested I/O events. /// Must be called in the loop thread. void updateChannel(Channel* channel); void assertInLoopThread() { ownerLoop_->assertInLoopThread(); } private: void fillActiveChannels(int numEvents,ChannelList* activeChannels) const; typedef std::vector<struct pollfd> PollFdList; typedef std::map<int,Channel*> ChannelMap; EventLoop* ownerLoop_; PollFdList pollfds_; ChannelMap channels_; }; } #endif // MUDUO_NET_POLLER_H
poller类的实现
#include "Poller.h" #include "Channel.h" #include "logging/Logging.h" #include <assert.h> #include <poll.h> using namespace muduo; Poller::Poller(EventLoop* loop) : ownerLoop_(loop) { } Poller::~Poller() { } Timestamp Poller::poll(int timeoutMs,ChannelList* activeChannels) { // XXX pollfds_ shouldn't change int numEvents = ::poll(&*pollfds_.begin(),pollfds_.size(),timeoutMs); Timestamp now(Timestamp::now()); if (numEvents > 0) { LOG_TRACE << numEvents << " events happended"; fillActiveChannels(numEvents,activeChannels); } else if (numEvents == 0) { LOG_TRACE << " nothing happended"; } else { LOG_SYSERR << "Poller::poll()"; } return now; } void Poller::fillActiveChannels(int numEvents,ChannelList* activeChannels) const { for (PollFdList::const_iterator pfd = pollfds_.begin(); pfd != pollfds_.end() && numEvents > 0; ++pfd) { if (pfd->revents > 0) { --numEvents; ChannelMap::const_iterator ch = channels_.find(pfd->fd); assert(ch != channels_.end()); Channel* channel = ch->second; assert(channel->fd() == pfd->fd); channel->set_revents(pfd->revents); // pfd->revents = 0; activeChannels->push_back(channel); } } } void Poller::updateChannel(Channel* channel) { assertInLoopThread(); LOG_TRACE << "fd = " << channel->fd() << " events = " << channel->events(); if (channel->index() < 0) { // a new one,add to pollfds_ assert(channels_.find(channel->fd()) == channels_.end()); struct pollfd pfd; pfd.fd = channel->fd(); pfd.events = static_cast<short>(channel->events()); pfd.revents = 0; pollfds_.push_back(pfd); int idx = static_cast<int>(pollfds_.size())-1; channel->set_index(idx); channels_[pfd.fd] = channel; } else { // update existing one assert(channels_.find(channel->fd()) != channels_.end()); assert(channels_[channel->fd()] == channel); int idx = channel->index(); assert(0 <= idx && idx < static_cast<int>(pollfds_.size())); struct pollfd& pfd = pollfds_[idx]; assert(pfd.fd == channel->fd() || pfd.fd == -1); pfd.events = static_cast<short>(channel->events()); pfd.revents = 0; if (channel->isNoneEvent()) { // ignore this pollfd pfd.fd = -1; } } }
poller对象是有pollfdlist,channels这样的私有成员。在poller::updatechannel中,会把channel对象中的fd抽取出来,组成pollfd结构体,然后添加到pollfds_数组中。并且会对pollfdlist,channels等私有成员进行相应的修改,所以poller::update是将channel和poll联系起来的关键函数。
而在poller::poll中则会调用poll系统调用来完成关心事件的IO多路复用,并且将当前有活动的事件添加到activechannels中。
最后在eventloop::loop()中会调用相应事件的处理方法:
void EventLoop::loop() { assert(!looping_); assertInLoopThread(); looping_ = true; quit_ = false; while (!quit_) { activeChannels_.clear(); poller_->poll(kPollTimeMs,&activeChannels_); for (ChannelList::iterator it = activeChannels_.begin(); it != activeChannels_.end(); ++it) { (*it)->handleEvent(); } } LOG_TRACE << "EventLoop " << this << " stop looping"; looping_ = false; }
通过以上几个简单的类,即可实现最简单的reactor模式。
再来看简单的测试程序:
#include "Channel.h" #include "EventLoop.h" #include <stdio.h> #include <sys/timerfd.h> muduo::EventLoop* g_loop; void timeout() { printf("Timeout!\n"); g_loop->quit(); } int main() { muduo::EventLoop loop; g_loop = &loop; int timerfd = ::timerfd_create(CLOCK_MONOTONIC,TFD_NONBLOCK | TFD_CLOEXEC); muduo::Channel channel(&loop,timerfd); channel.setReadCallback(timeout); channel.enableReading(); struct itimerspec howlong; bzero(&howlong,sizeof howlong); howlong.it_value.tv_sec = 5; ::timerfd_settime(timerfd,&howlong,NULL); loop.loop(); ::close(timerfd); }上述程序中是将timerfd交由eventloop管理,并且编写其handler函数timeout()。eventloop会完成事件的检测和分发。