Linux使用匿名管道实现进程池得以高效通信

2024-01-29 22:52

本文主要是介绍Linux使用匿名管道实现进程池得以高效通信,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

                                               🎬慕斯主页修仙—别有洞天

                                              ♈️今日夜电波:Nonsense—Sabrina Carpenter

                                                                0:50━━━━━━️💟──────── 2:43
                                                                    🔄   ◀️   ⏸   ▶️    ☰  

                                      💗关注👍点赞🙌收藏您的每一次鼓励都是对我莫大的支持😍


目录

思路梳理

匿名管道知识回忆

匿名管道实现进程池思路

池化技术怎么提高效率?

具体实现

进程以及管道的创建操作

分发任务操作

回收资源操作

解决上述所提到Bug

总体代码及代码效果

Makefile

Task.hpp

mulpipe.cpp

实现效果


思路梳理

匿名管道知识回忆

        上一篇的文章中,详细介绍了管道的知识点,下面还是复习一下对于匿名管道相关的知识点。如下是匿名管道的一个示例图,这表现了两个“有血缘关系”的两个进程之间通过匿名管道进行通信的过程,我们通过控制struct files *fd_array[]中读或者写的struct files的开关来实现两个进程间的通信:

        我们主要通过如下的接口创建匿名管道来实现以上的匿名管道通信:

#include <unistd.h>
功能:创建一无名管道
原型
int pipe(int fd[2]);
参数
fd:文件描述符数组,其中fd[0]表示读端, fd[1]表示写端
返回值:成功返回0,失败返回错误代码

匿名管道实现进程池思路

        说大白话(●—●):我们使用一个父进程创建很多的管道,再创建对应数量的进程,然后这些管道分别与其他的进程进行直接的连接。这样我们就提前创建和联系好了一定数量的进程。我们想让父进程向其中一个子进程发消息就可通过选择管道直接发消息,而如果不发消息其它进程只会等待。通过信息的发送,我们可以就通过选择管道从而让子进程分别执行对应的任务。大致的图解如下:

池化技术怎么提高效率?

        池化技术通过资源共享和优化来提高效率,具体表现在以下几个方面:

  • 资源复用:池化技术通过重用已创建的资源,减少了频繁创建和销毁资源的开销。例如,线程池中的线程可以被多个任务重复使用,这样可以避免每次任务执行时都创建新线程的开销。
  • 减少等待时间:池化技术可以减少请求的等待时间。因为资源是预先分配好的,当有新的请求到来时,可以立即使用池中的资源,而不需要等待资源的创建过程。
  • 提高响应速度:由于资源已经准备好,池化技术可以快速响应请求,提高了处理速度。这对于需要快速响应的系统来说尤其重要。
  • 统一管理:池化技术提供了对资源的集中管理,这有助于监控系统资源的使用情况,及时回收不再使用的资源,避免资源浪费。
  • 优化性能:在大数据处理等场景中,池化技术可以通过合并多个请求来减少数据处理的时间和空间复杂度,从而提高数据处理的性能。

具体实现

进程以及管道的创建操作

        创建一个channel类用来来存储管道的读文件描述符ctrlfd以及进程描述符workerid,完成初始化操作以及后续的销毁操作。

static int number = 1;//标识对应的进程和管道class channel
{
public:channel(int fd, pid_t id) : ctrlfd(fd), workerid(id){name = "channel-" + std::to_string(number++);}public:int ctrlfd;pid_t workerid;std::string name;
};

        再根据先描述在组织的原则,我们在主函数使用一个std::vector<channel> channels来管理上面的结构体,可以根据需求进行增删查改等等操作。在完成这些预备操作后,创建对应数量的进程以及管道。

        特别注意:如下函数中的std::vector<int> old;以及如下代码是为了解决父子进程继承而产生的一些bug,这个将在最后解释:

        	std::vector<int> old;if(!old.empty()){for(auto fd : old){close(fd);}PrintFd(old);}old.push_back(pipefd[1]);

        如下函数的操作为创建管道以及进程,可以先忽略上述所提到的解决bug的代码,通过循环创建对应的子进程、关闭父子进程对应的读写文件,并且存储到vector<channel> *c(也就是主函数的中channels),下面的work()函数为子进程要做的工作,可以理解了后面的分发任务操作再来理解。

