网络编程: reactor模式的步步探索与实现

2024-06-08 04:04

本文主要是介绍网络编程: reactor模式的步步探索与实现,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

网络编程: reactor模式的步步探索与实现

  • 一.步步探索
    • 1.先看一下之前的BUG的影响
    • 2.解决拼接式读取问题
    • 3.进一步的探索
    • 4.Connection的提出
    • 5.EpollServer的修改并将监听套接字添加进去
    • 6.小演示
  • 二.协议与业务登场
    • 1.协议,业务,解决粘包,序列反序列化等等的函数模块实现
    • 2.读写异常事件的关心策略
    • 3.handler.hpp的修改
    • 4.client.cpp的快速编写
    • 5.演示
    • 6.事件派发与提出reactor
    • 7.完整代码
      • 1.Reactor.hpp
      • 2. Connection.hpp
      • 3. Epoller.hpp
      • 4. handler.hpp
      • 5. PacketProcesser.hpp
      • 6.协议和业务: Protocal.hpp Translate.hpp
      • 7.cpp文件: server和client
  • 三.画图进一步理解reactor
    • 1.版本1
    • 2.版本2
    • 3.小总结

回顾上文留下的疑问,这是一个待处理的问题
read的数据粘包,序列反序列化,写事件和异常事件怎么处理?

一.步步探索

1.先看一下之前的BUG的影响

之前我们都是多线程给每个用户提供服务,每个线程对应于一个用户,都在线程函数当中维护了对应的输入输出缓冲区,
所以没有遇到过这种BUG

多路转接使得服务器能够不依赖多线程即可完成同时为多个用户提供服务,那么它就要解决多线程解决过的问题,这是理所当然的
在这里插入图片描述
我们先简单地解决数据包粘包问题, 就规定 每条消息固定长度:15字节
buf数组我们搞成大小为20
依旧是echo服务器(发回完整的一条消息(15字节))

因为我们发的消息是字符流,所以没有序列反序列化问题

演示一下:

|Live on hope.|
|Give me time.|
|Show me yours|
|Take it easy!|
|Just a moment|
|Cat dog house|
|Tea pot stand|这是长度为13的短语,加上两边的|正好15个长度我们到时候就发这个

在这里插入图片描述
这种bug是不能容忍的,因此下面我们来解决这一问题

2.解决拼接式读取问题

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
这一点充分地说明了只要为每个fd维护一个输入输出缓冲区即可修复对应的BUG
大佬在这个角度上又有了更深的思考
在这里插入图片描述
某个功能不能写死,如何才能办到呢?
回调函数!!

3.进一步的探索

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
于是:
在这里插入图片描述

4.Connection的提出

每个fd对应于一个用户,也就是一个通信连接,也要有自己的inbuff,outbuff
还要能够访问Epoll_server的Epoller _epoll对象来设置取消关系等等!!

因此大佬对上述的成员进行封装,提出了Connection
在这里插入图片描述
在这里插入图片描述
写事件的问题我们解决了,但是仔细想想: 这种方法还是写死了啊,只不过是封装一层之后写死的啊
reader和writer应该要由使用方来提供,因此reader,writer,accepter,excepter都要在搞一个回调

因此connection就此成型
在这里插入图片描述

#pragma once
#include <functional>
#include <memory>
using namespace std;class EpollServer; // 前置声明
class Connection;
using func_t = function<void(shared_ptr<Connection>)>;// connection负责维护一个具体的连接
// 拥有自己的sockfd,用户级输入输出缓冲区, 读写异常事件的注册方法/回调函数
// 还有一个回指向EpollServer的指针/EpollServer设置的对应的回调函数class Connection
{
public:Connection(int fd, EpollServer *epoll_server): _sockfd(fd), _epoll_server(epoll_server) {}~Connection(){close(_sockfd);}void add_outbuffer(const string &buf){_out_buffer += buf;}string &get_inbuffer(){return _in_buffer;}string &get_outbuffer(){return _out_buffer;}func_t getreader(){return _reader;}func_t getwriter(){return _writer;}func_t getexcepter(){return _excepter;}void registerCallback(func_t reader = nullptr, func_t writer = nullptr, func_t excepter = nullptr){_reader = reader ? reader : _reader;_writer = writer ? writer : _writer;_excepter = _excepter ? excepter : _excepter;}void deregisterCallback(bool reader = false, bool writer = false, bool excepter = false){if (reader)_reader = nullptr;if (writer)_writer = nullptr;if (excepter)_excepter = nullptr;}int getfd(){return _sockfd;}EpollServer *getEpollServer(){return _epoll_server;}private:int _sockfd;string _in_buffer;  // 有问题,只能处理文本,无法处理二进制(比如:图片,视频....)string _out_buffer; // TO BE MODIFY// 回调函数: 读,写,异常func_t _reader;func_t _writer;func_t _excepter;EpollServer *_epoll_server;
};

我们先暂且把Callback,Request,Response,BusinessProcessing放到同一个文件中: handler.hpp
不过我们要知道,它们将来必须是要分开的
在这里插入图片描述

5.EpollServer的修改并将监听套接字添加进去

Connection封装了一个单独的连接,并提供了回调函数的get和set方法,我们的EpollServer也就要修改了
又因为:
将connection和handler提出去之后,我们就可以在.cpp文件当中提前创建/绑定监听套接字,然后设置回调,添加到EpollServer当中

