【Muduo】TcpConnection类

2024-05-25 05:28
文章标签 muduo tcpconnection

本文主要是介绍【Muduo】TcpConnection类,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

Muduo网络库的TcpConnection类代表了TCP连接的一端,即服务器与远程对等端之间的连接。TcpConnection类知道自身和对端的InetAddress、封装了前面讲过的Socket类和Channel类,并且保有管理自己的subLoop指针,还有多种事件处理函数和回调,还有属于自己的接收Buffer/发送Buffer。为上层应用提供了简洁的接口来处理连接的生命周期和数据传输。

主要成员变量

  • socket:Socket类封装了底层的socket文件描述符,用于网络通信。
  • channel:与socket关联的Channel对象,用于在事件循环中注册读/写/错误等事件。
  • loopEventLoop指针,表示该连接所属的事件循环。
  • name:连接的名称或标识,通常用于日志记录。
  • state:连接的状态,如kConnecting、kConnected、kDisconnecting、kDisconnected等。
  • highWaterMark:高水位线,用于流量控制。
  • messageCallback:当接收到数据时调用的回调函数。
  • writeCompleteCallback:当数据发送完成时调用的回调函数。
  • connectionCallback:当连接状态发生变化(如建立、断开)时调用的回调函数。

主要功能

1. 连接管理
  • 建立连接:在mainLoop中的Acceptor接收到一个新连接请求的时候,回调TcpServer中的新连接处理函数,在其中选择一个subLoop并由此构造出一个TcpConnection对象,构造函数中设置好本连接的相关信息,mainLoop就会将这个新Connection放到subLoop中运行。在连接建立之后,会紧接着向EPollPoller注册感兴趣的事件,并设置连接状态。
  • 断开连接:当需要断开连接时,TcpConnection会关闭底层的socket文件描述符,并触发相关的回调函数。此外,它还会处理连接关闭时的清理工作,如取消事件注册、释放资源等。
2. 数据传输
  • 接收数据:当底层socket有数据可读时,事件循环会通知关联的Channel对象。TcpConnection会读取数据并调用注册的messageCallback回调函数来处理接收到的数据。
  • 发送数据:应用程序可以通过调用TcpConnection的发送接口(send和sendInLoop)来发送数据。这些数据会被缓存在内部缓冲区中,并通过底层socket逐步发送出去。当所有数据都发送完成时,TcpConnection会调用注册的writeCompleteCallback回调函数来通知应用程序。

    关于数据传输,需要在源码中仔细品味。

3. 状态通知
  • 连接状态变化:当连接状态发生变化时(如建立、断开),TcpConnection会触发注册的connectionCallback回调函数来通知应用程序。这样,应用程序可以根据连接状态的变化来执行相应的逻辑,如重新连接、关闭会话等。

设计思想

  • 封装底层细节TcpConnection类封装了底层的socket API和事件循环机制,为上层应用提供了简洁的接口来处理连接和数据传输。这样,应用程序可以专注于业务逻辑的实现,而无需关心底层的网络编程细节。
  • 事件驱动:通过使用事件循环和回调函数机制,TcpConnection类实现了基于事件驱动的编程模型。这种模型使得应用程序能够高效地处理多个并发连接和数据传输任务。
  • 状态管理TcpConnection类维护了连接的状态信息,并通过回调函数机制将状态变化通知给上层应用。这样,应用程序可以根据连接状态的变化来执行相应的逻辑操作。

源码

