Linux | 进程池技术解析:利用无名管道实现并发任务处理(含实现代码)

本文主要是介绍Linux | 进程池技术解析:利用无名管道实现并发任务处理(含实现代码),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

在血海里游泳,一直游到海水变蓝。——何小鹏

2024.8.31

目录

一、进程池

二、使用匿名管道实现进程池的核心

前置知识:管道的四种情况和五个特征

三、代码实现

四、实现代码详解

main()

2、loadTask()

3、channelInit()

问题:为什么要将子进程的管道读端重定向至标准输入?

4、ctrlProcess()

5、channelClose()


一、进程池

多任务处理是提高系统性能和响应速度的关键。进程池技术作为一种有效的资源管理和任务调度策略,已经成为并发编程中不可或缺的一部分。本文探讨进程池的工作原理,以及如何通过使用无名管道来实现高效的并发任务处理。

  • 定义:进程池由一组预先创建的空闲进程(资源进程)和管理这些进程的管理进程组成。
  • 作用:优化资源管理和提高系统效率,通过预先创建进程减少频繁创建和销毁进程的开销。
  • 预期并发效果:虽然进程池中的进程数量固定,但可以并行处理多个任务,实现并发效果。
二、使用匿名管道实现进程池的核心
  • 主函数逻辑:创建任务,创建子进程池,发送任务,关闭写端和回收子进程。
  • 父进程创建进程池:使用fork()函数创建子进程,子进程阻塞在read()函数处等待任务。
  • 子进程退出时机:当所有管道写端关闭时,子进程通过read()函数返回0值判断退出。
前置知识:管道的四种情况和五个特征

管道的四种情况:

  1. 如果管道是空的,则读取端被阻塞
  2. 如果管道是满的,则写入端被阻塞
  3. 如果关闭了管道的读端,那管道没必要存在,被13号信号杀死
  4. 如果关闭了管道的写端,读取完毕后管道返回0,表示读到了文件末尾

管道的五个特征:

  1. 匿名管道只能用于有血缘关系的进程通信,常用于父子间通信。
  2. 管道内部实现了同步机制,读写具有明显的顺序性。
  3. 管道的生命周期是随进程的,随着进程使用管道而创建缓冲区, 随进程的退出而释放销毁。
  4. 管道通通信是面向字节流的,读写次数是可以不匹配的,读到的数据可能是单次残缺的,也可能是多次堆积的
  5. 管道通信是特殊的半双工模式,半双工是指支持读写,但不能同时读写,特殊在只支持信息的单向传递。

