[muduo网络库]——muduo库TcpConnection类,万字总结(剖析muduo网络库核心部分、设计思想)

本文主要是介绍[muduo网络库]——muduo库TcpConnection类,万字总结(剖析muduo网络库核心部分、设计思想),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

接着之前我们[muduo网络库]——muduo库Buffer类(剖析muduo网络库核心部分、设计思想),我们接下来继续看muduo库中的TcpConnection类。

TcpConnection类

TcpConnection类是muduo最核心的类,这个类主要封装了一个已建立的TCP连接,以及控制该TCP连接的方法(连接建立和关闭和销毁),以及该连接发生的各种事件(读/写/错误/连接)对应的处理函数,以及这个TCP连接的服务端和客户端的套接字地址信息等。

重要成员变量

EventLoop *loop_; //绝对不是baseloop,因为TcpConnetion都是在subloop中管理的
const std::string name_;
std::atomic_int state_;
bool reading_;std::unique_ptr<Socket> socket_;
std::unique_ptr<Channel> channel_;
const InetAddress localAddr_;
const InetAddress peerAddr_;ConnectionCallback connectionCallback_; //有新连接时的回调
MessageCallback messageCallback_; //有读写消息时的回调
WriteCompleteCallback writeCompleteCallback_; //消息发送完成以后的回调
CloseCallback closeCallback_;
HighWaterMarkCallback highWaterMarkCallback_;size_t highWaterMark_;Buffer inputBuffer_; //接受数据的缓冲区
Buffer outputBuffer_; //发送数据的缓冲区
  • loop_ 该Tcp连接的Channel注册到了哪一个sub EventLoop上。这个loop_就是那一个sub EventLoop。
  • name_客户端的名字
  • state_ 客户端的状态,对应的有一个枚举类型,分别对应着已经断开连接,正在连接,已经连接,正在断开连接。
enum StateE {kDisconnected, kConnecting, kConnected, kDisconnecting};
  • reading_ 连接是否正在监听读事件
  • socket_ 连接套接字, 用于对连接进行底层操作
  • channel_ 通道, 用于绑定要监听的事件
  • localAddr_本地IP地址
  • peerAddr_对端IP地址
  • connectionCallback_messageCallback_writeCompleteCallback_highWaterMarkCallback_closeCallback_对应的连接建立/关闭后的处理函数,收到消息后的处理函数,消息发送完后的处理函数,高水位回调,连接关闭后的处理函数。
  • highWaterMark_ 因为发送数据,应用写得快,内核发送数据慢,需要把待发送的数据写入缓冲区,且设置了水位回调,防止发送太快
  • inputBuffer_outputBuffer_输入输出缓冲区,在输出缓冲区是用于暂存那些暂时发送不出去的待发送数据。因为Tcp发送缓冲区是有大小限制的,假如达到了高水位线,就没办法把发送的数据通过send()直接拷贝到Tcp发送缓冲区,而是暂存在这个outputBuffer_中,等TCP发送缓冲区有空间了,触发可写事件了,再把outputBuffer_中的数据拷贝到Tcp发送缓冲区中。

重要成员函数

  • 构造函数给channel设置相应的回调函数
 //当通道有读事件时候在Channel::handleEvent()内调用:TcpConnection::handleRead()
channel_->setReadCallback(std::bind(&TcpConnection::handleRead,this,std::placeholders::_1));//当通道有写事件的时候在Channel::handleEven()内调用:TcpConnection::handleWrite()
channel_->setWriteCallback(std::bind(&TcpConnection::handleWrite,this));//当通道有关闭事件的时候在Channel::handleEvent()内调用:TcpConnection::handleClose()
channel_->setCloseCallback(std::bind(&TcpConnection::handleClose,this));//当通道有错误事件的时候在Channel::handleEvent()内调用:TcpConnection::handleError()
channel_->setErrorCallback(std::bind(&TcpConnection::handleError,this));LOG_INFO("TcpConnection::ctor[%s] at fd=%d\n",name_.c_str(),sockfd);
//开启Tcp/Ip层的心跳包检测
socket_->setKeepAlive(true);
  • 一系列的获取loop_name_,地址,状态的函数。
