Linux | 进程池技术解析:利用无名管道实现并发任务处理(含实现代码)

本文主要是介绍Linux | 进程池技术解析:利用无名管道实现并发任务处理(含实现代码),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

在血海里游泳,一直游到海水变蓝。——何小鹏

2024.8.31

目录

一、进程池

二、使用匿名管道实现进程池的核心

前置知识:管道的四种情况和五个特征

三、代码实现

四、实现代码详解

main()

2、loadTask()

3、channelInit()

问题:为什么要将子进程的管道读端重定向至标准输入?

4、ctrlProcess()

5、channelClose()


一、进程池

多任务处理是提高系统性能和响应速度的关键。进程池技术作为一种有效的资源管理和任务调度策略,已经成为并发编程中不可或缺的一部分。本文探讨进程池的工作原理,以及如何通过使用无名管道来实现高效的并发任务处理。

  • 定义:进程池由一组预先创建的空闲进程(资源进程)和管理这些进程的管理进程组成。
  • 作用:优化资源管理和提高系统效率,通过预先创建进程减少频繁创建和销毁进程的开销。
  • 预期并发效果:虽然进程池中的进程数量固定,但可以并行处理多个任务,实现并发效果。
二、使用匿名管道实现进程池的核心
  • 主函数逻辑:创建任务,创建子进程池,发送任务,关闭写端和回收子进程。
  • 父进程创建进程池:使用fork()函数创建子进程,子进程阻塞在read()函数处等待任务。
  • 子进程退出时机:当所有管道写端关闭时,子进程通过read()函数返回0值判断退出。
前置知识:管道的四种情况和五个特征

管道的四种情况:

  1. 如果管道是空的,则读取端被阻塞
  2. 如果管道是满的,则写入端被阻塞
  3. 如果关闭了管道的读端,那管道没必要存在,被13号信号杀死
  4. 如果关闭了管道的写端,读取完毕后管道返回0,表示读到了文件末尾

管道的五个特征:

  1. 匿名管道只能用于有血缘关系的进程通信,常用于父子间通信。
  2. 管道内部实现了同步机制,读写具有明显的顺序性。
  3. 管道的生命周期是随进程的,随着进程使用管道而创建缓冲区, 随进程的退出而释放销毁。
  4. 管道通通信是面向字节流的,读写次数是可以不匹配的,读到的数据可能是单次残缺的,也可能是多次堆积的
  5. 管道通信是特殊的半双工模式,半双工是指支持读写,但不能同时读写,特殊在只支持信息的单向传递。

