[muduo网络库]——muduo库TcpConnection类,万字总结(剖析muduo网络库核心部分、设计思想)

本文主要是介绍[muduo网络库]——muduo库TcpConnection类,万字总结(剖析muduo网络库核心部分、设计思想),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

接着之前我们[muduo网络库]——muduo库Buffer类(剖析muduo网络库核心部分、设计思想),我们接下来继续看muduo库中的TcpConnection类。

TcpConnection类

TcpConnection类是muduo最核心的类,这个类主要封装了一个已建立的TCP连接,以及控制该TCP连接的方法(连接建立和关闭和销毁),以及该连接发生的各种事件(读/写/错误/连接)对应的处理函数,以及这个TCP连接的服务端和客户端的套接字地址信息等。

重要成员变量

EventLoop *loop_; //绝对不是baseloop,因为TcpConnetion都是在subloop中管理的
const std::string name_;
std::atomic_int state_;
bool reading_;std::unique_ptr<Socket> socket_;
std::unique_ptr<Channel> channel_;
const InetAddress localAddr_;
const InetAddress peerAddr_;ConnectionCallback connectionCallback_; //有新连接时的回调
MessageCallback messageCallback_; //有读写消息时的回调
WriteCompleteCallback writeCompleteCallback_; //消息发送完成以后的回调
CloseCallback closeCallback_;
HighWaterMarkCallback highWaterMarkCallback_;size_t highWaterMark_;Buffer inputBuffer_; //接受数据的缓冲区
Buffer outputBuffer_; //发送数据的缓冲区
  • loop_ 该Tcp连接的Channel注册到了哪一个sub EventLoop上。这个loop_就是那一个sub EventLoop。
  • name_客户端的名字
  • state_ 客户端的状态,对应的有一个枚举类型,分别对应着已经断开连接,正在连接,已经连接,正在断开连接。
enum StateE {kDisconnected, kConnecting, kConnected, kDisconnecting};
  • reading_ 连接是否正在监听读事件
  • socket_ 连接套接字, 用于对连接进行底层操作
  • channel_ 通道, 用于绑定要监听的事件
  • localAddr_本地IP地址
  • peerAddr_对端IP地址
  • connectionCallback_messageCallback_writeCompleteCallback_highWaterMarkCallback_closeCallback_对应的连接建立/关闭后的处理函数,收到消息后的处理函数,消息发送完后的处理函数,高水位回调,连接关闭后的处理函数。
  • highWaterMark_ 因为发送数据,应用写得快,内核发送数据慢,需要把待发送的数据写入缓冲区,且设置了水位回调,防止发送太快
  • inputBuffer_outputBuffer_输入输出缓冲区,在输出缓冲区是用于暂存那些暂时发送不出去的待发送数据。因为Tcp发送缓冲区是有大小限制的,假如达到了高水位线,就没办法把发送的数据通过send()直接拷贝到Tcp发送缓冲区,而是暂存在这个outputBuffer_中,等TCP发送缓冲区有空间了,触发可写事件了,再把outputBuffer_中的数据拷贝到Tcp发送缓冲区中。

重要成员函数

  • 构造函数给channel设置相应的回调函数
 //当通道有读事件时候在Channel::handleEvent()内调用:TcpConnection::handleRead()
channel_->setReadCallback(std::bind(&TcpConnection::handleRead,this,std::placeholders::_1));//当通道有写事件的时候在Channel::handleEven()内调用:TcpConnection::handleWrite()
channel_->setWriteCallback(std::bind(&TcpConnection::handleWrite,this));//当通道有关闭事件的时候在Channel::handleEvent()内调用:TcpConnection::handleClose()
channel_->setCloseCallback(std::bind(&TcpConnection::handleClose,this));//当通道有错误事件的时候在Channel::handleEvent()内调用:TcpConnection::handleError()
channel_->setErrorCallback(std::bind(&TcpConnection::handleError,this));LOG_INFO("TcpConnection::ctor[%s] at fd=%d\n",name_.c_str(),sockfd);
//开启Tcp/Ip层的心跳包检测
socket_->setKeepAlive(true);
  • 一系列的获取loop_name_,地址,状态的函数。
