本文主要是介绍muduo网络库学习之EventLoop(三):Socket、Acceptor、TcpServer、TcpConnection(连接建立,接收消息),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
1、Socket 操作封装
Endian.h
封装了字节序转换函数(全局函数,位于muduo::net::sockets名称空间中)。
SocketsOps.h/ SocketsOps.cc
封 装了socket相关系统调用(全局函数,位于muduo::net::sockets名称空间中)。
Socket.h/Socket.cc(Socket类)
用RAII方法封装socket file descriptor
InetAddress.h/InetAddress.cc(InetAddress类)
网际地址sockaddr_in封装
2、Acceptor
Acceptor的数据成员包括acceptSocket_、acceptChannel_,Acceptor的acceptSocket_是listening socket(即server socket)。
acceptChannel_用于观察acceptSocket_的readable事件,可读事件发生,Channel::handleEvent()中回调Acceptor::handleRead(),
后者调用accept(2)来接受新连接,并回调用户callback,注意callback 中传入的第一个参数是accept返回的connfd。
C++ Code
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 | void Acceptor::handleRead() { loop_->assertInLoopThread(); InetAddress peerAddr( 0); //FIXME loop until no more int connfd = acceptSocket_.accept(&peerAddr); if (connfd >= 0) { // string hostport = peerAddr.toIpPort(); // LOG_TRACE << "Accepts of " << hostport; if (newConnectionCallback_) { newConnectionCallback_(connfd, peerAddr); } else { sockets::close(connfd); } } else { // Read the section named "The special problem of // accept()ing when you can't" in libev's doc. // By Marc Lehmann, author of libev. if (errno == EMFILE) { ::close( idleFd_); idleFd_ = ::accept(acceptSocket_.fd(), NULL, NULL); ::close(idleFd_); idleFd_ = ::open( "/dev/null", O_RDONLY | O_CLOEXEC); } } } |
在构造函数中:
C++ Code
1 2 | acceptChannel_.setReadCallback( boost::bind(&Acceptor::handleRead, this)); |
设置用户回调函数:
C++ Code
1 2 3 4 5 6 7 8 | // 传入connfd typedef boost::function < void ( int sockfd, const InetAddress &) > NewConnectionCallback; void setNewConnectionCallback( const NewConnectionCallback &cb) { newConnectionCallback_ = cb; } |
开始监听:
C++ Code
1 2 3 4 5 6 7 | void Acceptor::listen() { loop_->assertInLoopThread(); listenning_ = true; acceptSocket_.listen(); acceptChannel_.enableReading(); } |
测试代码:
simba@ubuntu:~$ telnet 127.0.0.1 8888
C++ Code
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 | #include <muduo/net/Acceptor.h> #include <muduo/net/EventLoop.h> #include <muduo/net/InetAddress.h> #include <muduo/net/SocketsOps.h> #include <stdio.h> using namespace muduo; using namespace muduo::net; void newConnection( int sockfd, const InetAddress &peerAddr) { printf( "newConnection(): accepted a new connection from %s\n", peerAddr.toIpPort().c_str()); ::write(sockfd, "How are you?\n", 13); sockets::close(sockfd); } int main() { printf( "main(): pid = %d\n", getpid()); InetAddress listenAddr( 8888); EventLoop loop; Acceptor acceptor(&loop, listenAddr); acceptor.setNewConnectionCallback(newConnection); acceptor.listen(); loop.loop(); } |
使用telnet 连接服务器,服务器输出如下:
simba@ubuntu:~/Documents/build/debug/bin$ ./reactor_test07
20131108 07:22:30.560145Z 3960 TRACE IgnoreSigPipe Ignore SIGPIPE - EventLoop.cc:51
main(): pid = 3960
20131108 07:22:30.675116Z 3960 TRACE updateChannel fd = 4 events = 3 - EPollPoller.cc:104
20131108 07:22:30.675684Z 3960 TRACE EventLoop EventLoop created 0xBFED7324 in thread 3960 - EventLoop.cc:76
20131108 07:22:30.676073Z 3960 TRACE updateChannel fd = 5 events = 3 - EPollPoller.cc:104
20131108 07:22:30.676577Z 3960 TRACE updateChannel fd = 6 events = 3 - EPollPoller.cc:104
20131108 07:22:30.676988Z 3960 TRACE loop EventLoop 0xBFED7324 start looping - EventLoop.cc:108
20131108 07:22:40.687957Z 3960 TRACE poll nothing happended - EPollPoller.cc:74
20131108 07:22:41.606525Z 3960 TRACE poll 1 events happended - EPollPoller.cc:65
20131108 07:22:41.607053Z 3960 TRACE printActiveChannels {6: IN} - EventLoop.cc:271
newConnection(): accepted a new connection from 127.0.0.1:56409
20131108 07:22:51.617500Z 3960 TRACE poll nothing happended - EPollPoller.cc:74
20131108 07:22:30.560145Z 3960 TRACE IgnoreSigPipe Ignore SIGPIPE - EventLoop.cc:51
main(): pid = 3960
20131108 07:22:30.675116Z 3960 TRACE updateChannel fd = 4 events = 3 - EPollPoller.cc:104
20131108 07:22:30.675684Z 3960 TRACE EventLoop EventLoop created 0xBFED7324 in thread 3960 - EventLoop.cc:76
20131108 07:22:30.676073Z 3960 TRACE updateChannel fd = 5 events = 3 - EPollPoller.cc:104
20131108 07:22:30.676577Z 3960 TRACE updateChannel fd = 6 events = 3 - EPollPoller.cc:104
20131108 07:22:30.676988Z 3960 TRACE loop EventLoop 0xBFED7324 start looping - EventLoop.cc:108
20131108 07:22:40.687957Z 3960 TRACE poll nothing happended - EPollPoller.cc:74
20131108 07:22:41.606525Z 3960 TRACE poll 1 events happended - EPollPoller.cc:65
20131108 07:22:41.607053Z 3960 TRACE printActiveChannels {6: IN} - EventLoop.cc:271
newConnection(): accepted a new connection from 127.0.0.1:56409
20131108 07:22:51.617500Z 3960 TRACE poll nothing happended - EPollPoller.cc:74
telnet 端输出如下:
simba@ubuntu:~$ telnet 127.0.0.1 8888
Trying 127.0.0.1...
Connected to 127.0.0.1.
Escape character is '^]'.
How are you?
Connection closed by foreign host.
simba@ubuntu:~$
Trying 127.0.0.1...
Connected to 127.0.0.1.
Escape character is '^]'.
How are you?
Connection closed by foreign host.
simba@ubuntu:~$
从输出可以看出,acceptSocket_.sockfd_ = 6,客户端连接上来,监听套接字发生可读事件,调用accept() 接收连接后调用用户回调函数newConnection()。
3、TcpServer/TcpConnection
Acceptor类的主要功能是socket、bind、listen
一般来说,在上层应用程序中,我们不直接使用Acceptor,而是把它作为TcpServer的成员
C++ Code
1 | boost::scoped_ptr<Acceptor> acceptor_; // avoid revealing Acceptor |
TcpServer还包含了一个TcpConnection列表
C++ Code
1 2 3 | typedef boost::shared_ptr<TcpConnection> TcpConnectionPtr; typedef std::map<string, TcpConnectionPtr> ConnectionMap; ConnectionMap connections_; // 连接列表 |
此外,还有一个IO线程池对象和一个acceptor Eventloop*, 通过setThreadNum()设置IO线程池的线程个数(不包括main Reactor)
关于EventLoopThread, EventLoopThreadPool 类参见 这里。
C++ Code
1 | boost::scoped_ptr<EventLoopThreadPool> threadPool_; EventLoop* loop_; // the acceptor loop |
C++ Code
1 2 3 4 5 | void TcpServer::setThreadNum( int numThreads) { assert( 0 <= numThreads); threadPool_->setThreadNum(numThreads); } |
TcpConnection与Acceptor类似,有两个重要的数据成员,Socket(connfd)与Channel
C++ Code
1 2 | boost::scoped_ptr<Socket> socket_; boost::scoped_ptr<Channel> channel_; |
时序图分析:
在TcpServer 构造函数中先初始化acceptor_成员,acceptor_(new Acceptor(loop, listenAddr)), 在构造函数体内:
C++ Code
1 2 3 4 | // Acceptor::handleRead函数中会回调用TcpServer::newConnection // _1对应的是socket文件描述符,_2对应的是对等方的地址(InetAddress) acceptor_->setNewConnectionCallback( boost::bind(&TcpServer::newConnection, this, _1, _2)); |
调用TcpServer::start(),开始 Acceptor::listen(), 已连接队列不为空,TcpServer:: acceptor_.acceptChannel_ 可读, poll返回,调用
Channel::handleEvent() 处理活动通道,调用Acceptor::handleRead(),函数中调用accept(2)来接受新连接,并回调TcpServer::newConnection(), 函数中先创建一个TcpConnectionPtr 对象,在TcpConnection 构造函数体中:
C++ Code
1 2 3 | // 通道可读事件到来的时候,回调TcpConnection::handleRead,_1是事件发生时间 channel_->setReadCallback( boost::bind(&TcpConnection::handleRead, this, _1)); |
添加进TcpServer::connections_, 设置连接回调函数和消息到来回调函数,如下:
C++ Code
1 2 3 4 5 6 7 8 9 10 11 12 | //传入connfd void TcpServer::newConnection( int sockfd, const InetAddress &peerAddr) { ...... TcpConnectionPtr conn( new TcpConnection(ioLoop, connName, sockfd, localAddr, peerAddr)); connections_[connName] = conn; // conn 是TcpConnectionPtr 对象 conn->setConnectionCallback(connectionCallback_); conn->setMessageCallback(messageCallback_); conn->connectEstablished(); } |
最后调用TcpConnection::connectEstablished()
C++ Code
1 2 3 4 5 6 | void TcpConnection::connectEstablished() { channel_->enableReading(); // TcpConnection所对应的通道加入到Poller关注 connectionCallback_(shared_from_this()); } |
现在已经建立了一个新连接,对等方发送数据到connfd,内核接收缓冲区不为空,TcpConnection::channel_ 可读事件发生, poll返回,调用 Channel::handleEvent() 处理活动通道,调用TcpConnection:: handleRead()
C++ Code
1 2 3 4 5 | void TcpConnection::handleRead(Timestamp receiveTime) { ssize_t n = ::read(channel_->fd(), buf, sizeof buf); messageCallback_(shared_from_this(), buf, n); } |
shared_from_this() 会用当前对象的裸指针构造一个临时智能指针对象,引用计数加1,但马上会被析构,又减1,故无论调用多少次,对引用计数都没有影响。
测试程序:
simba@ubuntu:~$ telnet 127.0.0.1 8888
C++ Code
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 | #include <muduo/net/TcpServer.h> #include <muduo/net/EventLoop.h> #include <muduo/net/InetAddress.h> #include <stdio.h> using namespace muduo; using namespace muduo::net; void onConnection( const TcpConnectionPtr &conn) { if (conn->connected()) { printf( "onConnection(): new connection [%s] from %s\n", conn->name().c_str(), conn->peerAddress().toIpPort().c_str()); } else { printf( "onConnection(): connection [%s] is down\n", conn->name().c_str()); } } void onMessage( const TcpConnectionPtr &conn, const char *data, ssize_t len) { printf( "onMessage(): received %zd bytes from connection [%s]\n", len, conn->name().c_str()); } int main() { printf( "main(): pid = %d\n", getpid()); InetAddress listenAddr( 8888); EventLoop loop; TcpServer server(&loop, listenAddr, "TestServer"); server.setConnectionCallback(onConnection); server.setMessageCallback(onMessage); server.start(); loop.loop(); } |
simba@ubuntu:~/Documents/build/debug/bin$ ./reactor_test08
main(): pid = 7557
20131108 09:37:51.098888Z 7557 TRACE updateChannel fd = 4 events = 3 - EPollPoller.cc:104
20131108 09:37:51.099825Z 7557 TRACE EventLoop EventLoop created 0xBFAD3D08 in thread 7557 - EventLoop.cc:62
20131108 09:37:51.100692Z 7557 TRACE updateChannel fd = 5 events = 3 - EPollPoller.cc:104
20131108 09:37:51.101548Z 7557 TRACE updateChannel fd = 6 events = 3 - EPollPoller.cc:104
20131108 09:37:51.102063Z 7557 TRACE loop EventLoop 0xBFAD3D08 start looping - EventLoop.cc:94
20131108 09:38:01.116672Z 7557 TRACE poll nothing happended - EPollPoller.cc:74
20131108 09:38:10.616161Z 7557 TRACE poll 1 events happended - EPollPoller.cc:65
20131108 09:38:10.616774Z 7557 TRACE printActiveChannels { 6: IN} - EventLoop.cc:257
20131108 09:38:10.616894Z 7557 INFO TcpServer::newConnection [TestServer] - new connection [TestServer:0.0.0.0:8888#1] from 127.0.0.1:56410 - TcpServer.cc:93
20131108 09:38:10.617007Z 7557 DEBUG TcpConnection TcpConnection::ctor[TestServer:0.0.0.0:8888#1] at 0x827D7F8 fd=8 - TcpConnection.cc:62
20131108 09:38:10.617103Z 7557 TRACE newConnection [1] usecount=1 - TcpServer.cc:111
20131108 09:38:10.617152Z 7557 TRACE newConnection [2] usecount=2 - TcpServer.cc:113
20131108 09:38:10.617166Z 7557 TRACE connectEstablished [3] usecount=6 - TcpConnection.cc:78
20131108 09:38:10.617174Z 7557 TRACE updateChannel fd = 8 events = 3 - EPollPoller.cc:104
onConnection(): new connection [TestServer:0.0.0.0:8888#1] from 127.0.0.1:56410
20131108 09:38:10.617266Z 7557 TRACE connectEstablished [4] usecount=6 - TcpConnection.cc:83
20131108 09:38:10.617275Z 7557 TRACE newConnection [5] usecount=2 - TcpServer.cc:122
20131108 09:38:20.627567Z 7557 TRACE poll nothing happended - EPollPoller.cc:74
20131108 09:38:30.638037Z 7557 TRACE poll nothing happended - EPollPoller.cc:74
20131108 09:38:40.648523Z 7557 TRACE poll nothing happended - EPollPoller.cc:74
20131108 09:38:46.891543Z 7557 TRACE poll 1 events happended - EPollPoller.cc:65
20131108 09:38:46.891599Z 7557 TRACE printActiveChannels {8: IN } - EventLoop.cc:257
20131108 09:38:46.891611Z 7557 TRACE handleEvent [6] usecount=2 - Channel.cc:67
onMessage(): received 6 bytes from connection [TestServer:0.0.0.0:8888#1]
20131108 09:38:46.891744Z 7557 TRACE handleEvent [12] usecount=2 - Channel.cc:69
20131108 09:38:56.901306Z 7557 TRACE poll nothing happended - EPollPoller.cc:74
main(): pid = 7557
20131108 09:37:51.098888Z 7557 TRACE updateChannel fd = 4 events = 3 - EPollPoller.cc:104
20131108 09:37:51.099825Z 7557 TRACE EventLoop EventLoop created 0xBFAD3D08 in thread 7557 - EventLoop.cc:62
20131108 09:37:51.100692Z 7557 TRACE updateChannel fd = 5 events = 3 - EPollPoller.cc:104
20131108 09:37:51.101548Z 7557 TRACE updateChannel fd = 6 events = 3 - EPollPoller.cc:104
20131108 09:37:51.102063Z 7557 TRACE loop EventLoop 0xBFAD3D08 start looping - EventLoop.cc:94
20131108 09:38:01.116672Z 7557 TRACE poll nothing happended - EPollPoller.cc:74
20131108 09:38:10.616161Z 7557 TRACE poll 1 events happended - EPollPoller.cc:65
20131108 09:38:10.616774Z 7557 TRACE printActiveChannels { 6: IN} - EventLoop.cc:257
20131108 09:38:10.616894Z 7557 INFO TcpServer::newConnection [TestServer] - new connection [TestServer:0.0.0.0:8888#1] from 127.0.0.1:56410 - TcpServer.cc:93
20131108 09:38:10.617007Z 7557 DEBUG TcpConnection TcpConnection::ctor[TestServer:0.0.0.0:8888#1] at 0x827D7F8 fd=8 - TcpConnection.cc:62
20131108 09:38:10.617103Z 7557 TRACE newConnection [1] usecount=1 - TcpServer.cc:111
20131108 09:38:10.617152Z 7557 TRACE newConnection [2] usecount=2 - TcpServer.cc:113
20131108 09:38:10.617166Z 7557 TRACE connectEstablished [3] usecount=6 - TcpConnection.cc:78
20131108 09:38:10.617174Z 7557 TRACE updateChannel fd = 8 events = 3 - EPollPoller.cc:104
onConnection(): new connection [TestServer:0.0.0.0:8888#1] from 127.0.0.1:56410
20131108 09:38:10.617266Z 7557 TRACE connectEstablished [4] usecount=6 - TcpConnection.cc:83
20131108 09:38:10.617275Z 7557 TRACE newConnection [5] usecount=2 - TcpServer.cc:122
20131108 09:38:20.627567Z 7557 TRACE poll nothing happended - EPollPoller.cc:74
20131108 09:38:30.638037Z 7557 TRACE poll nothing happended - EPollPoller.cc:74
20131108 09:38:40.648523Z 7557 TRACE poll nothing happended - EPollPoller.cc:74
20131108 09:38:46.891543Z 7557 TRACE poll 1 events happended - EPollPoller.cc:65
20131108 09:38:46.891599Z 7557 TRACE printActiveChannels {8: IN } - EventLoop.cc:257
20131108 09:38:46.891611Z 7557 TRACE handleEvent [6] usecount=2 - Channel.cc:67
onMessage(): received 6 bytes from connection [TestServer:0.0.0.0:8888#1]
20131108 09:38:46.891744Z 7557 TRACE handleEvent [12] usecount=2 - Channel.cc:69
20131108 09:38:56.901306Z 7557 TRACE poll nothing happended - EPollPoller.cc:74
可以看到,fd = 6 是监听套接字,fd = 8是返回来的已连接套接字,那么fd = 7去哪了呢?其实是被acceptor的 idleFd_ 占据了。
连接建立的时候回调onConnection(),我们在telnet 上输入aaaa,服务器端消息到来,fd=8可读事件发生,回调onMessage(),加上\r\n 所以收到6个字节数据。
参考:
《UNP》
muduo manual.pdf
《linux 多线程服务器编程:使用muduo c++网络库》
这篇关于muduo网络库学习之EventLoop(三):Socket、Acceptor、TcpServer、TcpConnection(连接建立,接收消息)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!