【项目】仿muduo库One Thread One Loop式主从Reactor模型实现高并发服务器(TcpServer板块)

本文主要是介绍【项目】仿muduo库One Thread One Loop式主从Reactor模型实现高并发服务器(TcpServer板块),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

【项目】仿muduo库One Thread One Loop式主从Reactor模型实现⾼并发服务器(TcpServer板块)

  • 一、思路图
  • 二、模式关系图
  • 三、定时器的设计
    • 1、Linux本身给我们的定时器
    • 2、我们自己实现的定时器
      • (1)代码部分
      • (2)思想部分
      • (3)使用实验
  • 四、正则表达式
    • 1、描述
    • 2、前置工作(非常重要)
    • 3、代码与解析
      • (1)请求字符
      • (2)[^?]*
      • (3)\\\\?(.*)
      • (4)HTTP/1\\\\.[01]
      • (5)(?:\n|\r\n)?
      • (6)总体代码如下
  • 五、通用类型Any
    • 1、简单介绍
    • 2、思想
    • 3、代码实现
    • 4、效果展示
      • (1)int、string这种简单类型的
      • (2)其他类
    • 5、C++17中的Any
      • (1)查阅文档介绍
      • (2)例子展示
  • 六、Buffer缓冲区
    • 1、设计
      • (1)vector\<char>的设计
      • (2)要素
      • (3)操作
        • i、写入操作
        • ii、读取数据
    • 2、函数接口
    • 3、总体代码
    • 4、测试实例
    • (1)测试string类的是否有用
    • (2)检查扩容
    • (3)测试按行读取
  • 七、暂停一下:日志宏的玩法
    • 1、日志打印代码
    • 2、效果展示
  • 八、Socket
    • 1、接口介绍及速览
    • 2、代码总览
    • 3、效果展示
  • 九、Channel
    • 1、简单介绍思路和用法
    • 2、接口介绍及速览
    • 3、代码总览
    • 4、过编译就行
  • 十、Poller
    • 1、简单介绍
    • 2、使用接口简述
    • 3、原码
  • 十一、通过Poller进行Channel的修改
    • 1、修改内容
    • 2、Poller和Channel联调
    • 3、回调关系图
  • 十二、EventLoop
    • 1、eventfd的妙用
      • (1)代码展示
      • (2)运行结果
    • 2、EventLoop设计思路
    • 3、EventLoop代码
    • 4、Channel的修改
    • 5、运行代码及结果
  • 十三、TimerWheel
    • 1、TimerWheel思路
    • 2、代码
    • 3、修改EventLoop
    • 4、运行代码及结果
  • 十四、Eventloop的模块流程图
  • 十五、Connection
    • 1、Connection设计思路
      • (1)目的
      • (2)管理
      • (3)功能
      • (4)场景
    • 2、预备工作:加上Any类
    • 3、Connection代码
    • 4、纠错修改(必看)
    • 5、运行代码及结果
  • 十六、Acceptor
    • 1、Acceptor设计思路
      • (1)创建一个监听套接字
      • (2)启动读事件监控
      • (3)事件触发后,获取新连接
      • (4)调用新连接获取成功后的回调函数
    • 2、代码
    • 3、运行代码及结果
  • 十七、LoopThread
    • 1、设计思路
    • 2、代码
    • 3、运行结果
  • 十八、LoopThreadPool
    • 1、设计思路
    • 2、代码
    • 3、运行结果
  • 十九、TcpServer
    • 1、设计思路
    • 2、代码
    • 3、运行代码及结果
      • (1)运行结果1
      • (2)运行结果2
      • (3)运行结果3
  • 二十、NetWork
  • 二十一、EchoServer
    • 1、直接上代码(TcpServer二次封装)
    • 2、测试结果
    • 3、简单的EchoServer压力测试
      • (1)找测试文件
      • (2)unzip
      • (3)make
      • (4)./webbench
      • (5)./webbench -c 500 -t 60 http://127.0.0.1:8888/hello
      • (6)跑完以后的结果:
  • 二十二、EchoServer关系图


一、思路图

在这里插入图片描述

二、模式关系图

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

三、定时器的设计

1、Linux本身给我们的定时器

#include <sys/timerfd.h> // 头文件
int timerfd_create(int clockid, int flags); // 创造定时器clockid: CLOCK_REALTIME-系统实时时间,如果修改了系统时间就会出问题; CLOCK_MONOTONIC-从开机到现在的时间是⼀种相对时间;flags: 0-默认阻塞属性
int timerfd_settime(int fd, int flags, struct itimerspec *new, struct itimerspec *old); // 设置定时时间fd: timerfd_create返回的⽂件描述符flags: 0-相对时间, 1-绝对时间;默认设置为0即可.new: ⽤于设置定时器的新超时时间old: ⽤于接收原来的超时时间struct timespec {time_t tv_sec; /* Seconds */long tv_nsec; /* Nanoseconds */};struct itimerspec {struct timespec it_interval; /* 第⼀次之后的超时间隔时间 */struct timespec it_value; /* 第⼀次超时时间 */};

我们利用linux给的定时器来进行代码的书写(简单使用一下):

#include <iostream>
#include <cstdio>
#include <string>
#include <ctime>
#include <cstdlib>
#include <unistd.h>
#include <sys/timerfd.h>
#include <sys/select.h>int main()
{// 创建一个定时器int timerfd = timerfd_create(CLOCK_MONOTONIC, 0);struct itimerspec iem; // 使用该结构体/* struct timespec {time_t tv_sec; // Secondslong tv_nsec; // Nanoseconds};struct itimerspec {struct timespec it_interval; // 第 次之后的超时间隔时间 struct timespec it_value; // 第 次超时时间 };*/iem.it_value.tv_sec = 2; // 第一次的超时时间(秒)iem.it_value.tv_nsec = 0; // 第一次的超时时间(毫秒)iem.it_interval.tv_sec = 2; // 这次过后的每隔多长时间超时(秒)iem.it_interval.tv_nsec = 0; // 这次过后的每隔多长时间超时(毫秒)// 启动定时器timerfd_settime(timerfd, 0, &iem, NULL); // 启动定时器time_t start = time(NULL);// 每隔三秒进行一次定时器的读取操作while(1){uint64_t temp;int fd = read(timerfd, &temp, sizeof(temp));if (fd < 0){perror("read failed!");return -1;}std::cout << temp << " " << time(NULL) - start << std::endl;}return 0;
}

在这里插入图片描述

2、我们自己实现的定时器

(1)代码部分

先上代码:

