本文主要是介绍重写muduo网络库:终章TcpServer,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
终章TcpServer
回顾muduo网络库的简单使用:
#include <mymuduo/TcpServer.h>
#include <mymuduo/Logger.h>
#include <functional>
#include <string>class EchoServer
{
public:EchoServer(EventLoop* loop, const InetAddress &addr,const std::string &name):loop_(loop),server_(loop,addr,name){//注册回调函数server_.setMessageCallback(std::bind(&EchoServer::onMessage, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));server_.setConnectionCallback(std::bind(&EchoServer::onConnection, this, std::placeholders::_1));//设置线程数量server_.setThreadNum(3);}void start(){server_.start();}
private:void onConnection(const TcpConnectionPtr& conn){if(conn->connected()){LOG_INFO("Connect UP : %s", conn->peerAddress().toIpPort().c_str());}else{LOG_INFO("Connect DOWN : %s", conn->peerAddress().toIpPort().c_str());}}void onMessage(const TcpConnectionPtr& conn,Buffer* buf,Timestamp time){std::string msg = buf->retrieveAllAsString();conn->send(msg);conn->shutdown();}EventLoop* loop_;TcpServer server_;
};int main()
{EventLoop loop;InetAddress addr(8000);EchoServer server(&loop, addr, "EchoServer-01");server.start();loop.loop();return 0;
}
用户先定义一个EventLoop事件循环和Socket地址,用他们构造一个TcpServer对象,然后就是设置回调函数和线程数量,接着就是启动线程池,开启循环监听。
而在构造TcpServer对象的时候做了什么呢?
首先就是接收了一个mainLoop,用接收的mainLoop和Socket地址创建了一个Acceptor,然后就是给Acceptor绑定了一个回调,这个回调就是在accept的处理流程,得到一个connfd时,就会触发这个回调。
这个回调会从线程池中获取一个ioLoop,对这个新连接的connfd打包成TcpConnection;设置用户设置的回调,回调的传递流程时从用户=》TcpServer=》TcpConnection=》Channel=》Poller;然后就runInLoop唤醒ioLoop所在的线程起来处理。
重写TcpServer.h:
#pragma once#include "noncopyable.h"
#include "EventLoop.h"
#include "Acceptor.h"
#include "InetAddress.h"
#include "Callbacks.h"
#include "EventLoopThreadPool.h"
#include "Buffer.h"
#include "TcpConnection.h"#include <string>
#include <functional>
#include <atomic>
#include <unordered_map>class TcpServer : noncopyable
{
public:using ThreadInitCallback = std::function<void(EventLoop*)>;enum Option{kNoReusePort,kReusePort,};TcpServer(EventLoop* loop, const InetAddress& listenAddr, const std::string& nameArg, Option option = kNoReusePort);~TcpServer();void setThreadInitCallback(const ThreadInitCallback &cb) { threadInitCallback_ = cb; }void setConnectionCallback(const ConnectionCallback &cb) { connectionCallback_ = cb; }void setMessageCallback(const MessageCallback &cb) { messageCallback_ = cb; }void setWriteCompleteCallback(const WriteCompleteCallback &cb) { writeCompleteCallback_ = cb; }//设置subloop线程数量void setThreadNum(int numThreads);//开启服务监听void start();private:void newConnection(int sockfd, const InetAddress &peerAddr);void removeConnection(const TcpConnectionPtr& conn);void removeConnectionInLoop(const TcpConnectionPtr& conn);using ConnectionMap = std::unordered_map<std::string, TcpConnectionPtr>;EventLoop *loop_; //用户定义的loopconst std::string ipPort_;const std::string name_;std::unique_ptr<Acceptor> acceptor_; //运行在mainloop,任务就是监听新连接事件std::shared_ptr<EventLoopThreadPool> threadPool_; //one loop per threadConnectionCallback connectionCallback_; //新连接回调MessageCallback messageCallback_; //有读写消息时的回调WriteCompleteCallback writeCompleteCallback_; //消息发送完成以后的回调ThreadInitCallback threadInitCallback_; //loop线程初始化的回调std::atomic_int started_;int nextConnId_;ConnectionMap connections_; //保存所有的连接
};
重写TcpServer.cc:
#include "TcpServer.h"
#include "Logger.h"
#include "TcpConnection.h"#include <strings.h>EventLoop* CheckLoopNotNull(EventLoop *loop)
{if(loop == nullptr){LOG_FATAL("%s:%s:%d mainloop is null \n", __FILE__, __FUNCTION__, __LINE__);}return loop;
}TcpServer::TcpServer(EventLoop* loop, const InetAddress& listenAddr, const std::string& nameArg, Option option):loop_(CheckLoopNotNull(loop)),ipPort_(listenAddr.toIpPort()),name_(nameArg),acceptor_(new Acceptor(loop, listenAddr, option = kReusePort)),threadPool_(new EventLoopThreadPool(loop, name_)),connectionCallback_(),messageCallback_(),nextConnId_(1),started_(0)
{//当有新用户连接时,会执行TcpServer::newConnection回调acceptor_->setNewConnectionCallback(std::bind(&TcpServer::newConnection, this,std::placeholders::_1, std::placeholders::_2));
}//设置subloop线程数量
void TcpServer::setThreadNum(int numThreads)
{threadPool_->setThreadNum(numThreads);
}//开启服务监听
void TcpServer::start()
{if(started_++ == 0) //防止一个TcpServer对象被start多次{threadPool_->start(threadInitCallback_);loop_->runInLoop(std::bind(&Acceptor::listen, acceptor_.get()));}
}//有一个新的客户端的连接,acceptor会执行这个回调操作
void TcpServer::newConnection(int sockfd, const InetAddress& peerAddr)
{//轮询算法,选择一个subLoop,来管理channelEventLoop* ioLoop = threadPool_->getNextLoop();char buf[64] = {0};snprintf(buf, sizeof buf, "-%s#%d", ipPort_.c_str(), nextConnId_);++nextConnId_;std::string connName = name_ + buf;LOG_INFO("TcpServer::newConnection [%s] - new connection [%s] from %s \n",name_.c_str(), connName.c_str(), peerAddr.toIpPort().c_str());//通过sockfd获取其绑定的本机的IP地址和端口信息sockaddr_in local;::bzero(&local, sizeof local);socklen_t addrLen = sizeof local;if(::getsockname(sockfd, (sockaddr*)&local, &addrLen) < 0){LOG_ERROR("sockets::getLocalAddr");}InetAddress localAddr(local);//根据连接成功的sockfd,创建TcpConnection连接对象TcpConnectionPtr conn(new TcpConnection(ioLoop, connName, sockfd, localAddr, peerAddr));connections_[connName] = conn;//下面回调都是用户设置给Tcpserver =》TcpConnection = 》 channel =》 poller =》notify channel调用回调conn->setConnectionCallback(connectionCallback_);conn->setMessageCallback(messageCallback_);conn->setWriteCompleteCallback(writeCompleteCallback_);//设置了如何关闭连接的回调conn->setCloseCallback(std::bind(&TcpServer::removeConnection, this, std::placeholders::_1));ioLoop->runInLoop(std::bind(&TcpConnection::connectEstablished, conn));
}void TcpServer::removeConnection(const TcpConnectionPtr& conn)
{loop_->runInLoop(std::bind(&TcpServer::removeConnectionInLoop, this, conn));
}void TcpServer::removeConnectionInLoop(const TcpConnectionPtr& conn)
{LOG_INFO("TcpServer::removeConnectionInLoop [%s] - connection %s\n",name_.c_str(), conn->name().c_str());connections_.erase(conn->name());EventLoop* ioLoop = conn->getLoop(); ioLoop->queueInLoop(std::bind(&TcpConnection::connectDestroyed, conn));
}TcpServer::~TcpServer()
{for(auto &item : connections_){TcpConnectionPtr conn(item.second); //这个局部的shared_ptr智能指针对象,出右括号,可以自动释放newitem.second.reset();//销毁连接conn->getLoop()->runInLoop(std::bind(&TcpConnection::connectDestroyed, conn));}
}
这篇关于重写muduo网络库:终章TcpServer的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!