linux下I/O模型并发的epoll多进程池协程实现

2024-06-23 20:44

本文主要是介绍linux下I/O模型并发的epoll多进程池协程实现,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

方法1

主要思路:

  1. 定义了一个EventData结构体,用于存储事件相关的数据,如文件描述符、epoll 文件描述符、协程 ID 等。
  2. EchoDeal函数用于处理请求消息,并生成响应消息。
  3. handlerClient函数是协程的执行函数,用于处理客户端连接。它通过循环读取数据、解析请求、执行业务处理、发送响应等步骤,实现了对客户端请求的处理。
  4. handler函数是主函数,用于创建监听套接字、初始化 epoll、设置非阻塞模式、添加读事件等操作。然后进入一个循环,通过 epoll_wait 等待事件发生,并根据事件类型进行相应的处理,如接受新连接、处理客户端请求等。
  5. 在主函数中,通过 fork 创建多个子进程,每个子进程都执行handler函数,从而实现多进程并发处理。

示例代码: 

#include <arpa/inet.h>  // 包含网络地址转换相关的头文件
#include <assert.h>  // 包含断言相关的头文件
#include <fcntl.h>  // 包含文件控制相关的头文件
#include <netinet/in.h>  // 包含网络协议相关的头文件
#include <stdio.h>  // 包含标准输入输出相关的头文件
#include <stdlib.h>  // 包含标准库相关的头文件
#include <sys/epoll.h>  // 包含 epoll 相关的头文件
#include <sys/socket.h>  // 包含套接字相关的头文件
#include <unistd.h>  // 包含 Unix 标准相关的头文件#include <iostream>  // 包含 C++ 的输入输出流头文件#include "../coroutine.h"  // 包含自定义的协程相关头文件
#include "../epollctl.hpp"  // 包含自定义的 epoll 控制相关头文件struct EventData {  // 定义事件数据结构体EventData(int fd, int epoll_fd) : fd_(fd), epoll_fd_(epoll_fd){};  // 构造函数,初始化成员变量int fd_{0};  // 文件描述符int epoll_fd_{0};  // epoll 文件描述符int cid_{MyCoroutine::INVALID_ROUTINE_ID};  // 协程 IDMyCoroutine::Schedule *schedule_{nullptr};  // 协程调度器指针
};  // 结构体定义结束void EchoDeal(const std::string reqMessage, std::string &respMessage) { respMessage = reqMessage; }  // 处理请求并生成响应的函数void handlerClient(void *arg) {  // 处理客户端连接的函数EventData *eventData = (EventData *)arg;  // 获取事件数据指针auto releaseConn = [&eventData]() {  // 定义释放连接的匿名函数EchoServer::ClearEvent(eventData->epoll_fd_, eventData->fd_);  // 清除事件delete eventData;  // 释放内存};ssize_t ret = 0;  // 读取结果EchoServer::Codec codec;  // 编解码器对象std::string reqMessage;  // 请求消息std::string respMessage;  // 响应消息while (true) {  // 读操作循环uint8_t data[100];  // 数据缓冲区ret = read(eventData->fd_, data, 100);  // 尝试读取数据if (ret == 0) {  // 对端关闭连接perror("peer close connection");  // 打印错误信息releaseConn();  // 释放连接return;  // 函数返回}if (ret < 0) {  // 读取错误if (EINTR == errno) continue;  // 被中断,继续尝试if (EAGAIN == errno or EWOULDBLOCK == errno) {  // 无数据可读MyCoroutine::CoroutineYield(*eventData->schedule_);  // 让出 CPUcontinue;  // 继续下一次循环}perror("read failed");  // 打印读取失败的错误信息releaseConn();  // 释放连接return;  // 函数返回}codec.DeCode(data, ret);  // 解码数据if (codec.GetMessage(reqMessage)) {  // 获取完整请求break;  // 跳出循环}}// 执行到这里说明已经读取到一个完整的请求EchoDeal(reqMessage, respMessage);  // 处理请求生成响应EchoServer::Packet pkt;  // 数据包对象codec.EnCode(respMessage, pkt);  // 编码响应EchoServer::ModToWriteEvent(eventData->epoll_fd_, eventData->fd_, eventData);  // 切换为监听可写事件ssize_t sendLen = 0;  // 已发送长度while (sendLen!= pkt.Len()) {  // 写操作循环ret = write(eventData->fd_, pkt.Data() + sendLen, pkt.Len() - sendLen);  // 尝试写入if (ret < 0) {  // 写入错误if (EINTR == errno) continue;  // 被中断,继续尝试if (EAGAIN == errno or EWOULDBLOCK == errno) {  // 不可写MyCoroutine::CoroutineYield(*eventData->schedule_);  // 让出 CPUcontinue;  // 继续下一次循环}perror("write failed");  // 打印写入失败的错误信息releaseConn();  // 释放连接return;  // 函数返回}sendLen += ret;  // 更新已发送长度}releaseConn();  // 释放连接资源
}void handler(char *argv[]) {  // 主处理函数int sockFd = EchoServer::CreateListenSocket(argv[1], atoi(argv[2]), true);  // 创建监听套接字if (sockFd < 0) {  // 如果创建失败return;  // 函数返回}epoll_event events[2048];  // epoll 事件数组int epollFd = epoll_create(1024);  // 创建 epoll 实例if (epollFd < 0) {  // 如果创建失败perror("epoll_create failed");  // 打印错误信息return;  // 函数返回}EventData eventData(sockFd, epollFd);  // 创建事件数据对象EchoServer::SetNotBlock(sock

方法2:

思路:

  1. main 函数首先检查命令行参数数量是否正确。
  2. 然后通过循环创建子进程。
  3. 子进程调用 handler 函数。
  4. 在 handler 函数中,创建监听套接字和 epoll 实例,设置套接字为非阻塞并添加可读事件,初始化协程调度器。
  5. 进入一个无限循环,通过 epoll_wait 等待事件发生。
  6. 根据事件的类型进行处理:
    • 如果是监听套接字的事件,处理新的连接。
    • 对于客户端连接的事件,如果是第一次事件则创建协程并唤醒,否则唤醒已有的协程。
  7. 协程中的 handlerClient 函数处理客户端的读写操作。
#include <arpa/inet.h>
#include <assert.h>
#include <fcntl.h>
#include <netinet/in.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/epoll.h>
#include <sys/socket.h>
#include <unistd.h>#include <iostream>#include "../coroutine.h"
#include "../epollctl.hpp"// 定义事件数据结构体
struct EventData {EventData(int fd, int epoll_fd) : fd_(fd), epoll_fd_(epoll_fd){};  // 构造函数int fd_{0};  // 文件描述符int epoll_fd_{0};  // epoll 文件描述符int cid_{MyCoroutine::INVALID_ROUTINE_ID};  // 协程 IDMyCoroutine::Schedule *schedule_{nullptr};  // 协程调度器指针
};// 处理请求并生成响应的函数
void EchoDeal(const std::string reqMessage, std::string &respMessage) { respMessage = reqMessage; }// 处理客户端的函数
void handlerClient(void *arg) {EventData *eventData = (EventData *)arg;  // 获取事件数据指针auto releaseConn = [&eventData]() {  // 定义释放连接的匿名函数EchoServer::ClearEvent(eventData->epoll_fd_, eventData->fd_);delete eventData;  // 释放内存};ssize_t ret = 0;  // 读取结果EchoServer::Codec codec;  // 编解码器对象std::string reqMessage;  // 请求消息std::string respMessage;  // 响应消息while (true) {  // 读操作循环uint8_t data[100];  // 数据缓冲区ret = read(eventData->fd_, data, 100);  // 尝试读取数据if (ret == 0) {  // 对端关闭连接perror("peer close connection");releaseConn();return;}if (ret < 0) {  // 读取错误if (EINTR == errno) continue;  // 被中断,继续尝试if (EAGAIN == errno or EWOULDBLOCK == errno) {  // 无数据可读MyCoroutine::CoroutineYield(*eventData->schedule_);  // 让出 CPUcontinue;}perror("read failed");releaseConn();return;}codec.DeCode(data, ret);  // 解码数据if (codec.GetMessage(reqMessage)) {  // 获取完整请求break;}}// 执行到这里说明已经读取到一个完整的请求EchoDeal(reqMessage, respMessage);  // 处理请求生成响应EchoServer::Packet pkt;codec.EnCode(respMessage, pkt);  // 编码响应EchoServer::ModToWriteEvent(eventData->epoll_fd_, eventData->fd_, eventData);  // 切换为监听可写事件ssize_t sendLen = 0;  // 已发送长度while (sendLen!= pkt.Len()) {  // 写操作循环ret = write(eventData->fd_, pkt.Data() + sendLen, pkt.Len() - sendLen);  // 尝试写入if (ret < 0) {  // 写入错误if (EINTR == errno) continue;  // 被中断,继续尝试if (EAGAIN == errno or EWOULDBLOCK == errno) {  // 不可写MyCoroutine::CoroutineYield(*eventData->schedule_);  // 让出 CPUcontinue;}perror("write failed");releaseConn();return;}sendLen += ret;  // 更新已发送长度}releaseConn();  // 释放连接资源
}// 主处理函数
void handler(char *argv[]) {int sockFd = EchoServer::CreateListenSocket(argv[1], atoi(argv[2]), true);  // 创建监听套接字if (sockFd < 0) {return;}epoll_event events[2048];  // epoll 事件数组int epollFd = epoll_create(1024);  // 创建 epoll 实例if (epollFd < 0) {perror("epoll_create failed");return;}EventData eventData(sockFd, epollFd);  // 创建事件数据对象EchoServer::SetNotBlock(sockFd);  // 设置套接字为非阻塞EchoServer::AddReadEvent(epollFd, sockFd, &eventData);  // 添加可读事件MyCoroutine::Schedule schedule;  // 协程调度器MyCoroutine::ScheduleInit(schedule, 10000);  // 初始化协程池int msec = -1;  // 超时时间while (true) {int num = epoll_wait(epollFd, events, 2048, msec);  // 等待 epoll 事件if (num < 0) {  // 等待失败perror("epoll_wait failed");continue;} else if (num == 0) {  // 超时无事件sleep(0);  // 让出 CPUmsec = -1;  // 下次超时时间设置为 -1continue;}msec = 0;  // 有事件,下次超时时间设置为 0for (int i = 0; i < num; i++) {  // 处理事件EventData *eventData = (EventData *)events[i].data.ptr;if (eventData->fd_ == sockFd) {  // 是监听套接字的事件EchoServer::LoopAccept(sockFd, 2048, [epollFd](int clientFd) {EventData *eventData = new EventData(clientFd, epollFd);EchoServer::SetNotBlock(clientFd);EchoServer::AddReadEvent(epollFd, clientFd, eventData);  // 处理新连接});continue;}if (eventData->cid_ == MyCoroutine::INVALID_ROUTINE_ID) {  // 第一次事件if (MyCoroutine::CoroutineCanCreate(schedule)) {  // 可以创建协程eventData->schedule_ = &schedule;eventData->cid_ = MyCoroutine::CoroutineCreate(schedule, handlerClient, eventData, 0);  // 创建协程MyCoroutine::CoroutineResumeById(schedule, eventData->cid_);  // 唤醒协程} else {std::cout << "MyCoroutine is full" << std::endl;}} else {MyCoroutine::CoroutineResumeById(schedule, eventData->cid_);  // 唤醒已有协程}}MyCoroutine::ScheduleTryReleaseMemory(schedule);  // 尝试释放协程内存}
}int main(int argc, char *argv[]) {if (argc!= 3) {  // 检查命令行参数数量std::cout << "invalid input" << std::endl;std::cout << "example:./EpollReactorProcessPoolCoroutine 0.0.0.0 1688" << std::endl;return -1;}for (int i = 0; i < EchoServer::GetNProcs(); i++) {  // 循环创建子进程pid_t pid = fork();  // 创建进程if (pid < 0) {  // 创建失败perror("fork failed");continue;}if (0 == pid) {  // 子进程handler(argv);  // 处理客户端请求exit(0);}}while (true) sleep(1);  // 父进程进入死循环return 0;
}

总的来说,程序通过多进程和协程的结合,实现了对客户端连接的处理和高效的 I/O 操作。

这篇关于linux下I/O模型并发的epoll多进程池协程实现的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1088233

相关文章

SpringBoot3实现Gzip压缩优化的技术指南

《SpringBoot3实现Gzip压缩优化的技术指南》随着Web应用的用户量和数据量增加,网络带宽和页面加载速度逐渐成为瓶颈,为了减少数据传输量,提高用户体验,我们可以使用Gzip压缩HTTP响应,... 目录1、简述2、配置2.1 添加依赖2.2 配置 Gzip 压缩3、服务端应用4、前端应用4.1 N

Linux换行符的使用方法详解

《Linux换行符的使用方法详解》本文介绍了Linux中常用的换行符LF及其在文件中的表示,展示了如何使用sed命令替换换行符,并列举了与换行符处理相关的Linux命令,通过代码讲解的非常详细,需要的... 目录简介检测文件中的换行符使用 cat -A 查看换行符使用 od -c 检查字符换行符格式转换将

SpringBoot实现数据库读写分离的3种方法小结

《SpringBoot实现数据库读写分离的3种方法小结》为了提高系统的读写性能和可用性,读写分离是一种经典的数据库架构模式,在SpringBoot应用中,有多种方式可以实现数据库读写分离,本文将介绍三... 目录一、数据库读写分离概述二、方案一:基于AbstractRoutingDataSource实现动态

Python FastAPI+Celery+RabbitMQ实现分布式图片水印处理系统

《PythonFastAPI+Celery+RabbitMQ实现分布式图片水印处理系统》这篇文章主要为大家详细介绍了PythonFastAPI如何结合Celery以及RabbitMQ实现简单的分布式... 实现思路FastAPI 服务器Celery 任务队列RabbitMQ 作为消息代理定时任务处理完整

Linux系统配置NAT网络模式的详细步骤(附图文)

《Linux系统配置NAT网络模式的详细步骤(附图文)》本文详细指导如何在VMware环境下配置NAT网络模式,包括设置主机和虚拟机的IP地址、网关,以及针对Linux和Windows系统的具体步骤,... 目录一、配置NAT网络模式二、设置虚拟机交换机网关2.1 打开虚拟机2.2 管理员授权2.3 设置子

Java枚举类实现Key-Value映射的多种实现方式

《Java枚举类实现Key-Value映射的多种实现方式》在Java开发中,枚举(Enum)是一种特殊的类,本文将详细介绍Java枚举类实现key-value映射的多种方式,有需要的小伙伴可以根据需要... 目录前言一、基础实现方式1.1 为枚举添加属性和构造方法二、http://www.cppcns.co

使用Python实现快速搭建本地HTTP服务器

《使用Python实现快速搭建本地HTTP服务器》:本文主要介绍如何使用Python快速搭建本地HTTP服务器,轻松实现一键HTTP文件共享,同时结合二维码技术,让访问更简单,感兴趣的小伙伴可以了... 目录1. 概述2. 快速搭建 HTTP 文件共享服务2.1 核心思路2.2 代码实现2.3 代码解读3.

Linux系统中卸载与安装JDK的详细教程

《Linux系统中卸载与安装JDK的详细教程》本文详细介绍了如何在Linux系统中通过Xshell和Xftp工具连接与传输文件,然后进行JDK的安装与卸载,安装步骤包括连接Linux、传输JDK安装包... 目录1、卸载1.1 linux删除自带的JDK1.2 Linux上卸载自己安装的JDK2、安装2.1

MySQL双主搭建+keepalived高可用的实现

《MySQL双主搭建+keepalived高可用的实现》本文主要介绍了MySQL双主搭建+keepalived高可用的实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,... 目录一、测试环境准备二、主从搭建1.创建复制用户2.创建复制关系3.开启复制,确认复制是否成功4.同

Java实现文件图片的预览和下载功能

《Java实现文件图片的预览和下载功能》这篇文章主要为大家详细介绍了如何使用Java实现文件图片的预览和下载功能,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... Java实现文件(图片)的预览和下载 @ApiOperation("访问文件") @GetMapping("