我们的EpollServer要能够提供这么几个功能:

  1. 建立和关闭连接 – connection
  2. 设置和取消关心 – epoll
  3. 注册和取消回调 – connection当中的reader,writer,excepter
    在这里插入图片描述
    这些代码以大家现在的水平应该一看就能看懂,我们重点是在思想上和代码上,走一下大佬们曾经走过的路
    (当然,大佬是边走边建立道路, 我们是边走边欣赏,感叹道路建设当中的优雅与强大)

下面我们添加监听套接字: 建立一个.cpp文件

  • 温馨提示: 我们用epoll的ET模式
    因此: 我们的accept,read,write都要用非阻塞方式来进行
    所以我们拿出我们之前一劳永逸的代码: 直接将fd设置为非阻塞状态
// 非阻塞式IO
void ModifyFdToNonBlock(int fd)
{int fl = fcntl(fd, F_GETFL); // 获取fd的状态给flif (fl < 0){LOG_SCREEN(FATAL) << "ModifyFdToNonBlock(int fd) error , errno: " << errno << " , strerror: " << strerror(errno) << "\n";return;}fcntl(fd, F_SETFL, fl | O_NONBLOCK); // 将fd设置为非阻塞状态
}

在这里插入图片描述
我们就不用Socket了,直接用原生套接字接口
在这里插入图片描述

6.小演示

跟我们一开始的样子一样,发一下|Live on hope.||Give me time.|等等这些短语验证一下,没问题我们就开始写一个小业务了(主要是自定义应用层协议解决一下粘包问题和序列反序列化问题)
在这里插入图片描述
完美,下面我们把它们分开吧
在这里插入图片描述

二.协议与业务登场

1.协议,业务,解决粘包,序列反序列化等等的函数模块实现

我们要实现一个简单的英汉互译服务器,
协议是这样的,采用的是LV(length+Value)方式来解决粘包问题 len\n0 单词\n
0和单词之间以空格作为分割符,进行序列反序列化

徒说无益,直接给代码了,没啥难的

