本文主要是介绍linux下I/O模型并发的epoll多进程池协程实现,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
方法1
主要思路:
- 定义了一个
EventData
结构体,用于存储事件相关的数据,如文件描述符、epoll 文件描述符、协程 ID 等。 EchoDeal
函数用于处理请求消息,并生成响应消息。handlerClient
函数是协程的执行函数,用于处理客户端连接。它通过循环读取数据、解析请求、执行业务处理、发送响应等步骤,实现了对客户端请求的处理。handler
函数是主函数,用于创建监听套接字、初始化 epoll、设置非阻塞模式、添加读事件等操作。然后进入一个循环,通过 epoll_wait 等待事件发生,并根据事件类型进行相应的处理,如接受新连接、处理客户端请求等。- 在主函数中,通过 fork 创建多个子进程,每个子进程都执行
handler
函数,从而实现多进程并发处理。
示例代码:
#include <arpa/inet.h> // 包含网络地址转换相关的头文件
#include <assert.h> // 包含断言相关的头文件
#include <fcntl.h> // 包含文件控制相关的头文件
#include <netinet/in.h> // 包含网络协议相关的头文件
#include <stdio.h> // 包含标准输入输出相关的头文件
#include <stdlib.h> // 包含标准库相关的头文件
#include <sys/epoll.h> // 包含 epoll 相关的头文件
#include <sys/socket.h> // 包含套接字相关的头文件
#include <unistd.h> // 包含 Unix 标准相关的头文件#include <iostream> // 包含 C++ 的输入输出流头文件#include "../coroutine.h" // 包含自定义的协程相关头文件
#include "../epollctl.hpp" // 包含自定义的 epoll 控制相关头文件struct EventData { // 定义事件数据结构体EventData(int fd, int epoll_fd) : fd_(fd), epoll_fd_(epoll_fd){}; // 构造函数,初始化成员变量int fd_{0}; // 文件描述符int epoll_fd_{0}; // epoll 文件描述符int cid_{MyCoroutine::INVALID_ROUTINE_ID}; // 协程 IDMyCoroutine::Schedule *schedule_{nullptr}; // 协程调度器指针
}; // 结构体定义结束void EchoDeal(const std::string reqMessage, std::string &respMessage) { respMessage = reqMessage; } // 处理请求并生成响应的函数void handlerClient(void *arg) { // 处理客户端连接的函数EventData *eventData = (EventData *)arg; // 获取事件数据指针auto releaseConn = [&eventData]() { // 定义释放连接的匿名函数EchoServer::ClearEvent(eventData->epoll_fd_, eventData->fd_); // 清除事件delete eventData; // 释放内存};ssize_t ret = 0; // 读取结果EchoServer::Codec codec; // 编解码器对象std::string reqMessage; // 请求消息std::string respMessage; // 响应消息while (true) { // 读操作循环uint8_t data[100]; // 数据缓冲区ret = read(eventData->fd_, data, 100); // 尝试读取数据if (ret == 0) { // 对端关闭连接perror("peer close connection"); // 打印错误信息releaseConn(); // 释放连接return; // 函数返回}if (ret < 0) { // 读取错误if (EINTR == errno) continue; // 被中断,继续尝试if (EAGAIN == errno or EWOULDBLOCK == errno) { // 无数据可读MyCoroutine::CoroutineYield(*eventData->schedule_); // 让出 CPUcontinue; // 继续下一次循环}perror("read failed"); // 打印读取失败的错误信息releaseConn(); // 释放连接return; // 函数返回}codec.DeCode(data, ret); // 解码数据if (codec.GetMessage(reqMessage)) { // 获取完整请求break; // 跳出循环}}// 执行到这里说明已经读取到一个完整的请求EchoDeal(reqMessage, respMessage); // 处理请求生成响应EchoServer::Packet pkt; // 数据包对象codec.EnCode(respMessage, pkt); // 编码响应EchoServer::ModToWriteEvent(eventData->epoll_fd_, eventData->fd_, eventData); // 切换为监听可写事件ssize_t sendLen = 0; // 已发送长度while (sendLen!= pkt.Len()) { // 写操作循环ret = write(eventData->fd_, pkt.Data() + sendLen, pkt.Len() - sendLen); // 尝试写入if (ret < 0) { // 写入错误if (EINTR == errno) continue; // 被中断,继续尝试if (EAGAIN == errno or EWOULDBLOCK == errno) { // 不可写MyCoroutine::CoroutineYield(*eventData->schedule_); // 让出 CPUcontinue; // 继续下一次循环}perror("write failed"); // 打印写入失败的错误信息releaseConn(); // 释放连接return; // 函数返回}sendLen += ret; // 更新已发送长度}releaseConn(); // 释放连接资源
}void handler(char *argv[]) { // 主处理函数int sockFd = EchoServer::CreateListenSocket(argv[1], atoi(argv[2]), true); // 创建监听套接字if (sockFd < 0) { // 如果创建失败return; // 函数返回}epoll_event events[2048]; // epoll 事件数组int epollFd = epoll_create(1024); // 创建 epoll 实例if (epollFd < 0) { // 如果创建失败perror("epoll_create failed"); // 打印错误信息return; // 函数返回}EventData eventData(sockFd, epollFd); // 创建事件数据对象EchoServer::SetNotBlock(sock
方法2:
思路:
main
函数首先检查命令行参数数量是否正确。- 然后通过循环创建子进程。
- 子进程调用
handler
函数。 - 在
handler
函数中,创建监听套接字和 epoll 实例,设置套接字为非阻塞并添加可读事件,初始化协程调度器。 - 进入一个无限循环,通过
epoll_wait
等待事件发生。 - 根据事件的类型进行处理:
- 如果是监听套接字的事件,处理新的连接。
- 对于客户端连接的事件,如果是第一次事件则创建协程并唤醒,否则唤醒已有的协程。
- 协程中的
handlerClient
函数处理客户端的读写操作。
#include <arpa/inet.h>
#include <assert.h>
#include <fcntl.h>
#include <netinet/in.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/epoll.h>
#include <sys/socket.h>
#include <unistd.h>#include <iostream>#include "../coroutine.h"
#include "../epollctl.hpp"// 定义事件数据结构体
struct EventData {EventData(int fd, int epoll_fd) : fd_(fd), epoll_fd_(epoll_fd){}; // 构造函数int fd_{0}; // 文件描述符int epoll_fd_{0}; // epoll 文件描述符int cid_{MyCoroutine::INVALID_ROUTINE_ID}; // 协程 IDMyCoroutine::Schedule *schedule_{nullptr}; // 协程调度器指针
};// 处理请求并生成响应的函数
void EchoDeal(const std::string reqMessage, std::string &respMessage) { respMessage = reqMessage; }// 处理客户端的函数
void handlerClient(void *arg) {EventData *eventData = (EventData *)arg; // 获取事件数据指针auto releaseConn = [&eventData]() { // 定义释放连接的匿名函数EchoServer::ClearEvent(eventData->epoll_fd_, eventData->fd_);delete eventData; // 释放内存};ssize_t ret = 0; // 读取结果EchoServer::Codec codec; // 编解码器对象std::string reqMessage; // 请求消息std::string respMessage; // 响应消息while (true) { // 读操作循环uint8_t data[100]; // 数据缓冲区ret = read(eventData->fd_, data, 100); // 尝试读取数据if (ret == 0) { // 对端关闭连接perror("peer close connection");releaseConn();return;}if (ret < 0) { // 读取错误if (EINTR == errno) continue; // 被中断,继续尝试if (EAGAIN == errno or EWOULDBLOCK == errno) { // 无数据可读MyCoroutine::CoroutineYield(*eventData->schedule_); // 让出 CPUcontinue;}perror("read failed");releaseConn();return;}codec.DeCode(data, ret); // 解码数据if (codec.GetMessage(reqMessage)) { // 获取完整请求break;}}// 执行到这里说明已经读取到一个完整的请求EchoDeal(reqMessage, respMessage); // 处理请求生成响应EchoServer::Packet pkt;codec.EnCode(respMessage, pkt); // 编码响应EchoServer::ModToWriteEvent(eventData->epoll_fd_, eventData->fd_, eventData); // 切换为监听可写事件ssize_t sendLen = 0; // 已发送长度while (sendLen!= pkt.Len()) { // 写操作循环ret = write(eventData->fd_, pkt.Data() + sendLen, pkt.Len() - sendLen); // 尝试写入if (ret < 0) { // 写入错误if (EINTR == errno) continue; // 被中断,继续尝试if (EAGAIN == errno or EWOULDBLOCK == errno) { // 不可写MyCoroutine::CoroutineYield(*eventData->schedule_); // 让出 CPUcontinue;}perror("write failed");releaseConn();return;}sendLen += ret; // 更新已发送长度}releaseConn(); // 释放连接资源
}// 主处理函数
void handler(char *argv[]) {int sockFd = EchoServer::CreateListenSocket(argv[1], atoi(argv[2]), true); // 创建监听套接字if (sockFd < 0) {return;}epoll_event events[2048]; // epoll 事件数组int epollFd = epoll_create(1024); // 创建 epoll 实例if (epollFd < 0) {perror("epoll_create failed");return;}EventData eventData(sockFd, epollFd); // 创建事件数据对象EchoServer::SetNotBlock(sockFd); // 设置套接字为非阻塞EchoServer::AddReadEvent(epollFd, sockFd, &eventData); // 添加可读事件MyCoroutine::Schedule schedule; // 协程调度器MyCoroutine::ScheduleInit(schedule, 10000); // 初始化协程池int msec = -1; // 超时时间while (true) {int num = epoll_wait(epollFd, events, 2048, msec); // 等待 epoll 事件if (num < 0) { // 等待失败perror("epoll_wait failed");continue;} else if (num == 0) { // 超时无事件sleep(0); // 让出 CPUmsec = -1; // 下次超时时间设置为 -1continue;}msec = 0; // 有事件,下次超时时间设置为 0for (int i = 0; i < num; i++) { // 处理事件EventData *eventData = (EventData *)events[i].data.ptr;if (eventData->fd_ == sockFd) { // 是监听套接字的事件EchoServer::LoopAccept(sockFd, 2048, [epollFd](int clientFd) {EventData *eventData = new EventData(clientFd, epollFd);EchoServer::SetNotBlock(clientFd);EchoServer::AddReadEvent(epollFd, clientFd, eventData); // 处理新连接});continue;}if (eventData->cid_ == MyCoroutine::INVALID_ROUTINE_ID) { // 第一次事件if (MyCoroutine::CoroutineCanCreate(schedule)) { // 可以创建协程eventData->schedule_ = &schedule;eventData->cid_ = MyCoroutine::CoroutineCreate(schedule, handlerClient, eventData, 0); // 创建协程MyCoroutine::CoroutineResumeById(schedule, eventData->cid_); // 唤醒协程} else {std::cout << "MyCoroutine is full" << std::endl;}} else {MyCoroutine::CoroutineResumeById(schedule, eventData->cid_); // 唤醒已有协程}}MyCoroutine::ScheduleTryReleaseMemory(schedule); // 尝试释放协程内存}
}int main(int argc, char *argv[]) {if (argc!= 3) { // 检查命令行参数数量std::cout << "invalid input" << std::endl;std::cout << "example:./EpollReactorProcessPoolCoroutine 0.0.0.0 1688" << std::endl;return -1;}for (int i = 0; i < EchoServer::GetNProcs(); i++) { // 循环创建子进程pid_t pid = fork(); // 创建进程if (pid < 0) { // 创建失败perror("fork failed");continue;}if (0 == pid) { // 子进程handler(argv); // 处理客户端请求exit(0);}}while (true) sleep(1); // 父进程进入死循环return 0;
}
总的来说,程序通过多进程和协程的结合,实现了对客户端连接的处理和高效的 I/O 操作。
这篇关于linux下I/O模型并发的epoll多进程池协程实现的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!