EventLoop* getLoop() const { return loop_;}
const std::string& name() const { return name_;}
const InetAddress& localAddress() const { return localAddr_;}
const InetAddress& peerAddress() const { return peerAddr_; }
bool connected() const { return state_ == kConnected;}
  • 发送数据
void TcpConnection::send(const std::string &buf) //直接引用buffer
{if(state_ == kConnected){if(loop_->isInLoopThread()){//string.c_str是Borland封装的String类中的一个函数,它返回当前字符串的首字符地址。sendInLoop(buf.c_str(),buf.size());}else{loop_->runInLoop(std::bind(&TcpConnection::sendInLoop, this, buf.c_str(), buf.size()));}}
}
void TcpConnection::sendInLoop(const void* data, size_t len)
{ssize_t nwrote = 0;size_t remaining = len; //未发送的数据bool faultError = false; //记录是否产生错误//之前调用过connection的shutdown 不能在发送了if(state_ == kDisconnected){LOG_ERROR("disconnected,give up writing!");return ;}//channel 第一次开始写数据,且缓冲区没有待发送数据if(!channel_->isWriting() && outputBuffer_.readableBytes() == 0){nwrote = ::write(channel_->fd(),data,len);if(nwrote >= 0){remaining = len - nwrote;if(remaining == 0 && writeCompleteCallback_){loop_->queueInLoop(std::bind(writeCompleteCallback_,shared_from_this()));}}else{nwrote = 0;if(errno != EWOULDBLOCK) //用于非阻塞模式,不需要重新读或者写{LOG_ERROR("TcpConnection::sendInLoop");if(errno == EPIPE || errno == ECONNRESET) //SIGPIPE RESET{faultError = true;}}}}if(!faultError && remaining > 0) {//目前发送缓冲区剩余的待发送数据的长度size_t oldlen = outputBuffer_.readableBytes();if(oldlen + remaining >= highWaterMark_ && oldlen < highWaterMark_&& highWaterMark_){loop_->queueInLoop(std::bind(highWaterMarkCallback_,shared_from_this(),oldlen + remaining));}outputBuffer_.append((char*)data + nwrote,remaining);if(!channel_->isWriting()){channel_->enableWriting(); //注册channel写事件,否则poller不会向channel通知epollout}}
}

1) 发送数据要发送的数据长度是len,如果在loop_在当前的线程里面,就调用sendInLoopsendInLoop内部实际上是调用了系统的write,如果一次性发送完了,就设置writeCompleteCallback_,表明不要再给channel设置epollout事件了
2)如果没有写完,先计算一下oldlen目前发送缓冲区剩余的待发送数据的长度。满足:

if(oldlen + remaining >= highWaterMark_ && oldlen < highWaterMark_&& highWaterMark_)

就会触发高水位回调
3)不满足以上的话,直接写入outputBuffer_
4) 剩余的数据保存到缓冲区当中,要给给channel注册epollout事件(切记,一定要注册channel的写事件,否则poller不会向channel通知epollout),这样poller发现tcp发送缓冲区有空间,会通知相应的socket-channel调用相应的writeCallback()回调方法,也就是调用TcpConnection::handleWrite,把发送缓冲区中数据全部发送出去。

  • 重中之重TcpConnection::handleRead负责处理Tcp连接的可读事件,它会将客户端发送来的数据拷贝到用户缓冲区中inputBuffer_,然后再调用connectionCallback_保存的连接建立后的处理函数。