TcpConnection.h
#pragma once#include "noncopyable.h"
#include "Buffer.h"
#include "Timestamp.h"
#include "InetAddress.h"
#include "Callbacks.h"#include <memory>
#include <string>
#include <atomic>class Channel;
class EventLoop;
class Socket;// 已连接socket的通信链路
/*** TcpServer => Acceptor => 有一个新用户连接,通过accept函数拿到connfd* => TcpConnetion 设置回调 => Channel => Poller => Channel的回调操作* 专门描述一个已建立连接的相应信息 
*/
class TcpConnection: noncopyable,public std::enable_shared_from_this<TcpConnection> // 运行本类的对象产生智能指针
{
public:TcpConnection(EventLoop* loop,const std::string& name,int sockfd,const InetAddress& localAddr,const InetAddress& peerAddr);~TcpConnection();EventLoop* getLoop() const { return loop_; }const std::string& name() const { return name_; }const InetAddress& localAddress() const { return localAddr_; }const InetAddress& peerAddress() const { return peerAddr_; }bool connected() const { return state_ == kConnected; }bool disconnected() const { return state_ == kDisconnected; }std::string getTcpInfoString() const;// 用户调用,给客户端发送数据void send(const std::string& message);// 关闭连接void shutdown();void setConnectionCallback(const ConnectionCallback& cb){ connectionCallback_ = cb; }void setMessageCallback(const MessageCallback& cb){ messageCallback_ = cb; }void setWriteCompleteCallback(const WriteCompleteCallback& cb){ writeCompleteCallback_ = cb; }void setHighWaterMarkCallback(const HighWaterMarkCallback& cb, size_t highWaterMark){ highWaterMarkCallback_ = cb; highWaterMark_ = highWaterMark; }void setCloseCallback(const CloseCallback& cb){ closeCallback_ = cb; }// called when TcpServer accepts a new connection// 连接建立void connectEstablished();   // should be called only once// called when TcpServer has removed me from its map// 连接销毁void connectDestroyed();  // should be called only onceprivate://            初始化       建立连接    调用shutdown      关闭完底层soketenum StateE { kConnecting, kConnected, kDisconnecting,  kDisconnected};void handleRead(Timestamp receiveTime);void handleWrite();void handleClose();void handleError();void sendInLoop(const void* message, size_t len);void shutdownInLoop();const char* stateToString() const;// void startReadInLoop();// void stopReadInLoop();void setState(StateE s) { state_ = s; }EventLoop* loop_; // TcpConnection都是在subLoop里管理的const std::string name_;std::atomic_int state_; bool reading_;// 这里和mainLoop中的Acceptor很像,而TcpConnection在subLoop中std::unique_ptr<Socket> socket_;std::unique_ptr<Channel> channel_;const InetAddress localAddr_;const InetAddress peerAddr_;ConnectionCallback connectionCallback_;MessageCallback messageCallback_;WriteCompleteCallback writeCompleteCallback_;HighWaterMarkCallback highWaterMarkCallback_;CloseCallback closeCallback_; // 来自TcpServer的回调,用于在Server中删除本connsize_t highWaterMark_; // 防止本端发送过快对面接收不及Buffer inputBuffer_; // 接收数据的缓冲区Buffer outputBuffer_; // 发送数据的缓冲区
};
TcpConnection.cc
#include "TcpConnection.h"
#include "LogStream.h"
#include "Socket.h"
#include "Channel.h"
#include "EventLoop.h"#include <functional>
#include <sys/types.h> 
#include <sys/socket.h>static EventLoop *checkLoopNotNull(EventLoop *loop)
{if (loop == nullptr){LOG_FATAL << "subLoop is null!";}return loop;
}TcpConnection::TcpConnection(EventLoop *loop, const std::string &nameArg, int sockfd,const InetAddress &localAddr, const InetAddress &peerAddr): loop_(checkLoopNotNull(loop)),name_(nameArg),state_(kConnecting),reading_(true),socket_(new Socket(sockfd)),channel_(new Channel(loop, sockfd)),localAddr_(localAddr),peerAddr_(peerAddr),highWaterMark_(64 * 1024 * 1024) // 64M
{// 给channel设置相应的回调函数// poller给channel通知感兴趣的事件发生了,channel会回调相应的操作函数channel_->setReadCallback(std::bind(&TcpConnection::handleRead, this, std::placeholders::_1));channel_->setWriteCallback(std::bind(&TcpConnection::handleWrite, this));channel_->setCloseCallback(std::bind(&TcpConnection::handleClose, this));channel_->setErrorCallback(std::bind(&TcpConnection::handleError, this));LOG_DEBUG << "TcpConnection::ctor[" << name_ << "] at " << this<< " fd=" << sockfd;socket_->setKeepAlive(true);
}TcpConnection::~TcpConnection()
{LOG_DEBUG << "TcpConnection::dtor[" << name_ << "] at " << this<< " fd=" << channel_->fd()<< " state=" << stateToString();
}void TcpConnection::send(const std::string& message)
{LOG_DEBUG << "state_:" << stateToString() << "  loop_=" << loop_;if (state_ == kConnected){if (loop_->isInLoopThread()){sendInLoop(message.c_str(), message.size());}else{loop_->runInLoop(std::bind(&TcpConnection::sendInLoop,this,message.c_str(),message.size()));}}
}/*** 发送数据,应用写得快,而内核发送数据慢,需要将待发送数据写入缓冲区,而且设置水位回调
*/
void TcpConnection::sendInLoop(const void *data, size_t len)
{LOG_DEBUG << "TcpConnection::sendInLoop()  send len:" << len;ssize_t nwrote = 0;size_t remaining = len; // 剩余字节数bool faultError = false;if(state_ == kDisconnected){LOG_ERROR << "disconnected, give up writing";return;}// 刚开始对写事件还不感兴趣// 表示channe_第一次开始写数据,而且缓冲区没有待发送的数据,尝试直接发送if(!channel_->isWriting() && outputBuffer_.writableBytes() == 0){nwrote = ::write(channel_->fd(), data, len); // 返回具体发送个数if(nwrote >= 0){remaining = len - nwrote;if(remaining == 0 && writeCompleteCallback_){ // 全部发送完了,执行用户的写完回调loop_->queueInLoop(std::bind(writeCompleteCallback_, shared_from_this()));}}else // nwrote < 0{nwrote = 0;if (errno != EWOULDBLOCK) // EWOULDBLOCK 表示由于非阻塞没有数据的一个正常返回{LOG_ERROR << "TcpConnection::sendInLoop";if (errno == EPIPE || errno == ECONNRESET) // 接收到对端的重置{faultError = true;}}}}// 说明当前这次write没有把数据全部发送出去,剩余的数据需要保存到缓冲区中// 然后给channel注册epollout事件,poller发现tcp的发送缓冲区有空间,会通知相应的sock-channel,// 调用writeCallback_回调方法,也就是调用TcpConnction::handleWrite方法,把发送缓冲区中的数据全部发送完成if (!faultError && remaining > 0) {//  目前发送缓冲区剩余待发送数据长度size_t oldLen = outputBuffer_.readableBytes();if (oldLen + remaining >= highWaterMark_&& oldLen < highWaterMark_&& highWaterMarkCallback_){loop_->queueInLoop(std::bind(highWaterMarkCallback_, shared_from_this(), oldLen + remaining));}outputBuffer_.append(static_cast<const char*>(data)+nwrote, remaining);if (!channel_->isWriting()){// 一定要注册channel的写事件,不然poller不会给channl通知epolloutchannel_->enableWriting();}}
}// 用户调用的
void TcpConnection::shutdown()
{if (state_ == kConnected){setState(kDisconnecting);loop_->runInLoop(std::bind(&TcpConnection::shutdownInLoop, this));}
}void TcpConnection::shutdownInLoop()
{if (!channel_->isWriting()) // 说明outputBuffer中的数据已经全部发送完成{socket_->shutdownWrite(); // 关闭写端,触发epllHUP,调用closeCB,最终执行handleClose}
}const char *TcpConnection::stateToString() const
{switch (state_){case kDisconnected:return "kDisconnected";case kConnecting:return "kConnecting";case kConnected:return "kConnected";case kDisconnecting:return "kDisconnecting";default:return "unknown state";}
}void TcpConnection::connectEstablished()
{setState(kConnected);channel_->tie(shared_from_this()); // Channel绑定本connection,防止TcpConnection对象析构之后Channel还要执行其回调操作channel_->enableReading(); // 向poller注册EPOLLIN事件// 连接建立,执行回调connectionCallback_(shared_from_this());
}void TcpConnection::connectDestroyed()
{if (state_ == kConnected){setState(kDisconnected);channel_->disableAll(); // 从Epoll中移除所有感兴趣事件connectionCallback_(shared_from_this());}channel_->remove();
}void TcpConnection::handleRead(Timestamp receiveTime)
{int savedErrno = 0;ssize_t n = inputBuffer_.readFd(channel_->fd(), &savedErrno);if (n > 0){LOG_DEBUG << "TcpConnection::handleRead()  read bytes from buf:" << n;// 已建立连接的用户,有可读事件发生了,调用用户传入的回调onMessage// 从本对象产生一个智能指针messageCallback_(shared_from_this(), &inputBuffer_, receiveTime);}else if (n == 0){handleClose();}else{errno = savedErrno;LOG_ERROR << "TcpConnection::handleRead";handleError();}
}// 有EpollOut事件的时候才执行
void TcpConnection::handleWrite()
{if (channel_->isWriting()){int saveError = 0;ssize_t n = outputBuffer_.writeFd(channel_->fd(), &saveError);if (n > 0){// n个数据写入成功,恢复n个字节outputBuffer_.retrieve(n);// ==0:已经发送完成了;!=0:还没发送完成if (outputBuffer_.readableBytes() == 0){channel_->disableWriting(); // ??if (writeCompleteCallback_){// 唤醒subLoop对应的线程,在其中执行回调函数// 其实调用 TcpConnection::handleWrite() 函数的时候,已经在这个subLoop中了loop_->queueInLoop(std::bind(writeCompleteCallback_, shared_from_this()));}// 可能client在接收到数据后就会shutdownif (state_ == kDisconnecting){shutdownInLoop();}}}else{LOG_ERROR << "TcpConnection::handleWrite";}}// 上一轮监听中,已经设置了对写事件的不感兴趣else{LOG_INFO << "Connection fd = " << channel_->fd()<< " is down, no more writing";}
}void TcpConnection::handleClose()
{LOG_INFO << "fd = " << channel_->fd() << " state = " << stateToString();// we don't close fd, leave it to dtor, so we can find leaks easily.setState(kDisconnected);channel_->disableAll(); // 在poller上删除Channel所有的感兴趣事件TcpConnectionPtr guardThis(shared_from_this());if(connectionCallback_){connectionCallback_(guardThis); // 执行用户注册的连接回调,用户会在其中判断链接状态}if(closeCallback_){closeCallback_(guardThis); // 关闭连接的回调,调用到TcpServer::removeConnection} 
}void TcpConnection::handleError()
{int optval;socklen_t optlen = sizeof optval;if(::getsockopt(channel_->fd(), SOL_SOCKET, SO_ERROR, &optval, &optlen) < 0){LOG_ERROR << "TcpConnection::handleError [" << name_<< "] - errno = " << errno;}
}

这篇关于【Muduo】TcpConnection类的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1000601

相关文章

muduo 日志打印改造

改造前是这样的:  改造后是这样的:    时间戳改成本地的了,线程id 改成了[线程id],函数名称放到最后。看一下这个打印日志宏: #define LOG_TRACE if (muduo::Logger::logLevel() <= muduo::Logger::TRACE) \   muduo::Logger(__FILE__, __LINE__, muduo::Logger::

仿Muduo库实现高并发服务器——Acceptor模块

Acceptor模块是为了创建套接字,并且接收新到来的客户端套接字,将对应套接字的Channel对象添加到Poller对象中,进行事件监控。 Acceptor模块成员函数: 连接回调函数:  在构造函数这里,就对主线程上面的网络套接字进行回调函数的设置。    会在TcpServer模块中,启动该套接字读事件监控,并将对应的Channel对象添加到Poller对象中。

muduo的研究目录

目录(1)大并发服务器架构介绍(P3)poll:poll函数原型(P4)poll:poll使用的基本流程 ,EMFILE处理 ,cmake(P5)epoll:epoll ,epoll LT ,epoll ET(P6)epoll:epoll的两种触发模式,select/poll/epoll对比(7)muduo介绍,参考:链接(8)面向对象编程(9)基于对象的编程风格(10)muduo_base库源码

(13)muduo_base库源码分析:Thread类实现

文章目录 1.线程标识符2.Thread类图3.Thread类相关代码学习4.相关测试5.多线程多进程的死锁案例 1.线程标识符 Linux中,每个进程有一个pid,类型pid_t,由getpid()取得。Linux下的POSIX线程也有一个id,类型 pthread_t,由pthread_self()取得,该id由线程库维护,其id空间是各个进程独立的(即不同进程中的线程可能

(12)muduo_base库源码分析:Exception类实现

文章目录 1.Exception类实现2.测试 1.Exception类实现 类图 12\jmuduo\muduo\base\Exception.cc // Use of this source code is governed by a BSD-style license// that can be found in the License file.//// Aut

(P20)muduo_base库源码分析:日志作用,日志级别,Logger使用时序图

文章目录 1.日志作用2.日志级别3.Logger使用时序图 1.日志作用 开发过程中: 调试错误 更好的理解程序 运行过程中: 诊断系统故障并处理 记录系统运行状态 错误分为:编译,运行(可以把errno对应的错误文本信息记录到日志中),逻辑错误(将整个程序的运行状态输出到日志中,通过分析日志可以理清楚程序逻辑,从而找出逻辑错误) 2.日志级别 TRACE 指出

(P19)muduo_base库源码分析:ThreadLocalSingleton封装

文章目录 1.ThreadLocalSingleton封装 1.ThreadLocalSingleton封装 类图如下: 该方法比P18的方法更好 线程本地单例类封装,每个线程都有一个T类型的单例对象 eg:src\19\jmuduo\muduo\base\ThreadLocalSingleton.h eg测试:src\19\jmuduo\muduo\base\te

(P18)muduo_base库源码分析:线程特定数据,ThreadLocal类的封装

文章目录 1.线程特定数据2.ThreadLocal类的封装 1.线程特定数据 在单线程程序中,我们经常要用到"全局变量"以实现多个函数间共享数据。 在多线程环境下,由于数据空间是共享的,因此全局变量也为所有线程所共有。 但有时应用程序设计中有必要提供线程私有的全局变量,仅在某个线程中有效,但却可以跨多个函数访问。 POSIX线程库通过维护一定的数据结构来解决这个问题,

(P17)muduo_base库源码分析:线程安全Singleton类实现

文章目录 1.线程安全Singleton类实现 1.线程安全Singleton类实现 线程安全Singleton类实现 pthread_once atexit typedef char T_must_be_complete_type[sizeof(T) == 0 ? -1 : 1]; 类图 +号表示公有的,-号表示私有的。使用模板方式实现 eg:src\17\jmudu

(P16)muduo_base库源码分析:ThreadPool实现

文章目录 1.ThreadPool实现 1.ThreadPool实现 也是生产者消费者问题。 ThreadPool创建了若干线程,维护了一个线程队列; 这些线程是用来执行任务的,所以还维护了一个任务队列; 外部的生产者线程往ThreadPool中的任务队列添加任务,一旦任务队列有任务,则唤醒线程队列的线程来执行这些任务,这些线程相当于消费者线程; eg:src\16\jm