本文主要是介绍Linux--IO模型_多路转接,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
目录
0.往期文章
1.五种IO模型介绍
概念
调用函数(非阻塞IO)
2.详解多路转接 之select
select函数介绍
设置文件描述符
写一个基于select的TCP服务器
辅助库
基于TCP的Socket封装
服务器代码
测试服务器
小结
3.详解多路转接 之poll
poll函数介绍
pollfd 结构
写一个基于poll的TCP服务器
小结
0.往期文章
Linux--应用层协议HTTP协议(http服务器构建)-CSDN博客
Linux--传输层协议UDP-CSDN博客
Linux--传输层协议TCP-CSDN博客
1.五种IO模型介绍
概念
1. 阻塞IO模型
- 特点:在阻塞IO模型中,应用程序发起一个IO请求后会一直阻塞等待操作完成,直到数据准备好或者超时才返回结果。在等待IO完成期间,应用程序会处于阻塞状态,无法执行其他任务。
- 典型应用:阻塞socket、Java BIO等。
- 优点:实现难度低,开发应用较容易。
- 缺点:不适用并发量大的应用,因为每个请求IO都会阻塞进程,需要为每个请求分配一个处理进程(线程),系统开销大。
2. 非阻塞IO模型
- 特点:应用程序发起一个IO请求后会立即返回,无需等待操作完成。应用程序需要不断轮询或者使用事件通知来检查操作是否完成。
- 典型应用:socket设置为NONBLOCK模式。
- 优点:在等待数据的过程中可以立即返回,用户线程不会被阻塞,实时性较好。
- 缺点:进程轮询调用会消耗CPU资源,且实现难度和复杂度相对较高。
3. IO多路复用/多路转接模型
- 特点:使用操作系统提供的select、poll或epoll等多路复用机制,允许应用程序同时监视多个IO事件。应用程序可以将多个IO请求注册到一个多路复用器上,然后通过轮询或者阻塞等待多路复用器通知事件的发生。
- 典型应用:JAVA7 AIO、高性能服务器应用等。
- 优点:不阻塞,数据一步到位,提高了系统的并发性能。
- 缺点:需要操作系统的底层支持,且对单个连接的处理速度可能不如其他模型。
4. 信号驱动的IO模型
- 特点:使用信号机制来实现异步IO,应用程序通过向内核注册信号处理函数来处理IO事件。当IO操作完成时,内核会发送一个信号通知应用程序,然后由应用程序在信号处理函数中处理该事件。
- 优点:相比阻塞IO和非阻塞IO更为灵活,适用于需要处理多个IO事件的场景。
- 缺点:在Linux中信号队列是有限制的,如果超过限制可能导致无法读取数据。此外,信号处理函数的执行可能会受到系统调用的限制。
5. 异步IO模型
- 特点:通过操作系统提供的异步IO接口来实现,应用程序发起一个IO请求后会立即返回,并且在操作完成后会通过回调或事件通知的方式通知应用程序。应用程序无需等待操作完成,可以继续执行其他任务。
- 典型应用:需要高并发、高性能的场景,如网络服务器、大规模并行计算等。
- 优点:真正实现了非阻塞IO,提高了系统的并发性能和吞吐量。
- 缺点:实现难度和复杂度较高,需要操作系统和应用程序的紧密配合。
前面四种,都是同步IO,因为它们都参与了IO的过程。
调用函数(非阻塞IO)
非阻塞IO
fcntl函数:一个文件描述符, 默认都是阻塞 IO,通过fcntl可以改变已打开的文件性质。
其中,
fd
参数代表欲设置的文件描述符,cmd
参数代表打算操作的指令,根据cmd
的值,fcntl
可以接受第三个参数。
- 复制一个现有的描述符(cmd=F_DUPFD) .
- 获得/设置文件描述符标记(cmd=F_GETFD 或 F_SETFD).
- 获得/设置文件状态标记(cmd=F_GETFL 或 F_SETFL).
- 获得/设置异步 I/O 所有权(cmd=F_GETOWN 或 F_SETOWN).
- 获得/设置记录锁(cmd=F_GETLK,F_SETLK 或 F_SETLKW)
我们此处只是用第三种功能, 获取/设置文件状态标记, 就可以将一个文件描述符设置为
非阻塞。下面是一个示例:
Comm.hpp
#include <iostream> #include <unistd.h> #include <fcntl.h>void SetNonBlock(int fd) {int fl = ::fcntl(fd, F_GETFL);if(fl < 0){std::cout << "fcntl error" << std::endl;return;}::fcntl(fd, F_SETFL, fl | O_NONBLOCK); }
fcntl
函数和F_GETFL
命令来获取与fd
关联的文件状态标志。这些标志包括文件是否以只读、只写或读写模式打开,以及是否设置了非阻塞模式等。再次调用fcntl
函数,但这次使用F_SETFL
命令来设置文件描述符的标志。它将之前获取的标志fl
与O_NONBLOCK
标志进行按位或操作,然后将结果作为新的标志集传递给fcntl
。
O_NONBLOCK
标志指定对文件描述符非阻塞,当设置了这个标志后,如果某个 I/O 操作不能立即完成,调用该操作的函数将不会使调用线程进入睡眠状态,而是立即返回一个错误,通常是EAGAIN
或EWOULDBLOCK
。Main.cc
#include <iostream> #include <cstdio> #include <unistd.h> #include "Comm.hpp"#include <sys/select.h>int main() {char buffer[1024];SetNonBlock(0);while(true){ssize_t n = ::read(0, buffer, sizeof(buffer)-1);if(n > 0){buffer[n] = 0;printf("echo# %s", buffer);}else if(n == 0) // ctrl + d{printf("read done\n");break;}else{// 如果是非阻塞,底层数据没有就绪,IO接口,会以出错形式返回// 所以,如何区分 底层不就绪 vs 真的出错了? 根据errno错误码if(errno == EWOULDBLOCK){sleep(1);std::cout << "底层数据没有就绪,开始轮询检测" << std::endl;std::cout << "do other thing" << std::endl;continue;}else if(errno == EINTR)//被信号中断{continue; }else{perror("read");//读写错误break;}}}return 0; }
有输入的时候,就向显示器输出,没有的时候,进程可以做其他的事情。
2.详解多路转接 之select
多路转接的作用:为了等待多个fd,等该fd上面的新事件就绪(OS底层有数据了->读事件就绪;OS底层有看见了->写事件就绪了),通知程序员,事件已经就绪,可以就绪IO拷贝了!(IO = 等 + 拷贝,多路转接的作用就是在等上)
select函数介绍
定位:只负责进行等,不进行拷贝。
作用:select 系统调用是用来让我们的程序监视多个文件描述符的状态变化的;程序会停在 select 这里等待, 直到被监视的文件描述符有一个或多个发生了状态改变。参数:
- nfds:这是一个整数值,指定了被检查的文件描述符的数量。它应该设置为文件描述符集合中的最大值加1。不过,在实际应用中,这个参数常常被设置为文件描述符集合中最大的文件描述符加1,但这并不是严格要求的,因为内核会忽略大于最大文件描述符的值。
- readfds:这是一个指向
fd_set
的指针,用于指定哪些文件描述符应该被检查可读性。如果设置为NULL
,则不检查可读性。- writefds:这是一个指向
fd_set
的指针,用于指定哪些文件描述符应该被检查可写性。如果设置为NULL
,则不检查可写性。- exceptfds:这是一个指向
fd_set
的指针,用于指定哪些文件描述符应该被检查异常条件(如带外数据到达)。如果设置为NULL
,则不检查异常条件。- timeout:这是一个指向
timeval
结构的指针,指定了等待的最大时间。如果设置为NULL
,则调用将无限期阻塞,直到至少有一个文件描述符就绪。如果timeout
中的秒数和微秒数都设置为0,则select
将立即返回,而不会等待文件描述符就绪。eg:timeval timeout={5,0},表示5秒内阻塞等待,5秒过后超时;timeval timeout={0,0},非阻塞轮询struct timeval的结构体类型:
struct timeval {long tv_sec; /* seconds */long tv_usec; /* microseconds */ };
返回值:
- 成功时,
select
返回就绪(可读、可写或异常)的文件描述符数量。- 如果在调用时没有任何文件描述符就绪,并且
timeout
指定的时间已经过去,则返回0。- 如果发生错误,则返回-1,并设置
errno
以指示错误类型。使用
fd_set:大小128字节,1024个bit位(32位机器)
fd_set
是一个位向量,表示文件描述符集,其中每一位对应一个文件描述符。使用以下宏来操作fd_set
:
FD_ZERO(fd_set *set)
:将set
中的所有位清零。FD_SET(int fd, fd_set *set)
:将set
中对应于fd
的位设置为1。FD_CLR(int fd, fd_set *set)
:将set
中对应于fd
的位清零。FD_ISSET(int fd, fd_set *set)
:如果set
中对应于fd
的位被设置,则返回非零值(真)。设置文件描述符
select可以同时监视多个文件描述符(套接字)。
此时需要先将文件描述符集中到一起。集中时也要按照监视项(接收,传输,异常)进行区分,即按照上述3种监视项分成三类。
使用fd_set数组变量执行此项操作,该数组是存有0和1的位数组。
数组是从下标0开始,最左端的位表示文件描述符0。如果该位值为1,则表示该文件描述符是监视对象。
图上显然监视对象为fd1和fd3。“是否应当通过文件描述符的数字直接将值注册到fd_set变量?”
当然不是!操作fd_set的值由如下宏来完成:
写一个基于select的TCP服务器
辅助库
用于封装和处理 IP 地址及其端口号:InetAddr.hpp
#include <iostream> #include <string> #include <sys/types.h> #include <sys/socket.h> #include <arpa/inet.h> #include <netinet/in.h>class InetAddr { private:void ToHost(const struct sockaddr_in &addr){_port = ntohs(addr.sin_port);// _ip = inet_ntoa(addr.sin_addr);char ip_buf[32];// inet_p to n// p: process// n: net// inet_pton(int af, const char *src, void *dst);// inet_pton(AF_INET, ip.c_str(), &addr.sin_addr.s_addr);::inet_ntop(AF_INET, &addr.sin_addr, ip_buf, sizeof(ip_buf));_ip = ip_buf;}public:InetAddr(const struct sockaddr_in &addr):_addr(addr){ToHost(addr);}InetAddr(){}bool operator == (const InetAddr &addr){return (this->_ip == addr._ip && this->_port == addr._port);}std::string Ip(){return _ip;}uint16_t Port(){return _port;}struct sockaddr_in Addr(){return _addr;}std::string AddrStr(){return _ip + ":" + std::to_string(_port);}~InetAddr(){}private:std::string _ip;uint16_t _port;struct sockaddr_in _addr; };
日志库:Log.hpp
#include <iostream> #include <sys/types.h> #include <unistd.h> #include <ctime> #include <cstdarg> #include <fstream> #include <cstring> #include <pthread.h> #include "LockGuard.hpp"namespace log_ns {enum{DEBUG = 1,INFO,WARNING,ERROR,FATAL};std::string LevelToString(int level){switch (level){case DEBUG:return "DEBUG";case INFO:return "INFO";case WARNING:return "WARNING";case ERROR:return "ERROR";case FATAL:return "FATAL";default:return "UNKNOWN";}}std::string GetCurrTime(){time_t now = time(nullptr);struct tm *curr_time = localtime(&now);char buffer[128];snprintf(buffer, sizeof(buffer), "%d-%02d-%02d %02d:%02d:%02d",curr_time->tm_year + 1900,curr_time->tm_mon + 1,curr_time->tm_mday,curr_time->tm_hour,curr_time->tm_min,curr_time->tm_sec);return buffer;}class logmessage{public:std::string _level;pid_t _id;std::string _filename;int _filenumber;std::string _curr_time;std::string _message_info;};#define SCREEN_TYPE 1 #define FILE_TYPE 2const std::string glogfile = "./log.txt";pthread_mutex_t glock = PTHREAD_MUTEX_INITIALIZER;// log.logMessage("", 12, INFO, "this is a %d message ,%f, %s hellwrodl", x, , , );class Log{public:Log(const std::string &logfile = glogfile) : _logfile(logfile), _type(SCREEN_TYPE){}void Enable(int type){_type = type;}void FlushLogToScreen(const logmessage &lg){printf("[%s][%d][%s][%d][%s] %s",lg._level.c_str(),lg._id,lg._filename.c_str(),lg._filenumber,lg._curr_time.c_str(),lg._message_info.c_str());}void FlushLogToFile(const logmessage &lg){std::ofstream out(_logfile, std::ios::app);if (!out.is_open())return;char logtxt[2048];snprintf(logtxt, sizeof(logtxt), "[%s][%d][%s][%d][%s] %s",lg._level.c_str(),lg._id,lg._filename.c_str(),lg._filenumber,lg._curr_time.c_str(),lg._message_info.c_str());out.write(logtxt, strlen(logtxt));out.close();}void FlushLog(const logmessage &lg){// 加过滤逻辑 --- TODOLockGuard lockguard(&glock);switch (_type){case SCREEN_TYPE:FlushLogToScreen(lg);break;case FILE_TYPE:FlushLogToFile(lg);break;}}void logMessage(std::string filename, int filenumber, int level, const char *format, ...){logmessage lg;lg._level = LevelToString(level);lg._id = getpid();lg._filename = filename;lg._filenumber = filenumber;lg._curr_time = GetCurrTime();va_list ap;va_start(ap, format);char log_info[1024];vsnprintf(log_info, sizeof(log_info), format, ap);va_end(ap);lg._message_info = log_info;// 打印出来日志FlushLog(lg);}~Log(){}private:int _type;std::string _logfile;};Log lg;#define LOG(Level, Format, ...) \do \{ \lg.logMessage(__FILE__, __LINE__, Level, Format, ##__VA_ARGS__); \} while (0) #define EnableScreen() \do \{ \lg.Enable(SCREEN_TYPE); \} while (0) #define EnableFILE() \do \{ \lg.Enable(FILE_TYPE); \} while (0) };
给日志库上锁,保证线程安全:LockGuard.hpp
#include <pthread.h>class LockGuard { public:LockGuard(pthread_mutex_t *mutex):_mutex(mutex){pthread_mutex_lock(_mutex);}~LockGuard(){pthread_mutex_unlock(_mutex);} private:pthread_mutex_t *_mutex; };
基于TCP的Socket封装
使得Socket的使用更加面向对象。
#include <iostream> #include <cstring> #include <functional> #include <unistd.h> #include <sys/types.h> #include <sys/socket.h> #include <netinet/in.h> #include <arpa/inet.h> #include <sys/wait.h> #include <pthread.h> #include <memory>#include "Log.hpp" #include "InetAddr.hpp" //以下是对socket的封装,方便面向对象式的使用socket namespace socket_ns {using namespace log_ns;class Socket;using SockSPtr = std::shared_ptr<Socket>;//Socket是虚基类,实际上是拿TcpSocket//定义的对象enum//创建失败的常量{SOCKET_ERROR = 1,BIND_ERROR,LISTEN_ERR};const static int gblcklog = 8;//监听队列默认大小。// 模版方法模式class Socket{public:virtual void CreateSocketOrDie() = 0;virtual void CreateBindOrDie(uint16_t port) = 0;virtual void CreateListenOrDie(int backlog = gblcklog) = 0;virtual SockSPtr Accepter(InetAddr *cliaddr) = 0;virtual bool Conntecor(const std::string &peerip, uint16_t peerport) = 0;virtual int Sockfd() = 0;virtual void Close() = 0;virtual ssize_t Recv(std::string *out) = 0;//进行读取virtual ssize_t Send(const std::string &in) = 0;//进行发送public:void BuildListenSocket(uint16_t port)//创建监听套接字{CreateSocketOrDie();CreateBindOrDie(port);CreateListenOrDie();}//创建客户端套接字bool BuildClientSocket(const std::string &peerip, uint16_t peerport){CreateSocketOrDie();return Conntecor(peerip, peerport);}// void BuildUdpSocket()// {}};class TcpSocket : public Socket{public:TcpSocket(){}//监听套接字初始化/构造函数式的初始化TcpSocket(int sockfd) : _sockfd(sockfd){}~TcpSocket(){}void CreateSocketOrDie() override{// 1. 创建socket_sockfd = ::socket(AF_INET, SOCK_STREAM, 0);if (_sockfd < 0){LOG(FATAL, "socket create error\n");exit(SOCKET_ERROR);}LOG(INFO, "socket create success, sockfd: %d\n", _sockfd); // 3}void CreateBindOrDie(uint16_t port) override//bind{struct sockaddr_in local;memset(&local, 0, sizeof(local));local.sin_family = AF_INET;local.sin_port = htons(port);local.sin_addr.s_addr = INADDR_ANY;// 2. bind sockfd 和 Socket addrif (::bind(_sockfd, (struct sockaddr *)&local, sizeof(local)) < 0){LOG(FATAL, "bind error\n");exit(BIND_ERROR);}LOG(INFO, "bind success, sockfd: %d\n", _sockfd); // 3}//监听void CreateListenOrDie(int backlog) override{// 3. 因为tcp是面向连接的,tcp需要未来不断地能够做到获取连接if (::listen(_sockfd, gblcklog) < 0){LOG(FATAL, "listen error\n");exit(LISTEN_ERR);}LOG(INFO, "listen success\n");}//方便获取客户端地址,accept获取一个新的文件描述符//而该文件描述符本质就是ip+端口号//之前我们使用文件描述符都是面向过程,都是作为函数参数进行传递的//我们需要面向对象的使用套接字,我们将得到的IO文件描述符设置进套接字里面//返回该套接字//using SockSPtr = std::shared_ptr<Socket>;//Socket是虚基类,实际上是拿TcpSocket//定义的对象SockSPtr Accepter(InetAddr *cliaddr) override{struct sockaddr_in client;socklen_t len = sizeof(client);// 4. 获取新连接:得到一个新的文件描述符,得到新的客户端int sockfd = ::accept(_sockfd, (struct sockaddr *)&client, &len);if (sockfd < 0){LOG(WARNING, "accept error\n");return nullptr;}*cliaddr = InetAddr(client);LOG(INFO, "get a new link, client info : %s, sockfd is : %d\n", cliaddr->AddrStr().c_str(), sockfd);return std::make_shared<TcpSocket>(sockfd); // C++14}//连接目标服务器(是否成功)//客户端ip和端口号bool Conntecor(const std::string &peerip, uint16_t peerport) override{struct sockaddr_in server;memset(&server, 0, sizeof(server));server.sin_family = AF_INET;server.sin_port = htons(peerport);//将IPv4地址的字符串形式转换为网络字节顺序的二进制形式,//并将其存储在server.sin_addr中::inet_pton(AF_INET, peerip.c_str(), &server.sin_addr);int n = ::connect(_sockfd, (struct sockaddr *)&server, sizeof(server));if (n < 0){ return false;}return true;}int Sockfd()//文件描述符{return _sockfd;}void Close(){if (_sockfd > 0){::close(_sockfd);}}ssize_t Recv(std::string *out) override//读到的消息{char inbuffer[4096];//从sockfd中读ssize_t n = ::recv(_sockfd, inbuffer, sizeof(inbuffer) - 1, 0);if (n > 0){inbuffer[n] = 0;//这里不能是=,不可以覆盖式的读取,因为每一次可能并不是读取到一条完整的报文// "len"\r\n// "len"\r\n"{json}"\r\n//向上面的情况如果覆盖的读取将读取不到完整的报文了//所以要用+=*out += inbuffer;}return n;}ssize_t Send(const std::string &in) override{return ::send(_sockfd, in.c_str(), in.size(), 0);}private:int _sockfd; // 可以是listensock,普通socketfd};// class UdpSocket : public Socket// {}; } // namespace socket_n
代码逻辑:
- 命名空间和类定义:
- 定义了一个命名空间
socket_ns
,用于封装Socket相关的类和函数。- 定义了一个基类
Socket
,它是一个抽象类,提供了Socket操作的基本接口,如创建、绑定、监听、接收连接、发送和接收数据等。- 定义了一个派生类
TcpSocket
,它继承自Socket
类,并实现了所有虚函数,提供了TCP Socket的具体实现。- Socket基类:
- 定义了多个纯虚函数,包括创建Socket、绑定、监听、接受连接、连接服务器、获取文件描述符、关闭Socket、接收和发送数据等。
- 提供了一个构建监听Socket的成员函数
BuildListenSocket
,它依次调用创建Socket、绑定和监听函数来初始化监听Socket。- 提供了一个构建客户端Socket的成员函数
BuildClientSocket
,它调用创建Socket和连接服务器函数来初始化客户端Socket。- TcpSocket类:
- 实现了
Socket
类中的所有纯虚函数,提供了TCP Socket的具体实现。- 在构造函数中,可以初始化一个已存在的文件描述符,或者通过调用
CreateSocketOrDie
函数创建一个新的Socket文件描述符。CreateSocketOrDie
函数用于创建一个新的Socket文件描述符。CreateBindOrDie
函数用于将Socket绑定到一个指定的端口上。CreateListenOrDie
函数用于将Socket设置为监听模式,以便接受连接。Accepter
函数用于接受一个新的连接,并返回一个表示该连接的TcpSocket
对象。Conntecor
函数用于连接到一个指定的服务器。Sockfd
函数用于获取Socket的文件描述符。Close
函数用于关闭Socket。Recv
函数用于从Socket接收数据。Send
函数用于向Socket发送数据。- 日志和错误处理:
- 使用了自定义的日志系统(
log_ns
命名空间中的LOG
宏)来记录日志和错误信息。- 在发生错误时,使用
exit
函数终止程序,并传递一个错误码。- 内存管理:
- 使用了智能指针(
std::shared_ptr
)来管理TcpSocket
对象的内存,以避免内存泄漏。服务器代码
该服务器仅用于对select应用的测试, 没有上层逻辑,不完整。
#pragma once#include <iostream> #include <sys/select.h> #include "Socket.hpp" #include "Log.hpp" #include "InetAddr.hpp"using namespace socket_ns;class SelectServer {//位图有多少个bit位,就定义多大const static int gnum = sizeof(fd_set) * 8;const static int gdefaultfd = -1;public:SelectServer(uint16_t port) : _port(port), _listensock(std::make_unique<TcpSocket>()){_listensock->BuildListenSocket(_port);}void InitServer(){for (int i = 0; i < gnum; i++){fd_array[i] = gdefaultfd;//初始化辅助数组}fd_array[0] = _listensock->Sockfd(); // 默认直接添加listensock到数组中}// 处理新连接的void Accepter(){// 我们叫做连接事件就绪,等价于读事件就绪InetAddr addr;int sockfd = _listensock->Accepter(&addr); // 会不会被阻塞!一定不会!if (sockfd > 0){LOG(DEBUG, "get a new link, client info %s:%d\n", addr.Ip().c_str(), addr.Port());// 已经获得了一个新的sockfd// 接下来我们可以读取吗?绝对不能读!读取的时候,条件不一定满足// 谁最清楚底层fd的数据是否就绪了呢??通过select!// select 为什么等待的fd会越来越多?//listensockt在获取新链接的同时,要把新链接添加到select当中// 想办法把新的fd添加给select,由select统一进行监管。怎么做到??// 只要将新的fd,添加到fd_array中即可!bool flag = false;for (int pos = 1; pos < gnum; pos++){if (fd_array[pos] == gdefaultfd){flag = true;fd_array[pos] = sockfd;//添加fdLOG(INFO, "add %d to fd_array success!\n", sockfd);break;}}if (!flag)//表示没有缺省值,已经添加满了{LOG(WARNING, "Server Is Full!\n");::close(sockfd);//select无法监管,关闭fd}}}// 处理普通的fd就绪的void HandlerIO(int i){// 下面的读写对吗?// 普通的文件描述符,正常的读写char buffer[1024];ssize_t n = ::recv(fd_array[i], buffer, sizeof(buffer) - 1, 0); // 这里读取会阻塞吗?不会,已经就绪if (n > 0){buffer[n] = 0;std::cout << "client say# " << buffer << std::endl;std::string content = "<html><body><h1>hello Linux</h1></body></html>";std::string echo_str = "HTTP/1.0 200 OK\r\n";echo_str += "Content-Type: text/html\r\n";echo_str += "Content-Length: " + std::to_string(content.size()) + "\r\n\r\n";echo_str += content;// echo_str += buffer;//一个fd被新的accept创建的时候,读写缓冲区基本都是空的,所以在这可以直接向//fd中写::send(fd_array[i], echo_str.c_str(), echo_str.size(), 0); // 临时方案}else if (n == 0)//连接关闭了{LOG(INFO, "client quit...\n");// 关闭fd::close(fd_array[i]);// select 不要在关心这个fd了fd_array[i] = gdefaultfd;}else//读出错了{LOG(ERROR, "recv error\n");// 关闭fd::close(fd_array[i]);// select 不要在关心这个fd了fd_array[i] = gdefaultfd;}}// 一定会存在大量的fd就绪,可能是普通sockfd,也可能是listensockfdvoid HandlerEvent(fd_set &rfds) {// 事件派发for (int i = 0; i < gnum; i++){if (fd_array[i] == gdefaultfd)continue;// fd一定是合法的fd// 合法的fd不一定就绪, 判断fd是否就绪?if (FD_ISSET(fd_array[i], &rfds))//看看文件描述符在不在rfds中{// 读事件就绪// 1. listensockfd 2. normal sockfd就绪?if (_listensock->Sockfd() == fd_array[i]){Accepter();}else {HandlerIO(i);}}}}void Loop(){while (true){// 1. 文件描述符进行初始化fd_set rfds;//读文件fd集FD_ZERO(&rfds);//将set中的所有位清零int max_fd = gdefaultfd;// 2. 合法的fd 添加到rfds集合中for (int i = 0; i < gnum; i++){if (fd_array[i] == gdefaultfd)continue;FD_SET(fd_array[i], &rfds);// 2.1 更新出最大的文件fd的值if (max_fd < fd_array[i]){max_fd = fd_array[i];}}struct timeval timeout = {30, 0};//超时时间// _listensock->Accepter();// 不能,listensock && accept 我们把他也看做IO类的函数。// 只关心新链接到来,等价于读事件就绪!// 只关心读事件,监控监听套接字(socket)的读事件->是否有新链接int n = ::select(max_fd + 1, &rfds, nullptr, nullptr, nullptr /*&timeout*/); // 临时switch (n){case 0://服务器select超时//timeout.tv_sec:这个成员变量表示超时时间中的秒数部分//timeout.tv_usec:这个成员变量表示超时时间中的微秒数部分。LOG(DEBUG, "time out, %d.%d\n", timeout.tv_sec, timeout.tv_usec);break; case -1:LOG(ERROR, "select error\n");break;default://LOG(DEBUG, "time out, %d.%d\n", timeout.tv_sec, timeout.tv_usec);LOG(INFO, "haved event ready, n : %d\n", n); // 如果事件就绪,但是不处理,select会一直通知我,直到我处理了!HandlerEvent(rfds);//处理事件 PrintDebug();// sleep(1);break;}}}void PrintDebug()//打印出所有合法的fd{std::cout << "fd list: ";for (int i = 0; i < gnum; i++){if (fd_array[i] == gdefaultfd)continue;std::cout << fd_array[i] << " ";}std::cout << "\n";}~SelectServer() {}private:uint16_t _port;std::unique_ptr<Socket> _listensock;// select要正常工作,需要借助一个辅助数组,来保存所有合法fd//方便对rfds进行重置int fd_array[gnum]; };
该代码实现了一个基于
select
方法的TCP服务器,其主要逻辑可以分为以下几个部分:
- 初始化服务器:
- 在构造函数中,通过
_listensock
成员(一个std::unique_ptr<TcpSocket>
)创建一个监听套接字,并绑定到指定的端口上。InitServer
方法用于初始化一个固定大小的fd_array
数组,用于存储所有当前被select
监控的文件描述符(包括监听套接字和已接受的客户端连接)。监听套接字的文件描述符被直接放入数组的第一个位置。- 接受新连接:
Accepter
方法用于处理监听套接字上的新连接。当有新连接到来时,它会接受这个连接,并将新连接的文件描述符添加到fd_array
数组中(如果有空位的话)。如果没有空位,则关闭新连接的文件描述符。- 处理IO事件:
HandlerIO
方法用于处理普通文件描述符(即客户端连接)的就绪事件。它读取客户端发送的数据,并回复一个简单的HTTP响应。如果读取到0字节(表示连接关闭),或者读取出错,则关闭文件描述符,并从fd_array
中移除它。- 事件循环:
Loop
方法是服务器的主循环,它不断使用select
函数来等待文件描述符的就绪事件。每次循环,它都会重新构建rfds
集合,只包含当前fd_array
中有效的文件描述符。然后,它调用select
等待这些文件描述符的就绪事件。- 当
select
返回时,HandlerEvent
方法被调用以处理就绪的事件。如果是监听套接字就绪,则调用Accepter
接受新连接;如果是普通文件描述符就绪,则调用HandlerIO
处理IO事件。- 调试和日志:
PrintDebug
方法用于打印当前所有被select
监控的文件描述符,以便于调试。- 使用
LOG
宏进行日志记录,帮助追踪服务器的运行状态。- 资源管理:
- 使用
std::unique_ptr<Socket>
自动管理监听套接字的生命周期。- 在
Accepter
和HandlerIO
中,如果无法将新连接添加到fd_array
或遇到读取错误,会关闭相应的文件描述符,并从fd_array
中移除它。测试服务器
#include "SelectServer.hpp" #include <memory>int main(int argc, char *argv[]) {if (argc != 2){std::cerr << "Usage: " << argv[0] << " local-port" << std::endl;exit(0);}uint16_t port = std::stoi(argv[1]);EnableScreen();std::unique_ptr<SelectServer> svr = std::make_unique<SelectServer>(port);svr->InitServer();svr->Loop();return 0; }
- 使用
std::make_unique<SelectServer>(port)
创建一个SelectServer
类型的std::unique_ptr
智能指针svr
,并将命令行参数指定的端口号传递给SelectServer
的构造函数。std::make_unique
是一个C++14引入的函数模板,用于创建并返回一个拥有给定类型对象的std::unique_ptr
。- 调用
svr->InitServer()
初始化服务器。这个函数的具体实现应该包括设置监听端口、创建socket等准备工作。- 调用
svr->Loop()
进入服务器的事件循环。在这个循环中,服务器将等待并处理客户端的连接请求、接收数据、发送响应等。使用浏览器访问,服务器收到请求,并处理返回。通过
select
方法,它能够在单个线程中高效地管理多个客户端连接。然而,需要注意的是,由于fd_array
的大小是固定的,这限制了服务器能够同时处理的客户端连接数量。在实际应用中,可能需要采用更高级的多路复用技术(如poll
、epoll
)或引入线程池来处理更多的并发连接。小结
特点
- 可监控的文件描述符个数取决于 sizeof(fd_set)的值. 我这边服务器上sizeof(fd_set)= 512, 每 bit 表示一个文件描述符, 则我服务器上支持的最大文件描述符是 512*8=4096.
- 将 fd 加入 select 监控集的同时, 还要再使用一个数据结构 array 保存放到 select监控集中的 fd:一是用于再 select 返回后, array 作为源数据和 fd_set 进行 FD_ISSET 判断;二是 select 返回后会把以前加入的但并无事件发生的 fd 清空, 则每次开始select 前都要重新从 array 取得 fd 逐一加入(FD_ZERO 最先), 扫描 array 的同时取得 fd 最大值 maxfd, 用于 select 的第一个参数。
缺点
- 每次调用 select, 都需要手动设置 fd 集合, 从接口使用角度来说也非常不便.
- 每次调用 select, 都需要把 fd 集合从用户态拷贝到内核态, 这个开销在 fd 很多时会很大
- 同时每次调用 select 都需要在内核遍历传递进来的所有 fd, 这个开销在 fd 很多时也很大
- select 支持的文件描述符数量太小
3.详解多路转接 之poll
poll函数介绍
定位:只负责进行等,不进行拷贝。
作用:与select一样,等待多个fd,事件一旦就绪,就进行IO
fds
是一个指向pollfd
结构数组的指针,每个pollfd
结构都指定了一个要监视的文件描述符和感兴趣的事件。nfds
是数组fds
中元素的数量,即要监视的文件描述符的数量。timeout
指定了函数等待 I/O 事件发生的超时时间(以毫秒为单位)。如果timeout
为-1
,则poll
将无限期地等待,直到至少有一个文件描述符就绪;如果timeout
为0
,为非阻塞IO,poll
将立即返回,不会等待任何文件描述符就绪。返回值
- 正整数(>0):
- 表示在调用期间,至少有一个文件描述符的状态发生了指定的变化(如可读、可写或出现错误)。具体地说,这个正整数表示状态发生变化的文件描述符的数量。此时,调用者需要通过检查
pollfd
结构体数组的revents
字段来确定哪些文件描述符的状态发生了变化。- 0:
- 表示在指定的超时时间内,没有任何文件描述符的状态发生变化。这通常意味着所有被监控的文件描述符都处于非就绪状态,或者指定的超时时间已经到达。
- -1:
- 表示
poll
函数调用过程中发生了错误。此时,可以通过检查全局变量errno
来获取具体的错误原因。常见的错误包括无效的文件描述符、系统资源不足等。pollfd 结构
不同于 select 使用三个位图来表示三个 fdset 的方式, poll 使用一个 pollfd 的指针实现
pollfd
结构体用于指定要监视的文件描述符和事件:struct pollfd { int fd; /* 文件描述符 */ short events; /* 感兴趣的事件 */ short revents; /* 返回的事件 */ };
fd
是要监视的文件描述符。events
是请求监视的事件集合,可以通过位或操作组合多个事件,如POLLIN
(有数据可读)、POLLOUT
(写操作不再阻塞)等。revents
是由poll
函数返回时设置的事件集合,表示在fd
上实际发生了哪些事件。events 和 revents 的取值:每个事件都是宏
使用 poll
使用
poll
时,你首先需要准备一个pollfd
结构体数组,每个元素都指定了要监视的文件描述符和感兴趣的事件。然后,调用poll
函数并传入这个数组。poll
函数会阻塞等待(除非timeout
指定为 0),直到至少有一个文件描述符就绪,或者超时发生。最后,你可以通过检查每个pollfd
结构体的revents
字段来确定哪些文件描述符就绪,并据此执行相应的操作。1.用户告诉内核,你要我关心哪些fd(设置参数fd)上的哪些事件(设置参数events);
2.内核告诉用户,你要我关心哪些fd(设置参数fd)上的哪些事件(设置参数revents);
因为接口设置的好,就无需对fd的事件进行重新设定了
优点和缺点
优点:
- 相比
select
,poll
没有文件描述符数量的硬限制(尽管实际上仍然受到系统资源的限制)。poll
的接口更加清晰和灵活,可以指定对每个文件描述符感兴趣的具体事件。缺点:
- 当监视的文件描述符数量非常多时,
poll
的效率可能会下降,因为它仍然需要遍历整个pollfd
数组来检查哪些文件描述符就绪。poll
的可移植性可能不如select
,因为并非所有系统都提供了poll
函数。
写一个基于poll的TCP服务器
该服务器实现思路与select一样,只是用了poll函数:
#include <iostream> #include <poll.h> #include "Socket.hpp" #include "Log.hpp" #include "InetAddr.hpp"using namespace socket_ns;class PollServer {const static int gnum = sizeof(fd_set) * 8;const static int gdefaultfd = -1;public:PollServer(uint16_t port) : _port(port), _listensock(std::make_unique<TcpSocket>()){_listensock->BuildListenSocket(_port);}void InitServer(){for (int i = 0; i < gnum; i++){fd_events[i].fd = gdefaultfd;fd_events[i].events = 0;fd_events[i].revents = 0;}fd_events[0].fd = _listensock->Sockfd(); // 默认直接添加listensock到数组中fd_events[0].events = POLLIN;//关心读事件}// 处理新连接的void Accepter(){// 我们叫做连接事件就绪,等价于读事件就绪InetAddr addr;int sockfd = _listensock->Accepter(&addr); // 会不会被阻塞!一定不会!if (sockfd > 0){LOG(DEBUG, "get a new link, client info %s:%d\n", addr.Ip().c_str(), addr.Port());// 已经获得了一个新的sockfd// 接下来我们可以读取吗?绝对不能读!读取的时候,条件不一定满足// 谁最清楚底层fd的数据是否就绪了呢??通过select!// 想办法把新的fd添加给select,由select统一进行监管。怎么做到??// select 为什么等待的fd会越来越多??// 只要将新的fd,添加到fd_array中即可!bool flag = false;for (int pos = 1; pos < gnum; pos++){if (fd_events[pos].fd == gdefaultfd){flag = true;fd_events[pos].fd = sockfd;fd_events[pos].events = POLLIN;LOG(INFO, "add %d to fd_array success!\n", sockfd);break;}}if (!flag){LOG(WARNING, "Server Is Full!\n");::close(sockfd);// 扩容// 添加}}}// 处理普通的fd就绪的void HandlerIO(int i){// 下面的读写对吗?// 普通的文件描述符,正常的读写char buffer[1024];ssize_t n = ::recv(fd_events[i].fd, buffer, sizeof(buffer) - 1, 0); // 这里读取会阻塞吗?不会if (n > 0){buffer[n] = 0;std::cout << "client say# " << buffer << std::endl;std::string content = "<html><body><h1>hello bite</h1></body></html>";std::string echo_str = "HTTP/1.0 200 OK\r\n";echo_str += "Content-Type: text/html\r\n";echo_str += "Content-Length: " + std::to_string(content.size()) + "\r\n\r\n";echo_str += content;// echo_str += buffer;::send(fd_events[i].fd, echo_str.c_str(), echo_str.size(), 0); // 临时方案}else if (n == 0){LOG(INFO, "client quit...\n");// 关闭fd::close(fd_events[i].fd);// select 不要在关心这个fd了fd_events[i].fd = gdefaultfd;fd_events[i].events = 0;fd_events[i].revents = 0;}else{LOG(ERROR, "recv error\n");// 关闭fd::close(fd_events[i].fd);// select 不要在关心这个fd了fd_events[i].fd = gdefaultfd;fd_events[i].events = 0;fd_events[i].revents = 0;}}// 一定会存在大量的fd就绪,可能是普通sockfd,也可能是listensockfdvoid HandlerEvent(){// 事件派发for (int i = 0; i < gnum; i++){if (fd_events[i].fd == gdefaultfd)continue;// fd一定是合法的fd// 合法的fd不一定就绪, 判断fd是否就绪?if (fd_events[i].revents & POLLIN){// 读事件就绪// 1. listensockfd 2. normal sockfd就绪?if (_listensock->Sockfd() == fd_events[i].fd){Accepter();}else{HandlerIO(i);}}}}void Loop(){int timeout = 1000;while (true){// _listensock->Accepter();// 不能,listensock && accept 我们把他也看做IO类的函数。只关心新链接到来,等价于读事件就绪!int n = ::poll(fd_events, gnum, timeout); // 临时switch (n){case 0:LOG(DEBUG, "time out\n");break;case -1:LOG(ERROR, "poll error\n");break;default:// LOG(DEBUG, "time out, %d.%d\n", timeout.tv_sec, timeout.tv_usec);LOG(INFO, "haved event ready, n : %d\n", n); // 如果事件就绪,但是不处理,select会一直通知我,直到我处理了!HandlerEvent();PrintDebug();// sleep(1);break;}}}void PrintDebug(){std::cout << "fd list: ";for (int i = 0; i < gnum; i++){if (fd_events[i].fd == gdefaultfd)continue;std::cout << fd_events[i].fd << " ";}std::cout << "\n";}~PollServer() {}private:uint16_t _port;std::unique_ptr<Socket> _listensock;// 1. poll要正常工作,需要借助一个辅助数组,来保存所有合法fdstruct pollfd fd_events[gnum]; };
- 构造函数 (
PollServer(uint16_t port)
):
- 初始化服务器端口和监听套接字(
_listensock
),并绑定和监听该端口。- 初始化 (
InitServer
):
- 准备
poll
所需的fd_events
数组,将监听套接字(_listensock
)的文件描述符添加到数组中,并设置其事件为POLLIN
(表示对读事件感兴趣)。- 接受新连接 (
Accepter
):
- 当监听套接字的读事件就绪时(即有新连接到来),接受该连接,并尝试将新连接的文件描述符添加到
fd_events
数组中(如果数组未满)。- 如果数组已满,则关闭新连接并打印警告信息。
- 处理IO事件 (
HandlerIO
):
- 对除监听套接字外的其他文件描述符(即已连接的客户端套接字)的读事件进行处理。
- 读取客户端发送的数据,并回显一个简单的HTTP响应。
- 如果读取到EOF(
n == 0
),则关闭该连接,并从fd_events
数组中移除其文件描述符。- 如果读取发生错误,则同样关闭连接并移除其文件描述符。
- 处理事件 (
HandlerEvent
):
- 遍历
fd_events
数组,检查哪些文件描述符的就绪事件(revents
)与期望的事件(events
)相匹配。- 对于监听套接字的读就绪事件,调用
Accepter
方法接受新连接。- 对于其他套接字的读就绪事件,调用
HandlerIO
方法处理数据。- 主循环 (
Loop
):
- 使用
poll
函数等待文件描述符集合中的任何文件描述符就绪。- 根据
poll
的返回值(就绪的文件描述符数量),调用HandlerEvent
方法处理就绪的事件。- 如果
poll
超时,则打印超时信息。- 如果
poll
调用失败,则打印错误信息。- 打印调试信息 (
PrintDebug
):
- 打印当前
fd_events
数组中所有非默认(非-1
)文件描述符的值,用于调试目的。小结
虽然poll能 挂的fd没有上限,但是poll的底层,也需要遍历所有的fd,因此不够高效,为了解决这个问题,就有了epoll。
· 请看下篇文章Linux——IO模型_多路转接(epoll)-CSDN博客
这篇关于Linux--IO模型_多路转接的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!