三、代码实现
#include<iostream>
#include<unistd.h>
#include<vector>
#include<string>
#include<sys/wait.h>
#include<sys/types.h>// 定义任务函数指针类型
typedef void (*task_t)();
#define taskNum 3 // 定义任务数量
int n; // 进程数量
task_t tasks[taskNum]; // 任务函数指针数组
struct channel
{int _wfd; // 管道写端文件描述符pid_t _task; // 子进程IDstd::string _name; // 通道名称// 构造函数channel(int wfd, pid_t task, const std::string& name = "channel"):_wfd(wfd),_task(task),_name(name){}// 关闭管道写端void Close(){close(_wfd);}};
std::vector<channel> v; // 通道向量// 打印任务
void print()
{std::cout << "this is print task" << std::endl;
}// 下载任务
void download()
{std::cout << "this is download task" << std::endl;
}// 刷新任务
void flush()
{std::cout << "this is flush task" << std::endl;
}// 加载任务
void loadTask()
{tasks[0] = print;tasks[1] = download;tasks[2] = flush;
}// 执行任务
void ExecuteTask()
{int read_num = 0;int task_index = 0;while (true){read_num = read(0, &task_index, sizeof(int));if (read_num == -1){std::cerr << "管道读取失败!错误码:" << errno << std::endl;exit(-1);}else if (read_num == 0) // 读到0代表写端关闭,直接停止{std::cout << "子进程 " << getpid() << " 任务读取完成或管道关闭,退出。" << std::endl;exit(0);}else{if (task_index >= 0 && task_index < 3){std::cout << "子进程 " << getpid() << " 执行任务 " << task_index << std::endl;tasks[task_index]();}else{std::cerr << "无效的任务索引:" << task_index << std::endl;}}}
}// 初始化通道
void channelInit()
{for(int i = 0; i < n; i++){int pipefd[2];pipe(pipefd);pid_t id = fork();if(id < 0)return;else if(id == 0){close(pipefd[1]);dup2(pipefd[0], 0);for(int i = 0; i < v.size(); i++){v[i].Close();}ExecuteTask();}std::string channel_name = "channel_"+std::to_string(i);close(pipefd[0]);v.push_back(channel(pipefd[1], id, channel_name));}
}// 获取下一个通道索引
int nextChannel()
{static int next = 0;int channel = next;next++;next %= n;return channel;
}// 向通道发送任务命令
void sendTaskCommand(channel& chan, int taskCommand)
{write(chan._wfd, &taskCommand, sizeof(taskCommand));
}// 选择任务
int selectTask()
{return rand() % taskNum;
}// 控制进程执行一次
void ctrlProcessOnce()
{int taskCommand = selectTask();int channel_index = nextChannel();sendTaskCommand(v[channel_index], taskCommand);
}// 控制进程执行多次
void ctrlProcess(int times = -1)
{while(times--){ctrlProcessOnce();sleep(1);}
}// 关闭通道并等待子进程退出
void channelClose()
{int status = 0;for(int i = 0; i < n; i++){v[i].Close();wait(&status);int exit_status = (status&0x7f);int exit_code = ((status>>8)&0xff);std::cout << "子进程 " << getpid() << " 的退出状态为 " << exit_status << " 退出码为 " << exit_code << std::endl;std::cout << std::endl;}
}// 主函数
int main()
{   loadTask();std::cout << "请输入你要创建的进程个数"<<std::endl;std::cin >> n;int count = -1;std::cout << "请输入要执行程序的次数" << std::endl;std::cin >> count;channelInit();ctrlProcess(count);channelClose();sleep(3);return 0;
}
四、实现代码详解
main()

负责执行初始化任务、创建进程池、控制进程执行任务,以及关闭通道并等待子进程退出。

#include<iostream>
#include<unistd.h>
#include<vector>
#include<string>
#include<sys/types.h>typedef void (*task_t)();
#define taskNum 3
int n;
task_t tasks[taskNum];
struct channel
{int _wfd;pid_t _task;std::string _name;channel(int wfd, pid_t task, const std::string& name = "channel"):_wfd(wfd),_task(task),_name(name){}void Close(){close(_wfd);}};
std::vector<channel> v; // 存储创建的子进程信息,包括读wid/进程id/进程名称// 需要执行的三个函数
void print()
{std::cout << "this is print task" << std::endl;
}void download()
{std::cout << "this is download task" << std::endl;
}void flush()
{std::cout << "this is flush task" << std::endl;
}int main()
{   std::cout << "请输入你要创建的进程个数"<<std::endl;std::cin >> n;loadTask(); // 通过函数指针解耦调用函数,将需要运行的函数放入函数指针数组中channelInit();ctrlProcess();return 0;
}
2、loadTask()

将函数通过函数指针将不同的任务函数加载到数组中,以便后续可以通过索引来调用相应的任务

// 通过函数指针解耦调用函数,将需要运行的函数放入函数指针数组中
void loadTask()
{tasks[0] = print;tasks[1] = download;tasks[2] = flush;
}
3、channelInit()

创建子进程并让子进程read阻塞等待任务。子进程通过管道的读端来接收任务索引,并执行相应的任务函数。

void channelInit()
{for(int i = 0; i < n; i++){int pipefd[2];pipe(pipefd);pid_t id = fork();if(id < 0)return ;else if(id == 0){close(pipefd[1]);dup2(pipefd[0], 0); // 为什么要重定向?——实现监听通道和子进程的解耦// 重定向后只需要固定监听0号文件描述符收到的文件即可,无需根据特定子进程监听不同的widfor(int i = 0; i < v.size(); i++){v[i].Close(); //子进程必须关闭继承父进程的、多余的wfd}ExecuteTask(); // 子进程在此阻塞等待}std::string channel_name = "channel_"+std::to_string(i);close(pipefd[0]);v.push_back(channel(pipefd[1], id, channel_name));}
}// 监听被重定向的wid 获得函数指针数组下标,执行相应函数
void ExecuteTask()
{int read_num = 0;int task_index = 0;while (true){read_num = read(0, &task_index, sizeof(int));if (read_num == -1){std::cerr << "管道读取失败!错误码:" << errno << std::endl;exit(-1);}else if (read_num == 0) // 读到0代表写端关闭,直接停止{std::cout << "子进程 " << getpid() << " 任务读取完成或管道关闭,退出。" << std::endl;break;}else{if (task_index >= 0 && task_index < 3){std::cout << "子进程 " << getpid() << " 执行任务 " << task_index << std::endl;tasks[task_index]();}else{std::cerr << "无效的任务索引:" << task_index << std::endl;}}}
}
问题:为什么要将子进程的管道读端重定向至标准输入?

实现监听通道和子进程的解耦。每个子进程的读端rid不同,后续read监听需要根据不同子进程调整rid。而重定向后只需要固定监听0号文件描述符收到的文件即可,实现了不同子进程监听同一个文件描述符的解耦。

4、ctrlProcess()

父进程根据轮询策略向子进程发送任务,控制子进程执行任务。

// 获取下一个通道索引
int nextChannel()
{static int next = 0;int channel = next;next++;next %= n;return channel;
}// 向通道发送任务命令
void sendTaskCommand(channel& chan, int taskCommand)
{write(chan._wfd, &taskCommand, sizeof(taskCommand));
}// 选择任务
int selectTask()
{return rand() % taskNum;
}// 控制进程执行一次
void ctrlProcessOnce()
{int taskCommand = selectTask();int channel_index = nextChannel();sendTaskCommand(v[channel_index], taskCommand);
}// 控制进程执行多次
void ctrlProcess(int times = -1)
{while(times--){ctrlProcessOnce();sleep(1);}
}
5、channelClose()

用于关闭所有通道,并等待所有子进程退出,同时获取子进程的退出状态和退出码。

// 关闭通道并等待子进程退出
void channelClose()
{int status = 0;for(int i = 0; i < n; i++){v[i].Close();wait(&status);int exit_status = (status&0x7f);int exit_code = ((status>>8)&0xff);std::cout << "子进程 " << getpid() << " 的退出状态为 " << exit_status << " 退出码为 " << exit_code << std::endl;std::cout << std::endl;}
}

这篇关于Linux | 进程池技术解析:利用无名管道实现并发任务处理(含实现代码)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1124235

相关文章

openCV中KNN算法的实现

《openCV中KNN算法的实现》KNN算法是一种简单且常用的分类算法,本文主要介绍了openCV中KNN算法的实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的... 目录KNN算法流程使用OpenCV实现KNNOpenCV 是一个开源的跨平台计算机视觉库,它提供了各

OpenCV图像形态学的实现

《OpenCV图像形态学的实现》本文主要介绍了OpenCV图像形态学的实现,包括腐蚀、膨胀、开运算、闭运算、梯度运算、顶帽运算和黑帽运算,文中通过示例代码介绍的非常详细,需要的朋友们下面随着小编来一起... 目录一、图像形态学简介二、腐蚀(Erosion)1. 原理2. OpenCV 实现三、膨胀China编程(

通过Spring层面进行事务回滚的实现

《通过Spring层面进行事务回滚的实现》本文主要介绍了通过Spring层面进行事务回滚的实现,包括声明式事务和编程式事务,具有一定的参考价值,感兴趣的可以了解一下... 目录声明式事务回滚:1. 基础注解配置2. 指定回滚异常类型3. ​不回滚特殊场景编程式事务回滚:1. ​使用 TransactionT

Android实现打开本地pdf文件的两种方式

《Android实现打开本地pdf文件的两种方式》在现代应用中,PDF格式因其跨平台、稳定性好、展示内容一致等特点,在Android平台上,如何高效地打开本地PDF文件,不仅关系到用户体验,也直接影响... 目录一、项目概述二、相关知识2.1 PDF文件基本概述2.2 android 文件访问与存储权限2.

使用Python实现全能手机虚拟键盘的示例代码

《使用Python实现全能手机虚拟键盘的示例代码》在数字化办公时代,你是否遇到过这样的场景:会议室投影电脑突然键盘失灵、躺在沙发上想远程控制书房电脑、或者需要给长辈远程协助操作?今天我要分享的Pyth... 目录一、项目概述:不止于键盘的远程控制方案1.1 创新价值1.2 技术栈全景二、需求实现步骤一、需求

Spring Shell 命令行实现交互式Shell应用开发

《SpringShell命令行实现交互式Shell应用开发》本文主要介绍了SpringShell命令行实现交互式Shell应用开发,能够帮助开发者快速构建功能丰富的命令行应用程序,具有一定的参考价... 目录引言一、Spring Shell概述二、创建命令类三、命令参数处理四、命令分组与帮助系统五、自定义S

SpringBatch数据写入实现

《SpringBatch数据写入实现》SpringBatch通过ItemWriter接口及其丰富的实现,提供了强大的数据写入能力,本文主要介绍了SpringBatch数据写入实现,具有一定的参考价值,... 目录python引言一、ItemWriter核心概念二、数据库写入实现三、文件写入实现四、多目标写入

SpringQuartz定时任务核心组件JobDetail与Trigger配置

《SpringQuartz定时任务核心组件JobDetail与Trigger配置》Spring框架与Quartz调度器的集成提供了强大而灵活的定时任务解决方案,本文主要介绍了SpringQuartz定... 目录引言一、Spring Quartz基础架构1.1 核心组件概述1.2 Spring集成优势二、J

Android Studio 配置国内镜像源的实现步骤

《AndroidStudio配置国内镜像源的实现步骤》本文主要介绍了AndroidStudio配置国内镜像源的实现步骤,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,... 目录一、修改 hosts,解决 SDK 下载失败的问题二、修改 gradle 地址,解决 gradle

SpringSecurity JWT基于令牌的无状态认证实现

《SpringSecurityJWT基于令牌的无状态认证实现》SpringSecurity中实现基于JWT的无状态认证是一种常见的做法,本文就来介绍一下SpringSecurityJWT基于令牌的无... 目录引言一、JWT基本原理与结构二、Spring Security JWT依赖配置三、JWT令牌生成与