[muduo网络库]——muduo库TcpConnection类,万字总结(剖析muduo网络库核心部分、设计思想)

本文主要是介绍[muduo网络库]——muduo库TcpConnection类,万字总结(剖析muduo网络库核心部分、设计思想),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

接着之前我们[muduo网络库]——muduo库Buffer类(剖析muduo网络库核心部分、设计思想),我们接下来继续看muduo库中的TcpConnection类。

TcpConnection类

TcpConnection类是muduo最核心的类,这个类主要封装了一个已建立的TCP连接,以及控制该TCP连接的方法(连接建立和关闭和销毁),以及该连接发生的各种事件(读/写/错误/连接)对应的处理函数,以及这个TCP连接的服务端和客户端的套接字地址信息等。

重要成员变量

EventLoop *loop_; //绝对不是baseloop,因为TcpConnetion都是在subloop中管理的
const std::string name_;
std::atomic_int state_;
bool reading_;std::unique_ptr<Socket> socket_;
std::unique_ptr<Channel> channel_;
const InetAddress localAddr_;
const InetAddress peerAddr_;ConnectionCallback connectionCallback_; //有新连接时的回调
MessageCallback messageCallback_; //有读写消息时的回调
WriteCompleteCallback writeCompleteCallback_; //消息发送完成以后的回调
CloseCallback closeCallback_;
HighWaterMarkCallback highWaterMarkCallback_;size_t highWaterMark_;Buffer inputBuffer_; //接受数据的缓冲区
Buffer outputBuffer_; //发送数据的缓冲区
  • loop_ 该Tcp连接的Channel注册到了哪一个sub EventLoop上。这个loop_就是那一个sub EventLoop。
  • name_客户端的名字
  • state_ 客户端的状态,对应的有一个枚举类型,分别对应着已经断开连接,正在连接,已经连接,正在断开连接。
enum StateE {kDisconnected, kConnecting, kConnected, kDisconnecting};
  • reading_ 连接是否正在监听读事件
  • socket_ 连接套接字, 用于对连接进行底层操作
  • channel_ 通道, 用于绑定要监听的事件
  • localAddr_本地IP地址
  • peerAddr_对端IP地址
  • connectionCallback_messageCallback_writeCompleteCallback_highWaterMarkCallback_closeCallback_对应的连接建立/关闭后的处理函数,收到消息后的处理函数,消息发送完后的处理函数,高水位回调,连接关闭后的处理函数。
  • highWaterMark_ 因为发送数据,应用写得快,内核发送数据慢,需要把待发送的数据写入缓冲区,且设置了水位回调,防止发送太快
  • inputBuffer_outputBuffer_输入输出缓冲区,在输出缓冲区是用于暂存那些暂时发送不出去的待发送数据。因为Tcp发送缓冲区是有大小限制的,假如达到了高水位线,就没办法把发送的数据通过send()直接拷贝到Tcp发送缓冲区,而是暂存在这个outputBuffer_中,等TCP发送缓冲区有空间了,触发可写事件了,再把outputBuffer_中的数据拷贝到Tcp发送缓冲区中。

重要成员函数

  • 构造函数给channel设置相应的回调函数
 //当通道有读事件时候在Channel::handleEvent()内调用:TcpConnection::handleRead()
channel_->setReadCallback(std::bind(&TcpConnection::handleRead,this,std::placeholders::_1));//当通道有写事件的时候在Channel::handleEven()内调用:TcpConnection::handleWrite()
channel_->setWriteCallback(std::bind(&TcpConnection::handleWrite,this));//当通道有关闭事件的时候在Channel::handleEvent()内调用:TcpConnection::handleClose()
channel_->setCloseCallback(std::bind(&TcpConnection::handleClose,this));//当通道有错误事件的时候在Channel::handleEvent()内调用:TcpConnection::handleError()
channel_->setErrorCallback(std::bind(&TcpConnection::handleError,this));LOG_INFO("TcpConnection::ctor[%s] at fd=%d\n",name_.c_str(),sockfd);
//开启Tcp/Ip层的心跳包检测
socket_->setKeepAlive(true);
  • 一系列的获取loop_name_,地址,状态的函数。
