linux下I/O模型并发的epoll多进程池协程实现

2024-06-23 20:44

本文主要是介绍linux下I/O模型并发的epoll多进程池协程实现,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

方法1

主要思路:

  1. 定义了一个EventData结构体,用于存储事件相关的数据,如文件描述符、epoll 文件描述符、协程 ID 等。
  2. EchoDeal函数用于处理请求消息,并生成响应消息。
  3. handlerClient函数是协程的执行函数,用于处理客户端连接。它通过循环读取数据、解析请求、执行业务处理、发送响应等步骤,实现了对客户端请求的处理。
  4. handler函数是主函数,用于创建监听套接字、初始化 epoll、设置非阻塞模式、添加读事件等操作。然后进入一个循环,通过 epoll_wait 等待事件发生,并根据事件类型进行相应的处理,如接受新连接、处理客户端请求等。
  5. 在主函数中,通过 fork 创建多个子进程,每个子进程都执行handler函数,从而实现多进程并发处理。

示例代码: 

#include <arpa/inet.h>  // 包含网络地址转换相关的头文件
#include <assert.h>  // 包含断言相关的头文件
#include <fcntl.h>  // 包含文件控制相关的头文件
#include <netinet/in.h>  // 包含网络协议相关的头文件
#include <stdio.h>  // 包含标准输入输出相关的头文件
#include <stdlib.h>  // 包含标准库相关的头文件
#include <sys/epoll.h>  // 包含 epoll 相关的头文件
#include <sys/socket.h>  // 包含套接字相关的头文件
#include <unistd.h>  // 包含 Unix 标准相关的头文件#include <iostream>  // 包含 C++ 的输入输出流头文件#include "../coroutine.h"  // 包含自定义的协程相关头文件
#include "../epollctl.hpp"  // 包含自定义的 epoll 控制相关头文件struct EventData {  // 定义事件数据结构体EventData(int fd, int epoll_fd) : fd_(fd), epoll_fd_(epoll_fd){};  // 构造函数,初始化成员变量int fd_{0};  // 文件描述符int epoll_fd_{0};  // epoll 文件描述符int cid_{MyCoroutine::INVALID_ROUTINE_ID};  // 协程 IDMyCoroutine::Schedule *schedule_{nullptr};  // 协程调度器指针
};  // 结构体定义结束void EchoDeal(const std::string reqMessage, std::string &respMessage) { respMessage = reqMessage; }  // 处理请求并生成响应的函数void handlerClient(void *arg) {  // 处理客户端连接的函数EventData *eventData = (EventData *)arg;  // 获取事件数据指针auto releaseConn = [&eventData]() {  // 定义释放连接的匿名函数EchoServer::ClearEvent(eventData->epoll_fd_, eventData->fd_);  // 清除事件delete eventData;  // 释放内存};ssize_t ret = 0;  // 读取结果EchoServer::Codec codec;  // 编解码器对象std::string reqMessage;  // 请求消息std::string respMessage;  // 响应消息while (true) {  // 读操作循环uint8_t data[100];  // 数据缓冲区ret = read(eventData->fd_, data, 100);  // 尝试读取数据if (ret == 0) {  // 对端关闭连接perror("peer close connection");  // 打印错误信息releaseConn();  // 释放连接return;  // 函数返回}if (ret < 0) {  // 读取错误if (EINTR == errno) continue;  // 被中断,继续尝试if (EAGAIN == errno or EWOULDBLOCK == errno) {  // 无数据可读MyCoroutine::CoroutineYield(*eventData->schedule_);  // 让出 CPUcontinue;  // 继续下一次循环}perror("read failed");  // 打印读取失败的错误信息releaseConn();  // 释放连接return;  // 函数返回}codec.DeCode(data, ret);  // 解码数据if (codec.GetMessage(reqMessage)) {  // 获取完整请求break;  // 跳出循环}}// 执行到这里说明已经读取到一个完整的请求EchoDeal(reqMessage, respMessage);  // 处理请求生成响应EchoServer::Packet pkt;  // 数据包对象codec.EnCode(respMessage, pkt);  // 编码响应EchoServer::ModToWriteEvent(eventData->epoll_fd_, eventData->fd_, eventData);  // 切换为监听可写事件ssize_t sendLen = 0;  // 已发送长度while (sendLen!= pkt.Len()) {  // 写操作循环ret = write(eventData->fd_, pkt.Data() + sendLen, pkt.Len() - sendLen);  // 尝试写入if (ret < 0) {  // 写入错误if (EINTR == errno) continue;  // 被中断,继续尝试if (EAGAIN == errno or EWOULDBLOCK == errno) {  // 不可写MyCoroutine::CoroutineYield(*eventData->schedule_);  // 让出 CPUcontinue;  // 继续下一次循环}perror("write failed");  // 打印写入失败的错误信息releaseConn();  // 释放连接return;  // 函数返回}sendLen += ret;  // 更新已发送长度}releaseConn();  // 释放连接资源
}void handler(char *argv[]) {  // 主处理函数int sockFd = EchoServer::CreateListenSocket(argv[1], atoi(argv[2]), true);  // 创建监听套接字if (sockFd < 0) {  // 如果创建失败return;  // 函数返回}epoll_event events[2048];  // epoll 事件数组int epollFd = epoll_create(1024);  // 创建 epoll 实例if (epollFd < 0) {  // 如果创建失败perror("epoll_create failed");  // 打印错误信息return;  // 函数返回}EventData eventData(sockFd, epollFd);  // 创建事件数据对象EchoServer::SetNotBlock(sock

方法2:

思路:

  1. main 函数首先检查命令行参数数量是否正确。
  2. 然后通过循环创建子进程。
  3. 子进程调用 handler 函数。
  4. 在 handler 函数中,创建监听套接字和 epoll 实例,设置套接字为非阻塞并添加可读事件,初始化协程调度器。
  5. 进入一个无限循环,通过 epoll_wait 等待事件发生。
  6. 根据事件的类型进行处理:
    • 如果是监听套接字的事件,处理新的连接。
    • 对于客户端连接的事件,如果是第一次事件则创建协程并唤醒,否则唤醒已有的协程。
  7. 协程中的 handlerClient 函数处理客户端的读写操作。
#include <arpa/inet.h>
#include <assert.h>
#include <fcntl.h>
#include <netinet/in.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/epoll.h>
#include <sys/socket.h>
#include <unistd.h>#include <iostream>#include "../coroutine.h"
#include "../epollctl.hpp"// 定义事件数据结构体
struct EventData {EventData(int fd, int epoll_fd) : fd_(fd), epoll_fd_(epoll_fd){};  // 构造函数int fd_{0};  // 文件描述符int epoll_fd_{0};  // epoll 文件描述符int cid_{MyCoroutine::INVALID_ROUTINE_ID};  // 协程 IDMyCoroutine::Schedule *schedule_{nullptr};  // 协程调度器指针
};// 处理请求并生成响应的函数
void EchoDeal(const std::string reqMessage, std::string &respMessage) { respMessage = reqMessage; }// 处理客户端的函数
void handlerClient(void *arg) {EventData *eventData = (EventData *)arg;  // 获取事件数据指针auto releaseConn = [&eventData]() {  // 定义释放连接的匿名函数EchoServer::ClearEvent(eventData->epoll_fd_, eventData->fd_);delete eventData;  // 释放内存};ssize_t ret = 0;  // 读取结果EchoServer::Codec codec;  // 编解码器对象std::string reqMessage;  // 请求消息std::string respMessage;  // 响应消息while (true) {  // 读操作循环uint8_t data[100];  // 数据缓冲区ret = read(eventData->fd_, data, 100);  // 尝试读取数据if (ret == 0) {  // 对端关闭连接perror("peer close connection");releaseConn();return;}if (ret < 0) {  // 读取错误if (EINTR == errno) continue;  // 被中断,继续尝试if (EAGAIN == errno or EWOULDBLOCK == errno) {  // 无数据可读MyCoroutine::CoroutineYield(*eventData->schedule_);  // 让出 CPUcontinue;}perror("read failed");releaseConn();return;}codec.DeCode(data, ret);  // 解码数据if (codec.GetMessage(reqMessage)) {  // 获取完整请求break;}}// 执行到这里说明已经读取到一个完整的请求EchoDeal(reqMessage, respMessage);  // 处理请求生成响应EchoServer::Packet pkt;codec.EnCode(respMessage, pkt);  // 编码响应EchoServer::ModToWriteEvent(eventData->epoll_fd_, eventData->fd_, eventData);  // 切换为监听可写事件ssize_t sendLen = 0;  // 已发送长度while (sendLen!= pkt.Len()) {  // 写操作循环ret = write(eventData->fd_, pkt.Data() + sendLen, pkt.Len() - sendLen);  // 尝试写入if (ret < 0) {  // 写入错误if (EINTR == errno) continue;  // 被中断,继续尝试if (EAGAIN == errno or EWOULDBLOCK == errno) {  // 不可写MyCoroutine::CoroutineYield(*eventData->schedule_);  // 让出 CPUcontinue;}perror("write failed");releaseConn();return;}sendLen += ret;  // 更新已发送长度}releaseConn();  // 释放连接资源
}// 主处理函数
void handler(char *argv[]) {int sockFd = EchoServer::CreateListenSocket(argv[1], atoi(argv[2]), true);  // 创建监听套接字if (sockFd < 0) {return;}epoll_event events[2048];  // epoll 事件数组int epollFd = epoll_create(1024);  // 创建 epoll 实例if (epollFd < 0) {perror("epoll_create failed");return;}EventData eventData(sockFd, epollFd);  // 创建事件数据对象EchoServer::SetNotBlock(sockFd);  // 设置套接字为非阻塞EchoServer::AddReadEvent(epollFd, sockFd, &eventData);  // 添加可读事件MyCoroutine::Schedule schedule;  // 协程调度器MyCoroutine::ScheduleInit(schedule, 10000);  // 初始化协程池int msec = -1;  // 超时时间while (true) {int num = epoll_wait(epollFd, events, 2048, msec);  // 等待 epoll 事件if (num < 0) {  // 等待失败perror("epoll_wait failed");continue;} else if (num == 0) {  // 超时无事件sleep(0);  // 让出 CPUmsec = -1;  // 下次超时时间设置为 -1continue;}msec = 0;  // 有事件,下次超时时间设置为 0for (int i = 0; i < num; i++) {  // 处理事件EventData *eventData = (EventData *)events[i].data.ptr;if (eventData->fd_ == sockFd) {  // 是监听套接字的事件EchoServer::LoopAccept(sockFd, 2048, [epollFd](int clientFd) {EventData *eventData = new EventData(clientFd, epollFd);EchoServer::SetNotBlock(clientFd);EchoServer::AddReadEvent(epollFd, clientFd, eventData);  // 处理新连接});continue;}if (eventData->cid_ == MyCoroutine::INVALID_ROUTINE_ID) {  // 第一次事件if (MyCoroutine::CoroutineCanCreate(schedule)) {  // 可以创建协程eventData->schedule_ = &schedule;eventData->cid_ = MyCoroutine::CoroutineCreate(schedule, handlerClient, eventData, 0);  // 创建协程MyCoroutine::CoroutineResumeById(schedule, eventData->cid_);  // 唤醒协程} else {std::cout << "MyCoroutine is full" << std::endl;}} else {MyCoroutine::CoroutineResumeById(schedule, eventData->cid_);  // 唤醒已有协程}}MyCoroutine::ScheduleTryReleaseMemory(schedule);  // 尝试释放协程内存}
}int main(int argc, char *argv[]) {if (argc!= 3) {  // 检查命令行参数数量std::cout << "invalid input" << std::endl;std::cout << "example:./EpollReactorProcessPoolCoroutine 0.0.0.0 1688" << std::endl;return -1;}for (int i = 0; i < EchoServer::GetNProcs(); i++) {  // 循环创建子进程pid_t pid = fork();  // 创建进程if (pid < 0) {  // 创建失败perror("fork failed");continue;}if (0 == pid) {  // 子进程handler(argv);  // 处理客户端请求exit(0);}}while (true) sleep(1);  // 父进程进入死循环return 0;
}

总的来说,程序通过多进程和协程的结合,实现了对客户端连接的处理和高效的 I/O 操作。

这篇关于linux下I/O模型并发的epoll多进程池协程实现的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1088233

相关文章

Python实现终端清屏的几种方式详解

《Python实现终端清屏的几种方式详解》在使用Python进行终端交互式编程时,我们经常需要清空当前终端屏幕的内容,本文为大家整理了几种常见的实现方法,有需要的小伙伴可以参考下... 目录方法一:使用 `os` 模块调用系统命令方法二:使用 `subprocess` 模块执行命令方法三:打印多个换行符模拟

SpringBoot+EasyPOI轻松实现Excel和Word导出PDF

《SpringBoot+EasyPOI轻松实现Excel和Word导出PDF》在企业级开发中,将Excel和Word文档导出为PDF是常见需求,本文将结合​​EasyPOI和​​Aspose系列工具实... 目录一、环境准备与依赖配置1.1 方案选型1.2 依赖配置(商业库方案)二、Excel 导出 PDF

Python实现MQTT通信的示例代码

《Python实现MQTT通信的示例代码》本文主要介绍了Python实现MQTT通信的示例代码,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一... 目录1. 安装paho-mqtt库‌2. 搭建MQTT代理服务器(Broker)‌‌3. pytho

使用zip4j实现Java中的ZIP文件加密压缩的操作方法

《使用zip4j实现Java中的ZIP文件加密压缩的操作方法》本文介绍如何通过Maven集成zip4j1.3.2库创建带密码保护的ZIP文件,涵盖依赖配置、代码示例及加密原理,确保数据安全性,感兴趣的... 目录1. zip4j库介绍和版本1.1 zip4j库概述1.2 zip4j的版本演变1.3 zip4

python生成随机唯一id的几种实现方法

《python生成随机唯一id的几种实现方法》在Python中生成随机唯一ID有多种方法,根据不同的需求场景可以选择最适合的方案,文中通过示例代码介绍的非常详细,需要的朋友们下面随着小编来一起学习学习... 目录方法 1:使用 UUID 模块(推荐)方法 2:使用 Secrets 模块(安全敏感场景)方法

Spring StateMachine实现状态机使用示例详解

《SpringStateMachine实现状态机使用示例详解》本文介绍SpringStateMachine实现状态机的步骤,包括依赖导入、枚举定义、状态转移规则配置、上下文管理及服务调用示例,重点解... 目录什么是状态机使用示例什么是状态机状态机是计算机科学中的​​核心建模工具​​,用于描述对象在其生命

Spring Boot 结合 WxJava 实现文章上传微信公众号草稿箱与群发

《SpringBoot结合WxJava实现文章上传微信公众号草稿箱与群发》本文将详细介绍如何使用SpringBoot框架结合WxJava开发工具包,实现文章上传到微信公众号草稿箱以及群发功能,... 目录一、项目环境准备1.1 开发环境1.2 微信公众号准备二、Spring Boot 项目搭建2.1 创建

Linux进程CPU绑定优化与实践过程

《Linux进程CPU绑定优化与实践过程》Linux支持进程绑定至特定CPU核心,通过sched_setaffinity系统调用和taskset工具实现,优化缓存效率与上下文切换,提升多核计算性能,适... 目录1. 多核处理器及并行计算概念1.1 多核处理器架构概述1.2 并行计算的含义及重要性1.3 并

IntelliJ IDEA2025创建SpringBoot项目的实现步骤

《IntelliJIDEA2025创建SpringBoot项目的实现步骤》本文主要介绍了IntelliJIDEA2025创建SpringBoot项目的实现步骤,文中通过示例代码介绍的非常详细,对大家... 目录一、创建 Spring Boot 项目1. 新建项目2. 基础配置3. 选择依赖4. 生成项目5.

Linux线程之线程的创建、属性、回收、退出、取消方式

《Linux线程之线程的创建、属性、回收、退出、取消方式》文章总结了线程管理核心知识:线程号唯一、创建方式、属性设置(如分离状态与栈大小)、回收机制(join/detach)、退出方法(返回/pthr... 目录1. 线程号2. 线程的创建3. 线程属性4. 线程的回收5. 线程的退出6. 线程的取消7.