void TcpConnection::handleRead(TimeStamp receiveTime)
{int savedErrno = 0;ssize_t n = inputBuffer_.readFd(channel_->fd(),&savedErrno);if(n > 0){      messageCallback_(shared_from_this(),&inputBuffer_,receiveTime);}else if(n==0) //客户端断开{handleClose();} else{errno = savedErrno;LOG_ERROR("TcpConnection::hanleRead");handleError();}}
  1. 关于readFd在Buffer类中我们已经剖析过了Buffer类,接着已建立连接的用户,有可读事件发生了,调用用户传入的回调操作onMessageshared_from_this()获取了当前TcpConnection对象的智能指针.
  2. n=0,说明客户端断开了,调用连接关闭后的处理函数。
  3. n<0 出错了,调用错误处理回调
  • handleWrite( )负责处理Tcp连接的可写事件
void TcpConnection::handleWrite()
{if(channel_->isWriting()){int savedErrno = 0;ssize_t n = outputBuffer_.writeFd(channel_->fd(),&savedErrno);if(n > 0){outputBuffer_.retrieve(n); //处理了n个if(outputBuffer_.readableBytes() == 0) //发送完成{channel_->disableWriting(); //不可写了if(writeCompleteCallback_){//唤醒loop对应的thread线程,执行回调loop_->queueInLoop(std::bind(writeCompleteCallback_,shared_from_this()));}if(state_ == kDisconnecting){shutdownInLoop();// 在当前loop中删除TcpConnection}}}else{LOG_ERROR("TcpConnection::handleWrite");}}else{LOG_ERROR("TcpConnection fd=%d is down, no more writing \n",channel_->fd());}
}
  1. 如果可写,通过fd发送数据,直到发送完成
  2. 设置不可写,如果writeCompleteCallback_,唤醒loop对应的thread线程,执行回调
  3. 当前TCP正在断开连接,调用shutdownInLoop,在当前loop中删除TcpConnection
  • 处理Tcp连接关闭的事件handleClose()
void TcpConnection::handleClose()
{LOG_INFO("TcpConnection::handleClose fd=%d state=%d \n",channel_->fd(),(int)state_);setState(kDisconnected);channel_->disableAll();TcpConnectionPtr connPtr(shared_from_this());connectionCallback_(connPtr); //执行连接关闭的回调closeCallback_(connPtr); //关闭连接的回调 TcpServer => TcpServer::removeConnection
}

处理逻辑就是将这个TcpConnection对象中的channel_从事件监听器中移除。然后调用connectionCallback_closeCallback_保存的回调函数。closeCallback_TcpServer::newConnection()为新连接新建TcpConnection时,已设为TcpServer::removeConnection(),而removeConnection()最终会调用TcpConnection::connectDestroyed()来销毁连接资源。

void TcpConnection::connectDestroyed()
{if(state_ == kConnected){setState(kDisconnected);channel_->disableAll(); //把channel所有感兴趣的事件,从poller中del掉connectionCallback_(shared_from_this());}channel_->remove();//把channel从poller中删除掉
}

只有处于已连接状态(kConnected)的tcp连接, 才需要先更新状态, 关闭通道事件监听。

  • 错误处理回调
void TcpConnection::handleError()
{int optval;socklen_t optlen = sizeof optval;int err = 0;if(::getsockopt(channel_->fd(),SOL_SOCKET,SO_ERROR,&optval,&optlen) < 0){err = errno;        }else{err = optval;}LOG_ERROR("TcpConnection::handleError name:%s - SO_ERROR:%d \n",name_.c_str(),err);
}

getsockopt()函数用于获取任意类型、任意状态套接口的选项当前值,并把结果存入optval,最后输出错误日志。

  • 关闭连接
 //关闭连接
void TcpConnection::shutdown()
{if(state_ == kConnected){setState(kDisconnecting);loop_->runInLoop(std::bind(&TcpConnection::shutdownInLoop,this));}
}void TcpConnection::shutdownInLoop()
{if(!channel_->isWriting()) //说明当前outputBuffer中的数据已经全部发送完成{socket_->shutdowmWrite(); // 关闭写端}
}

注意: 为什么是关闭了写端呢?在TcpConnection::shutdownInLoop()中,我们会发现它调用了Socket的shutdowmWrite,这里并没有使用close,陈硕大佬原话是这样的:Muduo TcpConnection 没有提供 close,而只提供 shutdown ,这么做是为了收发数据的完整性。因为TCP 是一个全双工协议,同一个文件描述符既可读又可写, shutdownWrite() 关闭了“写”方向的连接,保留了“读”方向,这称为 TCP half-close。如果直接 close(socket_fd),那么 socket_fd 就不能读或写了。用 shutdown 而不用 close 的效果是,如果对方已经发送了数据,这些数据还“在路上”,那么 muduo 不会漏收这些数据。换句话说,muduo 在 TCP 这一层面解决了“当你打算关闭网络连接的时候,如何得知对方有没有发了一些数据而你还没有收到?”这一问题。当然,这个问题也可以在上面的协议层解决,双方商量好不再互发数据,就可以直接断开连接。等于说 muduo 把“主动关闭连接”这件事情分成两步来做,如果要主动关闭连接,它会先关本地“写”端,等对方关闭之后,再关本地“读”端。
Muduo把“主动关闭连接”这件事分成两步来做,如果要主动关闭连接,它先关闭本地的“写”端,等对方关闭之后,再关闭本地“读”端。
另外如果当前outputbuffer里面还有数据尚未发出的话,Muduo也不会立刻调用shutwownWrite,而是等到数据发送完毕再shutdown,可以避免对方漏收数据。

关闭连接事件很重要,涉及到TcpConnection和Channel的生命周期以及是否能合理销毁,用了智能指针来管理和控制生命周期。下面我们就来分析一下断开流程中**TcpConnection的引用计数问题**:

1.首先连接到来创建TcpConnection,并存入容器。引用计数+1 总数:1
2.客户端断开连接,在Channel的handleEvent函数中会将Channel中的TcpConnection弱指针提升,引用计数+1 总数:2
3.触发HandleRead ,可读字节0,进而触发HandleClose,HandleClose函数中栈上的TcpConnectionPtr guardThis会继续将引用计数+1 总数:3
4.触发HandleClose的回调函数 在TcpServer::removeConnection结束后(回归主线程队列),释放HandleClose的栈指针,以及Channel里提升的指针引用计数-2 总数:1
5.主线程执行回调removeConnectionInLoop,在函数内部将tcpconnection从TcpServer中保存连接容器中erase掉。但在removeConnectionInLoop结尾用conn为参数构造了bind。引用计数不变 总数:1
6.回归次线程处理connectDestroyed事件,结束完释放参数传递的最后一个shard_ptr,释放TcpConnection。引用计数-1 总数:0

  • 建立连接
void TcpConnection::connectEstablished()
{setState(kConnected);channel_->tie(shared_from_this());channel_->enableReading(); //向poller注册channel的epollin事件//新连接建立 执行回调connectionCallback_(shared_from_this());
}
思考一下:什么时候调用WriteCompleCallback?什么时候调用HighWaterMarkCallback?
  • 如果发送缓存区被清空,就调用WriteCompleCallback。TcpConnection有两处可能触发此回调。
  1. TcpConnection::sendInLoop()。
  2. TcpConnection::handleWrite()。
  • 如果输出缓冲的长度超过用户指定大小,就会触发回调HighWaterMarkCallback(只在上升沿触发一次)。
    在非阻塞的发送数据情况下,假设Server发给Client数据流,为防止Server发过来的数据撑爆Client的输出缓冲区,一种做法是在Client的HighWaterMarkCallback中停止读取Server的数据,而在Client的WriteCompleteCallback中恢复读取Server的数据
需要注意的是: TcpConnection类是唯一默认用shared_ptr来管理的类,唯一继承自enable_shared_from_this的类。这是因为其生命周期模糊:可能在连接断开时,还有其他地方持有它的引用,贸然delete会造成空悬指针。只有确保其他地方没有持有该对象的引用的时候,才能安全地销毁对象。

代码地址:https://github.com/Cheeron955/mymuduo/tree/master

好了~ 有关于muduo库的TcpConnection类的细节就到此结束了,再次强调一点,当channel有相应的事件的时候,会调用TcpConnection对应的回调。到此为止,我们介绍了Channel,Poller,EPollPoller,EventLoop,Acceptor,Socket,Buffer,TcpConnection八大类,还有单独的CurrentThread,DefaultPoller,接下来我们会剖析一下剩下的,比如Logger,TimeStamp等比较简单的,然后最后在从连接建立,断开,发送消息等过程在进行一个总结剖析,希望大家多多支持,我们下一节见~~

这篇关于[muduo网络库]——muduo库TcpConnection类,万字总结(剖析muduo网络库核心部分、设计思想)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/985261

相关文章

如何通过海康威视设备网络SDK进行Java二次开发摄像头车牌识别详解

《如何通过海康威视设备网络SDK进行Java二次开发摄像头车牌识别详解》:本文主要介绍如何通过海康威视设备网络SDK进行Java二次开发摄像头车牌识别的相关资料,描述了如何使用海康威视设备网络SD... 目录前言开发流程问题和解决方案dll库加载不到的问题老旧版本sdk不兼容的问题关键实现流程总结前言作为

Python中连接不同数据库的方法总结

《Python中连接不同数据库的方法总结》在数据驱动的现代应用开发中,Python凭借其丰富的库和强大的生态系统,成为连接各种数据库的理想编程语言,下面我们就来看看如何使用Python实现连接常用的几... 目录一、连接mysql数据库二、连接PostgreSQL数据库三、连接SQLite数据库四、连接Mo

Git提交代码详细流程及问题总结

《Git提交代码详细流程及问题总结》:本文主要介绍Git的三大分区,分别是工作区、暂存区和版本库,并详细描述了提交、推送、拉取代码和合并分支的流程,文中通过代码介绍的非常详解,需要的朋友可以参考下... 目录1.git 三大分区2.Git提交、推送、拉取代码、合并分支详细流程3.问题总结4.git push

Kubernetes常用命令大全近期总结

《Kubernetes常用命令大全近期总结》Kubernetes是用于大规模部署和管理这些容器的开源软件-在希腊语中,这个词还有“舵手”或“飞行员”的意思,使用Kubernetes(有时被称为“... 目录前言Kubernetes 的工作原理为什么要使用 Kubernetes?Kubernetes常用命令总

Python中的可视化设计与UI界面实现

《Python中的可视化设计与UI界面实现》本文介绍了如何使用Python创建用户界面(UI),包括使用Tkinter、PyQt、Kivy等库进行基本窗口、动态图表和动画效果的实现,通过示例代码,展示... 目录从像素到界面:python带你玩转UI设计示例:使用Tkinter创建一个简单的窗口绘图魔法:用

Python中实现进度条的多种方法总结

《Python中实现进度条的多种方法总结》在Python编程中,进度条是一个非常有用的功能,它能让用户直观地了解任务的进度,提升用户体验,本文将介绍几种在Python中实现进度条的常用方法,并通过代码... 目录一、简单的打印方式二、使用tqdm库三、使用alive-progress库四、使用progres

SSID究竟是什么? WiFi网络名称及工作方式解析

《SSID究竟是什么?WiFi网络名称及工作方式解析》SID可以看作是无线网络的名称,类似于有线网络中的网络名称或者路由器的名称,在无线网络中,设备通过SSID来识别和连接到特定的无线网络... 当提到 Wi-Fi 网络时,就避不开「SSID」这个术语。简单来说,SSID 就是 Wi-Fi 网络的名称。比如

Java实现任务管理器性能网络监控数据的方法详解

《Java实现任务管理器性能网络监控数据的方法详解》在现代操作系统中,任务管理器是一个非常重要的工具,用于监控和管理计算机的运行状态,包括CPU使用率、内存占用等,对于开发者和系统管理员来说,了解这些... 目录引言一、背景知识二、准备工作1. Maven依赖2. Gradle依赖三、代码实现四、代码详解五

Node.js 中 http 模块的深度剖析与实战应用小结

《Node.js中http模块的深度剖析与实战应用小结》本文详细介绍了Node.js中的http模块,从创建HTTP服务器、处理请求与响应,到获取请求参数,每个环节都通过代码示例进行解析,旨在帮... 目录Node.js 中 http 模块的深度剖析与实战应用一、引言二、创建 HTTP 服务器:基石搭建(一

Android数据库Room的实际使用过程总结

《Android数据库Room的实际使用过程总结》这篇文章主要给大家介绍了关于Android数据库Room的实际使用过程,详细介绍了如何创建实体类、数据访问对象(DAO)和数据库抽象类,需要的朋友可以... 目录前言一、Room的基本使用1.项目配置2.创建实体类(Entity)3.创建数据访问对象(DAO