#include <iostream>
#include <unordered_map>
#include <vector>
#include <cstdint>
#include <unistd.h>
#include <memory>
#include <functional>using FuncTask = std::function<void()>; // 回调函数,用来回调函数任务的
using ReleaseTask = std::function<void()>; // 回调函数,用来回调释放任务的
class TimerTask
{private:uint64_t _id; // 定时器任务对象IDuint32_t _timeout; // 定时器任务超时的时间bool _cancel;  // 用来取消定时任务, false表示没有被取消,true表示被取消了FuncTask _taskcb; // 定时器任务要执行的定时任务ReleaseTask _releasecb; // 定时器任务在时间轮中保存的定时器信息public:TimerTask(uint64_t id, uint32_t delay, const FuncTask& cb) // 构造函数: _id(id), _timeout(delay), _cancel(false) // 刚开始定为false, _taskcb(cb){}~TimerTask() // 析构函数{// 在析构函数中进行两个回调函数的创建if (_cancel == false) _taskcb();_releasecb();}void SetRelease(const ReleaseTask& cb) // 设置释放的回调函数{_releasecb = cb;}uint32_t DelayTimer() // 将延迟时间开放出去{return _timeout;}void Cancel(){_cancel = true;}
};
// 时间轮
class WheelTask
{private:using SharedPtrTask = std::shared_ptr<TimerTask>; // 用一个shared_ptr,因为有引用计数,当引用计数为0的时候,就是释放资源的时候using WeakPtrTask = std::weak_ptr<TimerTask>; // weak_ptr用来配合shared_ptr使用int _tick; // 当前的秒钟,指到哪就将哪释放掉int _capacity; // 表盘的最大数量(最大延迟时间)std::vector<std::vector<SharedPtrTask>> _wheel; // 时间轮(二维数组,存放的数据更规整)std::unordered_map<uint64_t, WeakPtrTask> _timers; // 映射关系private:void RemovePtr(uint64_t id) // 移动指针{auto it = _timers.find(id); // 先找到id在_timers中if (it != _timers.end()){_timers.erase(it); // 删除掉it所指向的位置,也就是id的位置}}public:WheelTask(): _tick(0), _capacity(60), _wheel(_capacity){}void TimerAdd(uint64_t id, uint32_t delay, const FuncTask& cb) // 添加到定时任务{SharedPtrTask pt(new TimerTask(id, delay, cb)); // 先建立一个shared_ptrpt->SetRelease(std::bind(&WheelTask::RemovePtr, this, id)); // 有一个this指针int pos = (_tick + delay) % _capacity;_wheel[pos].push_back(pt); // 将这个pt先插入到哈希表中_timers[id] = WeakPtrTask(pt); // 对象中不能用shared_ptr,因为shared_ptr不会减减引用计数,所以用weak_ptr}void TimerRefresh(uint64_t id) // 刷新/延迟定时时间{auto it = _timers.find(id);if (it == _timers.end()){perror("_timers find error!");return;}SharedPtrTask pt = it->second.lock();//lock获取weak_ptr管理的对象对应的shared_ptrint delay = pt->DelayTimer();int pos = (_tick + delay) % _capacity;_wheel[pos].push_back(pt);}void CancelTimer(uint64_t id) // 取消定时任务{auto it = _timers.find(id);if (it == _timers.end()){perror("_timers not find!");return;}SharedPtrTask pt = it->second.lock();if (pt) pt->Cancel();}void RunTimerTask() // 时钟滴答滴答往后走{_tick = (_tick + 1) % _capacity;_wheel[_tick].clear(); // 清空一下}
};// for test
class Task
{public:Task(){std::cout << "构造函数!~" << std::endl;}~Task(){std::cout << "析构函数!~" << std::endl;}   
};void Delete(Task* t)
{delete t;
}int main()
{WheelTask tw;Task* t = new Task();tw.TimerAdd(888, 5, std::bind(Delete, t));for(int i = 5; i > 0; i--){sleep(1);tw.TimerRefresh(888);//刷新定时任务tw.RunTimerTask();//向后移动秒针std::cout << "刷新了一下操作,需要" << i << "秒以后才能进行\n";}tw.CancelTimer(888); // 用来测试取消的函数while(1) {std::cout << "-------------------\n";tw.RunTimerTask();//向后移动秒针sleep(1);}return 0;}

(2)思想部分

我们linux本身的定时器是有很大缺陷的,因为我们假如说一个任务超时了以后,要将所有的所有的连接进行遍历一遍,这就效率大大的降低,因为假如说是有成千上万个连接的话,从头到尾变量太不现实了。
所以我们有了另一种方案,这种方案的思路来自于时钟的钟表,我们看时钟的钟表是不是滴答滴答的往后走的?所以我们就可以用的是这种思路来进行设计的,我们定义一个数组,以及一个指针,指针起始位置指向初始位置,指针每秒钟往后走一步,走到哪里象征着哪里的任务开始执行,所以假如我们设置一个3秒的任务,我们只需要用_tick指针往后+3指向数组的相对应的位置即可,然后执行当前的任务。而假如说是我们有成批成批的任务的话,我们设计思路也非常简单,我们在当前位置往下拉一排数组就可以了,也就是我们熟悉的哈希桶底下挂链表,所以也就是在当前的时刻同时执行很多的任务。如下图所示:
在这里插入图片描述

而还有一种条件,假如说我们的超时时间到了1小时的时间的话,这个数组要开3600个,还是太多了,那么我们就有另外一种思路,我们用多个时间轮来进行操作,我们设置一个小时时间轮,一个分钟时间轮,一个秒钟时间轮,当小时时间轮到相对应的位置的时候,分钟时间轮开始运动,到了相对应的位置秒钟时间轮开始运动,一直到对应的位置即可,我们如下图所示:
在这里插入图片描述

但其实,我们平常用的定时任务最多也不过30秒,所以我们只需要设计60秒的就可以了,不用设计多层的时间轮,只需要设计单层时间轮即可。

我们可以去思考一个问题,我们现在实现的想法是当时间片到了,我们主动去执行定时任务,释放连接,那我们可不可以时间片到了后自动取执行定时任务,释放连接。那么我们可以想到析构函数!利用析构函数将一个定时任务作为一个类中的析构函数内进行操作,那么这个定时任务在对象被释放的时候执行析构函数进行释放。
同时还有一个问题:假如说我们当前的任务建立连接成功了,我们给这个连接设置了一个30s的定时销毁的任务,但是在第10秒的时候,突然建立了通信了,我们此时该怎么办?我们还记不记得有一个智能指针叫shared_ptr?我们利用这个智能指针的引用计数来进行解决,简单思路就是第10秒后在第40秒的位置加一个销毁任务,那么引用计数也就为2,到第30秒先-1,到第40秒再-1到0,销毁,图的思路逻辑如下:
在这里插入图片描述

(3)使用实验

在这里插入图片描述

在这里插入图片描述

四、正则表达式

1、描述

正则表达式描述了⼀种字符串匹配的模式,可以⽤来检查⼀个串是否含有某种⼦串、将匹配的⼦串替换或者从某个串中取出符合某个条件的⼦串等。使用正则表达式,使得HTTP协议更加的简单了,因为其只需要用这一个来进行子串的匹配和提取了。

2、前置工作(非常重要)

sudo yum install -y devtoolset-9-gcc devtoolset-9-gcc-c++

在这里插入图片描述

scl enable devtoolset-9 bash

在这里插入图片描述

vim ~/.bash_profile

在这里插入图片描述

在这里插入图片描述

3、代码与解析

我们的HTTP代码是如下所示的:

"GET /jiangrenhai/login$usr=jrh&password=123456 HTTP/1.1\r\n"

(1)请求字符

请求的方法有右边五种:GET|HEAD|POST|PUT|DELETE

(2)[^?]*

表示匹配非问号的字符 后面的*表示匹配并提取0个或多个。

(3)\\?(.*)

\\?表示匹配原始的? (.*)表示匹配问号后的任意字符0次或多次 空格表示到空格为止.

(4)HTTP/1\\.[01]

\\表示匹配最原始的\ [01]表示匹配0或1。

(5)(?:\n|\r\n)?

(?: …) 表示匹配某个格式字符串,但是不提取, 最后的?表示的是匹配前边的表达式0次或1次。

(6)总体代码如下

#include <iostream>
#include <regex>
#include <string>int main()
{// std::string str = "GET /jiangrenhai/login?usr=jrh&password=123456 HTTP/1.1\r\n";std::string str = "GET /jiangrenhai/login$usr=jrh&password=123456 HTTP/1.1\r\n";std::smatch matches;// 请求的方法有右边五种:GET|HEAD|POST|PUT|DELETE  std::regex e("(GET|HEAD|POST|PUT|DELETE) ([^?]*)(?:\\?(.*))? (HTTP/1\\.[01])(?:\n|\r\n)?");// [^?]*      表示匹配非问号的字符 后面的*表示匹配并提取0个或多个// \\?(.*)    \\?表示匹配原始的? (.*)表示匹配问号后的任意字符0次或多次 空格表示到空格为止// HTTP/1\\.[01]   \\表示匹配最原始的\  [01]表示匹配0或1// (?:\n|\r\n)?   (?: ...) 表示匹配某个格式字符串,但是不提取, 最后的?表示的是匹配前边的表达式0次或1次bool ret = std::regex_match(str, matches, e);if (ret == false){return -1;}for (auto &e : matches){std::cout << "[" << e << "]" << std::endl;}return 0;
}

在这里插入图片描述

五、通用类型Any

1、简单介绍

每⼀个Connection对连接进⾏管理,最终都不可避免需要涉及到应⽤层协议的处理,因此在Connection中需要设置协议处理的上下⽂来控制处理节奏。但是应⽤层协议千千万,为了降低耦合度,这个协议接收解析上下⽂就不能有明显的协议倾向,它可以是任意协议的上下⽂信息,因此就需要⼀个通⽤的类型来保存各种不同的数据结构。

简而言之,也就是我们构建一个通用的类型Any,用来实例化近乎所有容器类型。

2、思想

我们假如说要实现一个通用的类型用来保存Connection的上下文,那么我们不能用模版来进行描写,因为假如我们用的是模版的话,我们需要传Any<int>或者Any<float>等,也就是说在使⽤的时候就要确定其类型。这是⾏不通的,因为保存在Content中的协议上下⽂,我们在定义any对象的时候是不知道他们的协议类型的,因此⽆法传递类型作为模板参数。所以我们考虑在Any类中内部设计⼀个模板容器holder类,可以保存各种类型数据,所以,定义⼀个基类placehoder,让placeholder继承于holder,而Any类保存⽗类指针即可,当需要保存数据时,则new⼀个带有模板参数的子类placeholder对象出来保存数据,然后让Any类中的父类指针,指向这个子类对象就搞定了。

3、代码实现

#include <iostream>
#include <cassert>
#include <typeinfo>
#include <string>// 先搭个框架--通用容器的函数
class Any
{private:class Holder{public:virtual ~Holder(){} // 析构函数// 纯虚函数的定义在于只让它的子类对象刻画出对象,// 不让父类对象刻画出对象,就像动物包含着老虎狮子等的,老虎狮子是对象,而动物不是对象virtual const std::type_info& type() = 0; virtual Holder* clone() = 0;};template<class T>class PlaceHolder : public Holder{public:// 构造函数PlaceHolder(const T& val) :_val(val) {}// 类型virtual const std::type_info& type(){return typeid(T); // 直接返回我们模版实例化的参数的类型即可}// 针对当前的函数对象,克隆出一个新的子类对象virtual Holder* clone(){return new PlaceHolder(_val);}public:T _val; // 构造一个模版类型的数据};Holder *_content; // 父类的指针可以指向任何类型的数据(这个是Any类型的私有函数)public:// 空的构造函数Any():_content(NULL){}// 模版T类型的构造函数template<class T>Any(const T& val):_content(new PlaceHolder<T>(val)){}// 通过其他的通用容器来构造我们自己的容器Any(const Any& other):_content(other._content ? other._content->clone() : NULL){}// 析构函数~Any(){delete _content; // 释放掉这个_content}// 得到子类对象保存的数据的指针template<class T>T* get(){// 当前对象的类型不能和传进来的实例化模版的类型是一样的assert(_content->type() == typeid(T));// 先强转,再指向_val,再取地址return &((PlaceHolder<T>*)_content)->_val;}Any& Swap(Any& other){std::swap(_content, other._content); // 调用算法函数直接交换this和other对象的指针return *this; // 返回实例化的this解引用}// operator=的任意模板类型的运算符重载函数template<class T>Any& operator=(const T& val){//为val构造一个临时的通用容器,然后与当前容器自身进行指针交换,临时对象释放的时候,原先保存的数据也就被释放Any(val).Swap(*this);return *this;}// operator=的对象的运算符重载函数Any& operator=(const Any& other){Any(other).Swap(*this);return *this;}
};class Task
{public:Task(){std::cout << "构造" << std::endl;}Task(const Task& t){std::cout << "拷贝" << std::endl;}~Task(){std::cout << "析构" << std::endl;}
};int main()
{Any a;{Task t;a = t;}// a = 10;// int* pa = a.get<int>();// std::cout << *pa << std::endl;// a = std::string("nihaoya");// std::string* ps = a.get<std::string>();// std::cout << *ps << std::endl;return 0;
}

4、效果展示

(1)int、string这种简单类型的

在这里插入图片描述

(2)其他类

在这里插入图片描述

5、C++17中的Any

(1)查阅文档介绍

在这里插入图片描述
我们主要用的是any和any_cast<>。

(2)例子展示

在这里插入图片描述
在这里插入图片描述

六、Buffer缓冲区

1、设计

(1)vector<char>的设计

我们使用的缓冲区中得有一块内存空间,我们这里采用的是vector<char>,为什么不用string呢?原因在于我们结尾又不需要判断\0,根本不需要用string,我们为什么不用vector<std::string>,我们是一个字符一个字符存的,又不是一段字符串一段字符串存的,所以用vector<char>更合适。

(2)要素

首先要有一个默认的空间大小
当前的读取数据的位置
当前的写入数据的位置

我们用代码角度解释为:

class Buffer
{private:std::vector<char>;uint64_t ReadData; // 相对读数据的偏移量uint64_t WriteData; // 相对写数据的偏移量public:// ...
};

(3)操作

i、写入操作

当前的写入位置指向哪里就从哪里开始写入,但我们内存空间总是有大小的吧?所以我们分下面情况进行讨论:

假如说是后续的剩余空间不够了
我们此时考虑一下当前内存的整体空间的大小是否足够,倘若是前面还是有空间的,那么我们将这段数据移到从最前面开始,读指针指向开头位置,写指针指向当前内存往后增加的结尾位置。倘若是前面已经没有空间了或者是前面的空间已经不够再加入我们当前需要插入的空间的时候,我们直接扩容。而当数据一旦写成功了,我们的当前写位置就要往后偏移。

在这里插入图片描述

ii、读取数据

当前的读取数据指向哪里,我们就从哪里开始读取,但其前提是要有数据可读。
可读数据大小=写数据位置-读数据位置。

2、函数接口

我们总共有11种主要的函数接口的定义,其余有其他拓展的函数接口,我们主要的十一种接口如下:
我们用下面这个形象的图来进行写接口。
在这里插入图片描述

        // 1、获取当前写入的起始地址char* WritePos(); // 写就是数组开始位置+write的相对开头的位置(是一个指针)// 2、获取当前读入的起始地址char* ReadPos(); // 读就是数组开始位置+read的相对开头的位置(同样也是一个指针)// 3、获取缓冲区末尾的空闲的空间大小--写数据往后的空闲空间的大小uint64_t TailSpace(); // 总空间-写的位置// 4、获取缓冲区开始的空闲的空间大小--读数据往前的空闲空间的大小uint64_t HeadSpace(); // 直接就是读的位置,相对于整个缓冲区的开头// 5、获取可读数据大小uint64_t ReadAbleSize(); // 写位置-读位置// 6、将读偏移往后void ReadOffset(uint64_t len); // 读往后加len(考虑长度不大于可读空间)// 7、将写偏移往后void WriteOffset(uint64_t len); // 写往后加len(考虑长度不大于读位置往后的空间)// 8、确保可写空间足够--整体空闲空间够了就用整体空闲空间,不够就扩容void EnsureWriteSpace(uint64_t len); // 空间不够的话先看前面的空间和后面剩余空间相加是否够用,够用就往前挪然后后面加入,不够就扩容(扩到刚好的容量)// 9、写入数据void WriteData(const void* data, uint64_t len); // 从写的位置往后写len的长度// 10、读取数据void ReadData(void* buff, uint64_t len); // 从读位置往后读len个位置// 11、清空缓冲区void clear(); // 俩相对位置等于0即可

后面会加很多的string类的,string类的只需要传入字符串即可,直接用上面的接口就OK了。

3、总体代码

#include <iostream>
#include <string>
#include <vector>
#include <unistd.h>
#include <cassert>
#include <cstring>
#define BufferDefault 1024class Buffer
{private:std::vector<char> _buffer;uint64_t _read_idx; // 相对读数据的偏移量uint64_t _write_idx; // 相对写数据的偏移量public:// 构造函数--俩偏移位置都从0开始,设置_buffer的空间大小为1024Buffer():_buffer(BufferDefault), _read_idx(0), _write_idx(0){}// 定义一个Begin()函数用来调用整个数组_buffer的起始位置的char* Begin() { return &*_buffer.begin(); }// 1、获取当前写入的起始地址char* WritePos(){return Begin() + _write_idx; // 开始位置加上_write_idx到写的位置}// 2、获取当前读入的起始地址char* ReadPos(){return Begin() + _read_idx; // 开始位置加上_read_idx到读的位置}// 3、获取缓冲区末尾的空闲的空间大小--写数据往后的空闲空间的大小uint64_t TailSpace(){return _buffer.size() - _write_idx; // 总体的大小减去写的相对偏移量}// 4、获取缓冲区开始的空闲的空间大小--读数据往前的空闲空间的大小uint64_t HeadSpace(){return _read_idx; // 直接就是读的相对偏移量}// 5、获取可读数据大小uint64_t ReadAbleSize(){return _write_idx - _read_idx; // 写的偏移量-读的偏移量}// 6、将读偏移往后void ReadOffset(uint64_t len){assert(len <= ReadAbleSize()); // 不让往后读到结尾的位置,要读的长度小于等于可读数据大小_read_idx += len; // 往后加等即可}// 7、将写偏移往后void WriteOffset(uint64_t len){assert(len <= TailSpace()); // 往后写不能到末尾,不能多于后面剩的空间大小_write_idx += len; // 往后加等即可}// 8、确保可写空间足够--整体空闲空间够了就用整体空闲空间,不够就扩容void EnsureWriteSpace(uint64_t len){// 1、足够的话 直接返回if (len <= TailSpace()) return;// 2、末尾空间不够且加上前面剩余空间够的情况下,先将整体数据移动到最前面,然后再在后面进行加入if (len <= TailSpace() + HeadSpace()){// 先保存一下可读数据的大小uint64_t Size = ReadAbleSize();// 再将数据头移动到最开始,这里用std::copy函数// template<class InputIterator, class OutputIterator>// OutputIterator copy (InputIterator first, InputIterator last, OutputIterator result)// {//     while (first!=last) {//         *result = *first;//         ++result; ++first;//     }//     return result;// }std::copy(ReadPos(), ReadPos() + Size, Begin());_read_idx = 0; // 更新可读位置到0_write_idx = Size; // 更新可写位置为Size}else // 这种情况是需要扩容了{_buffer.resize(len + _write_idx); // 直接扩容到相对应的位置}}// 9、写入数据void WriteData(const void* data, uint64_t len){// 1、确保有足够的空间EnsureWriteSpace(len);// 2、写入数据const char* data_ = (const char*)data;std::copy(data_, data_ + len, WritePos());}// 写入push的void WriteAndPush(const void* data, uint64_t len){WriteData(data, len); // 先写进去WriteOffset(len); // 再偏移}// 写入string类型数据void WriteString(const std::string& data){return WriteData(data.c_str(), data.size()); // 返回上面封装的函数即可}// 写入string再pushvoid WriteStringAndPush(const std::string& data){WriteString(data); // 先写进去WriteOffset(data.size()); // 再偏移}// 写入Buffer类的void WriteBuffer(Buffer& data){return WriteData(data.ReadPos(), data.ReadAbleSize()); // 先写入}// 写入Buffer的string类型void WriteBufferAndPush(Buffer& data){WriteBuffer(data); // 先写入WriteOffset(data.ReadAbleSize()); // 再偏移}// 10、读取数据void ReadData(void* buff, uint64_t len){assert(len <= ReadAbleSize()); // 确保不超能读的范围std::copy(ReadPos(), ReadPos() + len, (char*)buff);}// 读取string类std::string ReadAsString(uint64_t len){// 首先要求len不能比可读数据多assert(len <= ReadAbleSize());std::string str;str.resize(len); // 将str这个string类字符串的空间定义好ReadData(&str[0], len);return str;}// 读取并往外拿void ReadAndPop(void* buff, uint64_t len){ReadData(buff, len);ReadOffset(len);}// 读string类并往外拿std::string ReadAsStringAndPop(uint64_t len) {// 首先要求len不能比可读数据多assert(len <= ReadAbleSize());std::string str = ReadAsString(len);ReadOffset(len);return str;}// 找换行符char* FindCRl(){// void *memchr(const void *s, int c, size_t n);char* res = (char*)memchr(ReadPos(), '\n', ReadAbleSize()); // memchr(起始位置,到的字符,大小)return res;}std::string GetLine(){char* pos = FindCRl(); // 先找到这一行的位置if (pos == nullptr) { return ""; }return ReadAsString(pos - ReadPos() + 1); // +1是为了将\n也加入  }std::string GetLineAndPop(){std::string str = GetLine(); // 先找到ReadOffset(str.size());return str;}// 11、清空缓冲区void clear(){_read_idx = _write_idx = 0;}
};

4、测试实例

(1)测试string类的是否有用

在这里插入图片描述

(2)检查扩容

在这里插入图片描述

(3)测试按行读取

在这里插入图片描述

七、暂停一下:日志宏的玩法

1、日志打印代码

我们通过日志宏的做法可以更快速的定位到错误的地方,将日志宏分等级以后,我们想打印什么等级的日志我们只需要进行打印即可,我们尝试写一下下面的日志宏代码:

#define INFOR 0
#define DEBUG 1
#define ERROR 2
#define LOGLEVEL INFOR/*struct tm *localtime(const time_t *timep);--localtime的用法*/// 先定义一个t为当前时间// 用上面那个样例的结构体/*size_t strftime(char *s, size_t max, const char *format,const struct tm *tm);*/// %H对应小时,%M对应分钟,%S对应秒
#define LOG(level, format/*类型*/, ...)do{\if (level < LOGLEVEL) break;\time_t t = time(NULL);\struct tm *ltm = localtime(&t);\char tmp[32] = {0};\strftime(tmp, 31, "%H:%M:%S", ltm);\fprintf(stdout, "[%s %s:%d]" format "\n", tmp,  __FILE__, __LINE__, ##__VA_ARGS__);\
}while(0)#define INFLOG(format, ...) LOG(INFOR, format, ##__VA_ARGS__);
#define DEBLOG(format, ...) LOG(DEBUG, format, ##__VA_ARGS__);
#define ERRLOG(format, ...) LOG(ERROR, format, ##__VA_ARGS__);

2、效果展示

在这里插入图片描述

八、Socket

1、接口介绍及速览

        // 创建套接字bool CreateSocket(); // 直接用socket就OK// 绑定地址信息bool Bind(const std::string &ip, uint16_t port); // bind// 开始监听bool Listen(int blocknum = MAX_BLOCK_NUM); // listen// 向服务器发起连接bool Connection(const std::string& ip, uint16_t port); // connect// 获取新连接int Accept(); // accept// 接收数据ssize_t Recv(void* buff, size_t len, int flag = 0); // recv// 非阻塞接收数据ssize_t NonBlockRecv(void* buff, size_t len); // 调用Recv即可,我们最后一个参数设置为非阻塞即可// 发送数据ssize_t Send(const void* buff, size_t len, int flag = 0); // send// 非阻塞发送数据ssize_t NonBlockSend(void* buff, size_t len); // 调用Send,最后一个参数设置为非阻塞即可// 关闭套接字void Close(); // close// 创建一个监听链接bool Server(uint16_t port, const std::string& ip = "0.0.0.0", bool block_flag = false); // 1、创建套接字 2、绑定地址 3、监听套接字 4、设置非阻塞 5、地址复用// 创建一个客户端连接bool Client(uint16_t port, const std::string& ip); // 1、创建套接字 2、建立连接// 设计套接字选项--开启地址端口重用void Reuse(); // 用setsockopt// 设置套接字阻塞属性--设置为非阻塞void NonBlock(); // fcntl

2、代码总览

这里为了方便我们进行测试,我们直接将整个项目的所有头文件都放在下面了:

#include <iostream>
#include <string>
#include <vector>
#include <unistd.h>
#include <cassert>
#include <cstring>
#include <ctime>
#include <functional>
#include <unordered_map>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <memory>
#include <typeinfo>
#include <fcntl.h>
#include <signal.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#include <sys/epoll.h>
#include <sys/eventfd.h>
#include <sys/timerfd.h>
#define MAX_BLOCK_NUM 1024
class Socket
{private:int _sockfd;public:Socket():_sockfd(-1){}Socket(int fd):_sockfd(fd){}~Socket(){ Close(); }int Fd(){ return _sockfd; }// 创建套接字bool CreateSocket(){/*int socket(int domain, int type, int protocol);*/// AF_INET             IPv4 Internet protocols          ip(7)// SOCK_STREAM     Provides sequenced, reliable, two-way, connection-based byte streams.  An out-of-band data  trans‐//           mission mechanism may be supported._sockfd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); // 创建套接字if (_sockfd < 0){ERRLOG("Create socket error!");return false;}return true;}// 绑定地址信息bool Bind(const std::string &ip, uint16_t port){// int bind(int sockfd, const struct sockaddr *addr, socklen_t addrlen);struct sockaddr_in addr; // 先创建这个结构体addr.sin_family = AF_INET;addr.sin_port = htons(port);addr.sin_addr.s_addr = inet_addr(ip.c_str());socklen_t len = sizeof(struct sockaddr_in);int ret = bind(_sockfd, (struct sockaddr*)&addr, len);if (ret < 0){ERRLOG("BIND FAILED");return false;}return true;}// 开始监听bool Listen(int blocknum = MAX_BLOCK_NUM){// int listen(int sockfd, int backlog);int ret = listen(_sockfd, blocknum);if (ret < 0){ERRLOG("LISTEN ERROR!");return false;}return true;}// 向服务器发起连接bool Connection(const std::string& ip, uint16_t port){// int connect(int sockfd, const struct sockaddr *addr, socklen_t addrlen);struct sockaddr_in addr; // 先创建这个结构体addr.sin_family = AF_INET;addr.sin_port = htons(port);addr.sin_addr.s_addr = inet_addr(ip.c_str());socklen_t len = sizeof(struct sockaddr_in);int ret = connect(_sockfd, (struct sockaddr*)&addr, len);if (ret < 0){ERRLOG("CONNECT FAILED");return false;}return true;}// 获取新连接int Accept(){// int accept(int sockfd, struct sockaddr *addr, socklen_t *addrlen);int newfd = accept(_sockfd, NULL, NULL);if (newfd < 0){ERRLOG("ACCEPT ERROR!");return -1;}return newfd;}// 接收数据ssize_t Recv(void* buff, size_t len, int flag = 0){// ssize_t recv(int sockfd, void *buf, size_t len, int flags);ssize_t ret = recv(_sockfd, buff, len, flag);if (ret < 0){//EAGAIN 当前socket的接收缓冲区中没有数据了,在非阻塞的情况下才会有这个错误//EINTR  表示当前socket的阻塞等待,被信号打断了,if (errno == EAGAIN || errno == EINTR) {INFLOG("JUST NON&&BREAK");return 0;//表示这次接收没有接收到数据}ERRLOG("RECV ERROR!");return -1; // 这里-1表示确实是出错了}return ret;}// 非阻塞接收数据ssize_t NonBlockRecv(void* buff, size_t len){return Recv(buff, len, MSG_DONTWAIT); // MSG_DONTWAIT 表示当前接收为非阻塞。}// 发送数据ssize_t Send(const void* buff, size_t len, int flag = 0){// ssize_t send(int sockfd, const void *buf, size_t len, int flags);ssize_t ret = send(_sockfd, buff, len, flag);if (ret < 0){if (errno == EAGAIN || errno == EINTR) {INFLOG("JUST NON&&BREAK");return 0;}ERRLOG("SEND ERROR!");return -1;}return ret;}// 非阻塞发送数据ssize_t NonBlockSend(void* buff, size_t len){return Send(buff, len, MSG_DONTWAIT); // MSG_DONTWAIT 表示当前接收为非阻塞。}// 关闭套接字void Close(){// int close(int fd);if (_sockfd != -1){close(_sockfd);_sockfd = -1;}}// 创建一个监听链接bool Server(uint16_t port, const std::string& ip = "0.0.0.0", bool block_flag = false){// 1、创建套接字 2、绑定地址 3、监听套接字 4、设置非阻塞 5、地址复用if (CreateSocket() == false) return false;if (block_flag) NonBlock(); // 设置非阻塞if (Bind(ip, port) == false) return false;if (Listen() == false) return false;Reuse();return true;}// 创建一个客户端连接bool Client(uint16_t port, const std::string& ip){// 1、创建套接字 2、建立连接if (CreateSocket() == false) return false;if (Connection(ip, port) == false) return false;return true;}// 设计套接字选项--开启地址端口重用void Reuse(){// int setsockopt(int sockfd, int level, int optname, const void *optval, socklen_t optlen);int val = 1;setsockopt(_sockfd, SOL_SOCKET, SO_REUSEADDR, (void*)&val, sizeof(int)); // 地址进行复用一下val = 1;setsockopt(_sockfd, SOL_SOCKET, SO_REUSEPORT, (void*)&val, sizeof(int)); // 端口也进行复用一下}// 设置套接字阻塞属性--设置为非阻塞void NonBlock(){// int fcntl(int fd, int cmd, ... /* arg */ );int flag = fcntl(_sockfd, F_GETFL, 0);fcntl(_sockfd, F_SETFL, flag | O_NONBLOCK);}
};

3、效果展示

代码展示:

// tcp_svr.cc
#include "../source/server.hpp"// for test
int main()
{Socket l_socket;l_socket.Server(8888);while (1){int newfd = l_socket.Accept();if (newfd < 0){continue;}Socket cl_socket(newfd); // 重新创建一个char buff[1024] = {0};int ret = cl_socket.Recv(buff, 1023);if (ret < 0){cl_socket.Close();continue;}cl_socket.Send(buff, ret);cl_socket.Close();}l_socket.Close();return 0;
}
// tcp_cli.cc
#include "../source/server.hpp"int main()
{Socket clent_socket;clent_socket.Client(8888, "127.0.0.1");std::string str = "xiangshuijiaole";clent_socket.Send(str.c_str(), str.size());char buff[1024] = {0};clent_socket.Recv(buff, 1023);DEBLOG("%s", buff);return 0;
}

在这里插入图片描述

九、Channel

1、简单介绍思路和用法

这个类设计的目的是对描述符的监控事件管理。
其功能有:描述符是否可读,描述符是否可写,对描述符监控可读,对描述符监控可写,解除可读事件监控,解除可写事件监控,解除所有事件监控。而当事件触发后的处理的管理,需要的处理的事件有:可读,可写,挂断,错误,任意,同时我们需要添加事件处理的回调函数。
在这里插入图片描述

2、接口介绍及速览

		// 获取想要监控的事件uint32_t Events();// 设置实际就绪的事件void SetREvents(uint32_t events);// 设置可读回调函数void SetReadCallback(const EventCallback& cb);// 设置可写回调函数void SetWriteCallback(const EventCallback& cb);// 设置错误回调函数void SetErrorCallback(const EventCallback& cb);// 设置连接关闭回调函数void SetCloseCallback(const EventCallback& cb);// 设置所有回调函数void SetAlleventCallback(const EventCallback& cb);// 描述符是否监控了可读bool ReadAble();// 描述符是否监控了可写bool WriteAble();// 对描述符监控可读void EnableRead();// 对描述符监控可写void EnableWrite();// 解除可读事件监控void DisableWrite();// 解除可写事件监控void DisableRead();// 解除所有事件监控void DisableEvent();// 移除监控void Move() { /*这个要到evebtloop往后才能用到*/ }// 连接事件处理void Handle();

3、代码总览

class Channel
{// 以下是epoll_ctl的使用 man 2 epoll_ctl/*EPOLLINThe associated file is available for read(2) operations.EPOLLOUTThe associated file is available for write(2) operations.EPOLLRDHUP (since Linux 2.6.17)Stream socket peer closed connection, or shut down writing half of connection.  (This  flag  is  especiallyuseful for writing simple code to detect peer shutdown when using Edge Triggered monitoring.)EPOLLPRIThere is urgent data available for read(2) operations.EPOLLERRError condition happened on the associated file descriptor.  epoll_wait(2) will always wait for this event;it is not necessary to set it in events.EPOLLHUPHang up happened on the associated file descriptor.  epoll_wait(2) will always wait for this event;  it  isnot necessary to set it in events.EPOLLETSets  the  Edge  Triggered  behavior for the associated file descriptor.  The default behavior for epoll isLevel Triggered.  See epoll(7) for more detailed information about Edge and Level Triggered event distribu‐tion architectures.EPOLLONESHOT (since Linux 2.6.2)Sets  the  one-shot  behavior for the associated file descriptor.  This means that after an event is pulledout with epoll_wait(2) the associated file descriptor is internally disabled and no other  events  will  bereported  by  the  epoll  interface.   The  user must call epoll_ctl() with EPOLL_CTL_MOD to rearm the filedescriptor with a new event mask.*/private:int _fd; // 文件描述符uint32_t _events; // 当前所监控的事件uint32_t _revents; // 当前所连接触发的事件using EventCallback = std::function<void()>; // 包装器包装个回调函数EventCallback _readcallback; // 可读事件回调EventCallback _writecallback; // 可写事件回调EventCallback _errorcallback; // 错误事件回调EventCallback _closecallback; // 连接失效事件回调EventCallback _eventcallback; // 任意事件回调public:// 构造函数Channel(int fd): _fd(fd), _events(0), _revents(0) {}// 返回文件描述符int Fd(){return _fd;}// 获取想要监控的事件uint32_t Events() { return _events; }// 设置实际就绪的事件void SetREvents(uint32_t events) { _revents = events; }// 设置可读回调函数void SetReadCallback(const EventCallback& cb) { _readcallback = cb; }// 设置可写回调函数void SetWriteCallback(const EventCallback& cb) { _writecallback = cb; }// 设置错误回调函数void SetErrorCallback(const EventCallback& cb) { _errorcallback = cb; }// 设置连接关闭回调函数void SetCloseCallback(const EventCallback& cb) { _closecallback = cb; }// 设置所有回调函数void SetAlleventCallback(const EventCallback& cb) { _eventcallback = cb; }// 描述符是否监控了可读bool ReadAble() { return (_events & EPOLLIN); }// 描述符是否监控了可写bool WriteAble() { return (_events & EPOLLOUT); }// 对描述符监控可读void EnableRead() { _events |= EPOLLIN; }// 对描述符监控可写void EnableWrite() { _events |= EPOLLOUT; }// 解除可读事件监控void DisableWrite() { _events &= ~EPOLLIN; }// 解除可写事件监控void DisableRead() { _events &= ~EPOLLOUT; }// 解除所有事件监控void DisableEvent() { _events = 0;/*全部清空即可*/ }// 移除监控void Move() { /*这个要到evebtloop往后才能用到*/ }// 连接事件处理void Handle(){if ((_revents & EPOLLIN) || (_revents & EPOLLHUP) || (_revents & EPOLLPRI)){if (_readcallback) _readcallback(); // 上面都成立的情况下如果读的回调函数存在则调用读的回调函数if (_eventcallback) _eventcallback(); // 总要有一个任意事件的关闭函数吧!不管怎么样都会有一个回调函数的}if (_revents & EPOLLOUT){if (_writecallback) _writecallback(); // 写入if (_eventcallback) _eventcallback(); // 事件处理完毕后,刷新活跃度}else if (_revents & EPOLLERR){if (_eventcallback) _eventcallback();if (_errorcallback) _errorcallback(); // 一旦出了错误,就调用错误的回调函数,但前面是加入了任意事件的回调函数的}else if (_revents & EPOLLHUP){if (_eventcallback) _eventcallback();if (_closecallback) _closecallback(); // 关闭函数前调用一下任意事件的回调函数}}
};

4、过编译就行

在这里插入图片描述

十、Poller

1、简单介绍

在这里插入图片描述
在这里插入图片描述

2、使用接口简述

private:// 向epoll_ctl提供的添加或更新描述符所监控事件void Updata(Channel* channel, int op);// 判断一个新的Channel是否链接了监控bool HasChannel(Channel* channel);
public:// 添加或更新描述符所监控的事件void UpdateEvent(Channel* channel);// 移除描述符监控void RemoveFd(Channel* channel);// 开始监控,获取就绪的Channelvoid Poll(std::vector<Channel*>* active)

3、原码

#define MAX_EPOLL_EVENTS 1024
class Epoll
{private:// epoll操作尺柄int _epfd;// struct结构监控时保存所有的活跃事件struct epoll_event _evs[MAX_EPOLL_EVENTS];// 使用hash表智理描述符与描述符对应的事件管理Channel对象std::unordered_map<int, Channel*> _channels;private:// 向epoll_ctl提供的添加或更新描述符所监控事件void Update(Channel* channel, int op){// int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);int fd = channel->Fd();struct epoll_event ev;ev.data.fd = fd;ev.events = channel->Events();int epcl = epoll_ctl(_epfd, op, fd, &ev);if (epcl < 0){ERRLOG("EPOLL_CTL ERROR!");return; // 直接退出程序}}// 判断一个新的Channel是否链接了监控bool HasChannel(Channel* channel){auto it = _channels.find(channel->Fd()); // 用迭代器去获取文件描述符if (it == _channels.end()){ERRLOG("CHANNEL FIND FAILED!");return false;}return true;}public:// 构造函数Epoll(){// int epoll_create(int size);int epct = epoll_create(MAX_EPOLL_EVENTS);if (epct < 0){ERRLOG("EPCT EPOLL_CREAT ERROR!");abort(); // 退出程序了}}// 添加或更新描述符所监控的事件void UpdateEvent(Channel* channel){bool ret = HasChannel(channel); if (ret == false) // 没有的话就添加{return Update(channel, EPOLL_CTL_ADD); // 添加}return Update(channel, EPOLL_CTL_MOD); // 修改}// 移除描述符监控void RemoveFd(Channel* channel){auto it = _channels.find(channel->Fd()); // 用迭代器去获取文件描述符if (it != _channels.end()){_channels.erase(it);}Update(channel, EPOLL_CTL_DEL);}// 开始监控,获取就绪的Channelvoid Poll(std::vector<Channel*>* active){// int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);int epwt = epoll_wait(_epfd, _evs, MAX_EPOLL_EVENTS, -1);if (epwt < 0){if (errno == EINTR){return;}ERRLOG("EPWT EPOLL_WAIT ERROR:%s\n", strerror(errno));abort(); // 退出程序}for (int i = 0; i < epwt; i++){auto it = _channels.find(_evs[i].data.fd); // 继续找文件描述符if (it == _channels.end()) { return; }// 设置实际就绪事件it->second->SetREvents(_evs[i].events);active->push_back(it->second);}}
};

十一、通过Poller进行Channel的修改

1、修改内容

在这里插入图片描述

将声明和定义放在一起编译报错:
在这里插入图片描述

将声明和定义分离,定义分离到Epoll类后面的话编译成功:
在这里插入图片描述

2、Poller和Channel联调

// tcp_svr.cc
#include "../source/server.hpp"void HandleClose(Channel* channel) 
{std::cout << "close channel fd is sucessful!" << channel->Fd() << std::endl;channel->Move();delete channel;
}
void HandleRead(Channel* channel)
{int fd = channel->Fd();char buff[1024] = {0};// ssize_t recv(int sockfd, void *buf, size_t len, int flags);int ret = recv(fd, buff, 1023, 0);if (ret <= 0){return HandleClose(channel); // 关闭释放}std::cout << buff << std::endl;channel->EnableWrite(); // 启动可写事件
}
void HandleWrite(Channel* channel) 
{int fd = channel->Fd();const char *data = "I love you!";int ret = send(fd, data, strlen(data), 0);if (ret < 0){return HandleClose(channel); // 关闭释放}channel->DisableWrite(); // 关闭释放
}
void HandleError(Channel* channel) 
{return HandleClose(channel);
}
void HandleAllEvent(Channel* channel) 
{std::cout << "have a new channel connection" << std::endl;
}
void Acceptor(Epoll* poller, Channel* ls_channel)
{int fd = ls_channel->Fd();int newfd = accept(fd, NULL, NULL);if (newfd < 0) { return; }Channel* channel = new Channel(poller, newfd);channel->SetReadCallback(std::bind(HandleRead, channel)); // 可读事件的回调函数channel->SetWriteCallback(std::bind(HandleWrite, channel)); // 可写事件回调函数channel->SetCloseCallback(std::bind(HandleClose, channel));  // 关闭事件回调函数channel->SetErrorCallback(std::bind(HandleError, channel));  // 错误事件回调函数channel->SetAlleventCallback(std::bind(HandleAllEvent, channel));  // 任意事件回调函数channel->EnableRead();
}// for test
int main()
{Epoll poller;Socket l_socket;l_socket.Server(8888);Channel channel(&poller, l_socket.Fd()); // 监听套接字的channel.SetReadCallback(std::bind(Acceptor, &poller, &channel)); // 回调中,获取新连接,为新连接创建Channel并添加监控channel.EnableRead(); // 启动可读事件监控while (1){std::vector<Channel*> actives;poller.Poll(&actives);for (auto& a : actives){a->Handle();}}l_socket.Close();return 0;
}
// tcp_cli.cc
#include "../source/server.hpp"int main()
{Socket clent_socket;clent_socket.Client(8888, "127.0.0.1");while(1){std::string str = "xiangshuijiaole";clent_socket.Send(str.c_str(), str.size());char buff[1024] = {0};clent_socket.Recv(buff, 1023);DEBLOG("%s", buff);sleep(1);}return 0;
}

在这里插入图片描述
修改代码(前面Channel和Poller代码有很多bug,我们直接在下面展示我们的修改后的代码):

class Epoll;
class Channel
{// 以下是epoll_ctl的使用 man 2 epoll_ctl/*EPOLLINThe associated file is available for read(2) operations.EPOLLOUTThe associated file is available for write(2) operations.EPOLLRDHUP (since Linux 2.6.17)Stream socket peer closed connection, or shut down writing half of connection.  (This  flag  is  especiallyuseful for writing simple code to detect peer shutdown when using Edge Triggered monitoring.)EPOLLPRIThere is urgent data available for read(2) operations.EPOLLERRError condition happened on the associated file descriptor.  epoll_wait(2) will always wait for this event;it is not necessary to set it in events.EPOLLHUPHang up happened on the associated file descriptor.  epoll_wait(2) will always wait for this event;  it  isnot necessary to set it in events.EPOLLETSets  the  Edge  Triggered  behavior for the associated file descriptor.  The default behavior for epoll isLevel Triggered.  See epoll(7) for more detailed information about Edge and Level Triggered event distribu‐tion architectures.EPOLLONESHOT (since Linux 2.6.2)Sets  the  one-shot  behavior for the associated file descriptor.  This means that after an event is pulledout with epoll_wait(2) the associated file descriptor is internally disabled and no other  events  will  bereported  by  the  epoll  interface.   The  user must call epoll_ctl() with EPOLL_CTL_MOD to rearm the filedescriptor with a new event mask.*/private:int _fd; // 文件描述符uint32_t _events; // 当前所监控的事件Epoll * _poller;uint32_t _revents; // 当前所连接触发的事件using EventCallback = std::function<void()>; // 包装器包装个回调函数EventCallback _readcallback; // 可读事件回调EventCallback _writecallback; // 可写事件回调EventCallback _errorcallback; // 错误事件回调EventCallback _closecallback; // 连接失效事件回调EventCallback _eventcallback; // 任意事件回调public:// 构造函数Channel(Epoll* poller, int fd): _fd(fd), _events(0), _revents(0), _poller(poller) {}// 返回文件描述符int Fd(){return _fd;}// 获取想要监控的事件uint32_t Events() { return _events; }// 设置实际就绪的事件void SetREvents(uint32_t events) { _revents = events; }// 设置可读回调函数void SetReadCallback(const EventCallback& cb) { _readcallback = cb; }// 设置可写回调函数void SetWriteCallback(const EventCallback& cb) { _writecallback = cb; }// 设置错误回调函数void SetErrorCallback(const EventCallback& cb) { _errorcallback = cb; }// 设置连接关闭回调函数void SetCloseCallback(const EventCallback& cb) { _closecallback = cb; }// 设置所有回调函数void SetAlleventCallback(const EventCallback& cb) { _eventcallback = cb; }// 描述符是否监控了可读bool ReadAble() { return (_events & EPOLLIN); }// 描述符是否监控了可写bool WriteAble() { return (_events & EPOLLOUT); }// 对描述符监控可读void EnableRead() { _events |= EPOLLIN; Update(); }// 对描述符监控可写void EnableWrite() { _events |= EPOLLOUT; Update(); }// 解除可读事件监控void DisableRead() { _events &= ~EPOLLIN; Update(); }// 解除可写事件监控void DisableWrite() { _events &= ~EPOLLOUT; Update(); }// 解除所有事件监控void DisableEvent() { _events = 0; Update();}// 移除监控void Move(); // 声明// 更新监控void Update(); // 声明// 连接事件处理void Handle(){if ((_revents & EPOLLIN) || (_revents & EPOLLHUP) || (_revents & EPOLLPRI)){if (_eventcallback) _eventcallback(); // 总要有一个任意事件的关闭函数吧!不管怎么样都会有一个回调函数的if (_readcallback) _readcallback(); // 上面都成立的情况下如果读的回调函数存在则调用读的回调函数            }if (_revents & EPOLLOUT){if (_writecallback) _writecallback(); // 写入if (_eventcallback) _eventcallback(); // 事件处理完毕后,刷新活跃度}else if (_revents & EPOLLERR){if (_eventcallback) _eventcallback();if (_errorcallback) _errorcallback(); // 一旦出了错误,就调用错误的回调函数,但前面是加入了任意事件的回调函数的}else if (_revents & EPOLLHUP){if (_eventcallback) _eventcallback();if (_closecallback) _closecallback(); // 关闭函数前调用一下任意事件的回调函数}}
};#define MAX_EPOLL_EVENTS 1024
class Epoll
{private:// epoll操作尺柄int _epfd;// struct结构监控时保存所有的活跃事件struct epoll_event _evs[MAX_EPOLL_EVENTS];// 使用hash表智理描述符与描述符对应的事件管理Channel对象std::unordered_map<int, Channel*> _channels;private:// 向epoll_ctl提供的添加或更新描述符所监控事件void Update(Channel* channel, int op){// int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);int fd = channel->Fd();struct epoll_event ev;ev.data.fd = fd;ev.events = channel->Events();int epcl = epoll_ctl(_epfd, op, fd, &ev);if (epcl < 0){ERRLOG("EPOLL_CTL ERROR!");}return;}// 判断一个新的Channel是否链接了监控bool HasChannel(Channel* channel){auto it = _channels.find(channel->Fd()); // 用迭代器去获取文件描述符if (it == _channels.end()){ERRLOG("CHANNEL FIND FAILED!");return false;}return true;}public:// 构造函数Epoll(){// int epoll_create(int size);_epfd = epoll_create(MAX_EPOLL_EVENTS);if (_epfd < 0){ERRLOG("EPCT EPOLL_CREAT ERROR!");abort(); // 退出程序了}}// 添加或更新描述符所监控的事件void UpdateEvent(Channel* channel){bool ret = HasChannel(channel); if (ret == false) // 没有的话就添加{_channels.insert(std::make_pair(channel->Fd(), channel));return Update(channel, EPOLL_CTL_ADD); // 添加}return Update(channel, EPOLL_CTL_MOD); // 修改}// 移除描述符监控void RemoveFd(Channel* channel){auto it = _channels.find(channel->Fd()); // 用迭代器去获取文件描述符if (it != _channels.end()){_channels.erase(it);}Update(channel, EPOLL_CTL_DEL);}// 开始监控,获取就绪的Channelvoid Poll(std::vector<Channel*> *active){// int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);int epwt = epoll_wait(_epfd, _evs, MAX_EPOLL_EVENTS, -1);if (epwt < 0){if (errno == EINTR){return;}ERRLOG("EPWT EPOLL_WAIT ERROR:%s\n", strerror(errno));abort(); // 退出程序}for (int i = 0; i < epwt; i++){auto it = _channels.find(_evs[i].data.fd); // 继续找文件描述符if (it == _channels.end()) { return; }// 设置实际就绪事件it->second->SetREvents(_evs[i].events);active->push_back(it->second);}return;}
};// 移除监控
void Channel::Move() { return _poller->RemoveFd(this); }
// 更新监控
void Channel::Update() { return _poller->UpdateEvent(this); }

3、回调关系图

在这里插入图片描述

十二、EventLoop

1、eventfd的妙用

在这里插入图片描述
在这里插入图片描述

(1)代码展示

#include <stdio.h>
#include <fcntl.h>
#include <unistd.h>
#include <sys/eventfd.h>int main()
{int efd = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);if (efd < 0) { perror("eventfd created error!"); return; } uint64_t val = 1;write(efd, &val, sizeof(val));write(efd, &val, sizeof(val));write(efd, &val, sizeof(val));uint64_t res = 0;read(efd, &res, sizeof(res));printf("%ld\n", res);return 0;
}

(2)运行结果

在这里插入图片描述

2、EventLoop设计思路

我们首先要了解,这个模块是与线程是一一对应关联的,这是因为线程安全的问题,因为当一个线程中跑的资源只有这一个,不让其他资源抢占这个线程的话,实现了线程安全的问题。

而当我们监控了一个连接,这个连接一旦就绪了,就要进行事件处理,但是如果这个描述符在多个线程中都出发了事件,都进行处理的话必然会出现线程安全的问题。因此我们需要将一个连接的事件监控以及连接事件处理以及其他操作都放在同一个线程中进行,这样就能实现线程安全了。

那么我们如何能够保证一个连接的所有操作都在EventLoop对应的线程中呢?其解决方案就是给EventLoop模块中添加一个任务队列,并且对连接的所有操作,都进行一次封装,将对连接的操作并不直接执行,而是当做任务添加到任务队列中。

EventLoop的处理流程:
1、在线程中对描述符进行事件监控
2、有描述符就绪则描述符进行事件处理(保证处理回调函数中的操作都在线程中)
3、所有的就绪事件处理完了,这时候再去将任务队列中的任务一一执行。

我们用图看一下:
在这里插入图片描述

介绍一下epoll和task:
1、事件监控:使用Poller模块,有事件就绪则进行事件处理
2、执行任务队列中的任务:一个线程安全的任务队列
注意点:有可能因为等待描述符IO事件就绪导致的执行流流程阻塞,这时候任务队列中的任务将得不到执行,因此得有一个事件通知的东西能够唤醒事件监控的阻塞。
当事件就绪,需要进行处理的时候,处理过程中,如果对连接要进行某些操作,这些操作必须在EventLoop对应的线程中执行,保证对连接的各项操作都是线程安全的。
1、如果执行的操作本就在线程中,不需要将操作压入队列,可以直接执行
2、如果执行的操作不在线程中,才需要加入任务池,等到事件处理完了再执行任务。

3、EventLoop代码

class EventLoop
{private:using Functor = std::function<void()>;std::thread::id _threadid; // 线程ID,用于判断是否是同一个线程int _eventfd; // eventfd唤醒IO操作中有可能引起的阻塞std::unique_ptr<Channel> _event_channel; // 监控Epoll _poller; // 进行所有描述符事件的监控std::vector<Functor> _taskpool; // 任务池std::mutex _mutex; // 对任务池中的任务进行加锁操作public:// 启动任务池所有任务  void RunAllTask(){std::vector<Functor> functor;// 限定作用域,用来实现加锁的操作{std::unique_lock<std::mutex> _lock(_mutex);_taskpool.swap(functor); // 交换}for (auto& e : functor){e(); // 回调执行任务}return;}static int GetEventFd(){int efd = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);if (efd < 0) {ERRLOG("EVENT CREATE ERROR!!");abort();}return efd;}void ReadEventFd(){uint64_t res = 0;int ret = read(_eventfd, &res, sizeof(res));if (ret < 0){//EINTR -- 被信号打断;   EAGAIN -- 表示无数据可读if (errno == EINTR || errno == EAGAIN){ERRLOG("MEI SHI!!!");return;}ERRLOG("RET READ ERROR!!!");abort();}return;}// 用来唤醒void WakeUpEventFd(){uint64_t val = 1;int ret = write(_eventfd, &val, sizeof(val));if (ret < 0){if (errno == EINTR) {return;}ERRLOG("RET WRITE ERROR!!!");abort();}return;}public:// 构造函数 EventLoop(): _threadid(std::this_thread::get_id()), _eventfd(GetEventFd()), _event_channel(new Channel(this, _eventfd)){_event_channel->SetReadCallback(std::bind(&EventLoop::ReadEventFd, this)); // 调用设置的回调读函数,读取Eventfd事件的次数_event_channel->EnableRead(); // 启动EventFd读事件监控}// 判断将要执行的任务是否处于当前线程中,如果是则执行,不是则压入队列void RunInLoop(const Functor& cb){if (IsInLoop()){return cb();}return QueueInTask(cb);}void QueueInTask(const Functor& cb) // 将操作压入到任务队列中{// 限定作用域,对其进行加锁{std::unique_lock<std::mutex> _lock(_mutex);_taskpool.push_back(cb);}//唤醒有可能因为没有事件就绪,而导致的epoll阻塞;//其实就是给eventfd写入一个数据,eventfd就会触发可读事件WakeUpEventFd();}// 判断当前线程是否是EventLoop中的线程bool IsInLoop(){return (_threadid == std::this_thread::get_id());}// 添加/修改描述符事件监控void UpdateEvent(Channel *channel){return _poller.UpdateEvent(channel);}// 移除描述符事件监控void RemoveEvent(Channel *channel){return _poller.RemoveFd(channel);}// 三步走:事件监控->就绪事件处理->执行任务void Start(){// 1、事件监控std::vector<Channel*> actives;_poller.Poll(&actives);// 2、就绪事件处理for (auto &a : actives){a->Handle(); // 处理}// 3、执行任务RunAllTask();}
};

4、Channel的修改

在这里插入图片描述
在这里插入图片描述

5、运行代码及结果

tcp_server.cc:
在这里插入图片描述

tcp_client.cc代码不变。

测试结果:
在这里插入图片描述

十三、TimerWheel

1、TimerWheel思路

timerfd:实现内核每隔一段时间,给进程一次超时事件,我们用RefreshEvent接口实现。
timerwheel:实现每次执行Runtimetask,都可以执行一波到期的定时任务
要实现一个完整的秒级定时器,就要把这两个功能整合到一起
timerfd设置为每秒钟触发一次定时事件,当事件被触发,则运行一次timerwheel和runtimetask,执行一下所有的过期的定时任务。

2、代码

using FuncTask = std::function<void()>; // 回调函数,用来回调函数任务的
using ReleaseTask = std::function<void()>; // 回调函数,用来回调释放任务的
class TimerTask
{private:uint64_t _id; // 定时器任务对象IDuint32_t _timeout; // 定时器任务超时的时间bool _cancel;  // 用来取消定时任务, false表示没有被取消,true表示被取消了FuncTask _taskcb; // 定时器任务要执行的定时任务ReleaseTask _releasecb; // 定时器任务在时间轮中保存的定时器信息public:TimerTask(uint64_t id, uint32_t delay, const FuncTask& cb) // 构造函数: _id(id), _timeout(delay), _cancel(false) // 刚开始定为false, _taskcb(cb){}~TimerTask() // 析构函数{// 在析构函数中进行两个回调函数的创建if (_cancel == false) _taskcb();_releasecb();}void SetRelease(const ReleaseTask& cb) // 设置释放的回调函数{_releasecb = cb;}uint32_t DelayTimer() // 将延迟时间开放出去{return _timeout;}void Cancel(){_cancel = true;}
};
// 时间轮
class WheelTask
{private:using SharedPtrTask = std::shared_ptr<TimerTask>; // 用一个shared_ptr,因为有引用计数,当引用计数为0的时候,就是释放资源的时候using WeakPtrTask = std::weak_ptr<TimerTask>; // weak_ptr用来配合shared_ptr使用int _tick; // 当前的秒钟,指到哪就将哪释放掉int _capacity; // 表盘的最大数量(最大延迟时间)std::vector<std::vector<SharedPtrTask>> _wheel; // 时间轮(二维数组,存放的数据更规整)std::unordered_map<uint64_t, WeakPtrTask> _timers; // 映射关系EventLoop* _loop; // 定时器描述符int _timerfd; // 定时器描述符--可读事件回调,就是读取计数器,执行定时任务std::unique_ptr<Channel> _timerchannel;private:void RemovePtr(uint64_t id) // 移动指针{auto it = _timers.find(id); // 先找到id在_timers中if (it != _timers.end()){_timers.erase(it); // 删除掉it所指向的位置,也就是id的位置}}static int CreateTimerFd(){// 创建一个定时器int timerfd = timerfd_create(CLOCK_MONOTONIC, 0);if (timerfd < 0){ERRLOG("CREATE TIMERFD ERROR!!");abort();}struct itimerspec iem; // 使用该结构体/* struct timespec {time_t tv_sec; // Secondslong tv_nsec; // Nanoseconds};struct itimerspec {struct timespec it_interval; // 第 次之后的超时间隔时间 struct timespec it_value; // 第 次超时时间 };*/iem.it_value.tv_sec = 2; // 第一次的超时时间(秒)iem.it_value.tv_nsec = 0; // 第一次的超时时间(毫秒)iem.it_interval.tv_sec = 2; // 这次过后的每隔多长时间超时(秒)iem.it_interval.tv_nsec = 0; // 这次过后的每隔多长时间超时(毫秒)// 启动定时器timerfd_settime(timerfd, 0, &iem, NULL); // 启动定时器return timerfd;}void ReadTimerFd(){uint64_t temp;int fd = read(_timerfd, &temp, 8);if (fd < 0){ERRLOG("READ FAILED!!");abort();}return;}void RunTimerTask() // 时钟滴答滴答往后走{_tick = (_tick + 1) % _capacity;_wheel[_tick].clear(); // 清空一下}void OnTimer() // 时间到了的函数{ReadTimerFd(); // 读取一下RunTimerTask(); // 任务运行起来}void TimerAddInLoop(uint64_t id, uint32_t delay, const FuncTask& cb) // 添加到定时任务{SharedPtrTask pt(new TimerTask(id, delay, cb)); // 先建立一个shared_ptrpt->SetRelease(std::bind(&WheelTask::RemovePtr, this, id)); // 有一个this指针int pos = (_tick + delay) % _capacity;_wheel[pos].push_back(pt); // 将这个pt先插入到哈希表中_timers[id] = WeakPtrTask(pt); // 对象中不能用shared_ptr,因为shared_ptr不会减减引用计数,所以用weak_ptr}void TimerRefreshInLoop(uint64_t id) // 刷新/延迟定时时间{auto it = _timers.find(id);if (it == _timers.end()){//perror("_timers find error!");ERRLOG("TIMERS FREASH ERROR!!");return;}SharedPtrTask pt = it->second.lock();//lock获取weak_ptr管理的对象对应的shared_ptrint delay = pt->DelayTimer();int pos = (_tick + delay) % _capacity;_wheel[pos].push_back(pt);}void CancelTimerInloop(uint64_t id) // 取消定时任务{auto it = _timers.find(id);if (it == _timers.end()){//perror("_timers not find!");ERRLOG("CANCELTIMER ERROR!!!");return;}SharedPtrTask pt = it->second.lock();if (pt) pt->Cancel();}public:WheelTask(EventLoop* loop): _tick(0), _capacity(60), _wheel(_capacity), _loop(loop), _timerfd(CreateTimerFd()), _timerchannel(new Channel(_loop, _timerfd)){_timerchannel->SetReadCallback(std::bind(&WheelTask::OnTimer, this));_timerchannel->EnableRead(); // 启动可读事件}// 定时器中有个_timers的成员,定时器信息的操作是有可能在多个线程中进行,因此需要考虑线程安全的问题// 如果不想加锁,那么就将定时器的所有操作都放在同一个线程中进行,放到EventLoop中即可void TimerAdd(uint64_t id, uint32_t delay, const FuncTask& cb);void TimerRefresh(uint64_t id);void CancelTimer(uint64_t id);// 这个接口是有线程安全的问题,不能外界使用者调用,只能在内部,在EventLoop线程内执行bool HasTimer(uint64_t id){auto ret = _timers.find(id);if (ret == _timers.end()){ERRLOG("HASTIMER FIND FAILED!!");return false;}return true;}
};

3、修改EventLoop

在这里插入图片描述

4、运行代码及结果

// tcp_svr.cc:
#include "../source/server.hpp"void HandleClose(Channel* channel) 
{//std::cout << "close channel fd is sucessful!" << channel->Fd() << std::endl;DEBLOG("%d", channel->Fd());channel->Move();delete channel;
}
void HandleRead(Channel* channel)
{int fd = channel->Fd();char buff[1024] = {0};// ssize_t recv(int sockfd, void *buf, size_t len, int flags);int ret = recv(fd, buff, 1023, 0);if (ret <= 0){return HandleClose(channel); // 关闭释放}DEBLOG("%s", buff);//std::cout << buff << std::endl;channel->EnableWrite(); // 启动可写事件
}
void HandleWrite(Channel* channel) 
{int fd = channel->Fd();const char *data = "I love you!";int ret = send(fd, data, strlen(data), 0);if (ret < 0){return HandleClose(channel); // 关闭释放}channel->DisableWrite(); // 关闭释放
}
void HandleError(Channel* channel) 
{return HandleClose(channel);
}
void HandleAllEvent(EventLoop* loop, Channel* channel, uint64_t timerid) 
{loop->TimerRefresh(timerid); // 刷新//std::cout << "have a new channel connection" << std::endl;
}
void Acceptor(EventLoop* loop, Channel* ls_channel)
{int fd = ls_channel->Fd();int newfd = accept(fd, NULL, NULL);if (newfd < 0) { return; }uint64_t timerid = rand() % 10000;Channel* channel = new Channel(loop, newfd);channel->SetReadCallback(std::bind(HandleRead, channel)); // 可读事件的回调函数channel->SetWriteCallback(std::bind(HandleWrite, channel)); // 可写事件回调函数channel->SetCloseCallback(std::bind(HandleClose, channel));  // 关闭事件回调函数channel->SetErrorCallback(std::bind(HandleError, channel));  // 错误事件回调函数channel->SetAlleventCallback(std::bind(HandleAllEvent, loop, channel, timerid));  // 任意事件回调函数// 非活跃连接超时释放操作// 注意的是:定时任务一定在读事件启动之前,因为有可能启动事件监控后,立即就有了事件,但是这时候还没有任务产生loop->TimerAdd(timerid, 10, std::bind(HandleClose, channel));channel->EnableRead();
}// for test
int main()
{srand(time(NULL)); // 生成一个随机数种子// Epoll poller;EventLoop loop;Socket l_socket;l_socket.Server(8889);Channel channel(&loop, l_socket.Fd()); // 监听套接字的channel.SetReadCallback(std::bind(Acceptor, &loop, &channel)); // 回调中,获取新连接,为新连接创建Channel并添加监控channel.EnableRead(); // 启动可读事件监控while (1){loop.Start();}l_socket.Close();return 0;
}
// tcp_cli.cc
#include "../source/server.hpp"int main()
{Socket clent_socket;clent_socket.Client(8889, "127.0.0.1");//while(1)for (int i = 0; i < 5; i++){std::string str = "xiangshuijiaole";clent_socket.Send(str.c_str(), str.size());char buff[1024] = {0};clent_socket.Recv(buff, 1023);DEBLOG("%s", buff);sleep(1);}while(1){sleep(1);}return 0;
}

在这里插入图片描述

十四、Eventloop的模块流程图

在这里插入图片描述

十五、Connection

1、Connection设计思路

(1)目的

对连接进行全方位的管理,对通信连接的所有操作都是通过这个模块提供的功能完成的。

(2)管理

套接字的管理,能够进行套接字的操作,socket
连接事件的管理,可读、可写、错误、挂断、任意
缓冲区的管理,便于socket数据的接收和发送
协议上下文的管理,记录请求数据的处理过程
回调函数的管理
因为连接接收到数据之后该如何处理,都要由用户决定,因此必须有业务处理回调函数
一个连接建立成立后,该如何处理,由用户决定,因此必须有连接建立成功的回调函数。
一个连接关闭前,该如何处理,由用户决定,因此必须由关闭连接回调函数。
任意事件的产生,有没有某些处理,由用户决定,因此必须有任意事件的回调函数。

(3)功能

发送数据:给用户提供的发送数据接口,并不是真正的发送接口,而只是把数据放到发送缓冲区,然后启动写事件监控。
关闭连接:给用户提供的关闭连接接口,应该在实际释放连接之前,看看输入输出缓冲区是否有数据待处理
启动非活跃连接的超时销毁功能
取消非活跃连接的超时功能
协议切换:一个连接接收到数据后如何进行业务处理,取决于上下文以及数据的业务处理回调功能。

(4)场景

Connection模块是对连接的管理模块,对于连接的所有操作都是通过这个模块完成的场景:对连接进行操作的时候,但是连接已经被释放,导致内存访问错误,最终程序崩溃解决方案:使用智能指针shared ptr对Connection对象进行管理,这样就能保证任意一个地方对Connection对象进行操作的时候保存了一份shared ptr,因此就算其他地方进行释放操作,也只是对shared ptr的计数器-1,而不会导致Connection的实际释放

2、预备工作:加上Any类

// 先搭个框架--通用容器的函数
class Any
{private:class Holder{public:virtual ~Holder(){} // 析构函数// 纯虚函数的定义在于只让它的子类对象刻画出对象,// 不让父类对象刻画出对象,就像动物包含着老虎狮子等的,老虎狮子是对象,而动物不是对象virtual const std::type_info& type() = 0; virtual Holder* clone() = 0;};template<class T>class PlaceHolder : public Holder{public:// 构造函数PlaceHolder(const T& val) :_val(val) {}// 类型virtual const std::type_info& type(){return typeid(T); // 直接返回我们模版实例化的参数的类型即可}// 针对当前的函数对象,克隆出一个新的子类对象virtual Holder* clone(){return new PlaceHolder(_val);}public:T _val; // 构造一个模版类型的数据};Holder *_content; // 父类的指针可以指向任何类型的数据(这个是Any类型的私有函数)public:// 空的构造函数Any():_content(NULL){}// 模版T类型的构造函数template<class T>Any(const T& val):_content(new PlaceHolder<T>(val)){}// 通过其他的通用容器来构造我们自己的容器Any(const Any& other):_content(other._content ? other._content->clone() : NULL){}// 析构函数~Any(){delete _content; // 释放掉这个_content}// 得到子类对象保存的数据的指针template<class T>T* get(){// 当前对象的类型不能和传进来的实例化模版的类型是一样的assert(_content->type() == typeid(T));// 先强转,再指向_val,再取地址return &((PlaceHolder<T>*)_content)->_val;}Any& Swap(Any& other){std::swap(_content, other._content); // 调用算法函数直接交换this和other对象的指针return *this; // 返回实例化的this解引用}// operator=的任意模板类型的运算符重载函数template<class T>Any& operator=(const T& val){//为val构造一个临时的通用容器,然后与当前容器自身进行指针交换,临时对象释放的时候,原先保存的数据也就被释放Any(val).Swap(*this);return *this;}// operator=的对象的运算符重载函数Any& operator=(const Any& other){Any(other).Swap(*this);return *this;}
};

3、Connection代码

class Connection;
// DISCONNECTED -- 连接关闭状态;   CONNECTING -- 连接建立成功-待处理状态;
// CONNECTED -- 连接建立完成,各种设置已完成,可以通信的状态;  DISCONNECTING -- 待关闭状态;
typedef enum { DISCONNECTED, CONNECTING, CONNECTED, DISCONNECTING }Constatus;
using PtrConnection = std::shared_ptr<Connection>; // 用智能指针管理Connection
class Connection : public std::enable_shared_from_this<Connection> // 为了给下面用
{private:uint64_t _con_id; // 连接的唯一ID,便于连接的管理和查找,同时也是定时器的ID,因为连接的唯一ID是唯一的,可以作为定时器唯一ID处理int _sockfd; // 连接关联的文件描述符bool _is_enable_released; // 连接是否启动非活跃销毁的判断标志,默认为falseEventLoop* _loop; // 连接所关联的EventLoop,也就是只在同一个线程中跑Constatus _constatus; // 连接状态Socket _socket; // 套接字的操作管理Channel _channel; // 连接的事件管理Buffer _buffer_in; // 输入缓冲区---存放从socket中读取到的数据Buffer _buffer_out; // 输出缓冲区---存放要发送给对端的数据Any _context; // 请求连接的上下文(这里需要保存的)// 下面四个回调函数是由组件者(我们使用者)来完成的using ConnectedCallback = std::function<void(const PtrConnection&)>;using MessageCallback = std::function<void(const PtrConnection&, Buffer*)>;using ClosedCallback = std::function<void(const PtrConnection&)>;using AnyCallback = std::function<void(const PtrConnection&)>;ConnectedCallback _conn_callback;MessageCallback _msg_callback;ClosedCallback _clo_callback;AnyCallback _any_callback;// 组件内的连接关闭回调--组件内设置的,因为服务器组件内会把所有的连接管理起来,一旦某个连接要关闭// 就应该从管理的地方移除掉自己的信息ClosedCallback _server_clo_callback;private:// 五个channel的事件回调函数// 描述符可读事件触发后调用的函数,接收socket数据放到接收缓冲区中,然后调用_message_callbackvoid HandleRead(){// 1、接收socket数据放到接收缓冲区char buff[65536];ssize_t ret = _socket.NonBlockRecv(buff, 65535);if (ret < 0){// 此时是出错了,我们不能调用实际的关闭连接的操作,而应当调用Shutdown的操作return ShutdownInLoop();}// 将数据放入缓冲区,顺便将写偏移向后移动_buffer_in.WriteAndPush(buff, ret);// 2、调用_message_callbackif (_buffer_in.ReadAbleSize() > 0){// shared_from_this--从当前对象自身获取自身的shared_ptr管理对象return _msg_callback(shared_from_this(), &_buffer_in);}}// 描述符可写事件触发后调用的函数,将发送缓冲区中的数据进行发送void HandleWrite(){// _out_buffer中保存的数据就是要发送的数据ssize_t ret = _socket.NonBlockSend(_buffer_out.ReadPos(), _buffer_out.ReadAbleSize());if (ret < 0){// 发送错误就需要实际意义的关闭了// 有数据先读出去if (_buffer_in.ReadAbleSize() > 0){_msg_callback(shared_from_this(), &_buffer_in);}return Release();}// 将读事件的偏移往后移动_buffer_out.ReadOffset(ret);if (_buffer_out.ReadAbleSize() == 0){_channel.DisableWrite(); // 没有事件发送了,关闭写事件if (_constatus == DISCONNECTING){return Release();}}return;}// 描述符触发关闭void HandleClose(){// 一旦连接挂断了,套接字就什么都干不了了,因此有数据待处理就处理一下,完毕关闭连接if (_buffer_in.ReadAbleSize() > 0){_msg_callback(shared_from_this(), &_buffer_in);}return Release();}// 描述符触发出错void HandleError(){return HandleClose();}// 描述符触发任意事件: 1. 刷新连接的活跃度--延迟定时销毁任务;  2. 调用组件使用者的任意事件回调void HandleAllEvent(){if (_is_enable_released == true)  {  _loop->TimerRefresh(_con_id); }if (_any_callback)  {  _any_callback(shared_from_this()); }}// 连接获取之后,所处的状态下要进行各种设置(启动读监控, 调用回调函数)void EstablishedInloop(){// 1、修改连接状态 2、启动读事件监控 3、调用回调函数assert(_constatus == CONNECTING); // 当前的状态必须一定是上层的半连接状态_constatus = CONNECTED;// 一旦启动读事件监控就有可能会立即触发读事件,如果这时候启动了非活跃连接销毁_channel.EnableRead();if (_conn_callback) {_conn_callback(shared_from_this());}}// 发送数据到缓冲区,这个接口并不是实际要发送的接口,而是把数据放到了发送缓冲区并启动可写事件监控void SendInLoop(Buffer &buff){if (_constatus == DISCONNECTED) return;_buffer_out.WriteBufferAndPush(buff);if (_channel.WriteAble() == false){_channel.EnableWrite();}}// 实际释放接口void ReleaseInLoop(){// 1. 修改连接状态,将其置为DISCONNECTED_constatus = DISCONNECTED;// 2. 移除连接的事件监控_channel.Move();// 3. 关闭描述符_socket.Close();// 4. 如果当前定时器队列中还有定时销毁任务,则取消任务if (_loop->HasTimer(_con_id)) CancelInactiveReleaseInLoop();// 5. 调用关闭回调函数,避免先移除服务器管理的连接信息导致Connection被释放,再去处理会出错,因此先调用用户的回调函数if (_clo_callback) _clo_callback(shared_from_this());// 移除服务器内部管理的连接信息if (_server_clo_callback) _server_clo_callback(shared_from_this());}// 关闭接口的操作并不是实际的连接释放的操作,我们需要判断是否还有数据待处理、待发送void ShutdownInLoop(){_constatus = DISCONNECTING; // 设置连接为半关闭状态if (_buffer_in.ReadAbleSize() > 0){if (_msg_callback) _msg_callback(shared_from_this(), &_buffer_in);}// 写入数据出错或者没有发送数据导致的关闭if (_buffer_out.ReadAbleSize() > 0){if (_channel.WriteAble() == false){_channel.EnableWrite();}}if (_buffer_out.ReadAbleSize() == 0){Release();}}// 启动非活跃任务销毁,启动定时任务,定义多长时间无通信就是非活跃并销毁void SetInactiveReleaseInLoop(int sec){// 1. 将判断标志 _is_enable_released 置为true_is_enable_released = true;// 2. 如果当前定时销毁任务已经存在,那就刷新延迟一下即可if (_loop->HasTimer(_con_id)){return _loop->TimerRefresh(_con_id);}// 3. 如果不存在定时销毁任务,则新增_loop->TimerAdd(_con_id, sec, std::bind(&Connection::Release, this));}// 取消非活跃任务的销毁void CancelInactiveReleaseInLoop(){_is_enable_released = false;if (_loop->HasTimer(_con_id)){_loop->CancelTimer(_con_id);}}// 切换协议的接口void UpgradeInLoop(const Any& content, const ConnectedCallback& conn, const MessageCallback& msg, const ClosedCallback& clo, const AnyCallback& any){_context = content;_conn_callback = conn;_msg_callback = msg;_clo_callback = clo;_any_callback = any;}public:// 构造函数Connection(EventLoop *loop, uint64_t conn_id, int sockfd): _con_id(conn_id), _sockfd(sockfd), _is_enable_released(false), _loop(loop), _constatus(CONNECTING), _socket(_sockfd), _channel(loop, _sockfd){_channel.SetCloseCallback(std::bind(&Connection::HandleClose, this));_channel.SetAlleventCallback(std::bind(&Connection::HandleAllEvent, this));_channel.SetReadCallback(std::bind(&Connection::HandleRead, this));_channel.SetWriteCallback(std::bind(&Connection::HandleWrite, this));_channel.SetErrorCallback(std::bind(&Connection::HandleError, this));}// 析构函数~Connection() { std::cout << "~Connection" << std::endl;DEBLOG("RELEASE CONNECTION:%p", this);}// 获取管理文件描述符int Fd() { return _sockfd; }// 获取连接IDint Id() { return _con_id; }// 判断是否处于CONNECTED状态bool IsConnected() { return (_constatus == CONNECTED); }// 设置上下文--连接建立完成时进行调用void SetContext(const Any& context) { _context = context; }// 获取上下文,返回指针Any* GetContext() { return &_context; }// 以下是五个回调函数(四个组件使用者使用的回调函数+一个组件内的连接关闭回调)--设置void SetConnectedCallback(const ConnectedCallback& cb) { _conn_callback = cb; }void SetMessageCallback(const MessageCallback& cb) { _msg_callback = cb; }void SetClosedCallback(const ClosedCallback& cb) { _clo_callback = cb; }void SetAnyCallback(const AnyCallback& cb) { _any_callback = cb; }void SerServerClosedCallback(const ClosedCallback& cb) { _server_clo_callback = cb; }// 连接建立就绪后,进行channel回调设置,启动读监控,调用_connected_callbackvoid Established(){_loop->RunInLoop(std::bind(&Connection::EstablishedInloop, this));}// Send,发送数据到缓冲区,启动写事件监控void Send(const char* data, size_t len){// 外界传入的data,可能是个临时的空间,我们现在只是把发送操作压入了任务池,有可能并没有被立即执行// 因此有可能执行的时候,data指向的空间有可能已经被释放了Buffer buff;buff.WriteAndPush(data, len);_loop->RunInLoop(std::bind(&Connection::SendInLoop, this, std::move(buff)));}// 提供给组件使用者的关闭接口--并不实际关闭,需要判断有没有数据待处理void Shutdown(){_loop->RunInLoop(std::bind(&Connection::ShutdownInLoop, this));}void Release(){_loop->QueueInTask(std::bind(&Connection::ReleaseInLoop, this));}// 启动非活跃任务销毁,启动定时任务,定义多长时间无通信就是非活跃并销毁void SetInactiveRelease(int sec){_loop->RunInLoop(std::bind(&Connection::SetInactiveReleaseInLoop, this, sec));}// 取消非活跃任务的销毁void CancelInactiveRelease(){_loop->RunInLoop(std::bind(&Connection::CancelInactiveReleaseInLoop, this));}// 切换协议,重置上下文以及阶段性的函数--用四个回调函数(因为是有线程安全,所以必须放到线程中执行)void Upgrade(const Any& content, const ConnectedCallback& conn, const MessageCallback& msg, const ClosedCallback& clo, const AnyCallback& any){// 防备新的事件触发后,处理的时候,切换任务还没有被执行--会导致数据使用原协议处理了。_loop->AssertLoop(); // 必须在EventLoop中立即执行_loop->RunInLoop(std::bind(&Connection::UpgradeInLoop, this, content, conn, msg, clo, any));}
};

4、纠错修改(必看)

大家看一下下面的Socket代码中的Recv函数,是不是就缺了个小于等于号???这有什么问题???其实问题大着呢!我们在客户端ctrl+c中断程序时候,如果对方正在发送数据,并且此时recv函数正处于阻塞状态等待数据到达,而你又中断了程序,导致链接关闭,那么recv函数会返回0,表示连接已经关闭,接收端不再接收数据,所以所以,切记我们在客户端ctrl+c的时候,recv返回的是0,而不是负数!!!所以这里必须将前面的Socket的值改成<=!!!

在这里插入图片描述
在这里插入图片描述

5、运行代码及结果

// tcp_svr.cc
#include "../source/server.hpp"std::unordered_map<uint64_t, PtrConnection> _conns;
uint64_t con_id = 0;
void DestroyConnection(const PtrConnection& ptrc)
{_conns.erase(ptrc->Id());
}
void OnConnection(const PtrConnection& ptrc)
{DEBLOG("NEW CONNECTION:%p", ptrc.get());
}
void OnMessage(const PtrConnection& conn, Buffer* buff)
{DEBLOG("%s", buff->ReadPos());buff->ReadOffset(buff->ReadAbleSize());std::string str = "hello linux";conn->Send(str.c_str(), str.size());// conn->Shutdown();
}void Acceptor(EventLoop* loop, Channel* ls_channel)
{int fd = ls_channel->Fd();int newfd = accept(fd, NULL, NULL);if (newfd < 0) { return; }con_id++;PtrConnection conn(new Connection(loop, con_id, newfd));conn->SetMessageCallback(std::bind(OnMessage, std::placeholders::_1, std::placeholders::_2));conn->SerServerClosedCallback(std::bind(DestroyConnection, std::placeholders::_1));conn->SetConnectedCallback(std::bind(OnConnection, std::placeholders::_1)); conn->SetInactiveRelease(5); // 非活跃连接10秒后销毁conn->Established(); // 就绪初始化_conns.insert(std::make_pair(con_id, conn));
}// for test
int main()
{//srand(time(NULL)); // 生成一个随机数种子// Epoll poller;EventLoop loop;Socket l_socket;l_socket.Server(8889);Channel channel(&loop, l_socket.Fd()); // 监听套接字的channel.SetReadCallback(std::bind(Acceptor, &loop, &channel)); // 回调中,获取新连接,为新连接创建Channel并添加监控channel.EnableRead(); // 启动可读事件监控while (1){loop.Start();}l_socket.Close();return 0;
}
// tcp_cli.cc
#include "../source/server.hpp"int main()
{Socket clent_socket;clent_socket.Client(8889, "127.0.0.1");//while(1)for (int i = 0; i < 5; i++){std::string str = "xiangshuijiaole";clent_socket.Send(str.c_str(), str.size());char buff[1024] = {0};clent_socket.Recv(buff, 1023);DEBLOG("%s", buff);sleep(1);}while(1){sleep(1);}return 0;
}

在这里插入图片描述

在这里插入图片描述

// tcp_svr.cc
#include "../source/server.hpp"std::unordered_map<uint64_t, PtrConnection> _conns;
uint64_t con_id = 0;
void DestroyConnection(const PtrConnection& ptrc)
{_conns.erase(ptrc->Id());
}
void OnConnection(const PtrConnection& ptrc)
{DEBLOG("NEW CONNECTION:%p", ptrc.get());
}
void OnMessage(const PtrConnection& conn, Buffer* buff)
{DEBLOG("%s", buff->ReadPos());buff->ReadOffset(buff->ReadAbleSize());std::string str = "hello linux";conn->Send(str.c_str(), str.size());conn->Shutdown();
}void Acceptor(EventLoop* loop, Channel* ls_channel)
{int fd = ls_channel->Fd();int newfd = accept(fd, NULL, NULL);if (newfd < 0) { return; }con_id++;PtrConnection conn(new Connection(loop, con_id, newfd));conn->SetMessageCallback(std::bind(OnMessage, std::placeholders::_1, std::placeholders::_2));conn->SerServerClosedCallback(std::bind(DestroyConnection, std::placeholders::_1));conn->SetConnectedCallback(std::bind(OnConnection, std::placeholders::_1)); conn->SetInactiveRelease(5); // 非活跃连接10秒后销毁conn->Established(); // 就绪初始化_conns.insert(std::make_pair(con_id, conn));
}// for test
int main()
{//srand(time(NULL)); // 生成一个随机数种子// Epoll poller;EventLoop loop;Socket l_socket;l_socket.Server(8889);Channel channel(&loop, l_socket.Fd()); // 监听套接字的channel.SetReadCallback(std::bind(Acceptor, &loop, &channel)); // 回调中,获取新连接,为新连接创建Channel并添加监控channel.EnableRead(); // 启动可读事件监控while (1){loop.Start();}l_socket.Close();return 0;
}
// tcp_cli.cc
#include "../source/server.hpp"int main()
{Socket clent_socket;clent_socket.Client(8889, "127.0.0.1");//while(1)for (int i = 0; i < 5; i++){std::string str = "xiangshuijiaole";clent_socket.Send(str.c_str(), str.size());char buff[1024] = {0};clent_socket.Recv(buff, 1023);DEBLOG("%s", buff);sleep(1);}while(1){sleep(1);}return 0;
}

在这里插入图片描述

十六、Acceptor

1、Acceptor设计思路

Acceptor模块说实在的就是对监听套接字进行管理。我们这样就不用在服务端用listen_socket套接字了,而只需要用这一个Acceptor模块进行操作就可以了。

(1)创建一个监听套接字

用Socket _socket:
在这里插入图片描述

(2)启动读事件监控

在这里插入图片描述

(3)事件触发后,获取新连接

(4)调用新连接获取成功后的回调函数

  也就是为新连接创建Connection进行管理,而这个创建Connection是服务器模块的操作,并不是Acceptor模块的操作。因为Acceptor模块只进行监听链接的管理,因此获取到新连接的描述符后,对于新连接如何处理根本不关心,你交给服务器模块处理就可以了。

而服务器模块实现了一个对于新连接描述符处理的函数,将这个函数设置给Acceptor模块中的回调函数。
在这里插入图片描述

2、代码

class Acceptor
{private:Socket _socket; // 创建套接字EventLoop* _loop; // 对_socket套接字进行事件监控Channel _channel; // 对_socket套接字进行事件管理using AcceptorCallback = std::function<void(int)>;AcceptorCallback _acceptor_callback;private:// 监听套接字的读事件回调处理函数--获取新连接,调用_acceptor_callback函数进行新连接处理void HandleRead(){int newfd = _socket.Accept();if (newfd < 0){ERRLOG("NEWFD ACCEPT ERROR!");return;}if (_acceptor_callback) _acceptor_callback(newfd);}int CreateServerFd(int port){bool ret = _socket.Server(port);if (ret == false){ERRLOG("RET SERVER ERROR!!");abort();}return _socket.Fd(); // 返回文件描述符}public:// 构造函数// 不能将启动读事件监控,放到构造函数中,必须在设置回调函数后,再去启动// 否则有可能造成启动监控后,立即有事件,处理的时候,回调函数还没设置:新连接得不到处理,且资源泄漏Acceptor(EventLoop* loop, int port) :_socket(CreateServerFd(port)), _loop(loop), _channel(_loop, _socket.Fd()){_channel.SetReadCallback(std::bind(&Acceptor::HandleRead, this));}void SetAcceptorCallback(const AcceptorCallback& cb) { _acceptor_callback = cb;  }void Listen(){_channel.EnableRead();}
};

3、运行代码及结果

// tcp_svr.cc
#include "../source/server.hpp"std::unordered_map<uint64_t, PtrConnection> _conns;
uint64_t con_id = 0;
EventLoop loop;void DestroyConnection(const PtrConnection& ptrc)
{_conns.erase(ptrc->Id());
}
void OnConnection(const PtrConnection& ptrc)
{DEBLOG("NEW CONNECTION:%p", ptrc.get());
}
void OnMessage(const PtrConnection& conn, Buffer* buff)
{DEBLOG("%s", buff->ReadPos());buff->ReadOffset(buff->ReadAbleSize());std::string str = "hello linux";conn->Send(str.c_str(), str.size());conn->Shutdown();
}void NewAcceptor(int fd)
{con_id++;PtrConnection conn(new Connection(&loop, con_id, fd));conn->SetMessageCallback(std::bind(OnMessage, std::placeholders::_1, std::placeholders::_2));conn->SerServerClosedCallback(std::bind(DestroyConnection, std::placeholders::_1));conn->SetConnectedCallback(std::bind(OnConnection, std::placeholders::_1)); conn->SetInactiveRelease(5); // 非活跃连接10秒后销毁conn->Established(); // 就绪初始化_conns.insert(std::make_pair(con_id, conn));
}// for test
int main()
{srand(time(NULL)); // 生成一个随机数种子Acceptor acceptor(&loop, 8889);acceptor.SetAcceptorCallback(std::bind(NewAcceptor, std::placeholders::_1)); // 回调中,获取新连接,为新连接创建Channel并添加监控acceptor.Listen();while (1){loop.Start();}return 0;
}
// tcp_cli.cc
#include "../source/server.hpp"int main()
{Socket clent_socket;clent_socket.Client(8889, "127.0.0.1");//while(1)for (int i = 0; i < 5; i++){std::string str = "xiangshuijiaole";clent_socket.Send(str.c_str(), str.size());char buff[1024] = {0};clent_socket.Recv(buff, 1023);DEBLOG("%s", buff);sleep(1);}while(1){sleep(1);}return 0;
}

在这里插入图片描述

十七、LoopThread

1、设计思路

在这里插入图片描述

2、代码

顺带先把EventLoop中的Start()中先进行循环打印。
在这里插入图片描述

class LoopThread
{private:// 用于实现_1oop获取的同步关系,避免线程创建了,但是_1oop还没有实例化之前去获取_1oop// 因为此时_loop为空std::mutex _mutex; // 互斥锁std::condition_variable _con; // 条件变量EventLoop* _loop; // EventLoop指针变量,这个对象需要在线程内实例化std::thread _thread; // EventLoop对应的线程private:// 这个函数就是EventLoop模块中的Start,三步走:事件监控->就绪事件处理->执行任务void ThreadEntry(){EventLoop loop; // 实例化对象// 规定作用域{std::unique_lock<std::mutex> lock(_mutex); // 加锁_loop = &loop;_con.notify_all(); // 唤醒loop阻塞的线程}loop.Start(); // 一直循环打印}public:// 构造函数,创建线程,设置函数入口即ThreadEntry()LoopThread(): _loop(NULL), _thread(std::thread(&LoopThread::ThreadEntry, this)){}// 返回当前线程关联的EventLoop对象的指针EventLoop* GetLoop(){EventLoop* loop = NULL;// 规定作用域{std::unique_lock<std::mutex> lock(_mutex); // 加锁_con.wait(lock, [&](){ return _loop != NULL; }); // loop为NULL就一直循环阻塞loop = _loop; // 赋值}return loop;}
};

3、运行结果

// tcp_svr.cc
// tcp_svr.cc
#include "../source/server.hpp"std::unordered_map<uint64_t, PtrConnection> _conns;
uint64_t con_id = 0;
EventLoop base_loop;
std::vector<LoopThread> threads(2);
int next_loop = 0;void DestroyConnection(const PtrConnection& ptrc)
{_conns.erase(ptrc->Id());
}
void OnConnection(const PtrConnection& ptrc)
{DEBLOG("NEW CONNECTION:%p", ptrc.get());
}
void OnMessage(const PtrConnection& conn, Buffer* buff)
{DEBLOG("%s", buff->ReadPos());buff->ReadOffset(buff->ReadAbleSize());std::string str = "hello linux";conn->Send(str.c_str(), str.size());conn->Shutdown();
}void NewAcceptor(int fd)
{con_id++;next_loop = (next_loop + 1) % 2;PtrConnection conn(new Connection(threads[next_loop].GetLoop(), con_id, fd));conn->SetMessageCallback(std::bind(OnMessage, std::placeholders::_1, std::placeholders::_2));conn->SerServerClosedCallback(std::bind(DestroyConnection, std::placeholders::_1));conn->SetConnectedCallback(std::bind(OnConnection, std::placeholders::_1)); conn->SetInactiveRelease(5); // 非活跃连接10秒后销毁conn->Established(); // 就绪初始化_conns.insert(std::make_pair(con_id, conn));
}// for test
int main()
{srand(time(NULL)); // 生成一个随机数种子Acceptor acceptor(&base_loop, 8888);acceptor.SetAcceptorCallback(std::bind(NewAcceptor, std::placeholders::_1)); // 回调中,获取新连接,为新连接创建Channel并添加监控acceptor.Listen();while (1){base_loop.Start();}return 0;
}

tcp_cli和之前一样。

在这里插入图片描述

十八、LoopThreadPool

1、设计思路

在这里插入图片描述

2、代码

class LoopThreadPool
{private:int _reactor_count; // 从属线程数量int _next_loop_idx; // 下一个从属线程的idEventLoop* _base_loop; // 如果从属线程为0的时候,就用这个EventLoop的单个线程咯std::vector<LoopThread*> _threads; // 保存所有的EventLoop对象std::vector<EventLoop*> _loops; // 从属线程大于0则从_loops中进行EventLoop分配public:// 构造函数LoopThreadPool(EventLoop* baseloop): _reactor_count(0), _next_loop_idx(0), _base_loop(baseloop){}// 设置从属线程数量void SetThreadCount(int count){_reactor_count = count;}// 创建从属线程void Create(){// 只有在主线程有的情况下并且从属线程也有的情况下,也就是数量大于0的时候if (_reactor_count > 0){_threads.resize(_reactor_count);_loops.resize(_reactor_count);for (int i = 0; i < _reactor_count; i++){_threads[i] = new LoopThread();_loops[i] = _threads[i]->GetLoop();}}return;}// 下一个从属线程EventLoop* NextLoop(){// 只有一个主线程的情况下,就直接返回主线程即可if (_reactor_count == 0){return _base_loop;}// 轮转_next_loop_idx = (_next_loop_idx + 1) % _reactor_count;return _loops[_next_loop_idx];}
};

3、运行结果

// tcp_svr.cc
// tcp_svr.cc
#include "../source/server.hpp"std::unordered_map<uint64_t, PtrConnection> _conns;
uint64_t con_id = 0;
EventLoop base_loop;
LoopThreadPool* loop_pool;void DestroyConnection(const PtrConnection& ptrc)
{_conns.erase(ptrc->Id());
}
void OnConnection(const PtrConnection& ptrc)
{DEBLOG("NEW CONNECTION:%p", ptrc.get());
}
void OnMessage(const PtrConnection& conn, Buffer* buff)
{DEBLOG("%s", buff->ReadPos());buff->ReadOffset(buff->ReadAbleSize());std::string str = "hello linux";conn->Send(str.c_str(), str.size());conn->Shutdown();
}void NewAcceptor(int fd)
{con_id++;PtrConnection conn(new Connection(loop_pool->NextLoop(), con_id, fd));conn->SetMessageCallback(std::bind(OnMessage, std::placeholders::_1, std::placeholders::_2));conn->SerServerClosedCallback(std::bind(DestroyConnection, std::placeholders::_1));conn->SetConnectedCallback(std::bind(OnConnection, std::placeholders::_1)); conn->SetInactiveRelease(5); // 非活跃连接10秒后销毁conn->Established(); // 就绪初始化_conns.insert(std::make_pair(con_id, conn));DEBLOG("New-------------------");
}// for test
int main()
{loop_pool = new LoopThreadPool(&base_loop);loop_pool->SetThreadCount(2);loop_pool->Create();srand(time(NULL)); // 生成一个随机数种子Acceptor acceptor(&base_loop, 8888);acceptor.SetAcceptorCallback(std::bind(NewAcceptor, std::placeholders::_1)); // 回调中,获取新连接,为新连接创建Channel并添加监控acceptor.Listen();base_loop.Start();return 0;
}

tcp_cli.cc不变
在这里插入图片描述

十九、TcpServer

用来整合上面十八个操作。

1、设计思路

在这里插入图片描述

2、代码

class TcpServer
{private:uint64_t _next_id; // 自动增加的IDint _port; // 端口int _timeout; // 非活跃连接的存在时长,也就是超过这个非活跃的时间就销毁bool _enable_inactive_release;// 是否启动非活跃连接释放EventLoop _baseloop;    // 主线程的EventLoop,如果没有从属线程的话就单线程EventLoopAcceptor _acceptor;    // 监听套接字的管理对象LoopThreadPool _pool;   // 从属EventLoop线程池std::unordered_map<uint64_t, PtrConnection> _conns; // 保存管理所有连接对应的shared_ptr对象// 回调函数using ConnectedCallback = std::function<void(const PtrConnection&)>;using MessageCallback = std::function<void(const PtrConnection&, Buffer *)>;using ClosedCallback = std::function<void(const PtrConnection&)>;using AnyEventCallback = std::function<void(const PtrConnection&)>;using Functor = std::function<void()>;ConnectedCallback _connected_callback;MessageCallback _message_callback;ClosedCallback _closed_callback;AnyEventCallback _event_callback;private:// 线程后再跑,因为不能让信息先进来线程再跑吧void RunAfterInLoop(const Functor &task, int delay) {_next_id++;_baseloop.TimerAdd(_next_id, delay, task);}// 为新连接构造一个Connection进行管理(与我们Connection模块测试的tcp_svr.cc一样)void NewConnection(int fd) {_next_id++;PtrConnection conn(new Connection(_pool.NextLoop(), _next_id, fd));conn->SetMessageCallback(_message_callback);conn->SetClosedCallback(_closed_callback);conn->SetConnectedCallback(_connected_callback);conn->SetAnyCallback(_event_callback);conn->SerServerClosedCallback(std::bind(&TcpServer::RemoveConnection, this, std::placeholders::_1));if (_enable_inactive_release) conn->SetInactiveRelease(_timeout); // 启动非活跃超时销毁conn->Established();_conns.insert(std::make_pair(_next_id, conn));}// 将连接从线程池中移除出来void RemoveConnectionInLoop(const PtrConnection &conn) {int id = conn->Id();auto it = _conns.find(id);if (it != _conns.end()) {_conns.erase(it);}}// 从管理Connection的_conns中移除连接信息void RemoveConnection(const PtrConnection &conn) {_baseloop.RunInLoop(std::bind(&TcpServer::RemoveConnectionInLoop, this, conn));}public:// 构造函数TcpServer(int port):_port(port), _next_id(0), _enable_inactive_release(false), _acceptor(&_baseloop, port),_pool(&_baseloop) {_acceptor.SetAcceptorCallback(std::bind(&TcpServer::NewConnection, this, std::placeholders::_1));_acceptor.Listen();//将监听套接字挂到baseloop上}// 设置线程个数void SetThreadCount(int count) {return _pool.SetThreadCount(count);}// 设置四个回调函数void SetConnectedCallback(const ConnectedCallback&cb) { _connected_callback = cb; }void SetMessageCallback(const MessageCallback&cb) { _message_callback = cb; }void SetClosedCallback(const ClosedCallback&cb) { _closed_callback = cb; }void SetAnyEventCallback(const AnyEventCallback&cb) { _event_callback = cb; }// 启动非活跃释放连接void EnableInactiveRelease(int timeout) {_timeout = timeout;_enable_inactive_release = true; }//用于添加一个定时任务void RunAfter(const Functor &task, int delay) {_baseloop.RunInLoop(std::bind(&TcpServer::RunAfterInLoop, this, task, delay));}// 启动void Start() { _pool.Create();  _baseloop.Start(); }
};

3、运行代码及结果

(1)运行结果1

// server.cc
#include "../source/server.hpp"void OnClosed(const PtrConnection& ptrc)
{DEBLOG("CLOSED CONNECTION:%p", ptrc.get());
}
void OnConnection(const PtrConnection& ptrc)
{DEBLOG("NEW CONNECTION:%p", ptrc.get());
}
void OnMessage(const PtrConnection& conn, Buffer* buff)
{DEBLOG("%s", buff->ReadPos());buff->ReadOffset(buff->ReadAbleSize());std::string str = "hello linux";conn->Send(str.c_str(), str.size());conn->Shutdown();
}int main()
{TcpServer server(8888);server.SetThreadCount(2);server.EnableInactiveRelease(10);server.SetClosedCallback(OnClosed);server.SetConnectedCallback(OnConnection);server.SetMessageCallback(OnMessage);server.Start();return 0;
}

tcp_cli.cc不变
在这里插入图片描述

(2)运行结果2

// server.cc
#include "../source/server.hpp"void OnClosed(const PtrConnection& ptrc)
{DEBLOG("CLOSED CONNECTION:%p", ptrc.get());
}
void OnConnection(const PtrConnection& ptrc)
{DEBLOG("NEW CONNECTION:%p", ptrc.get());
}
void OnMessage(const PtrConnection& conn, Buffer* buff)
{DEBLOG("%s", buff->ReadPos());buff->ReadOffset(buff->ReadAbleSize());std::string str = "hello linux";conn->Send(str.c_str(), str.size());//conn->Shutdown();
}int main()
{TcpServer server(8888);server.SetThreadCount(2);server.EnableInactiveRelease(10);server.SetClosedCallback(OnClosed);server.SetConnectedCallback(OnConnection);server.SetMessageCallback(OnMessage);server.Start();return 0;
}

在这里插入图片描述

(3)运行结果3

// server.cc
#include "../source/server.hpp"void OnClosed(const PtrConnection& ptrc)
{DEBLOG("CLOSED CONNECTION:%p", ptrc.get());
}
void OnConnection(const PtrConnection& ptrc)
{DEBLOG("NEW CONNECTION:%p", ptrc.get());
}
void OnMessage(const PtrConnection& conn, Buffer* buff)
{DEBLOG("%s", buff->ReadPos());buff->ReadOffset(buff->ReadAbleSize());std::string str = "hello linux";conn->Send(str.c_str(), str.size());//conn->Shutdown();
}int main()
{TcpServer server(8888);server.SetThreadCount(2);//server.EnableInactiveRelease(10);server.SetClosedCallback(OnClosed);server.SetConnectedCallback(OnConnection);server.SetMessageCallback(OnMessage);server.Start();return 0;
}

在这里插入图片描述

二十、NetWork

class NetWork 
{public:NetWork() {DEBLOG("SIGPIPE INIT");signal(SIGPIPE, SIG_IGN);}
};
static NetWork nw;

二十一、EchoServer

1、直接上代码(TcpServer二次封装)

// echo.hpp
#include "../server.hpp"
class EchoServer
{private:TcpServer _server;private:void OnClosed(const PtrConnection& ptrc){DEBLOG("CLOSED CONNECTION:%p", ptrc.get());}void OnConnection(const PtrConnection& ptrc){DEBLOG("NEW CONNECTION:%p", ptrc.get());}void OnMessage(const PtrConnection& conn, Buffer* buff){conn->Send(buff->ReadPos(), buff->ReadAbleSize());buff->ReadOffset(buff->ReadAbleSize());conn->Shutdown();}public:EchoServer(int port): _server(port){_server.SetThreadCount(2);_server.EnableInactiveRelease(10);_server.SetClosedCallback(std::bind(&EchoServer::OnClosed, this, std::placeholders::_1));_server.SetConnectedCallback(std::bind(&EchoServer::OnConnection, this, std::placeholders::_1));_server.SetMessageCallback(std::bind(&EchoServer::OnMessage, this, std::placeholders::_1, std::placeholders::_2));_server.Start();}void Start(){_server.Start();}
};
//main.cc
#include "echo.hpp"int main()
{EchoServer server(8888);server.Start();return 0;
}

2、测试结果

在这里插入图片描述

3、简单的EchoServer压力测试

(1)找测试文件

github上找一个叫WebBench的项目并压缩下来
在这里插入图片描述

(2)unzip

在这里插入图片描述

(3)make

在这里插入图片描述

(4)./webbench

在这里插入图片描述

(5)./webbench -c 500 -t 60 http://127.0.0.1:8888/hello

在这里插入图片描述

(6)跑完以后的结果:

在这里插入图片描述

二十二、EchoServer关系图

在这里插入图片描述

这篇关于【项目】仿muduo库One Thread One Loop式主从Reactor模型实现高并发服务器(TcpServer板块)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/925291

相关文章

大模型研发全揭秘:客服工单数据标注的完整攻略

在人工智能(AI)领域,数据标注是模型训练过程中至关重要的一步。无论你是新手还是有经验的从业者,掌握数据标注的技术细节和常见问题的解决方案都能为你的AI项目增添不少价值。在电信运营商的客服系统中,工单数据是客户问题和解决方案的重要记录。通过对这些工单数据进行有效标注,不仅能够帮助提升客服自动化系统的智能化水平,还能优化客户服务流程,提高客户满意度。本文将详细介绍如何在电信运营商客服工单的背景下进行

服务器集群同步时间手记

1.时间服务器配置(必须root用户) (1)检查ntp是否安装 [root@node1 桌面]# rpm -qa|grep ntpntp-4.2.6p5-10.el6.centos.x86_64fontpackages-filesystem-1.41-1.1.el6.noarchntpdate-4.2.6p5-10.el6.centos.x86_64 (2)修改ntp配置文件 [r

这15个Vue指令,让你的项目开发爽到爆

1. V-Hotkey 仓库地址: github.com/Dafrok/v-ho… Demo: 戳这里 https://dafrok.github.io/v-hotkey 安装: npm install --save v-hotkey 这个指令可以给组件绑定一个或多个快捷键。你想要通过按下 Escape 键后隐藏某个组件,按住 Control 和回车键再显示它吗?小菜一碟: <template

如何用Docker运行Django项目

本章教程,介绍如何用Docker创建一个Django,并运行能够访问。 一、拉取镜像 这里我们使用python3.11版本的docker镜像 docker pull python:3.11 二、运行容器 这里我们将容器内部的8080端口,映射到宿主机的80端口上。 docker run -itd --name python311 -p

hdu1043(八数码问题,广搜 + hash(实现状态压缩) )

利用康拓展开将一个排列映射成一个自然数,然后就变成了普通的广搜题。 #include<iostream>#include<algorithm>#include<string>#include<stack>#include<queue>#include<map>#include<stdio.h>#include<stdlib.h>#include<ctype.h>#inclu

Andrej Karpathy最新采访:认知核心模型10亿参数就够了,AI会打破教育不公的僵局

夕小瑶科技说 原创  作者 | 海野 AI圈子的红人,AI大神Andrej Karpathy,曾是OpenAI联合创始人之一,特斯拉AI总监。上一次的动态是官宣创办一家名为 Eureka Labs 的人工智能+教育公司 ,宣布将长期致力于AI原生教育。 近日,Andrej Karpathy接受了No Priors(投资博客)的采访,与硅谷知名投资人 Sara Guo 和 Elad G

【C++】_list常用方法解析及模拟实现

相信自己的力量,只要对自己始终保持信心,尽自己最大努力去完成任何事,就算事情最终结果是失败了,努力了也不留遗憾。💓💓💓 目录   ✨说在前面 🍋知识点一:什么是list? •🌰1.list的定义 •🌰2.list的基本特性 •🌰3.常用接口介绍 🍋知识点二:list常用接口 •🌰1.默认成员函数 🔥构造函数(⭐) 🔥析构函数 •🌰2.list对象

【Prometheus】PromQL向量匹配实现不同标签的向量数据进行运算

✨✨ 欢迎大家来到景天科技苑✨✨ 🎈🎈 养成好习惯,先赞后看哦~🎈🎈 🏆 作者简介:景天科技苑 🏆《头衔》:大厂架构师,华为云开发者社区专家博主,阿里云开发者社区专家博主,CSDN全栈领域优质创作者,掘金优秀博主,51CTO博客专家等。 🏆《博客》:Python全栈,前后端开发,小程序开发,人工智能,js逆向,App逆向,网络系统安全,数据分析,Django,fastapi

让树莓派智能语音助手实现定时提醒功能

最初的时候是想直接在rasa 的chatbot上实现,因为rasa本身是带有remindschedule模块的。不过经过一番折腾后,忽然发现,chatbot上实现的定时,语音助手不一定会有响应。因为,我目前语音助手的代码设置了长时间无应答会结束对话,这样一来,chatbot定时提醒的触发就不会被语音助手获悉。那怎么让语音助手也具有定时提醒功能呢? 我最后选择的方法是用threading.Time

Android实现任意版本设置默认的锁屏壁纸和桌面壁纸(两张壁纸可不一致)

客户有些需求需要设置默认壁纸和锁屏壁纸  在默认情况下 这两个壁纸是相同的  如果需要默认的锁屏壁纸和桌面壁纸不一样 需要额外修改 Android13实现 替换默认桌面壁纸: 将图片文件替换frameworks/base/core/res/res/drawable-nodpi/default_wallpaper.*  (注意不能是bmp格式) 替换默认锁屏壁纸: 将图片资源放入vendo