本文主要是介绍Linux | 进程池技术解析:利用无名管道实现并发任务处理(含实现代码),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
在血海里游泳,一直游到海水变蓝。——何小鹏
2024.8.31
目录
一、进程池
二、使用匿名管道实现进程池的核心
前置知识:管道的四种情况和五个特征
三、代码实现
四、实现代码详解
main()
2、loadTask()
3、channelInit()
问题:为什么要将子进程的管道读端重定向至标准输入?
4、ctrlProcess()
5、channelClose()
一、进程池
多任务处理是提高系统性能和响应速度的关键。进程池技术作为一种有效的资源管理和任务调度策略,已经成为并发编程中不可或缺的一部分。本文探讨进程池的工作原理,以及如何通过使用无名管道来实现高效的并发任务处理。
- 定义:进程池由一组预先创建的空闲进程(资源进程)和管理这些进程的管理进程组成。
- 作用:优化资源管理和提高系统效率,通过预先创建进程减少频繁创建和销毁进程的开销。
- 预期并发效果:虽然进程池中的进程数量固定,但可以并行处理多个任务,实现并发效果。
二、使用匿名管道实现进程池的核心
- 主函数逻辑:创建任务,创建子进程池,发送任务,关闭写端和回收子进程。
- 父进程创建进程池:使用
fork()
函数创建子进程,子进程阻塞在read()
函数处等待任务。 - 子进程退出时机:当所有管道写端关闭时,子进程通过
read()
函数返回0值判断退出。
前置知识:管道的四种情况和五个特征
管道的四种情况:
- 如果管道是空的,则读取端被阻塞
- 如果管道是满的,则写入端被阻塞
- 如果关闭了管道的读端,那管道没必要存在,被13号信号杀死
- 如果关闭了管道的写端,读取完毕后管道返回0,表示读到了文件末尾
管道的五个特征:
- 匿名管道只能用于有血缘关系的进程通信,常用于父子间通信。
- 管道内部实现了同步机制,读写具有明显的顺序性。
- 管道的生命周期是随进程的,随着进程使用管道而创建缓冲区, 随进程的退出而释放销毁。
- 管道通通信是面向字节流的,读写次数是可以不匹配的,读到的数据可能是单次残缺的,也可能是多次堆积的
- 管道通信是特殊的半双工模式,半双工是指支持读写,但不能同时读写,特殊在只支持信息的单向传递。
三、代码实现
#include<iostream>
#include<unistd.h>
#include<vector>
#include<string>
#include<sys/wait.h>
#include<sys/types.h>// 定义任务函数指针类型
typedef void (*task_t)();
#define taskNum 3 // 定义任务数量
int n; // 进程数量
task_t tasks[taskNum]; // 任务函数指针数组
struct channel
{int _wfd; // 管道写端文件描述符pid_t _task; // 子进程IDstd::string _name; // 通道名称// 构造函数channel(int wfd, pid_t task, const std::string& name = "channel"):_wfd(wfd),_task(task),_name(name){}// 关闭管道写端void Close(){close(_wfd);}};
std::vector<channel> v; // 通道向量// 打印任务
void print()
{std::cout << "this is print task" << std::endl;
}// 下载任务
void download()
{std::cout << "this is download task" << std::endl;
}// 刷新任务
void flush()
{std::cout << "this is flush task" << std::endl;
}// 加载任务
void loadTask()
{tasks[0] = print;tasks[1] = download;tasks[2] = flush;
}// 执行任务
void ExecuteTask()
{int read_num = 0;int task_index = 0;while (true){read_num = read(0, &task_index, sizeof(int));if (read_num == -1){std::cerr << "管道读取失败!错误码:" << errno << std::endl;exit(-1);}else if (read_num == 0) // 读到0代表写端关闭,直接停止{std::cout << "子进程 " << getpid() << " 任务读取完成或管道关闭,退出。" << std::endl;exit(0);}else{if (task_index >= 0 && task_index < 3){std::cout << "子进程 " << getpid() << " 执行任务 " << task_index << std::endl;tasks[task_index]();}else{std::cerr << "无效的任务索引:" << task_index << std::endl;}}}
}// 初始化通道
void channelInit()
{for(int i = 0; i < n; i++){int pipefd[2];pipe(pipefd);pid_t id = fork();if(id < 0)return;else if(id == 0){close(pipefd[1]);dup2(pipefd[0], 0);for(int i = 0; i < v.size(); i++){v[i].Close();}ExecuteTask();}std::string channel_name = "channel_"+std::to_string(i);close(pipefd[0]);v.push_back(channel(pipefd[1], id, channel_name));}
}// 获取下一个通道索引
int nextChannel()
{static int next = 0;int channel = next;next++;next %= n;return channel;
}// 向通道发送任务命令
void sendTaskCommand(channel& chan, int taskCommand)
{write(chan._wfd, &taskCommand, sizeof(taskCommand));
}// 选择任务
int selectTask()
{return rand() % taskNum;
}// 控制进程执行一次
void ctrlProcessOnce()
{int taskCommand = selectTask();int channel_index = nextChannel();sendTaskCommand(v[channel_index], taskCommand);
}// 控制进程执行多次
void ctrlProcess(int times = -1)
{while(times--){ctrlProcessOnce();sleep(1);}
}// 关闭通道并等待子进程退出
void channelClose()
{int status = 0;for(int i = 0; i < n; i++){v[i].Close();wait(&status);int exit_status = (status&0x7f);int exit_code = ((status>>8)&0xff);std::cout << "子进程 " << getpid() << " 的退出状态为 " << exit_status << " 退出码为 " << exit_code << std::endl;std::cout << std::endl;}
}// 主函数
int main()
{ loadTask();std::cout << "请输入你要创建的进程个数"<<std::endl;std::cin >> n;int count = -1;std::cout << "请输入要执行程序的次数" << std::endl;std::cin >> count;channelInit();ctrlProcess(count);channelClose();sleep(3);return 0;
}
四、实现代码详解
main()
负责执行初始化任务、创建进程池、控制进程执行任务,以及关闭通道并等待子进程退出。
#include<iostream>
#include<unistd.h>
#include<vector>
#include<string>
#include<sys/types.h>typedef void (*task_t)();
#define taskNum 3
int n;
task_t tasks[taskNum];
struct channel
{int _wfd;pid_t _task;std::string _name;channel(int wfd, pid_t task, const std::string& name = "channel"):_wfd(wfd),_task(task),_name(name){}void Close(){close(_wfd);}};
std::vector<channel> v; // 存储创建的子进程信息,包括读wid/进程id/进程名称// 需要执行的三个函数
void print()
{std::cout << "this is print task" << std::endl;
}void download()
{std::cout << "this is download task" << std::endl;
}void flush()
{std::cout << "this is flush task" << std::endl;
}int main()
{ std::cout << "请输入你要创建的进程个数"<<std::endl;std::cin >> n;loadTask(); // 通过函数指针解耦调用函数,将需要运行的函数放入函数指针数组中channelInit();ctrlProcess();return 0;
}
2、loadTask()
将函数通过函数指针将不同的任务函数加载到数组中,以便后续可以通过索引来调用相应的任务
// 通过函数指针解耦调用函数,将需要运行的函数放入函数指针数组中
void loadTask()
{tasks[0] = print;tasks[1] = download;tasks[2] = flush;
}
3、channelInit()
创建子进程并让子进程read阻塞等待任务。子进程通过管道的读端来接收任务索引,并执行相应的任务函数。
void channelInit()
{for(int i = 0; i < n; i++){int pipefd[2];pipe(pipefd);pid_t id = fork();if(id < 0)return ;else if(id == 0){close(pipefd[1]);dup2(pipefd[0], 0); // 为什么要重定向?——实现监听通道和子进程的解耦// 重定向后只需要固定监听0号文件描述符收到的文件即可,无需根据特定子进程监听不同的widfor(int i = 0; i < v.size(); i++){v[i].Close(); //子进程必须关闭继承父进程的、多余的wfd}ExecuteTask(); // 子进程在此阻塞等待}std::string channel_name = "channel_"+std::to_string(i);close(pipefd[0]);v.push_back(channel(pipefd[1], id, channel_name));}
}// 监听被重定向的wid 获得函数指针数组下标,执行相应函数
void ExecuteTask()
{int read_num = 0;int task_index = 0;while (true){read_num = read(0, &task_index, sizeof(int));if (read_num == -1){std::cerr << "管道读取失败!错误码:" << errno << std::endl;exit(-1);}else if (read_num == 0) // 读到0代表写端关闭,直接停止{std::cout << "子进程 " << getpid() << " 任务读取完成或管道关闭,退出。" << std::endl;break;}else{if (task_index >= 0 && task_index < 3){std::cout << "子进程 " << getpid() << " 执行任务 " << task_index << std::endl;tasks[task_index]();}else{std::cerr << "无效的任务索引:" << task_index << std::endl;}}}
}
问题:为什么要将子进程的管道读端重定向至标准输入?
实现监听通道和子进程的解耦。每个子进程的读端rid不同,后续read监听需要根据不同子进程调整rid。而重定向后只需要固定监听0号文件描述符收到的文件即可,实现了不同子进程监听同一个文件描述符的解耦。
4、ctrlProcess()
父进程根据轮询策略向子进程发送任务,控制子进程执行任务。
// 获取下一个通道索引
int nextChannel()
{static int next = 0;int channel = next;next++;next %= n;return channel;
}// 向通道发送任务命令
void sendTaskCommand(channel& chan, int taskCommand)
{write(chan._wfd, &taskCommand, sizeof(taskCommand));
}// 选择任务
int selectTask()
{return rand() % taskNum;
}// 控制进程执行一次
void ctrlProcessOnce()
{int taskCommand = selectTask();int channel_index = nextChannel();sendTaskCommand(v[channel_index], taskCommand);
}// 控制进程执行多次
void ctrlProcess(int times = -1)
{while(times--){ctrlProcessOnce();sleep(1);}
}
5、channelClose()
用于关闭所有通道,并等待所有子进程退出,同时获取子进程的退出状态和退出码。
// 关闭通道并等待子进程退出
void channelClose()
{int status = 0;for(int i = 0; i < n; i++){v[i].Close();wait(&status);int exit_status = (status&0x7f);int exit_code = ((status>>8)&0xff);std::cout << "子进程 " << getpid() << " 的退出状态为 " << exit_status << " 退出码为 " << exit_code << std::endl;std::cout << std::endl;}
}
这篇关于Linux | 进程池技术解析:利用无名管道实现并发任务处理(含实现代码)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!