三、代码实现
#include<iostream>
#include<unistd.h>
#include<vector>
#include<string>
#include<sys/wait.h>
#include<sys/types.h>// 定义任务函数指针类型
typedef void (*task_t)();
#define taskNum 3 // 定义任务数量
int n; // 进程数量
task_t tasks[taskNum]; // 任务函数指针数组
struct channel
{int _wfd; // 管道写端文件描述符pid_t _task; // 子进程IDstd::string _name; // 通道名称// 构造函数channel(int wfd, pid_t task, const std::string& name = "channel"):_wfd(wfd),_task(task),_name(name){}// 关闭管道写端void Close(){close(_wfd);}};
std::vector<channel> v; // 通道向量// 打印任务
void print()
{std::cout << "this is print task" << std::endl;
}// 下载任务
void download()
{std::cout << "this is download task" << std::endl;
}// 刷新任务
void flush()
{std::cout << "this is flush task" << std::endl;
}// 加载任务
void loadTask()
{tasks[0] = print;tasks[1] = download;tasks[2] = flush;
}// 执行任务
void ExecuteTask()
{int read_num = 0;int task_index = 0;while (true){read_num = read(0, &task_index, sizeof(int));if (read_num == -1){std::cerr << "管道读取失败!错误码:" << errno << std::endl;exit(-1);}else if (read_num == 0) // 读到0代表写端关闭,直接停止{std::cout << "子进程 " << getpid() << " 任务读取完成或管道关闭,退出。" << std::endl;exit(0);}else{if (task_index >= 0 && task_index < 3){std::cout << "子进程 " << getpid() << " 执行任务 " << task_index << std::endl;tasks[task_index]();}else{std::cerr << "无效的任务索引:" << task_index << std::endl;}}}
}// 初始化通道
void channelInit()
{for(int i = 0; i < n; i++){int pipefd[2];pipe(pipefd);pid_t id = fork();if(id < 0)return;else if(id == 0){close(pipefd[1]);dup2(pipefd[0], 0);for(int i = 0; i < v.size(); i++){v[i].Close();}ExecuteTask();}std::string channel_name = "channel_"+std::to_string(i);close(pipefd[0]);v.push_back(channel(pipefd[1], id, channel_name));}
}// 获取下一个通道索引
int nextChannel()
{static int next = 0;int channel = next;next++;next %= n;return channel;
}// 向通道发送任务命令
void sendTaskCommand(channel& chan, int taskCommand)
{write(chan._wfd, &taskCommand, sizeof(taskCommand));
}// 选择任务
int selectTask()
{return rand() % taskNum;
}// 控制进程执行一次
void ctrlProcessOnce()
{int taskCommand = selectTask();int channel_index = nextChannel();sendTaskCommand(v[channel_index], taskCommand);
}// 控制进程执行多次
void ctrlProcess(int times = -1)
{while(times--){ctrlProcessOnce();sleep(1);}
}// 关闭通道并等待子进程退出
void channelClose()
{int status = 0;for(int i = 0; i < n; i++){v[i].Close();wait(&status);int exit_status = (status&0x7f);int exit_code = ((status>>8)&0xff);std::cout << "子进程 " << getpid() << " 的退出状态为 " << exit_status << " 退出码为 " << exit_code << std::endl;std::cout << std::endl;}
}// 主函数
int main()
{   loadTask();std::cout << "请输入你要创建的进程个数"<<std::endl;std::cin >> n;int count = -1;std::cout << "请输入要执行程序的次数" << std::endl;std::cin >> count;channelInit();ctrlProcess(count);channelClose();sleep(3);return 0;
}
四、实现代码详解
main()

负责执行初始化任务、创建进程池、控制进程执行任务,以及关闭通道并等待子进程退出。

#include<iostream>
#include<unistd.h>
#include<vector>
#include<string>
#include<sys/types.h>typedef void (*task_t)();
#define taskNum 3
int n;
task_t tasks[taskNum];
struct channel
{int _wfd;pid_t _task;std::string _name;channel(int wfd, pid_t task, const std::string& name = "channel"):_wfd(wfd),_task(task),_name(name){}void Close(){close(_wfd);}};
std::vector<channel> v; // 存储创建的子进程信息,包括读wid/进程id/进程名称// 需要执行的三个函数
void print()
{std::cout << "this is print task" << std::endl;
}void download()
{std::cout << "this is download task" << std::endl;
}void flush()
{std::cout << "this is flush task" << std::endl;
}int main()
{   std::cout << "请输入你要创建的进程个数"<<std::endl;std::cin >> n;loadTask(); // 通过函数指针解耦调用函数,将需要运行的函数放入函数指针数组中channelInit();ctrlProcess();return 0;
}
2、loadTask()

将函数通过函数指针将不同的任务函数加载到数组中,以便后续可以通过索引来调用相应的任务

// 通过函数指针解耦调用函数,将需要运行的函数放入函数指针数组中
void loadTask()
{tasks[0] = print;tasks[1] = download;tasks[2] = flush;
}
3、channelInit()

创建子进程并让子进程read阻塞等待任务。子进程通过管道的读端来接收任务索引,并执行相应的任务函数。

void channelInit()
{for(int i = 0; i < n; i++){int pipefd[2];pipe(pipefd);pid_t id = fork();if(id < 0)return ;else if(id == 0){close(pipefd[1]);dup2(pipefd[0], 0); // 为什么要重定向?——实现监听通道和子进程的解耦// 重定向后只需要固定监听0号文件描述符收到的文件即可,无需根据特定子进程监听不同的widfor(int i = 0; i < v.size(); i++){v[i].Close(); //子进程必须关闭继承父进程的、多余的wfd}ExecuteTask(); // 子进程在此阻塞等待}std::string channel_name = "channel_"+std::to_string(i);close(pipefd[0]);v.push_back(channel(pipefd[1], id, channel_name));}
}// 监听被重定向的wid 获得函数指针数组下标,执行相应函数
void ExecuteTask()
{int read_num = 0;int task_index = 0;while (true){read_num = read(0, &task_index, sizeof(int));if (read_num == -1){std::cerr << "管道读取失败!错误码:" << errno << std::endl;exit(-1);}else if (read_num == 0) // 读到0代表写端关闭,直接停止{std::cout << "子进程 " << getpid() << " 任务读取完成或管道关闭,退出。" << std::endl;break;}else{if (task_index >= 0 && task_index < 3){std::cout << "子进程 " << getpid() << " 执行任务 " << task_index << std::endl;tasks[task_index]();}else{std::cerr << "无效的任务索引:" << task_index << std::endl;}}}
}
问题:为什么要将子进程的管道读端重定向至标准输入?

实现监听通道和子进程的解耦。每个子进程的读端rid不同,后续read监听需要根据不同子进程调整rid。而重定向后只需要固定监听0号文件描述符收到的文件即可,实现了不同子进程监听同一个文件描述符的解耦。

4、ctrlProcess()

父进程根据轮询策略向子进程发送任务,控制子进程执行任务。

// 获取下一个通道索引
int nextChannel()
{static int next = 0;int channel = next;next++;next %= n;return channel;
}// 向通道发送任务命令
void sendTaskCommand(channel& chan, int taskCommand)
{write(chan._wfd, &taskCommand, sizeof(taskCommand));
}// 选择任务
int selectTask()
{return rand() % taskNum;
}// 控制进程执行一次
void ctrlProcessOnce()
{int taskCommand = selectTask();int channel_index = nextChannel();sendTaskCommand(v[channel_index], taskCommand);
}// 控制进程执行多次
void ctrlProcess(int times = -1)
{while(times--){ctrlProcessOnce();sleep(1);}
}
5、channelClose()

用于关闭所有通道,并等待所有子进程退出,同时获取子进程的退出状态和退出码。

// 关闭通道并等待子进程退出
void channelClose()
{int status = 0;for(int i = 0; i < n; i++){v[i].Close();wait(&status);int exit_status = (status&0x7f);int exit_code = ((status>>8)&0xff);std::cout << "子进程 " << getpid() << " 的退出状态为 " << exit_status << " 退出码为 " << exit_code << std::endl;std::cout << std::endl;}
}

这篇关于Linux | 进程池技术解析:利用无名管道实现并发任务处理(含实现代码)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1124235

相关文章

Linux中shell解析脚本的通配符、元字符、转义符说明

《Linux中shell解析脚本的通配符、元字符、转义符说明》:本文主要介绍shell通配符、元字符、转义符以及shell解析脚本的过程,通配符用于路径扩展,元字符用于多命令分割,转义符用于将特殊... 目录一、linux shell通配符(wildcard)二、shell元字符(特殊字符 Meta)三、s

Linux之软件包管理器yum详解

《Linux之软件包管理器yum详解》文章介绍了现代类Unix操作系统中软件包管理和包存储库的工作原理,以及如何使用包管理器如yum来安装、更新和卸载软件,文章还介绍了如何配置yum源,更新系统软件包... 目录软件包yumyum语法yum常用命令yum源配置文件介绍更新yum源查看已经安装软件的方法总结软

linux报错INFO:task xxxxxx:634 blocked for more than 120 seconds.三种解决方式

《linux报错INFO:taskxxxxxx:634blockedformorethan120seconds.三种解决方式》文章描述了一个Linux最小系统运行时出现的“hung_ta... 目录1.问题描述2.解决办法2.1 缩小文件系统缓存大小2.2 修改系统IO调度策略2.3 取消120秒时间限制3

Linux alias的三种使用场景方式

《Linuxalias的三种使用场景方式》文章介绍了Linux中`alias`命令的三种使用场景:临时别名、用户级别别名和系统级别别名,临时别名仅在当前终端有效,用户级别别名在当前用户下所有终端有效... 目录linux alias三种使用场景一次性适用于当前用户全局生效,所有用户都可调用删除总结Linux

Linux:alias如何设置永久生效

《Linux:alias如何设置永久生效》在Linux中设置别名永久生效的步骤包括:在/root/.bashrc文件中配置别名,保存并退出,然后使用source命令(或点命令)使配置立即生效,这样,别... 目录linux:alias设置永久生效步骤保存退出后功能总结Linux:alias设置永久生效步骤

Oracle查询优化之高效实现仅查询前10条记录的方法与实践

《Oracle查询优化之高效实现仅查询前10条记录的方法与实践》:本文主要介绍Oracle查询优化之高效实现仅查询前10条记录的相关资料,包括使用ROWNUM、ROW_NUMBER()函数、FET... 目录1. 使用 ROWNUM 查询2. 使用 ROW_NUMBER() 函数3. 使用 FETCH FI

Python脚本实现自动删除C盘临时文件夹

《Python脚本实现自动删除C盘临时文件夹》在日常使用电脑的过程中,临时文件夹往往会积累大量的无用数据,占用宝贵的磁盘空间,下面我们就来看看Python如何通过脚本实现自动删除C盘临时文件夹吧... 目录一、准备工作二、python脚本编写三、脚本解析四、运行脚本五、案例演示六、注意事项七、总结在日常使用

Java实现Excel与HTML互转

《Java实现Excel与HTML互转》Excel是一种电子表格格式,而HTM则是一种用于创建网页的标记语言,虽然两者在用途上存在差异,但有时我们需要将数据从一种格式转换为另一种格式,下面我们就来看看... Excel是一种电子表格格式,广泛用于数据处理和分析,而HTM则是一种用于创建网页的标记语言。虽然两

Java中Springboot集成Kafka实现消息发送和接收功能

《Java中Springboot集成Kafka实现消息发送和接收功能》Kafka是一个高吞吐量的分布式发布-订阅消息系统,主要用于处理大规模数据流,它由生产者、消费者、主题、分区和代理等组件构成,Ka... 目录一、Kafka 简介二、Kafka 功能三、POM依赖四、配置文件五、生产者六、消费者一、Kaf

使用Python实现在Word中添加或删除超链接

《使用Python实现在Word中添加或删除超链接》在Word文档中,超链接是一种将文本或图像连接到其他文档、网页或同一文档中不同部分的功能,本文将为大家介绍一下Python如何实现在Word中添加或... 在Word文档中,超链接是一种将文本或图像连接到其他文档、网页或同一文档中不同部分的功能。通过添加超