本文主要是介绍muduo_net库源码分析(二)(EventLoop类、Channel类、Poller类),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
1.类图
白色的是外部类,表示对外可见,灰色的是内部类,表示对外不可见,实心的菱形表示类之间是组合关系,空心的菱形表示类之间是聚合关系( 处于聚合关系的两个类生命周期不同步,简单来说就是在类里面以指针形式定义其它类的对象;处于组合关系的两个类的生命周期同步)。
EventLoop类中就体现了与Poller类的组合关系,与Channel类的聚合关系,EventLoop类的几个主要的成员如下:
boost::scoped_ptr<Poller> poller_;//智能指针对象,与Poller对象关联(体现了Poller与EventLoop的组合关系)
typedef std::vector<Channel*> ChannelList;
ChannelList activeChannels_; // Poller返回活动的通道(一个EventLoop包含多个Channel对象指针,体现了Channel与EventLoop的聚合关系)
Channel* currentActiveChannel_; // 当前正在处理的活动通道
EventLoop类是对事件循环的抽象,一个EventLoop对象就是一个reactor,一个EventLoop对象中包含多个Channel对象指针(EventLoop类中std::vector<Channel*> activeChannels_;
);
Poller类是对IO复用的抽象,它有两个派生类PollPoller和EPollPoller,分别是对poll和epoll的封装(这里Poller是抽象类,是muduo库中唯一使用面向对象思想编程的地方,其他地方都是基于对象的思想);
Channel类是对IO事件注册与响应的封装,它把文件描述符和所关注的事件绑定在一起,即一个Channel对象(通道)就是一个事件,Channel::update负责注册或更新IO的可读、可写等事件【调用顺序为Channel::update–>EventLoop::updateChannel–>Poller::updateChannel(多态,以PollPoller::updateChannel为例),即注册或更新IO可读、可写等事件的核心操作是在Poller::updateChannel函数中实现的,Channel::update只是一个接口】,Channel::handleEvent负责对所发生的IO事件进行处理,当Channel对象销毁的时候不关闭文件描述符,文件描述符的生存期由Socket类来控制;
Acceptor类是对被动连接的抽象,它关注的是监听套接字的可读事件(可读事件发生时通过Channel::handleEvent回调Acceptor::handleRead);Connector类是对主动连接的抽象。
2.Channel
1.Channel是 selectable IO channel,负责注册与响应IO事件,它把文件描述符和所关注的事件绑定在一起,即一个Channel对象(通道)就是一个事件(非常关键!!!)。
2.Channel是 Acceptor、Connector、EventLoop、TimerQueue、TcpConnection的成员,生命期由后者控制(组合关系)。
3.Channel与EventLoop是聚合关系。
::poll
#include <poll.h>
int poll(struct pollfd *fds, nfds_t nfds, int timeout);struct pollfd {int fd; /* 文件描述符 */short events; /* 监控的事件 */short revents; /* 监控事件中满足条件返回的事件 */};
3.时序图(很重要!)
EventLoop::loop中调用Poller::poll(多态,以PollPoller::poll为例)–>在PollPoller::poll中调用PollPoller::fillActiveChannels返回活动的通道activeChannels_(EventLoop类的成员std::vector<Channel*>activeChannels_;
)–>遍历返回的活动通道,调用Channel::handleEvent进行事件处理–>在Channel::handleEvent中判断是可读、可写等事件后再调用Channel类中相应事件的回调函数。
Channel类中有几个主要的public函数:
void enableReading() { events_ |= kReadEvent; update(); }//注册可读事件
void enableWriting() { events_ |= kWriteEvent; update(); }//注册可写事件
void disableWriting() { events_ &= ~kWriteEvent; update(); }
void disableAll() { events_ = kNoneEvent; update(); }//更新为“不关注事件”
void remove();//移除事件,调用这个函数之前必须先调用disableAll
Channel::enableReading–>Channel::update–>通过Channel的成员loop_(EventLoop* loop_; // 所属的EventLoop对象(一个Channel对象只能属于一个EventLoop对象)
)调用EventLoop::updateChannel–>Poller::updateChannel(多态,以PollPoller::updateChannel为例),也就是说,注册或更新IO的可读、可写等事件的核心操作最终都是在Poller::updateChannel函数中实现的。
至此,代码的整体架构应该大致清楚了!
4.源码
理解全在注释里。
EventLoop.h
#ifndef MUDUO_NET_EVENTLOOP_H
#define MUDUO_NET_EVENTLOOP_H#include <vector>#include <boost/noncopyable.hpp>
#include <boost/scoped_ptr.hpp>#include <muduo/base/CurrentThread.h>
#include <muduo/base/Thread.h>
#include <muduo/base/Timestamp.h>namespace muduo
{
namespace net
{class Channel;
class Poller;
///
/// Reactor, at most one per thread.
///
/// This is an interface class, so don't expose too much details.
//EventLoop类其实就是Reactor模式的封装
class EventLoop : boost::noncopyable
{public:EventLoop();~EventLoop(); // force out-line dtor, for scoped_ptr members.////// Loops forever.////// Must be called in the same thread as creation of the object.///void loop();//该函数不能跨线程调用,只能在创建该对象的线程中调用!void quit();//该函数可以跨线程调用!////// Time when poll returns, usually means data arrivial.///Timestamp pollReturnTime() const { return pollReturnTime_; }// internal usagevoid updateChannel(Channel* channel); // 在Poller中添加或者更新通道void removeChannel(Channel* channel); // 从Poller中移除通道void assertInLoopThread(){if (!isInLoopThread()){abortNotInLoopThread();}}bool isInLoopThread() const { return threadId_ == CurrentThread::tid(); }static EventLoop* getEventLoopOfCurrentThread();private:void abortNotInLoopThread();void printActiveChannels() const; // DEBUGtypedef std::vector<Channel*> ChannelList;//多个线程对bool类型的变量赋值本身就是原子性操作(atomic),不需要额外处理bool looping_; //当前是否在执行loop函数bool quit_;bool eventHandling_;//当前是否处于事件处理状态const pid_t threadId_; // 当前对象所属线程IDTimestamp pollReturnTime_;boost::scoped_ptr<Poller> poller_;//智能指针对象,与Poller对象关联(体现了Poller与EventLoop的组合关系)ChannelList activeChannels_; // Poller返回活动的通道(一个EventLoop包含多个Channel对象指针,体现了Channel与EventLoop的聚合关系)Channel* currentActiveChannel_; // 当前正在处理的活动通道
};}
}
#endif // MUDUO_NET_EVENTLOOP_H
EventLoop.cc
#include <muduo/net/EventLoop.h>#include <muduo/base/Logging.h>
#include <muduo/net/Channel.h>
#include <muduo/net/Poller.h>//#include <poll.h>using namespace muduo;
using namespace muduo::net;namespace
{
// 当前线程EventLoop对象指针
// 线程局部存储,每个线程都有,相互独立
__thread EventLoop* t_loopInThisThread = 0;const int kPollTimeMs = 10000;
}EventLoop* EventLoop::getEventLoopOfCurrentThread()
{return t_loopInThisThread;
}EventLoop::EventLoop(): looping_(false),quit_(false),eventHandling_(false),threadId_(CurrentThread::tid()),poller_(Poller::newDefaultPoller(this)),//重点currentActiveChannel_(NULL)
{LOG_TRACE << "EventLoop created " << this << " in thread " << threadId_;// 如果当前线程已经创建了EventLoop对象,终止(LOG_FATAL)if (t_loopInThisThread){LOG_FATAL << "Another EventLoop " << t_loopInThisThread<< " exists in this thread " << threadId_;}else{t_loopInThisThread = this;}
}EventLoop::~EventLoop()
{t_loopInThisThread = NULL;
}// 事件循环,该函数不能跨线程调用
// 只能在创建该对象的线程中调用
void EventLoop::loop()
{assert(!looping_);// 断言当前处于创建该对象的线程中assertInLoopThread();looping_ = true;LOG_TRACE << "EventLoop " << this << " start looping";//::poll(NULL, 0, 5*1000);while (!quit_){activeChannels_.clear();pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_);//返回活动的通道//++iteration_;if (Logger::logLevel() <= Logger::TRACE){printActiveChannels();//日志记录活动的通道}// TODO sort channel by priorityeventHandling_ = true;for (ChannelList::iterator it = activeChannels_.begin();it != activeChannels_.end(); ++it)//遍历活动的通道进行处理{currentActiveChannel_ = *it;currentActiveChannel_->handleEvent(pollReturnTime_);}currentActiveChannel_ = NULL;eventHandling_ = false;//doPendingFunctors();}LOG_TRACE << "EventLoop " << this << " stop looping";looping_ = false;
}//该函数可以跨线程调用
void EventLoop::quit()
{quit_ = true;//如果不是在IO线程(即reactor)中调用quit,那么需要先唤醒该IO线程,因为IO线程可能处于阻塞状态if (!isInLoopThread()){//wakeup();//后续会给出实现}
}void EventLoop::updateChannel(Channel* channel)
{assert(channel->ownerLoop() == this);assertInLoopThread();poller_->updateChannel(channel);
}void EventLoop::removeChannel(Channel* channel)
{assert(channel->ownerLoop() == this);assertInLoopThread();if (eventHandling_){assert(currentActiveChannel_ == channel ||std::find(activeChannels_.begin(), activeChannels_.end(), channel) == activeChannels_.end());}poller_->removeChannel(channel);
}void EventLoop::abortNotInLoopThread()
{LOG_FATAL << "EventLoop::abortNotInLoopThread - EventLoop " << this<< " was created in threadId_ = " << threadId_<< ", current thread id = " << CurrentThread::tid();
}void EventLoop::printActiveChannels() const
{for (ChannelList::const_iterator it = activeChannels_.begin();it != activeChannels_.end(); ++it){const Channel* ch = *it;LOG_TRACE << "{" << ch->reventsToString() << "} ";}
}
Poller.h
#ifndef MUDUO_NET_POLLER_H
#define MUDUO_NET_POLLER_H#include <vector>
#include <boost/noncopyable.hpp>#include <muduo/base/Timestamp.h>
#include <muduo/net/EventLoop.h>namespace muduo
{
namespace net
{class Channel;///
/// Base class for IO Multiplexing
///
/// This class doesn't own the Channel objects.
class Poller : boost::noncopyable//Poller是一个抽象类
{public:typedef std::vector<Channel*> ChannelList;Poller(EventLoop* loop);virtual ~Poller();/// Polls the I/O events./// Must be called in the loop thread.virtual Timestamp poll(int timeoutMs, ChannelList* activeChannels) = 0;/// Changes the interested I/O events./// Must be called in the loop thread.virtual void updateChannel(Channel* channel) = 0;/// Remove the channel, when it destructs./// Must be called in the loop thread.virtual void removeChannel(Channel* channel) = 0;static Poller* newDefaultPoller(EventLoop* loop);void assertInLoopThread(){ownerLoop_->assertInLoopThread();}private:EventLoop* ownerLoop_; // 记录Poller所属的EventLoop
};}
}
#endif // MUDUO_NET_POLLER_H
Poller.cc
#include <muduo/net/Poller.h>using namespace muduo;
using namespace muduo::net;Poller::Poller(EventLoop* loop): ownerLoop_(loop)
{
}Poller::~Poller()
{
}
PollPoller.h
#ifndef MUDUO_NET_POLLER_POLLPOLLER_H
#define MUDUO_NET_POLLER_POLLPOLLER_H#include <muduo/net/Poller.h>#include <map>
#include <vector>struct pollfd;namespace muduo
{
namespace net
{///
/// IO Multiplexing with poll(2).
///
class PollPoller : public Poller//继承于抽象类Poller
{public:PollPoller(EventLoop* loop);virtual ~PollPoller();virtual Timestamp poll(int timeoutMs, ChannelList* activeChannels);virtual void updateChannel(Channel* channel);virtual void removeChannel(Channel* channel);private:void fillActiveChannels(int numEvents,ChannelList* activeChannels) const;typedef std::vector<struct pollfd> PollFdList;typedef std::map<int, Channel*> ChannelMap; // key是文件描述符,value是Channel*PollFdList pollfds_;//::poll监控的事件数组pollfds_,::poll函数的第一个参数是struct pollfd数组的首地址ChannelMap channels_;
};}
}
#endif // MUDUO_NET_POLLER_POLLPOLLER_H
PollPoller.cc
#include <muduo/net/poller/PollPoller.h>#include <muduo/base/Logging.h>
#include <muduo/base/Types.h>
#include <muduo/net/Channel.h>#include <assert.h>
#include <poll.h>using namespace muduo;
using namespace muduo::net;PollPoller::PollPoller(EventLoop* loop): Poller(loop)
{
}PollPoller::~PollPoller()
{
}Timestamp PollPoller::poll(int timeoutMs, ChannelList* activeChannels)//timeoutMs是::poll的超时时间
{// XXX pollfds_ shouldn't changeint numEvents = ::poll(&*pollfds_.begin(), pollfds_.size(), timeoutMs);/*man page中的定义int poll(struct pollfd *fds, nfds_t nfds, int timeout);struct pollfd {int fd; // 文件描述符short events; // 监控的事件short revents; // 监控事件中满足条件返回的事件};*/Timestamp now(Timestamp::now());if (numEvents > 0){LOG_TRACE << numEvents << " events happended";fillActiveChannels(numEvents, activeChannels);//将::poll返回的numEvents个事件放入活动通道中}else if (numEvents == 0)//说明::poll超时了还没有事件发生{LOG_TRACE << " nothing happended";}else{LOG_SYSERR << "PollPoller::poll()";}return now;
}void PollPoller::fillActiveChannels(int numEvents,ChannelList* activeChannels) const
{for (PollFdList::const_iterator pfd = pollfds_.begin();pfd != pollfds_.end() && numEvents > 0; ++pfd)//遍历::poll监控的事件数组pollfds_,从中寻找::poll返回的numEvents个事件{if (pfd->revents > 0)//说明是返回的事件{--numEvents;ChannelMap::const_iterator ch = channels_.find(pfd->fd);//std::map<int, Channel*> channels_;// key是文件描述符,value是Channel*assert(ch != channels_.end());Channel* channel = ch->second;assert(channel->fd() == pfd->fd);channel->set_revents(pfd->revents);// pfd->revents = 0;activeChannels->push_back(channel);//把对应的channel对象指针放入活动通道}}
}void PollPoller::updateChannel(Channel* channel)//用于注册新通道(Channel对象)或者更新已有通道(Channel对象)
{Poller::assertInLoopThread();//确保在Poller所属EventLoop的线程中调用LOG_TRACE << "fd = " << channel->fd() << " events = " << channel->events();if (channel->index() < 0){// index为-1说明是一个新的通道(新的Channel对象),需要进行注册// a new one, add to pollfds_assert(channels_.find(channel->fd()) == channels_.end());struct pollfd pfd;//新建一个struct pollfd类型的变量//对结构体变量的属性(文件描述符及要监控的事件)进行设置/*struct pollfd {int fd; //文件描述符short events; //监控的事件short revents; //监控事件中满足条件返回的事件};*/pfd.fd = channel->fd();pfd.events = static_cast<short>(channel->events());pfd.revents = 0;//将该变量添加到事件::poll监控的事件数组pollfds_中//std::vector<struct pollfd> pollfds_;pollfds_.push_back(pfd);int idx = static_cast<int>(pollfds_.size())-1;channel->set_index(idx);//一个Channel对象就是一个事件,此时就可以知道当前事件在::poll监控的事件数组pollfds_中的序号index_了channels_[pfd.fd] = channel;//将新组成的key-value加入channels_//std::map<int, Channel*>channels_;}else{//说明是一个已有的通道(已有的Channel对象),需要进行更新// update existing oneassert(channels_.find(channel->fd()) != channels_.end());assert(channels_[channel->fd()] == channel);int idx = channel->index();assert(0 <= idx && idx < static_cast<int>(pollfds_.size()));struct pollfd& pfd = pollfds_[idx];//注意是引用assert(pfd.fd == channel->fd() || pfd.fd == -channel->fd()-1);pfd.events = static_cast<short>(channel->events());//更新struct pollfd变量中的监控事件pfd.revents = 0;// 将一个通道暂时更改为不关注事件,但不从PollPoller中移除该通道(removeChannel中也会用到)if (channel->isNoneEvent()){// temporarily ignore this pollfd// 暂时忽略该文件描述符的事件// 这里pfd.fd可以直接设置为-1pfd.fd = -channel->fd()-1; // 这样子设置是为了removeChannel优化}}
}void PollPoller::removeChannel(Channel* channel)//用于移除已有的通道(Channel对象)
//调用顺序如下:
//1.把将要移除的Channel对象的events_成员设置成Channel::kNoneEvent//const int Channel::kNoneEvent = 0;
//2.对该Channel对象调用updateChannel
//3.对该Channel对象调用removeChannel(2和3顺序绝对不能反)
{Poller::assertInLoopThread();LOG_TRACE << "fd = " << channel->fd();assert(channels_.find(channel->fd()) != channels_.end());assert(channels_[channel->fd()] == channel);assert(channel->isNoneEvent());int idx = channel->index();assert(0 <= idx && idx < static_cast<int>(pollfds_.size()));const struct pollfd& pfd = pollfds_[idx]; (void)pfd;assert(pfd.fd == -channel->fd()-1 && pfd.events == channel->events());//因为将要移除的通道已经调用过updateChannel了size_t n = channels_.erase(channel->fd());//根据key从channels_中移除key-valueassert(n == 1); (void)n;//把当前事件从::poll监控的事件数组pollfds_中移除if (implicit_cast<size_t>(idx) == pollfds_.size()-1){pollfds_.pop_back();}else{// 这里移除的算法复杂度是O(1),将待删除元素与最后一个元素交换再pop_backint channelAtEnd = pollfds_.back().fd;iter_swap(pollfds_.begin()+idx, pollfds_.end()-1);//交换迭代器if (channelAtEnd < 0)//取出的struct pollfd变量中的fd小于0说明最后一个元素的事件恰好是“暂时不关注事件”,需要进行还原{channelAtEnd = -channelAtEnd-1;}channels_[channelAtEnd]->set_index(idx);pollfds_.pop_back();}
}
Channel.h
// This is an internal header file, you should not include this.#ifndef MUDUO_NET_CHANNEL_H
#define MUDUO_NET_CHANNEL_H#include <boost/function.hpp>
#include <boost/noncopyable.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/weak_ptr.hpp>#include <muduo/base/Timestamp.h>namespace muduo
{
namespace net
{class EventLoop;///
/// A selectable I/O channel.
///
/// This class doesn't own the file descriptor.
/// The file descriptor could be a socket,
/// an eventfd, a timerfd, or a signalfd
class Channel : boost::noncopyable
{public:typedef boost::function<void()> EventCallback;typedef boost::function<void(Timestamp)> ReadEventCallback;Channel(EventLoop* loop, int fd);~Channel();void handleEvent(Timestamp receiveTime);void setReadCallback(const ReadEventCallback& cb){ readCallback_ = cb; }void setWriteCallback(const EventCallback& cb){ writeCallback_ = cb; }void setCloseCallback(const EventCallback& cb){ closeCallback_ = cb; }void setErrorCallback(const EventCallback& cb){ errorCallback_ = cb; }/// Tie this channel to the owner object managed by shared_ptr,/// prevent the owner object being destroyed in handleEvent.void tie(const boost::shared_ptr<void>&);int fd() const { return fd_; }int events() const { return events_; }void set_revents(int revt) { revents_ = revt; } // used by pollers// int revents() const { return revents_; }bool isNoneEvent() const { return events_ == kNoneEvent; }void enableReading() { events_ |= kReadEvent; update(); }//注册可读事件// void disableReading() { events_ &= ~kReadEvent; update(); }void enableWriting() { events_ |= kWriteEvent; update(); }//注册可写事件void disableWriting() { events_ &= ~kWriteEvent; update(); }void disableAll() { events_ = kNoneEvent; update(); }//更新为“不关注事件”bool isWriting() const { return events_ & kWriteEvent; }// for Pollerint index() { return index_; }void set_index(int idx) { index_ = idx; }// for debugstring reventsToString() const;void doNotLogHup() { logHup_ = false; }EventLoop* ownerLoop() { return loop_; }void remove();//移除事件,调用这个函数之前必须先调用disableAllprivate:void update();void handleEventWithGuard(Timestamp receiveTime);static const int kNoneEvent;//0static const int kReadEvent;//POLLIN | POLLPRIstatic const int kWriteEvent;//POLLOUTEventLoop* loop_; // 所属的EventLoop对象(一个Channel对象只能属于一个EventLoop对象)const int fd_; // 文件描述符,但不负责关闭该文件描述符int events_; // 关注的事件int revents_; // poll/epoll实际返回的事件int index_; // used by Poller类。对于PollPoller类,index_表示Channel*在::poll的事件数组中的序号;对于EPollPoller类,index_表示事件的状态bool logHup_; // for POLLHUPboost::weak_ptr<void> tie_;bool tied_;bool eventHandling_; // 是否处于处理事件中ReadEventCallback readCallback_;EventCallback writeCallback_;EventCallback closeCallback_;EventCallback errorCallback_;
};}
}
#endif // MUDUO_NET_CHANNEL_H
Channel.cc
#include <muduo/base/Logging.h>
#include <muduo/net/Channel.h>
#include <muduo/net/EventLoop.h>#include <sstream>#include <poll.h>using namespace muduo;
using namespace muduo::net;const int Channel::kNoneEvent = 0;
const int Channel::kReadEvent = POLLIN | POLLPRI;
const int Channel::kWriteEvent = POLLOUT;Channel::Channel(EventLoop* loop, int fd__): loop_(loop),fd_(fd__),events_(0),revents_(0),index_(-1),//初始化列表中将index_设置为-1//一个Channel对象就代表一个事件//对于PollPoller类://因为此时还没有将当前事件放入PollPoller类的成员pollfds_中//std::vector<struct pollfd> pollfds_;//也没有将当前Channel对象指针和fd_放入PollPoller类的成员channels_中//std::map<int, Channel*> channels_; //所以在::poll监控的事件数组pollfds_中的序号index_此时并不存在,故置为-1//对于EPollPoller类:和PollPoller类不同,index_代表事件的状态logHup_(true),tied_(false),eventHandling_(false)
{
}Channel::~Channel()
{assert(!eventHandling_);
}void Channel::tie(const boost::shared_ptr<void>& obj)
{tie_ = obj;tied_ = true;
}void Channel::update()
{loop_->updateChannel(this);
}// 调用这个函数之前确保调用disableAll
void Channel::remove()
{assert(isNoneEvent());loop_->removeChannel(this);
}void Channel::handleEvent(Timestamp receiveTime)//事件处理
{boost::shared_ptr<void> guard;if (tied_){guard = tie_.lock();if (guard){handleEventWithGuard(receiveTime);}}else{handleEventWithGuard(receiveTime);}
}void Channel::handleEventWithGuard(Timestamp receiveTime)
{eventHandling_ = true;if ((revents_ & POLLHUP) && !(revents_ & POLLIN)){if (logHup_){LOG_WARN << "Channel::handle_event() POLLHUP";}if (closeCallback_) closeCallback_();}if (revents_ & POLLNVAL){LOG_WARN << "Channel::handle_event() POLLNVAL";}if (revents_ & (POLLERR | POLLNVAL)){if (errorCallback_) errorCallback_();}if (revents_ & (POLLIN | POLLPRI | POLLRDHUP)){if (readCallback_) readCallback_(receiveTime);}if (revents_ & POLLOUT){if (writeCallback_) writeCallback_();}eventHandling_ = false;
}string Channel::reventsToString() const
{std::ostringstream oss;oss << fd_ << ": ";if (revents_ & POLLIN)oss << "IN ";if (revents_ & POLLPRI)oss << "PRI ";if (revents_ & POLLOUT)oss << "OUT ";if (revents_ & POLLHUP)oss << "HUP ";if (revents_ & POLLRDHUP)oss << "RDHUP ";if (revents_ & POLLERR)oss << "ERR ";if (revents_ & POLLNVAL)oss << "NVAL ";return oss.str().c_str();
}
这篇关于muduo_net库源码分析(二)(EventLoop类、Channel类、Poller类)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!