linux下I/O模型并发的epoll多进程池协程实现

2024-06-23 20:44

本文主要是介绍linux下I/O模型并发的epoll多进程池协程实现,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

方法1

主要思路:

  1. 定义了一个EventData结构体,用于存储事件相关的数据,如文件描述符、epoll 文件描述符、协程 ID 等。
  2. EchoDeal函数用于处理请求消息,并生成响应消息。
  3. handlerClient函数是协程的执行函数,用于处理客户端连接。它通过循环读取数据、解析请求、执行业务处理、发送响应等步骤,实现了对客户端请求的处理。
  4. handler函数是主函数,用于创建监听套接字、初始化 epoll、设置非阻塞模式、添加读事件等操作。然后进入一个循环,通过 epoll_wait 等待事件发生,并根据事件类型进行相应的处理,如接受新连接、处理客户端请求等。
  5. 在主函数中,通过 fork 创建多个子进程,每个子进程都执行handler函数,从而实现多进程并发处理。

示例代码: 

#include <arpa/inet.h>  // 包含网络地址转换相关的头文件
#include <assert.h>  // 包含断言相关的头文件
#include <fcntl.h>  // 包含文件控制相关的头文件
#include <netinet/in.h>  // 包含网络协议相关的头文件
#include <stdio.h>  // 包含标准输入输出相关的头文件
#include <stdlib.h>  // 包含标准库相关的头文件
#include <sys/epoll.h>  // 包含 epoll 相关的头文件
#include <sys/socket.h>  // 包含套接字相关的头文件
#include <unistd.h>  // 包含 Unix 标准相关的头文件#include <iostream>  // 包含 C++ 的输入输出流头文件#include "../coroutine.h"  // 包含自定义的协程相关头文件
#include "../epollctl.hpp"  // 包含自定义的 epoll 控制相关头文件struct EventData {  // 定义事件数据结构体EventData(int fd, int epoll_fd) : fd_(fd), epoll_fd_(epoll_fd){};  // 构造函数,初始化成员变量int fd_{0};  // 文件描述符int epoll_fd_{0};  // epoll 文件描述符int cid_{MyCoroutine::INVALID_ROUTINE_ID};  // 协程 IDMyCoroutine::Schedule *schedule_{nullptr};  // 协程调度器指针
};  // 结构体定义结束void EchoDeal(const std::string reqMessage, std::string &respMessage) { respMessage = reqMessage; }  // 处理请求并生成响应的函数void handlerClient(void *arg) {  // 处理客户端连接的函数EventData *eventData = (EventData *)arg;  // 获取事件数据指针auto releaseConn = [&eventData]() {  // 定义释放连接的匿名函数EchoServer::ClearEvent(eventData->epoll_fd_, eventData->fd_);  // 清除事件delete eventData;  // 释放内存};ssize_t ret = 0;  // 读取结果EchoServer::Codec codec;  // 编解码器对象std::string reqMessage;  // 请求消息std::string respMessage;  // 响应消息while (true) {  // 读操作循环uint8_t data[100];  // 数据缓冲区ret = read(eventData->fd_, data, 100);  // 尝试读取数据if (ret == 0) {  // 对端关闭连接perror("peer close connection");  // 打印错误信息releaseConn();  // 释放连接return;  // 函数返回}if (ret < 0) {  // 读取错误if (EINTR == errno) continue;  // 被中断,继续尝试if (EAGAIN == errno or EWOULDBLOCK == errno) {  // 无数据可读MyCoroutine::CoroutineYield(*eventData->schedule_);  // 让出 CPUcontinue;  // 继续下一次循环}perror("read failed");  // 打印读取失败的错误信息releaseConn();  // 释放连接return;  // 函数返回}codec.DeCode(data, ret);  // 解码数据if (codec.GetMessage(reqMessage)) {  // 获取完整请求break;  // 跳出循环}}// 执行到这里说明已经读取到一个完整的请求EchoDeal(reqMessage, respMessage);  // 处理请求生成响应EchoServer::Packet pkt;  // 数据包对象codec.EnCode(respMessage, pkt);  // 编码响应EchoServer::ModToWriteEvent(eventData->epoll_fd_, eventData->fd_, eventData);  // 切换为监听可写事件ssize_t sendLen = 0;  // 已发送长度while (sendLen!= pkt.Len()) {  // 写操作循环ret = write(eventData->fd_, pkt.Data() + sendLen, pkt.Len() - sendLen);  // 尝试写入if (ret < 0) {  // 写入错误if (EINTR == errno) continue;  // 被中断,继续尝试if (EAGAIN == errno or EWOULDBLOCK == errno) {  // 不可写MyCoroutine::CoroutineYield(*eventData->schedule_);  // 让出 CPUcontinue;  // 继续下一次循环}perror("write failed");  // 打印写入失败的错误信息releaseConn();  // 释放连接return;  // 函数返回}sendLen += ret;  // 更新已发送长度}releaseConn();  // 释放连接资源
}void handler(char *argv[]) {  // 主处理函数int sockFd = EchoServer::CreateListenSocket(argv[1], atoi(argv[2]), true);  // 创建监听套接字if (sockFd < 0) {  // 如果创建失败return;  // 函数返回}epoll_event events[2048];  // epoll 事件数组int epollFd = epoll_create(1024);  // 创建 epoll 实例if (epollFd < 0) {  // 如果创建失败perror("epoll_create failed");  // 打印错误信息return;  // 函数返回}EventData eventData(sockFd, epollFd);  // 创建事件数据对象EchoServer::SetNotBlock(sock

方法2:

思路:

  1. main 函数首先检查命令行参数数量是否正确。
  2. 然后通过循环创建子进程。
  3. 子进程调用 handler 函数。
  4. 在 handler 函数中,创建监听套接字和 epoll 实例,设置套接字为非阻塞并添加可读事件,初始化协程调度器。
  5. 进入一个无限循环,通过 epoll_wait 等待事件发生。
  6. 根据事件的类型进行处理:
    • 如果是监听套接字的事件,处理新的连接。
    • 对于客户端连接的事件,如果是第一次事件则创建协程并唤醒,否则唤醒已有的协程。
  7. 协程中的 handlerClient 函数处理客户端的读写操作。
#include <arpa/inet.h>
#include <assert.h>
#include <fcntl.h>
#include <netinet/in.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/epoll.h>
#include <sys/socket.h>
#include <unistd.h>#include <iostream>#include "../coroutine.h"
#include "../epollctl.hpp"// 定义事件数据结构体
struct EventData {EventData(int fd, int epoll_fd) : fd_(fd), epoll_fd_(epoll_fd){};  // 构造函数int fd_{0};  // 文件描述符int epoll_fd_{0};  // epoll 文件描述符int cid_{MyCoroutine::INVALID_ROUTINE_ID};  // 协程 IDMyCoroutine::Schedule *schedule_{nullptr};  // 协程调度器指针
};// 处理请求并生成响应的函数
void EchoDeal(const std::string reqMessage, std::string &respMessage) { respMessage = reqMessage; }// 处理客户端的函数
void handlerClient(void *arg) {EventData *eventData = (EventData *)arg;  // 获取事件数据指针auto releaseConn = [&eventData]() {  // 定义释放连接的匿名函数EchoServer::ClearEvent(eventData->epoll_fd_, eventData->fd_);delete eventData;  // 释放内存};ssize_t ret = 0;  // 读取结果EchoServer::Codec codec;  // 编解码器对象std::string reqMessage;  // 请求消息std::string respMessage;  // 响应消息while (true) {  // 读操作循环uint8_t data[100];  // 数据缓冲区ret = read(eventData->fd_, data, 100);  // 尝试读取数据if (ret == 0) {  // 对端关闭连接perror("peer close connection");releaseConn();return;}if (ret < 0) {  // 读取错误if (EINTR == errno) continue;  // 被中断,继续尝试if (EAGAIN == errno or EWOULDBLOCK == errno) {  // 无数据可读MyCoroutine::CoroutineYield(*eventData->schedule_);  // 让出 CPUcontinue;}perror("read failed");releaseConn();return;}codec.DeCode(data, ret);  // 解码数据if (codec.GetMessage(reqMessage)) {  // 获取完整请求break;}}// 执行到这里说明已经读取到一个完整的请求EchoDeal(reqMessage, respMessage);  // 处理请求生成响应EchoServer::Packet pkt;codec.EnCode(respMessage, pkt);  // 编码响应EchoServer::ModToWriteEvent(eventData->epoll_fd_, eventData->fd_, eventData);  // 切换为监听可写事件ssize_t sendLen = 0;  // 已发送长度while (sendLen!= pkt.Len()) {  // 写操作循环ret = write(eventData->fd_, pkt.Data() + sendLen, pkt.Len() - sendLen);  // 尝试写入if (ret < 0) {  // 写入错误if (EINTR == errno) continue;  // 被中断,继续尝试if (EAGAIN == errno or EWOULDBLOCK == errno) {  // 不可写MyCoroutine::CoroutineYield(*eventData->schedule_);  // 让出 CPUcontinue;}perror("write failed");releaseConn();return;}sendLen += ret;  // 更新已发送长度}releaseConn();  // 释放连接资源
}// 主处理函数
void handler(char *argv[]) {int sockFd = EchoServer::CreateListenSocket(argv[1], atoi(argv[2]), true);  // 创建监听套接字if (sockFd < 0) {return;}epoll_event events[2048];  // epoll 事件数组int epollFd = epoll_create(1024);  // 创建 epoll 实例if (epollFd < 0) {perror("epoll_create failed");return;}EventData eventData(sockFd, epollFd);  // 创建事件数据对象EchoServer::SetNotBlock(sockFd);  // 设置套接字为非阻塞EchoServer::AddReadEvent(epollFd, sockFd, &eventData);  // 添加可读事件MyCoroutine::Schedule schedule;  // 协程调度器MyCoroutine::ScheduleInit(schedule, 10000);  // 初始化协程池int msec = -1;  // 超时时间while (true) {int num = epoll_wait(epollFd, events, 2048, msec);  // 等待 epoll 事件if (num < 0) {  // 等待失败perror("epoll_wait failed");continue;} else if (num == 0) {  // 超时无事件sleep(0);  // 让出 CPUmsec = -1;  // 下次超时时间设置为 -1continue;}msec = 0;  // 有事件,下次超时时间设置为 0for (int i = 0; i < num; i++) {  // 处理事件EventData *eventData = (EventData *)events[i].data.ptr;if (eventData->fd_ == sockFd) {  // 是监听套接字的事件EchoServer::LoopAccept(sockFd, 2048, [epollFd](int clientFd) {EventData *eventData = new EventData(clientFd, epollFd);EchoServer::SetNotBlock(clientFd);EchoServer::AddReadEvent(epollFd, clientFd, eventData);  // 处理新连接});continue;}if (eventData->cid_ == MyCoroutine::INVALID_ROUTINE_ID) {  // 第一次事件if (MyCoroutine::CoroutineCanCreate(schedule)) {  // 可以创建协程eventData->schedule_ = &schedule;eventData->cid_ = MyCoroutine::CoroutineCreate(schedule, handlerClient, eventData, 0);  // 创建协程MyCoroutine::CoroutineResumeById(schedule, eventData->cid_);  // 唤醒协程} else {std::cout << "MyCoroutine is full" << std::endl;}} else {MyCoroutine::CoroutineResumeById(schedule, eventData->cid_);  // 唤醒已有协程}}MyCoroutine::ScheduleTryReleaseMemory(schedule);  // 尝试释放协程内存}
}int main(int argc, char *argv[]) {if (argc!= 3) {  // 检查命令行参数数量std::cout << "invalid input" << std::endl;std::cout << "example:./EpollReactorProcessPoolCoroutine 0.0.0.0 1688" << std::endl;return -1;}for (int i = 0; i < EchoServer::GetNProcs(); i++) {  // 循环创建子进程pid_t pid = fork();  // 创建进程if (pid < 0) {  // 创建失败perror("fork failed");continue;}if (0 == pid) {  // 子进程handler(argv);  // 处理客户端请求exit(0);}}while (true) sleep(1);  // 父进程进入死循环return 0;
}

总的来说,程序通过多进程和协程的结合,实现了对客户端连接的处理和高效的 I/O 操作。

这篇关于linux下I/O模型并发的epoll多进程池协程实现的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1088233

相关文章

mybatis执行insert返回id实现详解

《mybatis执行insert返回id实现详解》MyBatis插入操作默认返回受影响行数,需通过useGeneratedKeys+keyProperty或selectKey获取主键ID,确保主键为自... 目录 两种方式获取自增 ID:1. ​​useGeneratedKeys+keyProperty(推

Spring Boot集成Druid实现数据源管理与监控的详细步骤

《SpringBoot集成Druid实现数据源管理与监控的详细步骤》本文介绍如何在SpringBoot项目中集成Druid数据库连接池,包括环境搭建、Maven依赖配置、SpringBoot配置文件... 目录1. 引言1.1 环境准备1.2 Druid介绍2. 配置Druid连接池3. 查看Druid监控

Linux在线解压jar包的实现方式

《Linux在线解压jar包的实现方式》:本文主要介绍Linux在线解压jar包的实现方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录linux在线解压jar包解压 jar包的步骤总结Linux在线解压jar包在 Centos 中解压 jar 包可以使用 u

linux解压缩 xxx.jar文件进行内部操作过程

《linux解压缩xxx.jar文件进行内部操作过程》:本文主要介绍linux解压缩xxx.jar文件进行内部操作,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录一、解压文件二、压缩文件总结一、解压文件1、把 xxx.jar 文件放在服务器上,并进入当前目录#

Linux系统性能检测命令详解

《Linux系统性能检测命令详解》本文介绍了Linux系统常用的监控命令(如top、vmstat、iostat、htop等)及其参数功能,涵盖进程状态、内存使用、磁盘I/O、系统负载等多维度资源监控,... 目录toppsuptimevmstatIOStatiotopslabtophtopdstatnmon

c++ 类成员变量默认初始值的实现

《c++类成员变量默认初始值的实现》本文主要介绍了c++类成员变量默认初始值,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧... 目录C++类成员变量初始化c++类的变量的初始化在C++中,如果使用类成员变量时未给定其初始值,那么它将被

Javaee多线程之进程和线程之间的区别和联系(最新整理)

《Javaee多线程之进程和线程之间的区别和联系(最新整理)》进程是资源分配单位,线程是调度执行单位,共享资源更高效,创建线程五种方式:继承Thread、Runnable接口、匿名类、lambda,r... 目录进程和线程进程线程进程和线程的区别创建线程的五种写法继承Thread,重写run实现Runnab

Qt使用QSqlDatabase连接MySQL实现增删改查功能

《Qt使用QSqlDatabase连接MySQL实现增删改查功能》这篇文章主要为大家详细介绍了Qt如何使用QSqlDatabase连接MySQL实现增删改查功能,文中的示例代码讲解详细,感兴趣的小伙伴... 目录一、创建数据表二、连接mysql数据库三、封装成一个完整的轻量级 ORM 风格类3.1 表结构

基于Python实现一个图片拆分工具

《基于Python实现一个图片拆分工具》这篇文章主要为大家详细介绍了如何基于Python实现一个图片拆分工具,可以根据需要的行数和列数进行拆分,感兴趣的小伙伴可以跟随小编一起学习一下... 简单介绍先自己选择输入的图片,默认是输出到项目文件夹中,可以自己选择其他的文件夹,选择需要拆分的行数和列数,可以通过

Python中将嵌套列表扁平化的多种实现方法

《Python中将嵌套列表扁平化的多种实现方法》在Python编程中,我们常常会遇到需要将嵌套列表(即列表中包含列表)转换为一个一维的扁平列表的需求,本文将给大家介绍了多种实现这一目标的方法,需要的朋... 目录python中将嵌套列表扁平化的方法技术背景实现步骤1. 使用嵌套列表推导式2. 使用itert