EventLoop* getLoop() const { return loop_;}
const std::string& name() const { return name_;}
const InetAddress& localAddress() const { return localAddr_;}
const InetAddress& peerAddress() const { return peerAddr_; }
bool connected() const { return state_ == kConnected;}
  • 发送数据
void TcpConnection::send(const std::string &buf) //直接引用buffer
{if(state_ == kConnected){if(loop_->isInLoopThread()){//string.c_str是Borland封装的String类中的一个函数,它返回当前字符串的首字符地址。sendInLoop(buf.c_str(),buf.size());}else{loop_->runInLoop(std::bind(&TcpConnection::sendInLoop, this, buf.c_str(), buf.size()));}}
}
void TcpConnection::sendInLoop(const void* data, size_t len)
{ssize_t nwrote = 0;size_t remaining = len; //未发送的数据bool faultError = false; //记录是否产生错误//之前调用过connection的shutdown 不能在发送了if(state_ == kDisconnected){LOG_ERROR("disconnected,give up writing!");return ;}//channel 第一次开始写数据,且缓冲区没有待发送数据if(!channel_->isWriting() && outputBuffer_.readableBytes() == 0){nwrote = ::write(channel_->fd(),data,len);if(nwrote >= 0){remaining = len - nwrote;if(remaining == 0 && writeCompleteCallback_){loop_->queueInLoop(std::bind(writeCompleteCallback_,shared_from_this()));}}else{nwrote = 0;if(errno != EWOULDBLOCK) //用于非阻塞模式,不需要重新读或者写{LOG_ERROR("TcpConnection::sendInLoop");if(errno == EPIPE || errno == ECONNRESET) //SIGPIPE RESET{faultError = true;}}}}if(!faultError && remaining > 0) {//目前发送缓冲区剩余的待发送数据的长度size_t oldlen = outputBuffer_.readableBytes();if(oldlen + remaining >= highWaterMark_ && oldlen < highWaterMark_&& highWaterMark_){loop_->queueInLoop(std::bind(highWaterMarkCallback_,shared_from_this(),oldlen + remaining));}outputBuffer_.append((char*)data + nwrote,remaining);if(!channel_->isWriting()){channel_->enableWriting(); //注册channel写事件,否则poller不会向channel通知epollout}}
}

1) 发送数据要发送的数据长度是len,如果在loop_在当前的线程里面,就调用sendInLoopsendInLoop内部实际上是调用了系统的write,如果一次性发送完了,就设置writeCompleteCallback_,表明不要再给channel设置epollout事件了
2)如果没有写完,先计算一下oldlen目前发送缓冲区剩余的待发送数据的长度。满足:

if(oldlen + remaining >= highWaterMark_ && oldlen < highWaterMark_&& highWaterMark_)

就会触发高水位回调
3)不满足以上的话,直接写入outputBuffer_
4) 剩余的数据保存到缓冲区当中,要给给channel注册epollout事件(切记,一定要注册channel的写事件,否则poller不会向channel通知epollout),这样poller发现tcp发送缓冲区有空间,会通知相应的socket-channel调用相应的writeCallback()回调方法,也就是调用TcpConnection::handleWrite,把发送缓冲区中数据全部发送出去。

  • 重中之重TcpConnection::handleRead负责处理Tcp连接的可读事件,它会将客户端发送来的数据拷贝到用户缓冲区中inputBuffer_,然后再调用connectionCallback_保存的连接建立后的处理函数。