const int num = 5;//全局定义创建管道以及进程数
void CreateChannels(std::vector<channel> *c)
{std::vector<int> old;for (int i = 0; i < num; i++){// 1. 定义并创建管道int pipefd[2];int n = pipe(pipefd);assert(n == 0);(void)n;// 2. 创建进程pid_t id = fork();assert(id != -1);// 3. 构建单向通信信道if (id == 0) // child{if(!old.empty()){for(auto fd : old){close(fd);}PrintFd(old);}close(pipefd[1]);//关闭写dup2(pipefd[0], 0);//重定向写入Work();//子进程工作exit(0); // 会自动关闭自己打开的所有的fd}// fatherclose(pipefd[0]);c->push_back(channel(pipefd[1], id));old.push_back(pipefd[1]);// childid, pipefd[1]}
}

        因为前面我们已经重定向了管道的写入作为子进程的写入,接下来通过read就会读取对应的操作,需要注意的是:我们是通过一个int类型的变量来控制要完成的任务,后续再task.hpp中会定义对应要完成的任务。通过read的返回值来判断是要执行任务还是退出,我们在管道的知识中知道,read会等待write,也就是等待数据的输入。当写进程退出,读进程会跟着退出,我们根据以上特性来判断是要执行任务还是等待任务还是退出进程:

void Work()
{while (true){int code = 0;ssize_t n = read(0, &code, sizeof(code));if (n == sizeof(code)){if (!init.CheckSafe(code))continue;init.RunTask(code);}else if (n == 0){break;}else{// do nothing}}std::cout << "child quit" << std::endl;
}

分发任务操作

        当我们创建好管道以及进程后,可以注意到接下来的都是没有经过 if (id == 0) 控制下的程序,也就是说接下来的程序都是父进程运行的程序,因此我们就可以让父进程来分配任务使得子进程完成任务。接下来,我们创建一个task.hpp来模拟创建的任务以及对应的封装、任务分发等等操作如下:

#pragma once#include <iostream>
#include <functional>
#include <vector>
#include <ctime>
#include <unistd.h>
#include <stdlib.h>// using task_t = std::function<void()>;
typedef std::function<void()> task_t;void Download()
{std::cout << "我是一个下载任务"<< " 处理者: " << getpid() << std::endl;
}void PrintLog()
{std::cout << "我是一个打印日志的任务"<< " 处理者: " << getpid() << std::endl;
}void PushVideoStream()
{std::cout << "这是一个推送视频流的任务"<< " 处理者: " << getpid() << std::endl;
}// void ProcessExit()
// {
//     exit(0);
// }class Init
{
public:// 任务码const static int g_download_code = 0;const static int g_printlog_code = 1;const static int g_push_videostream_code = 2;// 任务集合std::vector<task_t> tasks;public:Init(){tasks.push_back(Download);tasks.push_back(PrintLog);tasks.push_back(PushVideoStream);srand(time(nullptr) ^ getpid());}bool CheckSafe(int code){if (code >= 0 && code < tasks.size())return true;elsereturn false;}void RunTask(int code){return tasks[code]();}int SelectTask(){return rand() % tasks.size();}std::string ToDesc(int code){switch (code){case g_download_code:return "Download";case g_printlog_code:return "PrintLog";case g_push_videostream_code:return "PushVideoStream";default:return "Unknow";}}
};Init init; // 定义对象

        传入主函数用于管理的channels,flag用于控制进程执行完任务后是否需要退出(1表示要退出,0表示不退出),num为要执行任务的次数,需要注意的是num是要在flag为1的前提下才能有效的,如果不传则程序只会执行一次。在选择完任务以及进程后,通过write来向指定的管道写入。具体读写操作可看:Linux进程间通信(IPC)机制之一:管道(Pipes)详解:匿名管道的特性与情况

void SendCommand(const std::vector<channel> &c, bool flag, int num = -1)
{int pos = 0;while (true){// 1. 选择任务int command = init.SelectTask();// 2. 选择信道(进程)const auto &channel = c[pos++];pos %= c.size();// debugstd::cout << "send command " << init.ToDesc(command) << "[" << command << "]"<< " in "<< channel.name << " worker is : " << channel.workerid << std::endl;// 3. 发送任务write(channel.ctrlfd, &command, sizeof(command));// 4. 判断是否要退出if (!flag){num--;if (num <= 0)break;}sleep(1);}std::cout << "SendCommand done..." << std::endl;
}

回收资源操作

        通过close关闭对应的管道,waitpid等待子进程的结束。

const int num = 5;//全局定义创建管道以及进程数
void ReleaseChannels(std::vector<channel> c)
{// version 2// int num = c.size() - 1;// for (; num >= 0; num--)// {//     close(c[num].ctrlfd);//     waitpid(c[num].workerid, nullptr, 0);// }// version 1for (const auto &channel : c){close(channel.ctrlfd);waitpid(channel.workerid, nullptr, 0);}// for (const auto &channel : c)// {//     pid_t rid = waitpid(channel.workerid, nullptr, 0);//     if (rid == channel.workerid)//     {//         std::cout << "wait child: " << channel.workerid << " success" << std::endl;//     }// }
}

解决上述所提到Bug

        如下代码:

        	std::vector<int> old;if(!old.empty()){for(auto fd : old){close(fd);}PrintFd(old);}old.push_back(pipefd[1]);

        这是个什么Bug呢?如果不加上上这段代码,那么关闭进程及管道就必须从最后面生成的进程和管道向前关闭。为啥呢?这是因为当我们父进程依次创建子进程,其中的对于管道的读写操作struct file也被继承了下来,也就是说有着上一个生成的子进程会被下一个子进程的写操作struct file指向,而下下个生成的子进程会指向前面两个子进程,以此类推...因此,我们需要在创建的时候关闭新创建子进程对应的写操作。

        具体的子进程继承写操作的struct file例子如下:

总体代码及代码效果

Makefile

processpool:mulpipe.cppg++ -o $@ $^ -std=c++11
.PHONY:clean
clean:rm -f processpool

Task.hpp

#pragma once#include <iostream>
#include <functional>
#include <vector>
#include <ctime>
#include <unistd.h>
#include <stdlib.h>// using task_t = std::function<void()>;
typedef std::function<void()> task_t;void Download()
{std::cout << "我是一个下载任务"<< " 处理者: " << getpid() << std::endl;
}void PrintLog()
{std::cout << "我是一个打印日志的任务"<< " 处理者: " << getpid() << std::endl;
}void PushVideoStream()
{std::cout << "这是一个推送视频流的任务"<< " 处理者: " << getpid() << std::endl;
}// void ProcessExit()
// {
//     exit(0);
// }class Init
{
public:// 任务码const static int g_download_code = 0;const static int g_printlog_code = 1;const static int g_push_videostream_code = 2;// 任务集合std::vector<task_t> tasks;public:Init(){tasks.push_back(Download);tasks.push_back(PrintLog);tasks.push_back(PushVideoStream);srand(time(nullptr) ^ getpid());}bool CheckSafe(int code){if (code >= 0 && code < tasks.size())return true;elsereturn false;}void RunTask(int code){return tasks[code]();}int SelectTask(){return rand() % tasks.size();}std::string ToDesc(int code){switch (code){case g_download_code:return "Download";case g_printlog_code:return "PrintLog";case g_push_videostream_code:return "PushVideoStream";default:return "Unknow";}}
};Init init; // 定义对象

mulpipe.cpp

#include <iostream>
#include <string>
#include <vector>
#include <cassert>
#include <unistd.h>
#include <sys/types.h>
#include <sys/wait.h>
#include "Task.hpp"const int num = 5;
static int number = 1;class channel
{
public:channel(int fd, pid_t id) : ctrlfd(fd), workerid(id){name = "channel-" + std::to_string(number++);}public:int ctrlfd;pid_t workerid;std::string name;
};void Work()
{while (true){int code = 0;ssize_t n = read(0, &code, sizeof(code));if (n == sizeof(code)){if (!init.CheckSafe(code))continue;init.RunTask(code);}else if (n == 0){break;}else{// do nothing}}std::cout << "child quit" << std::endl;
}void PrintFd(const std::vector<int> &fds)
{std::cout << getpid() << " close fds: ";for(auto fd : fds){std::cout << fd << " ";}std::cout << std::endl;
}// 传参形式:
// 1. 输入参数:const &
// 2. 输出参数:*
// 3. 输入输出参数:&
void CreateChannels(std::vector<channel> *c)
{// bugstd::vector<int> old;for (int i = 0; i < num; i++){// 1. 定义并创建管道int pipefd[2];int n = pipe(pipefd);assert(n == 0);(void)n;// 2. 创建进程pid_t id = fork();assert(id != -1);// 3. 构建单向通信信道if (id == 0) // child{if(!old.empty()){for(auto fd : old){close(fd);}PrintFd(old);}close(pipefd[1]);//关闭写dup2(pipefd[0], 0);//重定向写入Work();exit(0); // 会自动关闭自己打开的所有的fd}// fatherclose(pipefd[0]);c->push_back(channel(pipefd[1], id));old.push_back(pipefd[1]);// childid, pipefd[1]}
}void PrintDebug(const std::vector<channel> &c)
{for (const auto &channel : c){std::cout << channel.name << ", " << channel.ctrlfd << ", " << channel.workerid << std::endl;}
}void SendCommand(const std::vector<channel> &c, bool flag, int num = -1)
{int pos = 0;while (true){// 1. 选择任务int command = init.SelectTask();// 2. 选择信道(进程)const auto &channel = c[pos++];pos %= c.size();// debugstd::cout << "send command " << init.ToDesc(command) << "[" << command << "]"<< " in "<< channel.name << " worker is : " << channel.workerid << std::endl;// 3. 发送任务write(channel.ctrlfd, &command, sizeof(command));// 4. 判断是否要退出if (!flag){num--;if (num <= 0)break;}sleep(1);}std::cout << "SendCommand done..." << std::endl;
}
void ReleaseChannels(std::vector<channel> c)
{// version 2// int num = c.size() - 1;// for (; num >= 0; num--)// {//     close(c[num].ctrlfd);//     waitpid(c[num].workerid, nullptr, 0);// }// version 1for (const auto &channel : c){close(channel.ctrlfd);waitpid(channel.workerid, nullptr, 0);}// for (const auto &channel : c)// {//     pid_t rid = waitpid(channel.workerid, nullptr, 0);//     if (rid == channel.workerid)//     {//         std::cout << "wait child: " << channel.workerid << " success" << std::endl;//     }// }
}
int main()
{std::vector<channel> channels;// 1. 创建信道,创建进程CreateChannels(&channels);// 2. 开始发送任务const bool g_always_loop = true;// SendCommand(channels, g_always_loop);SendCommand(channels, !g_always_loop, 10);// 3. 回收资源,想让子进程退出,并且释放管道,只要关闭写端ReleaseChannels(channels);return 0;
}

实现效果


                   感谢你耐心的看到这里ღ( ´・ᴗ・` )比心,如有哪里有错误请踢一脚作者o(╥﹏╥)o! 

                                       

                                                                        给个三连再走嘛~  

这篇关于Linux使用匿名管道实现进程池得以高效通信的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/658422

相关文章

通俗易懂的Java常见限流算法具体实现

《通俗易懂的Java常见限流算法具体实现》:本文主要介绍Java常见限流算法具体实现的相关资料,包括漏桶算法、令牌桶算法、Nginx限流和Redis+Lua限流的实现原理和具体步骤,并比较了它们的... 目录一、漏桶算法1.漏桶算法的思想和原理2.具体实现二、令牌桶算法1.令牌桶算法流程:2.具体实现2.1

Python使用Pandas对比两列数据取最大值的五种方法

《Python使用Pandas对比两列数据取最大值的五种方法》本文主要介绍使用Pandas对比两列数据取最大值的五种方法,包括使用max方法、apply方法结合lambda函数、函数、clip方法、w... 目录引言一、使用max方法二、使用apply方法结合lambda函数三、使用np.maximum函数

MySQL8.0设置redo缓存大小的实现

《MySQL8.0设置redo缓存大小的实现》本文主要在MySQL8.0.30及之后版本中使用innodb_redo_log_capacity参数在线更改redo缓存文件大小,下面就来介绍一下,具有一... mysql 8.0.30及之后版本可以使用innodb_redo_log_capacity参数来更改

Qt 中集成mqtt协议的使用方法

《Qt中集成mqtt协议的使用方法》文章介绍了如何在工程中引入qmqtt库,并通过声明一个单例类来暴露订阅到的主题数据,本文通过实例代码给大家介绍的非常详细,感兴趣的朋友一起看看吧... 目录一,引入qmqtt 库二,使用一,引入qmqtt 库我是将整个头文件/源文件都添加到了工程中进行编译,这样 跨平台

C++使用栈实现括号匹配的代码详解

《C++使用栈实现括号匹配的代码详解》在编程中,括号匹配是一个常见问题,尤其是在处理数学表达式、编译器解析等任务时,栈是一种非常适合处理此类问题的数据结构,能够精确地管理括号的匹配问题,本文将通过C+... 目录引言问题描述代码讲解代码解析栈的状态表示测试总结引言在编程中,括号匹配是一个常见问题,尤其是在

Java实现检查多个时间段是否有重合

《Java实现检查多个时间段是否有重合》这篇文章主要为大家详细介绍了如何使用Java实现检查多个时间段是否有重合,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 目录流程概述步骤详解China编程步骤1:定义时间段类步骤2:添加时间段步骤3:检查时间段是否有重合步骤4:输出结果示例代码结语作

Java中String字符串使用避坑指南

《Java中String字符串使用避坑指南》Java中的String字符串是我们日常编程中用得最多的类之一,看似简单的String使用,却隐藏着不少“坑”,如果不注意,可能会导致性能问题、意外的错误容... 目录8个避坑点如下:1. 字符串的不可变性:每次修改都创建新对象2. 使用 == 比较字符串,陷阱满

Python使用国内镜像加速pip安装的方法讲解

《Python使用国内镜像加速pip安装的方法讲解》在Python开发中,pip是一个非常重要的工具,用于安装和管理Python的第三方库,然而,在国内使用pip安装依赖时,往往会因为网络问题而导致速... 目录一、pip 工具简介1. 什么是 pip?2. 什么是 -i 参数?二、国内镜像源的选择三、如何

使用C++实现链表元素的反转

《使用C++实现链表元素的反转》反转链表是链表操作中一个经典的问题,也是面试中常见的考题,本文将从思路到实现一步步地讲解如何实现链表的反转,帮助初学者理解这一操作,我们将使用C++代码演示具体实现,同... 目录问题定义思路分析代码实现带头节点的链表代码讲解其他实现方式时间和空间复杂度分析总结问题定义给定

Linux使用nload监控网络流量的方法

《Linux使用nload监控网络流量的方法》Linux中的nload命令是一个用于实时监控网络流量的工具,它提供了传入和传出流量的可视化表示,帮助用户一目了然地了解网络活动,本文给大家介绍了Linu... 目录简介安装示例用法基础用法指定网络接口限制显示特定流量类型指定刷新率设置流量速率的显示单位监控多个