// 英汉互译服务器
#pragma once
#include <string>
#include <iostream>
using namespace std;const char Sep = '\n';//  len\n字符串\n
class Codec
{
public:static void Encode(string &str){string encode_str = to_string(str.size()) + Sep + str;str = encode_str;}static bool Decode(string &str, string *return_str){// 先找\nsize_t start = 0, pos = str.find(Sep);if (pos == string::npos){return false;}// 1. 取出lensize_t len = stoi(str.substr(start, pos - start));// 2. start往后走,越过\r\nstart = pos + 1;// 3. pos往后走len个长度pos = start + len;// 4. 看pos是否不够if (pos >= str.size()+1){return false;}// 5.没有越界,则截取字符串,并erase str*return_str = str.substr(start, len);str.erase(0,pos);return true;}
};enum TranslateMode
{ETOC, // 英译汉CTOE  // 汉译英
};// len\n0 字符串\n
//"0"表示英译汉
//"1"表示汉译英
// 其余的直接丢弃
class Request
{
public:Request() = default;Request(TranslateMode mode, const string &str): _mode(mode){if (_mode == ETOC)_english = str;else_chinese = str;}string Serialize(){if (_mode == ETOC){return "0 " + _english;}elsereturn "1 " + _chinese;}bool Deserialize(const string &str){// 找空格,分割即可size_t pos = str.find(" ");if (pos == string::npos || str.size() <= 2)return false;if (str.substr(0, pos) == "0"){_english = str.substr(pos + 1);_mode = ETOC;return true;}else if (str.substr(0, pos) == "1"){_chinese = str.substr(pos + 1);_mode = CTOE;return true;}return false;}bool etoc() const{return _mode == ETOC;}const string &str() const{if (etoc())return _english;return _chinese;}void setmode(TranslateMode mode){_mode = mode;}void setstr(const string &s){if (etoc())_english = s;else_chinese = s;}private:string _english;string _chinese;TranslateMode _mode;
};class Response
{
public:Response() = default;Response(const string &str): _str(str) {}string Serialize(){return _str;}bool Deserialize(const string &str){_str = str;return true;}void Setans(const string &str){_str = str;}private:string _str;
};

至于业务:

#pragma once
#include <unordered_map>
using namespace std;
#include "Protocal.hpp"class Translater
{
public:static Response Translate(const Request &req){static unordered_map<string, string> umap_etoc = {{"hello", "你好"}, {"dp", "一生之敌"}, {"ac", "恭喜"}, {"hello world", "你好,世界"}};static unordered_map<string, string> umap_ctoe = {{"你好", "hello"}, {"一生之敌", "dp"}, {"恭喜", "ac"}, {"你好,世界", "hello world"}};bool etoc = req.etoc();string ans = "Not Found";if (etoc){if (umap_etoc.count(req.str())){ans = umap_etoc[req.str()];}}else{if (umap_ctoe.count(req.str())){ans = umap_etoc[req.str()];}}Response resp;resp.Setans(ans);return resp;}
};

至于解决粘包,反序列化,拿到Request,交给业务层处理,拿到Response,序列化,封装报头这些任务依然需要搞一个文件
PacketProcessor.hpp

#pragma once#include "Translate.hpp"
#include "Log.hpp"
#include <unistd.h>
class PacketProcessor
{
public:static string getProcessedMessage(string &inbuffer){string outstr;string tmpstr;// 1.解决粘包while (Codec::Decode(inbuffer, &tmpstr)){// 2.反序列化Request req;if (!req.Deserialize(tmpstr)){LOG_SCREEN(FATAL) << "request 反序列化失败,该报文直接丢弃\n";return outstr;}// 3.交由业务层处理Response resp = Translater::Translate(req);// 4.encodestring s = resp.Serialize();Codec::Encode(s);// 5.将返回结果添加到outstr当中outstr += s;}return outstr;}
};

业务相关代码处理完了,下面重点就是reader,writer,excepter函数的修改了

2.读写异常事件的关心策略

大家也都写了一年多代码了,结合我们的编程经验,我们也都可以得出: IO操作当中,读是最最最容易阻塞的,
因此读事件一般都要关心,而写事件很少阻塞(除了学管道的时候见过),因此写事件很少关心,而异常事件可以转为读写事件的关心,
而IO在读/写时也会发生异常,所以统一集中在读写时处理

对于写而言:
我们一般就是直接非阻塞式写,如果遇到errno==EAGAIN,说明发送缓冲区满了,此时才会设置写事件关心
如果用户级发送缓冲区outbuff空了,说明全发过去了,取消写事件关心注意: 一定不要影响到读事件的关心和回调

3.handler.hpp的修改

在这里插入图片描述

#pragma once
#include <string>
using namespace std;
#include "Epoll_server.hpp"
#include "common.hpp"
#include "PacketProcessor.hpp"#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>struct Callback
{static void reader(shared_ptr<Connection> conn){int fd = conn->getfd();string &inbuff = conn->get_inbuffer();char buf[1024];int num;while (true){errno = 0;num = recv(fd, buf, sizeof(buf) - 1, 0);if (num >= 0){buf[num] = 0;inbuff += buf; // 不要忘了拼接到接收缓冲区中if (num == 0)  // 说明client关闭了写,那么我们只需要把该发给client的数据发完就OK了,无需在读了break;}else{if (errno == EAGAIN) // 说明缓冲区为空了,此时退出循环即可break;if (errno == EINTR) // 说明读过程收到了信号打断,continue重新读continue;// 说明recv失败,打印日志LOG_SCREEN(ERROR) << "recv message fail!, errno: " << errno << " strerror: " << strerror(errno) << "\n";excepter(conn); // 调用excepter异常处理(断开连接)return;         // 回来之后马上return}}// 走到这里,说明recv成功,要不然就是接收缓冲区空了,要不然就是client关闭写了// 无论是哪种,先交给PackProcessor.hpp解决粘包,反序列化拿到Request,交由业务层处理,拿到Response,序列化,封装报头发回来,// 我们放到发送缓冲区调用writer进行发送string send_str = PacketProcessor::getProcessedMessage(inbuff);conn->add_outbuffer(send_str);writer(conn);// 还会回来 , 如果num==0 并且发送缓冲区为空,那么通常情况下断开连接即可if (num == 0){while (!conn->get_outbuffer().empty())writer(conn); // 只要发送缓冲区还有数据就一直调用LOG_SCREEN(INFO) << "client exit, send him message finish...\n";excepter(conn);}}static void writer(shared_ptr<Connection> conn){int fd = conn->getfd();string &outbuff = conn->get_outbuffer();while (true){errno = 0;// 发回去int num = send(fd, outbuff.c_str(), outbuff.size(), 0);if (num >= 0){if (num == 0) // 说明client关闭写,通常情况下我们直接关闭跟client的连接即可{LOG_SCREEN(INFO) << "client close his writer_interface\n";excepter(conn);return; // 回来时直接返回}outbuff.erase(0, num); // 发成功的话,要把实际发出的信息从outbuff删除掉if (outbuff.empty())break; // 全发完了,直接break即可}else{if (errno == EAGAIN) // 说明发送缓冲区满了,break即可break;if (errno == EINTR)continue;// 说明send失败,打印日志LOG_SCREEN(ERROR) << "send message fail!, errno: " << errno << " strerror: " << strerror(errno) << "\n";excepter(conn); // 调用excepter异常处理(断开连接)return;         // 回来之后马上return}}// 走到这里只有2种情况: 要么发送缓冲区满了 我们需要设置写事件关心,要么发完了 我们需要取消写事件关心if (outbuff.empty()){LOG_SCREEN(INFO) << "outbuff发完了,去取消写事件关心与回调\n";conn->getEpollServer()->deregisterCallback(fd, false, true);}else{LOG_SCREEN(INFO) << "缓冲区满了,设置写事件关心与回调\n";conn->getEpollServer()->setupCare(fd, true, true);conn->getEpollServer()->registerCallback(fd, &Callback::reader, &Callback::writer);}}static void excepter(shared_ptr<Connection> conn){int fd = conn->getfd();EpollServer *es = conn->getEpollServer();// 1. 取消回调es->deregisterCallback(fd, true, true, true);// 2. 取消关心es->teardownCare(fd);// 3. 关闭连接es->shutdownConnection(fd);LOG_SCREEN(INFO) << "关闭连接, client's fd: " << fd << "\n";}static void accepter(shared_ptr<Connection> conn){int listenfd = conn->getfd();while (true){errno = 0;sockaddr_in srcaddr;socklen_t len = sizeof(srcaddr);int newfd = ::accept(listenfd, Conv(&srcaddr), &len);if (newfd >= 0){LOG_SCREEN(INFO) << "accept success, newfd: " << newfd << "\n";ModifyFdToNonBlock(newfd);// 说明有新套接字出现,要用Epoll_server来添加conn->getEpollServer()->buildConnection(newfd);                     // 1.建议连接conn->getEpollServer()->setupCare(newfd, true);                     // 2.设置读事件的关心conn->getEpollServer()->registerCallback(newfd, &Callback::reader); // 3.注册回调}else{if (errno == EAGAIN)return;if (errno == EINTR)continue;else{LOG_SCREEN(ERROR) << "accept fail, errno: "<< errno << ", strerror(errno) : " << strerror(errno) << "\n";break;}}}}
};

4.client.cpp的快速编写

因为telnet的每次回车都会给我们加上/r/n,所以就不好演示,因此与其费劲调telnet和协议,还不如自己写一个简单的普普通通的套接字client呢

我们采用两个新线程,一个负责读,一个负责写
直接上代码了,没啥难的

#include "Protocal.hpp"
#include <iostream>
using namespace std;
#include <vector>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <cstring>
#include "common.hpp"
#include <thread>void sender(int sockfd)
{int command;vector<string> inv = {"0hello", "0dp", "0ac"};while (true){cout << "是否发送请求, 0代表不发送,直接退出,1代表发送: [0/1]\n";cin >> command;if (command == 0)break;for (auto &in : inv){//  1.构建请求Request req;if (in[0] == '0'){req.setmode(ETOC);}elsereq.setmode(CTOE);req.setstr(in.substr(1));// 2.序列化string send_str = req.Serialize();// 3.encodeCodec::Encode(send_str);//cout << "发送数据: " << send_str;send(sockfd, send_str.c_str(), send_str.size(), 0);}}// 仅关闭写端// int shutdown(int sockfd, int how); how 设为 SHUT_WRLOG_SCREEN(INFO) << "发完数据啦,关闭套接字的写端!\n";shutdown(sockfd, SHUT_WR);
}
void reader(int sockfd)
{while (true){string out;int n;// 1.读取响应while (true){char buf[1024];n = recv(sockfd, buf, sizeof(buf) - 1, MSG_DONTWAIT);if (n >= 0){buf[n] = 0;out += buf;// cout << out << endl;if (n == 0)break;}else{if (errno == EAGAIN)break;if (errno == EINTR)continue;else{cout << "recv error, errno: " << errno << ", strerror(errno): " << strerror(errno) << "\n";return;}}}// 2.解决数据包粘包string return_str;while (Codec::Decode(out, &return_str)){// 3.反序列化Response resp;if (!resp.Deserialize(return_str)){cout << "response 反序列化失败,该报文直接丢弃\n";break;}// 打印即可cout << "server回复: " << return_str << "\n";}if (n == 0)break;}
}int main(int argc, char *argv[])
{if (argc != 3){cout << "Usage: " << argv[0] << " server_ip, server_port" << endl;return 1;}string ip = argv[1];uint16_t port = stoi(argv[2]);// 1. 创建socketint sockfd = socket(AF_INET, SOCK_STREAM, 0);// 2. connectstruct sockaddr_in server_addr;memset(&server_addr, 0, sizeof(server_addr));server_addr.sin_family = AF_INET;server_addr.sin_addr.s_addr = inet_addr(ip.c_str());server_addr.sin_port = htons(port);int n = connect(sockfd, Conv(&server_addr), sizeof(server_addr));if (n < 0){cout << "connect fail\n";return 1;}thread t1(sender, sockfd);thread t2(reader, sockfd);t1.join();t2.join();return 0;
}

5.演示

验证成功
在这里插入图片描述

6.事件派发与提出reactor

到现在,我们成功解决了 本文一开始提出的 read的数据粘包,序列反序列化,写事件和异常事件怎么处理?的这个问题

下面我们回过头来看一下我们当初的EpollSever如今变成了什么样子
在这里插入图片描述
在这里插入图片描述
而正因如此,刚才的EpollServer被称为reactor模式(反应堆模式,也叫做半同步半异步模式)当中最核心的反应堆

反应堆体现在: 利用epoll的事件驱动机制+内部注册的回调函数实现回调函数调用的"自动化"与"连续化" , 就类似于核反应的感觉

半同步半异步模式体现在: 半异步: 事件的监听和通知与注册的函数调用是异步的
半同步体现在: 对应的recv和send接口依旧需要执行流主动调用,是一种IO的同步机制

因此: 我们改一下名字

7.完整代码

1.Reactor.hpp

#pragma once
#include "Log.hpp"
#include <memory>
#include <vector>
#include <cstring>
#include "Connection.hpp"
#include "Epoller.hpp"class Reactor
{
public:Epoller &epoll(){return _epoll;}Reactor(uint16_t port) : _port(port) {}void dispatch(){while (true){int n = _epoll.wait();if (n > 0){eventloop(n);}else{LOG_SCREEN(ERROR) << "epoll wait error, errno: " << errno << ", strerror: " << strerror(errno) << "\n";}}}void buildConnection(int fd){_connection_map[fd] = make_shared<Connection>(fd, this);}void shutdownConnection(int fd){_connection_map.erase(fd);}void setupCare(int fd, bool in = false, bool out = false, bool except = false){_epoll.add_Epoll(fd, in, out, except);}void teardownCare(int fd){_epoll.removefromEpoll(fd);}void registerCallback(int fd, func_t reader = nullptr, func_t writer = nullptr, func_t excepter = nullptr){// 添加/修改if (!isConnected(fd)){_connection_map[fd] = make_shared<Connection>(fd, this);}_connection_map[fd]->registerCallback(reader, writer, excepter);}void deregisterCallback(int fd, bool reader = false, bool writer = false, bool excepter = false){// 查看在不在if (!isConnected(fd))return;// 删除_connection_map[fd]->deregisterCallback(reader, writer, excepter);}private:bool isConnected(int fd){return _connection_map.count(fd) == 1;}void eventloop(int n){for (int i = 0; i < n; i++){int fd = _epoll.getfd(i);uint32_t event = _epoll.getevent(i);if (!isConnected(fd))continue;shared_ptr<Connection> conn = _connection_map[fd];// 面对异常,我们能做的只有断开连接// 而读写时也可能会发生异常啊,因此我们把异常统一到一个地方去处理,更加优雅if (event & EPOLLERR | event & EPOLLHUP){event |= (EPOLLIN | EPOLLOUT);}if (event & EPOLLIN){func_t reader = conn->getreader();//不区分fd是监听套接字还是普通套接字,监听套接字对应的connection绑定的reader函数其实是accepterif (reader == nullptr)continue;reader(conn);}if (event & EPOLLOUT){func_t writer = conn->getwriter();if (writer == nullptr)continue;writer(conn);}}}uint16_t _port;Epoller _epoll;unordered_map<int, shared_ptr<Connection>> _connection_map;
};

2. Connection.hpp

#pragma once
#include <functional>
#include <unistd.h>
#include <memory>
using namespace std;class Reactor; // 前置声明
class Connection;
using func_t = function<void(shared_ptr<Connection>)>;// connection负责维护一个具体的连接
// 拥有自己的sockfd,用户级输入输出缓冲区, 读写异常事件的注册方法/回调函数
// 还有一个回指向EpollServer的指针/EpollServer设置的对应的回调函数class Connection
{
public:Connection(int fd, Reactor *reactor): _sockfd(fd), _reactor(reactor) {}~Connection(){close(_sockfd);}void add_outbuffer(const string &buf){_out_buffer += buf;}string &get_inbuffer(){return _in_buffer;}string &get_outbuffer(){return _out_buffer;}func_t getreader(){return _reader;}func_t getwriter(){return _writer;}func_t getexcepter(){return _excepter;}void registerCallback(func_t reader = nullptr, func_t writer = nullptr, func_t excepter = nullptr){_reader = reader ? reader : _reader;_writer = writer ? writer : _writer;_excepter = _excepter ? excepter : _excepter;}void deregisterCallback(bool reader = false, bool writer = false, bool excepter = false){if (reader)_reader = nullptr;if (writer)_writer = nullptr;if (excepter)_excepter = nullptr;}int getfd(){return _sockfd;}Reactor *reactor(){return _reactor;}private:int _sockfd;string _in_buffer;  // 有问题,只能处理文本,无法处理二进制(比如:图片,视频....)string _out_buffer; // TO BE MODIFY// 回调函数: 读,写,异常func_t _reader;func_t _writer;func_t _excepter;Reactor *_reactor;
};

3. Epoller.hpp

#pragma once
#include <sys/epoll.h>
#include <vector>
#include <string>
using namespace std;class Epoller
{
public:// 默认阻塞式等待Epoller(int timeout = -1) : _timeout(timeout){_epfd = epoll_create(1);}void add_Epoll(int fd, bool in = false, bool out = false, bool except = false){int i = _events_arr.size();if (!_invalids.empty()){i = _invalids.back() - '0';_invalids.pop_back();}else_events_arr.push_back(epoll_event());_events_arr[i].events = EPOLLET | (in ? EPOLLIN : 0) | (out ? EPOLLOUT : 0) | (except ? EPOLLERR | EPOLLHUP : 0);_events_arr[i].data.fd = fd;epoll_ctl(_epfd, EPOLL_CTL_ADD, fd, &_events_arr[i]);}void removefromEpoll(int fd){for (int i = 0; i < _events_arr.size(); i++){if (_events_arr[i].data.fd == fd){_events_arr[i].data.fd = -1;_events_arr[i].events = 0;_invalids.push_back(i); // 删除的时候添加到invalids当中break;}}epoll_ctl(_epfd, EPOLL_CTL_DEL, fd, nullptr);}int wait(){return epoll_wait(_epfd, &_events_arr[0], _events_arr.size(), _timeout);}int getfd(int index){return _events_arr[index].data.fd;}uint32_t getevent(int index){return _events_arr[index].events;}private:int _epfd;vector<struct epoll_event> _events_arr;string _invalids;int _timeout;
};

4. handler.hpp

#pragma once
#include <string>
using namespace std;
#include "Reactor.hpp"
#include "common.hpp"
#include "PacketProcessor.hpp"#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>struct Handler
{static void reader(shared_ptr<Connection> conn){int fd = conn->getfd();string &inbuff = conn->get_inbuffer();char buf[1024];int num;while (true){errno = 0;num = recv(fd, buf, sizeof(buf) - 1, 0);if (num >= 0){buf[num] = 0;inbuff += buf; // 不要忘了拼接到接收缓冲区中if (num == 0)  // 说明client关闭了写,那么我们只需要把该发给client的数据发完就OK了,无需在读了break;}else{if (errno == EAGAIN) // 说明缓冲区为空了,此时退出循环即可break;if (errno == EINTR) // 说明读过程收到了信号打断,continue重新读continue;// 说明recv失败,打印日志LOG_SCREEN(ERROR) << "recv message fail!, errno: " << errno << " strerror: " << strerror(errno) << "\n";excepter(conn); // 调用excepter异常处理(断开连接)return;         // 回来之后马上return}}// 走到这里,说明recv成功,要不然就是接收缓冲区空了,要不然就是client关闭写了// 无论是哪种,先交给PackProcessor.hpp解决粘包,反序列化拿到Request,交由业务层处理,拿到Response,序列化,封装报头发回来,// 我们放到发送缓冲区调用writer进行发送string send_str = PacketProcessor::getProcessedMessage(inbuff);conn->add_outbuffer(send_str);writer(conn);// 还会回来 , 如果num==0 并且发送缓冲区为空,那么通常情况下断开连接即可if (num == 0){while (!conn->get_outbuffer().empty())writer(conn); // 只要发送缓冲区还有数据就一直调用LOG_SCREEN(INFO) << "client exit, send him message finish...\n";excepter(conn);}}static void writer(shared_ptr<Connection> conn){int fd = conn->getfd();string &outbuff = conn->get_outbuffer();while (true){errno = 0;// 发回去int num = send(fd, outbuff.c_str(), outbuff.size(), 0);if (num >= 0){if (num == 0) // 说明client关闭写,通常情况下我们直接关闭跟client的连接即可{LOG_SCREEN(INFO) << "client close his writer_interface\n";excepter(conn);return; // 回来时直接返回}outbuff.erase(0, num); // 发成功的话,要把实际发出的信息从outbuff删除掉if (outbuff.empty())break; // 全发完了,直接break即可}else{if (errno == EAGAIN) // 说明发送缓冲区满了,break即可break;if (errno == EINTR)continue;// 说明send失败,打印日志LOG_SCREEN(ERROR) << "send message fail!, errno: " << errno << " strerror: " << strerror(errno) << "\n";excepter(conn); // 调用excepter异常处理(断开连接)return;         // 回来之后马上return}}// 走到这里只有2种情况: 要么发送缓冲区满了 我们需要设置写事件关心,要么发完了 我们需要取消写事件关心if (outbuff.empty()){LOG_SCREEN(INFO) << "outbuff发完了,去取消写事件关心与回调\n";conn->reactor()->deregisterCallback(fd, false, true);}else{LOG_SCREEN(INFO) << "缓冲区满了,设置写事件关心与回调\n";conn->reactor()->setupCare(fd, true, true);conn->reactor()->registerCallback(fd, &Handler::reader, &Handler::writer);}}static void excepter(shared_ptr<Connection> conn){int fd = conn->getfd();// 1. 取消回调conn->reactor()->deregisterCallback(fd, true, true, true);// 2. 取消关心conn->reactor()->teardownCare(fd);// 3. 关闭连接conn->reactor()->shutdownConnection(fd);LOG_SCREEN(INFO) << "关闭连接, client's fd: " << fd << "\n";}static void accepter(shared_ptr<Connection> conn){int listenfd = conn->getfd();while (true){errno = 0;sockaddr_in srcaddr;socklen_t len = sizeof(srcaddr);int newfd = ::accept(listenfd, Conv(&srcaddr), &len);if (newfd >= 0){LOG_SCREEN(INFO) << "accept success, newfd: " << newfd << "\n";ModifyFdToNonBlock(newfd);// 说明有新套接字出现,要用Epoll_server来添加conn->reactor()->buildConnection(newfd);                     // 1.建议连接conn->reactor()->setupCare(newfd, true);                     // 2.设置读事件的关心conn->reactor()->registerCallback(newfd, &Handler::reader); // 3.注册回调}else{if (errno == EAGAIN)return;if (errno == EINTR)continue;else{LOG_SCREEN(ERROR) << "accept fail, errno: "<< errno << ", strerror(errno) : " << strerror(errno) << "\n";break;}}}}
};

5. PacketProcesser.hpp

#pragma once
#include "Translate.hpp"
#include "Log.hpp"
#include <unistd.h>
class PacketProcessor
{
public:static string getProcessedMessage(string &inbuffer){string outstr;string tmpstr;// 1.解决粘包while (Codec::Decode(inbuffer, &tmpstr)){// 2.反序列化Request req;if (!req.Deserialize(tmpstr)){LOG_SCREEN(FATAL) << "request 反序列化失败,该报文直接丢弃\n";return outstr;}// 3.交由业务层处理Response resp = Translater::Translate(req);// 4.encodestring s = resp.Serialize();Codec::Encode(s);// 5.将返回结果添加到outstr当中outstr += s;}return outstr;}
};

6.协议和业务: Protocal.hpp Translate.hpp

Protocal.hpp

// 英汉互译服务器
#pragma once
#include <string>
#include <iostream>
using namespace std;const char Sep = '\n';//  len\n字符串\n
class Codec
{
public:static void Encode(string &str){string encode_str = to_string(str.size()) + Sep + str;str = encode_str;}static bool Decode(string &str, string *return_str){// 先找\nsize_t start = 0, pos = str.find(Sep);if (pos == string::npos){return false;}// 1. 取出lensize_t len = stoi(str.substr(start, pos - start));// 2. start往后走,越过\r\nstart = pos + 1;// 3. pos往后走len个长度pos = start + len;// 4. 看pos是否不够if (pos >= str.size()+1){return false;}// 5.没有越界,则截取字符串,并erase str*return_str = str.substr(start, len);str.erase(0,pos);return true;}
};enum TranslateMode
{ETOC, // 英译汉CTOE  // 汉译英
};// len\n0 字符串\n
//"0"表示英译汉
//"1"表示汉译英
// 其余的直接丢弃
class Request
{
public:Request() = default;Request(TranslateMode mode, const string &str): _mode(mode){if (_mode == ETOC)_english = str;else_chinese = str;}string Serialize(){if (_mode == ETOC){return "0 " + _english;}elsereturn "1 " + _chinese;}bool Deserialize(const string &str){// 找空格,分割即可size_t pos = str.find(" ");if (pos == string::npos || str.size() <= 2)return false;if (str.substr(0, pos) == "0"){_english = str.substr(pos + 1);_mode = ETOC;return true;}else if (str.substr(0, pos) == "1"){_chinese = str.substr(pos + 1);_mode = CTOE;return true;}return false;}bool etoc() const{return _mode == ETOC;}const string &str() const{if (etoc())return _english;return _chinese;}void setmode(TranslateMode mode){_mode = mode;}void setstr(const string &s){if (etoc())_english = s;else_chinese = s;}private:string _english;string _chinese;TranslateMode _mode;
};class Response
{
public:Response() = default;Response(const string &str): _str(str) {}string Serialize(){return _str;}bool Deserialize(const string &str){_str = str;return true;}void Setans(const string &str){_str = str;}private:string _str;
};

Translate.hpp:

#pragma once
#include <unordered_map>
using namespace std;
#include "Protocal.hpp"class Translater
{
public:static Response Translate(const Request &req){static unordered_map<string, string> umap_etoc = {{"hello", "你好"}, {"dp", "一生之敌"}, {"ac", "恭喜"}, {"hello world", "你好,世界"}};static unordered_map<string, string> umap_ctoe = {{"你好", "hello"}, {"一生之敌", "dp"}, {"恭喜", "ac"}, {"你好,世界", "hello world"}};bool etoc = req.etoc();string ans = "Not Found";if (etoc){if (umap_etoc.count(req.str())){ans = umap_etoc[req.str()];}}else{if (umap_ctoe.count(req.str())){ans = umap_etoc[req.str()];}}Response resp;resp.Setans(ans);return resp;}
};

7.cpp文件: server和client

server.cpp

#include "handler.hpp"
#include "common.hpp"const int defaultBacklog = 5;int getListenSock(uint16_t port)
{// 1.设置监听套接字int listensock = socket(AF_INET, SOCK_STREAM, 0);// 2.绑定监听套接字struct sockaddr_in addr;memset(&addr, 0, sizeof(addr));addr.sin_family = AF_INET;addr.sin_port = htons(port);addr.sin_addr.s_addr = INADDR_ANY;if (::bind(listensock, Conv(&addr), sizeof(addr)) == -1){LOG_SCREEN(FATAL) << "bind fail , port: " << port << "\n";exit(1);}LOG_SCREEN(DEBUG) << "bind success\n";// 3.进行监听if (listen(listensock, defaultBacklog) == -1){LOG_SCREEN(FATAL) << "listen fail , port: " << port << "\n";exit(1);}LOG_SCREEN(DEBUG) << "listen success\n";return listensock;
}int main(int argc, char *argv[])
{if (argc != 2){cout << "Usage: " << argv[0] << " server_port\n";return 1;}uint16_t port = stoi(argv[1]);// 1. 创建监听套接字,并设为非阻塞int listensock = getListenSock(port);ModifyFdToNonBlock(listensock);// 2. 注册监听套接字Reactor svr(port);svr.setupCare(listensock, true);svr.registerCallback(listensock, &Handler::accepter);// 3. dispatch就完事了svr.dispatch();return 0;
}

client.cpp

#include "Protocal.hpp"
#include <iostream>
using namespace std;
#include <vector>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <cstring>
#include "common.hpp"
#include <thread>void sender(int sockfd)
{int command;vector<string> inv = {"0hello", "0dp", "0ac"};while (true){cout << "是否发送请求, 0代表不发送,直接退出,1代表发送: [0/1]\n";cin >> command;if (command == 0)break;for (auto &in : inv){//  1.构建请求Request req;if (in[0] == '0'){req.setmode(ETOC);}elsereq.setmode(CTOE);req.setstr(in.substr(1));// 2.序列化string send_str = req.Serialize();// 3.encodeCodec::Encode(send_str);//cout << "发送数据: " << send_str;send(sockfd, send_str.c_str(), send_str.size(), 0);}}// 仅关闭写端// int shutdown(int sockfd, int how); how 设为 SHUT_WRLOG_SCREEN(INFO) << "发完数据啦,关闭套接字的写端!\n";shutdown(sockfd, SHUT_WR);
}
void reader(int sockfd)
{while (true){string out;int n;// 1.读取响应while (true){char buf[1024];n = recv(sockfd, buf, sizeof(buf) - 1, MSG_DONTWAIT);if (n >= 0){buf[n] = 0;out += buf;// cout << out << endl;if (n == 0)break;}else{if (errno == EAGAIN)break;if (errno == EINTR)continue;else{cout << "recv error, errno: " << errno << ", strerror(errno): " << strerror(errno) << "\n";return;}}}// 2.解决数据包粘包string return_str;while (Codec::Decode(out, &return_str)){// 3.反序列化Response resp;if (!resp.Deserialize(return_str)){cout << "response 反序列化失败,该报文直接丢弃\n";break;}// 打印即可cout << "server回复: " << return_str << "\n";}if (n == 0)break;}
}int main(int argc, char *argv[])
{if (argc != 3){cout << "Usage: " << argv[0] << " server_ip, server_port" << endl;return 1;}string ip = argv[1];uint16_t port = stoi(argv[2]);// 1. 创建socketint sockfd = socket(AF_INET, SOCK_STREAM, 0);// 2. connectstruct sockaddr_in server_addr;memset(&server_addr, 0, sizeof(server_addr));server_addr.sin_family = AF_INET;server_addr.sin_addr.s_addr = inet_addr(ip.c_str());server_addr.sin_port = htons(port);int n = connect(sockfd, Conv(&server_addr), sizeof(server_addr));if (n < 0){cout << "connect fail\n";return 1;}thread t1(sender, sockfd);thread t2(reader, sockfd);t1.join();t2.join();return 0;
}

common.cpp

#pragma once
#include <unistd.h>
#include <fcntl.h>
#include <cstring>#include <sys/types.h>
#include <sys/stat.h>
#include <unistd.h>
#include "Log.hpp"
#define Conv(addr) (reinterpret_cast<struct sockaddr *>(addr))
// 非阻塞式IO
void ModifyFdToNonBlock(int fd)
{int fl = fcntl(fd, F_GETFL); // 获取fd的状态给flif (fl < 0){LOG_SCREEN(FATAL) << "ModifyFdToNonBlock(int fd) error , errno: " << errno << " , strerror: " << strerror(errno) << "\n";return;}fcntl(fd, F_SETFL, fl | O_NONBLOCK); // 将fd设置为非阻塞状态
}

三.画图进一步理解reactor

画了两张图,大家看一下

1.版本1

在这里插入图片描述

2.版本2

在这里插入图片描述
在这里插入图片描述
仅仅1087行代码就能实现一个简单的reactor模式了

3.小总结

reactor模式主要包括:

  1. Reactor反应堆(核心)
  2. Connection(维护每个连接[fd]与其用户级inbuff,outbuff,还有回调函数)
  3. Epoller(封装epoll进行多路转接)
  4. Handler(Connection对应的回调函数)

再往上就是具体的业务处理模块了: 协议层和业务层

以上就是网络编程: reactor模式的步步探索与实现的全部内容,希望能对大家有所帮助!!

这篇关于网络编程: reactor模式的步步探索与实现的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1041154

相关文章

SpringBoot3实现Gzip压缩优化的技术指南

《SpringBoot3实现Gzip压缩优化的技术指南》随着Web应用的用户量和数据量增加,网络带宽和页面加载速度逐渐成为瓶颈,为了减少数据传输量,提高用户体验,我们可以使用Gzip压缩HTTP响应,... 目录1、简述2、配置2.1 添加依赖2.2 配置 Gzip 压缩3、服务端应用4、前端应用4.1 N

SpringBoot实现数据库读写分离的3种方法小结

《SpringBoot实现数据库读写分离的3种方法小结》为了提高系统的读写性能和可用性,读写分离是一种经典的数据库架构模式,在SpringBoot应用中,有多种方式可以实现数据库读写分离,本文将介绍三... 目录一、数据库读写分离概述二、方案一:基于AbstractRoutingDataSource实现动态

Python FastAPI+Celery+RabbitMQ实现分布式图片水印处理系统

《PythonFastAPI+Celery+RabbitMQ实现分布式图片水印处理系统》这篇文章主要为大家详细介绍了PythonFastAPI如何结合Celery以及RabbitMQ实现简单的分布式... 实现思路FastAPI 服务器Celery 任务队列RabbitMQ 作为消息代理定时任务处理完整

Linux系统配置NAT网络模式的详细步骤(附图文)

《Linux系统配置NAT网络模式的详细步骤(附图文)》本文详细指导如何在VMware环境下配置NAT网络模式,包括设置主机和虚拟机的IP地址、网关,以及针对Linux和Windows系统的具体步骤,... 目录一、配置NAT网络模式二、设置虚拟机交换机网关2.1 打开虚拟机2.2 管理员授权2.3 设置子

揭秘Python Socket网络编程的7种硬核用法

《揭秘PythonSocket网络编程的7种硬核用法》Socket不仅能做聊天室,还能干一大堆硬核操作,这篇文章就带大家看看Python网络编程的7种超实用玩法,感兴趣的小伙伴可以跟随小编一起... 目录1.端口扫描器:探测开放端口2.简易 HTTP 服务器:10 秒搭个网页3.局域网游戏:多人联机对战4.

Java枚举类实现Key-Value映射的多种实现方式

《Java枚举类实现Key-Value映射的多种实现方式》在Java开发中,枚举(Enum)是一种特殊的类,本文将详细介绍Java枚举类实现key-value映射的多种方式,有需要的小伙伴可以根据需要... 目录前言一、基础实现方式1.1 为枚举添加属性和构造方法二、http://www.cppcns.co

使用Python实现快速搭建本地HTTP服务器

《使用Python实现快速搭建本地HTTP服务器》:本文主要介绍如何使用Python快速搭建本地HTTP服务器,轻松实现一键HTTP文件共享,同时结合二维码技术,让访问更简单,感兴趣的小伙伴可以了... 目录1. 概述2. 快速搭建 HTTP 文件共享服务2.1 核心思路2.2 代码实现2.3 代码解读3.

MySQL双主搭建+keepalived高可用的实现

《MySQL双主搭建+keepalived高可用的实现》本文主要介绍了MySQL双主搭建+keepalived高可用的实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,... 目录一、测试环境准备二、主从搭建1.创建复制用户2.创建复制关系3.开启复制,确认复制是否成功4.同

Java实现文件图片的预览和下载功能

《Java实现文件图片的预览和下载功能》这篇文章主要为大家详细介绍了如何使用Java实现文件图片的预览和下载功能,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... Java实现文件(图片)的预览和下载 @ApiOperation("访问文件") @GetMapping("

Java并发编程必备之Synchronized关键字深入解析

《Java并发编程必备之Synchronized关键字深入解析》本文我们深入探索了Java中的Synchronized关键字,包括其互斥性和可重入性的特性,文章详细介绍了Synchronized的三种... 目录一、前言二、Synchronized关键字2.1 Synchronized的特性1. 互斥2.