EventLoop* getLoop() const { return loop_;}
const std::string& name() const { return name_;}
const InetAddress& localAddress() const { return localAddr_;}
const InetAddress& peerAddress() const { return peerAddr_; }
bool connected() const { return state_ == kConnected;}
  • 发送数据
void TcpConnection::send(const std::string &buf) //直接引用buffer
{if(state_ == kConnected){if(loop_->isInLoopThread()){//string.c_str是Borland封装的String类中的一个函数,它返回当前字符串的首字符地址。sendInLoop(buf.c_str(),buf.size());}else{loop_->runInLoop(std::bind(&TcpConnection::sendInLoop, this, buf.c_str(), buf.size()));}}
}
void TcpConnection::sendInLoop(const void* data, size_t len)
{ssize_t nwrote = 0;size_t remaining = len; //未发送的数据bool faultError = false; //记录是否产生错误//之前调用过connection的shutdown 不能在发送了if(state_ == kDisconnected){LOG_ERROR("disconnected,give up writing!");return ;}//channel 第一次开始写数据,且缓冲区没有待发送数据if(!channel_->isWriting() && outputBuffer_.readableBytes() == 0){nwrote = ::write(channel_->fd(),data,len);if(nwrote >= 0){remaining = len - nwrote;if(remaining == 0 && writeCompleteCallback_){loop_->queueInLoop(std::bind(writeCompleteCallback_,shared_from_this()));}}else{nwrote = 0;if(errno != EWOULDBLOCK) //用于非阻塞模式,不需要重新读或者写{LOG_ERROR("TcpConnection::sendInLoop");if(errno == EPIPE || errno == ECONNRESET) //SIGPIPE RESET{faultError = true;}}}}if(!faultError && remaining > 0) {//目前发送缓冲区剩余的待发送数据的长度size_t oldlen = outputBuffer_.readableBytes();if(oldlen + remaining >= highWaterMark_ && oldlen < highWaterMark_&& highWaterMark_){loop_->queueInLoop(std::bind(highWaterMarkCallback_,shared_from_this(),oldlen + remaining));}outputBuffer_.append((char*)data + nwrote,remaining);if(!channel_->isWriting()){channel_->enableWriting(); //注册channel写事件,否则poller不会向channel通知epollout}}
}

1) 发送数据要发送的数据长度是len,如果在loop_在当前的线程里面,就调用sendInLoopsendInLoop内部实际上是调用了系统的write,如果一次性发送完了,就设置writeCompleteCallback_,表明不要再给channel设置epollout事件了
2)如果没有写完,先计算一下oldlen目前发送缓冲区剩余的待发送数据的长度。满足:

if(oldlen + remaining >= highWaterMark_ && oldlen < highWaterMark_&& highWaterMark_)

就会触发高水位回调
3)不满足以上的话,直接写入outputBuffer_
4) 剩余的数据保存到缓冲区当中,要给给channel注册epollout事件(切记,一定要注册channel的写事件,否则poller不会向channel通知epollout),这样poller发现tcp发送缓冲区有空间,会通知相应的socket-channel调用相应的writeCallback()回调方法,也就是调用TcpConnection::handleWrite,把发送缓冲区中数据全部发送出去。

  • 重中之重TcpConnection::handleRead负责处理Tcp连接的可读事件,它会将客户端发送来的数据拷贝到用户缓冲区中inputBuffer_,然后再调用connectionCallback_保存的连接建立后的处理函数。