void TcpConnection::handleRead(TimeStamp receiveTime)
{int savedErrno = 0;ssize_t n = inputBuffer_.readFd(channel_->fd(),&savedErrno);if(n > 0){      messageCallback_(shared_from_this(),&inputBuffer_,receiveTime);}else if(n==0) //客户端断开{handleClose();} else{errno = savedErrno;LOG_ERROR("TcpConnection::hanleRead");handleError();}}
  1. 关于readFd在Buffer类中我们已经剖析过了Buffer类,接着已建立连接的用户,有可读事件发生了,调用用户传入的回调操作onMessageshared_from_this()获取了当前TcpConnection对象的智能指针.
  2. n=0,说明客户端断开了,调用连接关闭后的处理函数。
  3. n<0 出错了,调用错误处理回调
  • handleWrite( )负责处理Tcp连接的可写事件
void TcpConnection::handleWrite()
{if(channel_->isWriting()){int savedErrno = 0;ssize_t n = outputBuffer_.writeFd(channel_->fd(),&savedErrno);if(n > 0){outputBuffer_.retrieve(n); //处理了n个if(outputBuffer_.readableBytes() == 0) //发送完成{channel_->disableWriting(); //不可写了if(writeCompleteCallback_){//唤醒loop对应的thread线程,执行回调loop_->queueInLoop(std::bind(writeCompleteCallback_,shared_from_this()));}if(state_ == kDisconnecting){shutdownInLoop();// 在当前loop中删除TcpConnection}}}else{LOG_ERROR("TcpConnection::handleWrite");}}else{LOG_ERROR("TcpConnection fd=%d is down, no more writing \n",channel_->fd());}
}
  1. 如果可写,通过fd发送数据,直到发送完成
  2. 设置不可写,如果writeCompleteCallback_,唤醒loop对应的thread线程,执行回调
  3. 当前TCP正在断开连接,调用shutdownInLoop,在当前loop中删除TcpConnection
  • 处理Tcp连接关闭的事件handleClose()
void TcpConnection::handleClose()
{LOG_INFO("TcpConnection::handleClose fd=%d state=%d \n",channel_->fd(),(int)state_);setState(kDisconnected);channel_->disableAll();TcpConnectionPtr connPtr(shared_from_this());connectionCallback_(connPtr); //执行连接关闭的回调closeCallback_(connPtr); //关闭连接的回调 TcpServer => TcpServer::removeConnection
}

处理逻辑就是将这个TcpConnection对象中的channel_从事件监听器中移除。然后调用connectionCallback_closeCallback_保存的回调函数。closeCallback_TcpServer::newConnection()为新连接新建TcpConnection时,已设为TcpServer::removeConnection(),而removeConnection()最终会调用TcpConnection::connectDestroyed()来销毁连接资源。

void TcpConnection::connectDestroyed()
{if(state_ == kConnected){setState(kDisconnected);channel_->disableAll(); //把channel所有感兴趣的事件,从poller中del掉connectionCallback_(shared_from_this());}channel_->remove();//把channel从poller中删除掉
}

只有处于已连接状态(kConnected)的tcp连接, 才需要先更新状态, 关闭通道事件监听。

  • 错误处理回调
void TcpConnection::handleError()
{int optval;socklen_t optlen = sizeof optval;int err = 0;if(::getsockopt(channel_->fd(),SOL_SOCKET,SO_ERROR,&optval,&optlen) < 0){err = errno;        }else{err = optval;}LOG_ERROR("TcpConnection::handleError name:%s - SO_ERROR:%d \n",name_.c_str(),err);
}

getsockopt()函数用于获取任意类型、任意状态套接口的选项当前值,并把结果存入optval,最后输出错误日志。

  • 关闭连接
 //关闭连接
void TcpConnection::shutdown()
{if(state_ == kConnected){setState(kDisconnecting);loop_->runInLoop(std::bind(&TcpConnection::shutdownInLoop,this));}
}void TcpConnection::shutdownInLoop()
{if(!channel_->isWriting()) //说明当前outputBuffer中的数据已经全部发送完成{socket_->shutdowmWrite(); // 关闭写端}
}

注意: 为什么是关闭了写端呢?在TcpConnection::shutdownInLoop()中,我们会发现它调用了Socket的shutdowmWrite,这里并没有使用close,陈硕大佬原话是这样的:Muduo TcpConnection 没有提供 close,而只提供 shutdown ,这么做是为了收发数据的完整性。因为TCP 是一个全双工协议,同一个文件描述符既可读又可写, shutdownWrite() 关闭了“写”方向的连接,保留了“读”方向,这称为 TCP half-close。如果直接 close(socket_fd),那么 socket_fd 就不能读或写了。用 shutdown 而不用 close 的效果是,如果对方已经发送了数据,这些数据还“在路上”,那么 muduo 不会漏收这些数据。换句话说,muduo 在 TCP 这一层面解决了“当你打算关闭网络连接的时候,如何得知对方有没有发了一些数据而你还没有收到?”这一问题。当然,这个问题也可以在上面的协议层解决,双方商量好不再互发数据,就可以直接断开连接。等于说 muduo 把“主动关闭连接”这件事情分成两步来做,如果要主动关闭连接,它会先关本地“写”端,等对方关闭之后,再关本地“读”端。
Muduo把“主动关闭连接”这件事分成两步来做,如果要主动关闭连接,它先关闭本地的“写”端,等对方关闭之后,再关闭本地“读”端。
另外如果当前outputbuffer里面还有数据尚未发出的话,Muduo也不会立刻调用shutwownWrite,而是等到数据发送完毕再shutdown,可以避免对方漏收数据。

关闭连接事件很重要,涉及到TcpConnection和Channel的生命周期以及是否能合理销毁,用了智能指针来管理和控制生命周期。下面我们就来分析一下断开流程中**TcpConnection的引用计数问题**:

1.首先连接到来创建TcpConnection,并存入容器。引用计数+1 总数:1
2.客户端断开连接,在Channel的handleEvent函数中会将Channel中的TcpConnection弱指针提升,引用计数+1 总数:2
3.触发HandleRead ,可读字节0,进而触发HandleClose,HandleClose函数中栈上的TcpConnectionPtr guardThis会继续将引用计数+1 总数:3
4.触发HandleClose的回调函数 在TcpServer::removeConnection结束后(回归主线程队列),释放HandleClose的栈指针,以及Channel里提升的指针引用计数-2 总数:1
5.主线程执行回调removeConnectionInLoop,在函数内部将tcpconnection从TcpServer中保存连接容器中erase掉。但在removeConnectionInLoop结尾用conn为参数构造了bind。引用计数不变 总数:1
6.回归次线程处理connectDestroyed事件,结束完释放参数传递的最后一个shard_ptr,释放TcpConnection。引用计数-1 总数:0

  • 建立连接
void TcpConnection::connectEstablished()
{setState(kConnected);channel_->tie(shared_from_this());channel_->enableReading(); //向poller注册channel的epollin事件//新连接建立 执行回调connectionCallback_(shared_from_this());
}
思考一下:什么时候调用WriteCompleCallback?什么时候调用HighWaterMarkCallback?
  • 如果发送缓存区被清空,就调用WriteCompleCallback。TcpConnection有两处可能触发此回调。
  1. TcpConnection::sendInLoop()。
  2. TcpConnection::handleWrite()。
  • 如果输出缓冲的长度超过用户指定大小,就会触发回调HighWaterMarkCallback(只在上升沿触发一次)。
    在非阻塞的发送数据情况下,假设Server发给Client数据流,为防止Server发过来的数据撑爆Client的输出缓冲区,一种做法是在Client的HighWaterMarkCallback中停止读取Server的数据,而在Client的WriteCompleteCallback中恢复读取Server的数据
需要注意的是: TcpConnection类是唯一默认用shared_ptr来管理的类,唯一继承自enable_shared_from_this的类。这是因为其生命周期模糊:可能在连接断开时,还有其他地方持有它的引用,贸然delete会造成空悬指针。只有确保其他地方没有持有该对象的引用的时候,才能安全地销毁对象。

代码地址:https://github.com/Cheeron955/mymuduo/tree/master

好了~ 有关于muduo库的TcpConnection类的细节就到此结束了,再次强调一点,当channel有相应的事件的时候,会调用TcpConnection对应的回调。到此为止,我们介绍了Channel,Poller,EPollPoller,EventLoop,Acceptor,Socket,Buffer,TcpConnection八大类,还有单独的CurrentThread,DefaultPoller,接下来我们会剖析一下剩下的,比如Logger,TimeStamp等比较简单的,然后最后在从连接建立,断开,发送消息等过程在进行一个总结剖析,希望大家多多支持,我们下一节见~~

这篇关于[muduo网络库]——muduo库TcpConnection类,万字总结(剖析muduo网络库核心部分、设计思想)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/985261

相关文章

Java实现任务管理器性能网络监控数据的方法详解

《Java实现任务管理器性能网络监控数据的方法详解》在现代操作系统中,任务管理器是一个非常重要的工具,用于监控和管理计算机的运行状态,包括CPU使用率、内存占用等,对于开发者和系统管理员来说,了解这些... 目录引言一、背景知识二、准备工作1. Maven依赖2. Gradle依赖三、代码实现四、代码详解五

Node.js 中 http 模块的深度剖析与实战应用小结

《Node.js中http模块的深度剖析与实战应用小结》本文详细介绍了Node.js中的http模块,从创建HTTP服务器、处理请求与响应,到获取请求参数,每个环节都通过代码示例进行解析,旨在帮... 目录Node.js 中 http 模块的深度剖析与实战应用一、引言二、创建 HTTP 服务器:基石搭建(一

Android数据库Room的实际使用过程总结

《Android数据库Room的实际使用过程总结》这篇文章主要给大家介绍了关于Android数据库Room的实际使用过程,详细介绍了如何创建实体类、数据访问对象(DAO)和数据库抽象类,需要的朋友可以... 目录前言一、Room的基本使用1.项目配置2.创建实体类(Entity)3.创建数据访问对象(DAO

Java向kettle8.0传递参数的方式总结

《Java向kettle8.0传递参数的方式总结》介绍了如何在Kettle中传递参数到转换和作业中,包括设置全局properties、使用TransMeta和JobMeta的parameterValu... 目录1.传递参数到转换中2.传递参数到作业中总结1.传递参数到转换中1.1. 通过设置Trans的

C# Task Cancellation使用总结

《C#TaskCancellation使用总结》本文主要介绍了在使用CancellationTokenSource取消任务时的行为,以及如何使用Task的ContinueWith方法来处理任务的延... 目录C# Task Cancellation总结1、调用cancellationTokenSource.

HarmonyOS学习(七)——UI(五)常用布局总结

自适应布局 1.1、线性布局(LinearLayout) 通过线性容器Row和Column实现线性布局。Column容器内的子组件按照垂直方向排列,Row组件中的子组件按照水平方向排列。 属性说明space通过space参数设置主轴上子组件的间距,达到各子组件在排列上的等间距效果alignItems设置子组件在交叉轴上的对齐方式,且在各类尺寸屏幕上表现一致,其中交叉轴为垂直时,取值为Vert

不懂推荐算法也能设计推荐系统

本文以商业化应用推荐为例,告诉我们不懂推荐算法的产品,也能从产品侧出发, 设计出一款不错的推荐系统。 相信很多新手产品,看到算法二字,多是懵圈的。 什么排序算法、最短路径等都是相对传统的算法(注:传统是指科班出身的产品都会接触过)。但对于推荐算法,多数产品对着网上搜到的资源,都会无从下手。特别当某些推荐算法 和 “AI”扯上关系后,更是加大了理解的难度。 但,不了解推荐算法,就无法做推荐系

学习hash总结

2014/1/29/   最近刚开始学hash,名字很陌生,但是hash的思想却很熟悉,以前早就做过此类的题,但是不知道这就是hash思想而已,说白了hash就是一个映射,往往灵活利用数组的下标来实现算法,hash的作用:1、判重;2、统计次数;

hdu1496(用hash思想统计数目)

作为一个刚学hash的孩子,感觉这道题目很不错,灵活的运用的数组的下标。 解题步骤:如果用常规方法解,那么时间复杂度为O(n^4),肯定会超时,然后参考了网上的解题方法,将等式分成两个部分,a*x1^2+b*x2^2和c*x3^2+d*x4^2, 各自作为数组的下标,如果两部分相加为0,则满足等式; 代码如下: #include<iostream>#include<algorithm

Andrej Karpathy最新采访:认知核心模型10亿参数就够了,AI会打破教育不公的僵局

夕小瑶科技说 原创  作者 | 海野 AI圈子的红人,AI大神Andrej Karpathy,曾是OpenAI联合创始人之一,特斯拉AI总监。上一次的动态是官宣创办一家名为 Eureka Labs 的人工智能+教育公司 ,宣布将长期致力于AI原生教育。 近日,Andrej Karpathy接受了No Priors(投资博客)的采访,与硅谷知名投资人 Sara Guo 和 Elad G