void TcpConnection::handleRead(TimeStamp receiveTime)
{int savedErrno = 0;ssize_t n = inputBuffer_.readFd(channel_->fd(),&savedErrno);if(n > 0){      messageCallback_(shared_from_this(),&inputBuffer_,receiveTime);}else if(n==0) //客户端断开{handleClose();} else{errno = savedErrno;LOG_ERROR("TcpConnection::hanleRead");handleError();}}
  1. 关于readFd在Buffer类中我们已经剖析过了Buffer类,接着已建立连接的用户,有可读事件发生了,调用用户传入的回调操作onMessageshared_from_this()获取了当前TcpConnection对象的智能指针.
  2. n=0,说明客户端断开了,调用连接关闭后的处理函数。
  3. n<0 出错了,调用错误处理回调
  • handleWrite( )负责处理Tcp连接的可写事件
void TcpConnection::handleWrite()
{if(channel_->isWriting()){int savedErrno = 0;ssize_t n = outputBuffer_.writeFd(channel_->fd(),&savedErrno);if(n > 0){outputBuffer_.retrieve(n); //处理了n个if(outputBuffer_.readableBytes() == 0) //发送完成{channel_->disableWriting(); //不可写了if(writeCompleteCallback_){//唤醒loop对应的thread线程,执行回调loop_->queueInLoop(std::bind(writeCompleteCallback_,shared_from_this()));}if(state_ == kDisconnecting){shutdownInLoop();// 在当前loop中删除TcpConnection}}}else{LOG_ERROR("TcpConnection::handleWrite");}}else{LOG_ERROR("TcpConnection fd=%d is down, no more writing \n",channel_->fd());}
}
  1. 如果可写,通过fd发送数据,直到发送完成
  2. 设置不可写,如果writeCompleteCallback_,唤醒loop对应的thread线程,执行回调
  3. 当前TCP正在断开连接,调用shutdownInLoop,在当前loop中删除TcpConnection
  • 处理Tcp连接关闭的事件handleClose()
void TcpConnection::handleClose()
{LOG_INFO("TcpConnection::handleClose fd=%d state=%d \n",channel_->fd(),(int)state_);setState(kDisconnected);channel_->disableAll();TcpConnectionPtr connPtr(shared_from_this());connectionCallback_(connPtr); //执行连接关闭的回调closeCallback_(connPtr); //关闭连接的回调 TcpServer => TcpServer::removeConnection
}

处理逻辑就是将这个TcpConnection对象中的channel_从事件监听器中移除。然后调用connectionCallback_closeCallback_保存的回调函数。closeCallback_TcpServer::newConnection()为新连接新建TcpConnection时,已设为TcpServer::removeConnection(),而removeConnection()最终会调用TcpConnection::connectDestroyed()来销毁连接资源。

void TcpConnection::connectDestroyed()
{if(state_ == kConnected){setState(kDisconnected);channel_->disableAll(); //把channel所有感兴趣的事件,从poller中del掉connectionCallback_(shared_from_this());}channel_->remove();//把channel从poller中删除掉
}

只有处于已连接状态(kConnected)的tcp连接, 才需要先更新状态, 关闭通道事件监听。

  • 错误处理回调
void TcpConnection::handleError()
{int optval;socklen_t optlen = sizeof optval;int err = 0;if(::getsockopt(channel_->fd(),SOL_SOCKET,SO_ERROR,&optval,&optlen) < 0){err = errno;        }else{err = optval;}LOG_ERROR("TcpConnection::handleError name:%s - SO_ERROR:%d \n",name_.c_str(),err);
}

getsockopt()函数用于获取任意类型、任意状态套接口的选项当前值,并把结果存入optval,最后输出错误日志。

  • 关闭连接
 //关闭连接
void TcpConnection::shutdown()
{if(state_ == kConnected){setState(kDisconnecting);loop_->runInLoop(std::bind(&TcpConnection::shutdownInLoop,this));}
}void TcpConnection::shutdownInLoop()
{if(!channel_->isWriting()) //说明当前outputBuffer中的数据已经全部发送完成{socket_->shutdowmWrite(); // 关闭写端}
}

注意: 为什么是关闭了写端呢?在TcpConnection::shutdownInLoop()中,我们会发现它调用了Socket的shutdowmWrite,这里并没有使用close,陈硕大佬原话是这样的:Muduo TcpConnection 没有提供 close,而只提供 shutdown ,这么做是为了收发数据的完整性。因为TCP 是一个全双工协议,同一个文件描述符既可读又可写, shutdownWrite() 关闭了“写”方向的连接,保留了“读”方向,这称为 TCP half-close。如果直接 close(socket_fd),那么 socket_fd 就不能读或写了。用 shutdown 而不用 close 的效果是,如果对方已经发送了数据,这些数据还“在路上”,那么 muduo 不会漏收这些数据。换句话说,muduo 在 TCP 这一层面解决了“当你打算关闭网络连接的时候,如何得知对方有没有发了一些数据而你还没有收到?”这一问题。当然,这个问题也可以在上面的协议层解决,双方商量好不再互发数据,就可以直接断开连接。等于说 muduo 把“主动关闭连接”这件事情分成两步来做,如果要主动关闭连接,它会先关本地“写”端,等对方关闭之后,再关本地“读”端。
Muduo把“主动关闭连接”这件事分成两步来做,如果要主动关闭连接,它先关闭本地的“写”端,等对方关闭之后,再关闭本地“读”端。
另外如果当前outputbuffer里面还有数据尚未发出的话,Muduo也不会立刻调用shutwownWrite,而是等到数据发送完毕再shutdown,可以避免对方漏收数据。

关闭连接事件很重要,涉及到TcpConnection和Channel的生命周期以及是否能合理销毁,用了智能指针来管理和控制生命周期。下面我们就来分析一下断开流程中**TcpConnection的引用计数问题**:

1.首先连接到来创建TcpConnection,并存入容器。引用计数+1 总数:1
2.客户端断开连接,在Channel的handleEvent函数中会将Channel中的TcpConnection弱指针提升,引用计数+1 总数:2
3.触发HandleRead ,可读字节0,进而触发HandleClose,HandleClose函数中栈上的TcpConnectionPtr guardThis会继续将引用计数+1 总数:3
4.触发HandleClose的回调函数 在TcpServer::removeConnection结束后(回归主线程队列),释放HandleClose的栈指针,以及Channel里提升的指针引用计数-2 总数:1
5.主线程执行回调removeConnectionInLoop,在函数内部将tcpconnection从TcpServer中保存连接容器中erase掉。但在removeConnectionInLoop结尾用conn为参数构造了bind。引用计数不变 总数:1
6.回归次线程处理connectDestroyed事件,结束完释放参数传递的最后一个shard_ptr,释放TcpConnection。引用计数-1 总数:0

  • 建立连接
void TcpConnection::connectEstablished()
{setState(kConnected);channel_->tie(shared_from_this());channel_->enableReading(); //向poller注册channel的epollin事件//新连接建立 执行回调connectionCallback_(shared_from_this());
}
思考一下:什么时候调用WriteCompleCallback?什么时候调用HighWaterMarkCallback?
  • 如果发送缓存区被清空,就调用WriteCompleCallback。TcpConnection有两处可能触发此回调。
  1. TcpConnection::sendInLoop()。
  2. TcpConnection::handleWrite()。
  • 如果输出缓冲的长度超过用户指定大小,就会触发回调HighWaterMarkCallback(只在上升沿触发一次)。
    在非阻塞的发送数据情况下,假设Server发给Client数据流,为防止Server发过来的数据撑爆Client的输出缓冲区,一种做法是在Client的HighWaterMarkCallback中停止读取Server的数据,而在Client的WriteCompleteCallback中恢复读取Server的数据
需要注意的是: TcpConnection类是唯一默认用shared_ptr来管理的类,唯一继承自enable_shared_from_this的类。这是因为其生命周期模糊:可能在连接断开时,还有其他地方持有它的引用,贸然delete会造成空悬指针。只有确保其他地方没有持有该对象的引用的时候,才能安全地销毁对象。

代码地址:https://github.com/Cheeron955/mymuduo/tree/master

好了~ 有关于muduo库的TcpConnection类的细节就到此结束了,再次强调一点,当channel有相应的事件的时候,会调用TcpConnection对应的回调。到此为止,我们介绍了Channel,Poller,EPollPoller,EventLoop,Acceptor,Socket,Buffer,TcpConnection八大类,还有单独的CurrentThread,DefaultPoller,接下来我们会剖析一下剩下的,比如Logger,TimeStamp等比较简单的,然后最后在从连接建立,断开,发送消息等过程在进行一个总结剖析,希望大家多多支持,我们下一节见~~

这篇关于[muduo网络库]——muduo库TcpConnection类,万字总结(剖析muduo网络库核心部分、设计思想)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/985261

相关文章

Python中logging模块用法示例总结

《Python中logging模块用法示例总结》在Python中logging模块是一个强大的日志记录工具,它允许用户将程序运行期间产生的日志信息输出到控制台或者写入到文件中,:本文主要介绍Pyt... 目录前言一. 基本使用1. 五种日志等级2.  设置报告等级3. 自定义格式4. C语言风格的格式化方法

Debian 13升级后网络转发等功能异常怎么办? 并非错误而是管理机制变更

《Debian13升级后网络转发等功能异常怎么办?并非错误而是管理机制变更》很多朋友反馈,更新到Debian13后网络转发等功能异常,这并非BUG而是Debian13Trixie调整... 日前 Debian 13 Trixie 发布后已经有众多网友升级到新版本,只不过升级后发现某些功能存在异常,例如网络转

Spring 依赖注入与循环依赖总结

《Spring依赖注入与循环依赖总结》这篇文章给大家介绍Spring依赖注入与循环依赖总结篇,本文通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友参考下吧... 目录1. Spring 三级缓存解决循环依赖1. 创建UserService原始对象2. 将原始对象包装成工

深度剖析SpringBoot日志性能提升的原因与解决

《深度剖析SpringBoot日志性能提升的原因与解决》日志记录本该是辅助工具,却为何成了性能瓶颈,SpringBoot如何用代码彻底破解日志导致的高延迟问题,感兴趣的小伙伴可以跟随小编一起学习一下... 目录前言第一章:日志性能陷阱的底层原理1.1 日志级别的“双刃剑”效应1.2 同步日志的“吞吐量杀手”

MySQL中查询和展示LONGBLOB类型数据的技巧总结

《MySQL中查询和展示LONGBLOB类型数据的技巧总结》在MySQL中LONGBLOB是一种二进制大对象(BLOB)数据类型,用于存储大量的二进制数据,:本文主要介绍MySQL中查询和展示LO... 目录前言1. 查询 LONGBLOB 数据的大小2. 查询并展示 LONGBLOB 数据2.1 转换为十

Python进阶之列表推导式的10个核心技巧

《Python进阶之列表推导式的10个核心技巧》在Python编程中,列表推导式(ListComprehension)是提升代码效率的瑞士军刀,本文将通过真实场景案例,揭示列表推导式的进阶用法,希望对... 目录一、基础语法重构:理解推导式的底层逻辑二、嵌套循环:破解多维数据处理难题三、条件表达式:实现分支

深度解析Python yfinance的核心功能和高级用法

《深度解析Pythonyfinance的核心功能和高级用法》yfinance是一个功能强大且易于使用的Python库,用于从YahooFinance获取金融数据,本教程将深入探讨yfinance的核... 目录yfinance 深度解析教程 (python)1. 简介与安装1.1 什么是 yfinance?

Python开发简易网络服务器的示例详解(新手入门)

《Python开发简易网络服务器的示例详解(新手入门)》网络服务器是互联网基础设施的核心组件,它本质上是一个持续运行的程序,负责监听特定端口,本文将使用Python开发一个简单的网络服务器,感兴趣的小... 目录网络服务器基础概念python内置服务器模块1. HTTP服务器模块2. Socket服务器模块

Go语言网络故障诊断与调试技巧

《Go语言网络故障诊断与调试技巧》在分布式系统和微服务架构的浪潮中,网络编程成为系统性能和可靠性的核心支柱,从高并发的API服务到实时通信应用,网络的稳定性直接影响用户体验,本文面向熟悉Go基本语法和... 目录1. 引言2. Go 语言网络编程的优势与特色2.1 简洁高效的标准库2.2 强大的并发模型2.

在Java中实现线程之间的数据共享的几种方式总结

《在Java中实现线程之间的数据共享的几种方式总结》在Java中实现线程间数据共享是并发编程的核心需求,但需要谨慎处理同步问题以避免竞态条件,本文通过代码示例给大家介绍了几种主要实现方式及其最佳实践,... 目录1. 共享变量与同步机制2. 轻量级通信机制3. 线程安全容器4. 线程局部变量(ThreadL