本文主要是介绍Linux进程间通信 pipe 实现线程池 命名管道 实现打印日志 共享内存代码验证 消息队列 信号量,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
文章目录
- 前言
- 管道
- 匿名管道
- pipe
- 测试管道接口 --> 代码验证
- 管道的4种情况
- 管道的5种特征
- 线程池案例
- 代码实现:
- ProcessPool.cc
- Task.hpp
- 检测脚本
- makefile
- 命名管道
- 代码演示:
- makefile
- namedPipe.hpp
- server.cc
- client.cc
- 实现日志
- Log.hpp
- 共享内存
- 共享内存原理
- 补充指令集(IPC的指令)
- shmget
- 谈谈key
- ftok
- ipcs
- shmat
- shmdt
- shmctl
- 代码验证(使用共享内存的相关接口)
- Shm.hpp
- client.cc
- server.cc
- system V消息队列
- system V信号量
- 进程互斥
前言
- 进程为什么要通信?
- 进程也是需要某种协同的,所以如何协同的前提条件就是通信
- 数据是由类别的,通知就绪的,有一些就是单纯的要传递给我数据,控制相关的信息
事实:进程是具有独立性的。进程 = 内核数据结构 + 代码和数据
- 进程如何通信?
- 进程间通信的本质,必须让不同的进程看到同一份“资源”
- “资源”就是特定形式的内存空间
- 这个资源是由操作系统来提供的,那么为什么不是我们两个进程中的一个?假设一个进程提供,这个资源属于谁,这个进程独有,破坏进程独立性,第三方空间。
- 我们进程访问这个空间,进行通信,本质就是访问操作系统!
- 进程代表的就是用户,“资源”从创建,使用(一般),释放资源我们需要使用系统调用接口
- 从底层设计,从接口设计,都要由操作系统独立设计,一般操作系统会有一个独立的通信模块,属于文件系统 (IPC通信模块),标准(system V && posix)
- system V:三种方式:消息队列、共享内存、信号量
还有一种就是基于文件级别的通信方式---->管道
管道
- 管道是Unix中最古老的进程间通信的形式。
- 我们把从一个进程连接到另一个进程的一个数据流称为一个“管道”
匿名管道
-
一个文件被打开两次
struct_file
是要被创建两个的 -
第二次打开同一个文件的时候,不需要再次加载文件
-
在创建一个子进程的时候,不会再次加载文件,因为进程要保持独立性,和文件没有关系
为什么父子进程会向同一个显示器终端打印数据
- 子进程会继承父进程的文件描述符表,会指向同一个文件
- 进程会默认打开三个标准输入输出错误:0,1,2,怎么做到的?
- 其实我们所有的在命令行中都是bash的子进程,bash打开了,所有的子进程默认也就打开了,我们只要做好约定即可
为什么我们主动close(0/1/2),不影响父进程继续使用显示器文件呢?
- 其实在struct_file里面包含了一个引用计数,是一个内核级的,这也就能解释了
进程间通信的本质,必须让不同的进程看到同一份“资源”,这个资源是由操作系统来分配的,我们看到的同一份“资源”就是内核级的文件缓冲区
-
管道只允许单向通信,因为它简单
-
那么如何通信呢?
- 子进程想写就关闭读的文件描述符(3),父进程就关闭写的文件描述符(4),此时,父进程就可以通过3号描述符进行读,子进程就可以通过4号文件描述符进行写,双方就可以写入同一个管道文件了。
父子既然要关闭不需要的fd,为什么曾经要打开呢?可以不关吗?
- 如果只打开一个文件描述符的话,未来子进程继承的时候也就会继承一个,那么以读方式打开,继承只能继承读,一个管道不能同时存在读写,我们也不能以读写的方式打开,因为管道是单向通信的,万一失误了呢?这个方式很不好~~
- 所以总的来说就是为了让子进程继承下去!
- 可以不关吗?可以~~,建议关了,万一读误写了呢?
还有就是为什么我们两个进程通信的时候,只是在内核级文件缓冲区,而不需要刷新到磁盘,所有虽然管道可以复用,但是还是要重新设计一下
pipe
- 接下来我们可以使用pipe来打开管道
int pipe(int pipefd[2]);
pipefd
是一个输出形参数- 不需要文件路径和文件名(匿名文件/匿名管道)
- 成功返回
0
,失败返回-1
,错误码被设置
如果我想要双向通信呢?
- 两个管道
为什么单向通信?
- 因为简单,只让它进行单向通信,符合这样的特点所以就叫管道
测试管道接口 --> 代码验证
#include <iostream>
#include <unistd.h>
#include <cerrno>
#include <cstring>
#include <string>
#include <sys/wait.h>
#include <sys/types.h>const int size = 1024;std::string getOtherMessage()
{// 计数static int cnt = 0;std::string massageid = std::to_string(cnt);cnt++;// 获取pidpid_t self_id = getpid();std::string id = std::to_string(self_id);std::string massage = "massage:";massage += massageid;massage += " my pid is: ";massage += id;return massage;
}void SubProcessWrite(int wfd)
{int pipesize = 0;std::string massage = "father, I am your son prcess!";char c = 'A';while (true){std::cerr << "++++++++++++++++++++++++++++++++++++++++++" << std::endl;std::string info = massage + getOtherMessage(); // 子进程写给父进程的消息write(wfd, info.c_str(), info.size());std::cerr << info << std::endl;// write(wfd,&c,1);// std::cout << "pipesize: " << ++pipesize << std::endl;// c++;// if(c == 'G') break;sleep(1);}std::cout << "child quit..." << std::endl;
}void FatherProcessRead(int rfd)
{char inbuffer[size];while (true){sleep(2);std::cout << "-------------------------------------------" << std::endl;ssize_t n = read(rfd, inbuffer, sizeof(inbuffer) - 1); // 注意是sizeofif (n > 0){inbuffer[n] = 0;std::cout << "father get massage: " << inbuffer << std::endl;}else if (n == 0){// read返回值为0表示写端关闭了,读到了文件的结尾std::cout << "client quit, father get return val: " << n << "father quit tool!" << std::endl;break;}else if (n < 0){std::cerr << " read error" << std::endl;break;}}
}int main()
{// 1.创建管道int pipefd[2]; // pipefd里的0号下标保存的是读,1号下标保存的是写int n = pipe(pipefd);if (n != 0){std::cerr << "errno:" << errno << ":"<< "errstring:" << strerror(errno) << std::endl;return 1;}std::cout << "pipefd[0]: " << pipefd[0] << " pipefd[1]: " << pipefd[1] << std::endl;// 2.fork创建出父子进程pid_t id = fork();if (id == 0){std::cout << "子进程关闭不需要的fd了,准备发消息了" << std::endl;sleep(1);// 子进程 --- write// 3.关闭不需要的文件秒速符close(pipefd[0]);SubProcessWrite(pipefd[1]);// 不用了就关闭close(pipefd[1]);exit(0);}std::cout << "父进程关闭不需要的fd了,准备收消息了" << std::endl;// 父进程 --- readsleep(1);// 3.关闭不需要的文件秒速符close(pipefd[1]);FatherProcessRead(pipefd[0]);// 不用了就关闭close(pipefd[0]);int status = 0;pid_t rid = waitpid(id, nullptr, 0);if (rid > 0){std::cout << "wait child process done, exit sig: " << (status & 0x7f) << std::endl;std::cout << "wait child process done, exit code(ign): " << ((status >> 8) & 0xFF) << std::endl;}return 0;
}
管道的4种情况
- 如果管道内部是空的 && write fd没有关闭,读取条件不具备,读进程会被阻塞(wait–>读取条件具备<–写入数据)
- 管道被写满 && read fd不读取且没有关闭,管道被写满,写进程会被阻塞(管道被写满—>条件不具备) —> wait 写条件具备就读数据
- 管道一直在读 && 写端关闭了wfd,读端read返回值会读到0,表示读到了文件末尾
- rfd直接关闭写端wfd一直在进行写入,写端进程会被操作系统直接使用13信号关掉,相当于进程出现了异常
管道的5种特征
- 匿名管道,只是用来进行具有血缘关系的进程之间进行通信,具有明显的顺序性
- 管道内部,自带进程之间的同步机制(同步机制就是多执行流执行代码的时候,具有明显顺序性)
- 管道文件的生命周期是随进程的
- 管道文件在通信的时候,是面向字节流的,write的次数和读取的次数不是一一匹配的
- 管道的通信模式,是一种特殊的半双工模式
可以通过上面的代码进行测试在ubuntu20.04管道的大小是4kb
我们平时在命令行中使用的|
就是匿名管道
线程池案例
-
管道里没有数据,worker进程就在阻塞等待,等待任务的到来
-
master向哪一个管道进行写入,就是唤醒哪一个子进程来处理任务
-
父进程要进行后端任务划分的负载均衡
代码实现:
ProcessPool.cc
#include "Task.hpp"class Channel
{
public:Channel(int wfd, pid_t id, const std::string &name): _wfd(wfd), _subprocessid(id), _name(name) {}int GetWfd() { return _wfd; }pid_t GetProcessId() { return _subprocessid; }std::string GetName() { return _name; }// 等待子进程void Wait(){pid_t rid = waitpid(_subprocessid, nullptr, 0);if (rid > 0){std::cout << "wait " << rid << " success" << std::endl;}}// 关闭void CloseChannel(){close(_wfd);}~Channel() {}private:int _wfd;pid_t _subprocessid;std::string _name;
};void CreateChannelAndSub(int num, std::vector<Channel> *channels, task_t task)
{for (int i = 0; i < num; i++){// 1. 创建管道int pipefd[2] = {0};int n = pipe(pipefd);if (n < 0)exit(-1);// 2. 创建子进程pid_t id = fork();if (id == 0){// 关闭多余的写端if (!channels->empty()){for (auto &channel : *channels){// 关闭等待channel.CloseChannel();channel.Wait();}}// child --- readclose(pipefd[1]);dup2(pipefd[0], 0); // 将管道的读端,重定向到标准输入task();close(pipefd[0]);exit(0);}// parent// 3.构建一个channel名称std::string channel_name = "Channel-" + std::to_string(i);// a. 子进程的pid b. 父进程关心的管道的w端channels->push_back(Channel(pipefd[1], id, channel_name));close(pipefd[0]); // 父进程关心的管道的w端}
}int NextChannel(int channels)
{static int next = 0;int channel = next;next++;next %= channels;return channel;
}void SendTaskCommand(Channel &channel, int taskcommand)
{write(channel.GetWfd(), &taskcommand, sizeof(taskcommand));
}void ctrlProcessOnce(std::vector<Channel> &channels)
{sleep(1);// a. 选择一个任务int taskCommand = SelectTask();// b. 选择一个信道和进程int channel_index = NextChannel(channels.size());// c. 发送任务SendTaskCommand(channels[channel_index], taskCommand);std::cout << std::endl;std::cout << "taskcommand: " << taskCommand << " channel: "<< channels[channel_index].GetName() << " sub process: " << channels[channel_index].GetProcessId() << std::endl;
}void ctrlProcess(std::vector<Channel> &channels, int times = -1)
{if (times > 0){while (times--){ctrlProcessOnce(channels);}}else{while (true){ctrlProcessOnce(channels);}}
}void CleanUpChannel(std::vector<Channel> &channels)
{// 方法一:// for (auto &ch : channels)// {// ch.CloseChannel();// }// for (auto &ch : channels)// {// ch.Wait();// }// 方法二:// int last = channels.size() - 1;// for (int i = last; i >= 0; i--)// {// close(channels[i].GetWfd());// waitpid(channels[i].GetProcessId(), nullptr, 0);// }// 方法三:for (auto &ch : channels){ch.CloseChannel();ch.Wait();}
}int main(int argc, char *argv[])
{if (argc != 2){std::cerr << "Usage: " << argv[0] << " processnum" << std::endl;return 1;}int num = std::stoi(argv[1]);LoadTask();std::vector<Channel> channels;// 1. 创建信道和子进程CreateChannelAndSub(num, &channels, work);// 2. 通过channel控制子进程ctrlProcess(channels, num);// 3. 回收管道和子进程. a. 关闭所有的写端 b. 回收子进程CleanUpChannel(channels);// sleep(100);return 0;
}
Task.hpp
#pragma once#include <iostream>
#include <ctime>
#include <cstdlib>
#include <unistd.h>
#include <string>
#include <vector>
#include <sys/types.h>
#include <sys/wait.h>#define TaskNum 3typedef void (*task_t)(); // 函数指针
task_t tasks[TaskNum]; // 4个任务void Print()
{std::cout << "I am print task" << std::endl;
}void DownLoad()
{std::cout << "I am DownLoad task" << std::endl;
}void Flush()
{std::cout << "I am Flush task" << std::endl;
}void LoadTask()
{srand(time(nullptr) ^ getpid() ^ 17777);tasks[0] = Print;tasks[1] = DownLoad;tasks[2] = Flush;
}int SelectTask()
{return rand() % TaskNum;
}void ExcuteTask(int number)
{if (number < 0 || number > 2)return;tasks[number]();
}void work()
{while (true){int command = 0;int n = read(0, &command, sizeof(command));if (n == sizeof(int)){std::cout << "pid is : " << getpid() << " handler task" << std::endl;ExcuteTask(command);}else if (n == 0){std::cout << "sub process : " << getpid() << " quit" << std::endl;break;}}
}void work1()
{while (true){int command = 0;int n = read(0, &command, sizeof(command));if (n == sizeof(int)){std::cout << "pid is : " << getpid() << " handler task" << std::endl;ExcuteTask(command);}else if (n == 0){std::cout << "sub process : " << getpid() << " quit" << std::endl;break;}}
}void work2()
{while (true){int command = 0;int n = read(0, &command, sizeof(command));if (n == sizeof(int)){std::cout << "pid is : " << getpid() << " handler task" << std::endl;ExcuteTask(command);}else if (n == 0){std::cout << "sub process : " << getpid() << " quit" << std::endl;break;}}
}
检测脚本
while :; do ps ajx | head -1 && ps ajx | grep -i Processpool | grep -v grep; echo "------------------------"; sleep 1;done
makefile
processpool: ProcessPool.ccg++ -o $@ $^ -std=c++11
.PHONY:clean
clean:rm -f processpool
命名管道
- 命名管道可以从命令行上创建,命令行方法是使用下面这个命令:
mkfifo filename
- 文件名+路径就可以看到同一份资源
代码演示:
makefile
.PHONY:all
all:server clientserver:server.ccg++ -o $@ $^ -g -std=c++11
client:client.ccg++ -o $@ $^ -g -std=c++11.PHONY:clean
clean:rm -rf server client
namedPipe.hpp
#include <iostream>
#include <cstdio>
#include <cerrno>
#include <string>
#include <unistd.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>const std::string comm_path = "./myfifo";
#define DefaultFd -1
#define Creater 1
#define User 2
#define Read O_RDONLY
#define Write O_WRONLY
#define BaseSize 4096class NamePiped
{
private:bool OpenNamedPipe(int mode){_fd = open(_fifo_path.c_str(), mode);if (_fd < 0)return false;return true;}public:NamePiped(const std::string &path, int who): _fifo_path(path), _id(who), _fd(DefaultFd){if (_id == Creater){int res = mkfifo(_fifo_path.c_str(), 0666);if (res != 0)perror("mkfifo");std::cout << "creater create named pipe" << std::endl;}}bool OpenForRead(){return OpenNamedPipe(Read);}bool OpenForWrite(){return OpenNamedPipe(Write);}// const &: const std::string &XXX 输入// * : std::string * 输出// & : std::string & 输入输出int WriteNamedPipe(const std::string &in){return write(_fd, in.c_str(), in.size());}int ReadNamedPipe(std::string *out){char buffer[BaseSize];int n = read(_fd, buffer, sizeof(buffer));if (n > 0){buffer[n] = 0;*out = buffer;}return n;}~NamePiped(){if (_id == Creater){int res = unlink(_fifo_path.c_str());if (res != 0)perror("unlink");std::cout << "creater free named pipe" << std::endl;}if (_fd != DefaultFd)close(_fd);}private:const std::string _fifo_path;int _id;int _fd;
};
server.cc
#include "namedPipe.hpp"// 读read
int main()
{// 对于读端而言,如果我们打开文件,但是写还没来,我会阻塞在open调用中,直到对方打开// 也就是相当于进程同步NamePiped fifo(comm_path, Creater);if (fifo.OpenForRead()){std::cout << "server open named pipe done" << std::endl;sleep(3);while (true){std::string message;int n = fifo.ReadNamedPipe(&message);if (n > 0){std::cout << "Client Say> " << message << std::endl;}else if (n == 0){std::cout << "Client quit, Server Too!" << std::endl;break;}else{std::cout << "fifo.ReadNamedPipe Error" << std::endl;break;}}}return 0;
}
client.cc
#include "namedPipe.hpp"// 写Write
int main()
{NamePiped fifo(comm_path, User);if (fifo.OpenForWrite()){std::cout << "client open namd pipe done" << std::endl;while (true){std::cout << "Please Enter> ";std::string message;std::getline(std::cin, message);fifo.WriteNamedPipe(message);}}return 0;
}
- 首先启动程序,对于读端而言,如果我们打开文件,但是写还没来,我会阻塞在open调用中,直到对方打开
- 如何关闭写端,读端会读到0,程序会结束
- 这次我们先关闭读端,这个时候写端不会立即结束程序,当我们再次输入的时候程序才会退出
实现日志
Log.hpp
#pragma once#include <iostream>
#include <time.h>
#include <stdarg.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <string.h>
#include <unistd.h>
#include <stdlib.h>#define SIZE 1024#define Info 0
#define Debug 1
#define Warning 2
#define Error 3
#define Fatal 4#define Screen 1
#define Onefile 2
#define Classfile 3#define LogFile "log.txt"class Log
{
public:Log(){_printMethod = Screen;_path = "./log/";}// 选择将日志打印到哪void Enable(int method){_printMethod = method;}// 头信息std::string levelToString(int level){switch (level){case Info:return "Info";case Debug:return "Debug";case Warning:return "Warning";case Error:return "Error";case Fatal:return "Fatal";default:return "None";}}// 打印日志void printLog(int level, const std::string &logtxt){switch (_printMethod){case Screen:std::cout << logtxt << std::endl;break;case Onefile:printOneFile(LogFile, logtxt);break;case Classfile:printClassFile(level, logtxt);break;default:break;}}// 单文件打印void printOneFile(const std::string &logname, const std::string &logtxt){std::string _logname = _path + logname;int fd = open(_logname.c_str(), O_WRONLY | O_CREAT | O_APPEND, 0666);if (fd < 0)return;write(fd, logtxt.c_str(), logtxt.size());close(fd);}// 多文件打印void printClassFile(int level, const std::string &logtxt){std::string filename = LogFile;filename += '.';filename += levelToString(level);printOneFile(filename, logtxt);}~Log(){}// 日志格式控制void operator()(int level, const char *format, ...){time_t t = time(nullptr);struct tm *ctime = localtime(&t);char leftbuffer[SIZE];snprintf(leftbuffer, sizeof(leftbuffer), "[%s][%d-%d-%d %d:%d:%d]", levelToString(level).c_str(), ctime->tm_year + 1900, ctime->tm_mon + 1, ctime->tm_mday, ctime->tm_hour, ctime->tm_min, ctime->tm_sec);va_list s;va_start(s, format);char rightbuffer[SIZE];vsnprintf(rightbuffer, sizeof(rightbuffer), format, s);va_end(s);char logtxt[SIZE * 2];snprintf(logtxt, sizeof(logtxt), "%s %s\n", leftbuffer, rightbuffer);printLog(level, logtxt);}private:int _printMethod;std::string _path;
};
共享内存
共享内存原理
- 都是OS做的
- OS提供上面的1,2 步骤的系统调用,供用户进程A,B进行调用 (系统调用)
- AB,CD,EF,XY----->共享内存在系统中存在多份,供不同个数,不同对进程同时通信
- OS注定了要对共享内存进行管理(先描述,再组织),共享内存不是简单的一段内存空间,也要有描述并管理共享内存的数据结构和匹配的算法
- 共享内存 = 内存空间(数据) + 共享内存的属性
补充指令集(IPC的指令)
shmget
- 创建共享内存
int shmget(key_t key, size_t size, int shmflg);
- 第一个参数是key(后面介绍)
- 第二个参数创建共享内存的大小(单位是字节)
- 第三个参数就是位图(下面介绍)
- 成功返回共享内存的标识符,失败返回
-1
,错误码被设置
- IPC_CREAT:如果申请的共享内存不存在就创建,存在就获取共享内存并返回
- IPC_EXCL:单独使用没有意义,只有和IPC_CREAT组合才有意义
- IPC_CREAT | IPC_EXCL:如果要创建的共享内存不存在就创建,如果存在就出错返回,如果返回成功就意味着这个shm是全新的
- 那么如何保证让不同的进程看到同一个共享内存?
- 怎么保证这个共享内存是存在还是不存在呢?
就是通过 第一个参数
key
谈谈key
- key是一个数字,这个数字是几,不重要。关键在于必须在内核中具有唯一性,能够让不同的进程进行唯一标识
- 第一个进程key通过key创建共享内存,第二个之后的进程, 只要拿着同一个
key
,就可以和第一个进程看到同一个共享内存了 - 对于一个已经创建好的共享内存,key在哪? ----> key在共享内存的描述对象中
- 第一次创建的时候,必须有一个key,怎么有?(一会谈)
- key 类似之前的路径,都是唯一的
ftok
- 形成key就使用下面的接口
key_t ftok(const char *pathname, int proj_id);
- 第一个参数是路径名字符串
- 第二个参数是项目id
这两个参数由用户自由指定
那么这个key值能不能由操作系统自动生成,为什么要用户去设置,主要原因是因为操作系统形成了一个key,另一个进程要用这个key,但是我们不知道,所以是由用户约定的,必须由用户层下达到操作系统
key:操作系统内标定的唯一性
shmid:只在你的进程内,用来表示资源的唯一性
ipcs
- 共享内存的生命周期是随内核的,用户不主动关闭,共享内存会一直存在,除非内核重启或者用户关闭
查看共享内存:
ipcs -m
关闭共享内存:
- 这里的shmid是要关闭的共享内存,而不是key,在用户层只能使用shmid,内核层用的key
ipcrm -m shmid
- 共享内存的大小一般建议是4096的整数倍
shmat
- 将共享内存挂接到程序地址空间当中
void *shmat(int shmid, const void *shmaddr, int shmflg);
- 第一个参数是shmid
- 第二个参数是要挂接到哪个地址上,一般是nullptr
- 第三个参数是挂接内存的访问权限,默认我们设置为0
返回值:失败返回nullptr,成功返回共享内存的起始地址
shmdt
- 从进程的地址空间中分离一个共享内存段
int shmdt(const void *shmaddr);
shmctl
- 删除共享内存&&获取共享内存的属性…
int shmctl(int shmid, int cmd, struct shmid_ds *buf);
参数说明:
- shmid:共享内存段的标识符,通常由 shmget 函数返回。
- cmd:要执行的操作的命令,可以是以下值之一:
- IPC_STAT:获取共享内存段的状态,并将结果写入 buf 所指向的 shmid_ds 结构。
- IPC_SET:设置共享内存段的 shmid_ds 结构中的 shm_perm 字段,通常用于更改权限。
- IPC_RMID:立即删除共享内存段。注意,只有当共享内存段的引用计数(即附加到它的进程数)为 0 时,该命令才会成功
- buf:一个指向 shmid_ds 结构的指针,该结构用于传递或接收关于共享内存段的信息。对于 - - IPC_STAT 命令,该结构用于接收信息;对于 IPC_SET 命令,该结构包含要设置的权限信息。
返回值:
如果成功,返回 0。
如果失败,返回 -1,并设置 errno 以指示错误原因。
- 共享内存不提供对共享内存的任何保护机制,这会导致数据不一致
- 共享内存是所有进程IPC,速度最快的,因为共享内存大大减少了数据的拷贝次数!
代码验证(使用共享内存的相关接口)
- 这里用上前面的log打印日志
Shm.hpp
#ifndef __SHM_HPP__
#define __SHM_HPP__#include <iostream>
#include <string>
#include <cerrno>
#include <cstdio>
#include <cstring>
#include <sys/ipc.h>
#include <sys/shm.h>
#include <unistd.h>#include "Log.hpp"
const int gCreater = 1;
const int gUser = 2;const std::string gpathname = "/root/111/code-exercise/Linux/lesson07";
const int gproj_id = 0x666;
const int gshmSize = 4096;Log log;class Shm
{
public:Shm(const std::string &pathname, int proj_id, int who): _pathname(pathname), _proj_id(proj_id), _who(who), _addrshm(nullptr){_key = GetCommKey(); // 获取key// 通过不同身份创建共享内存,先创建if (_who == gCreater)GetShmUseCreate();else if (_who == gUser)GetShmForUse();std::cout << "shmid: " << _shmid << std::endl;std::cout << "key: " << ToHex(_key) << std::endl;_addrshm = AttachShm(); // 后挂接}// 创建bool GetShmUseCreate(){if (_who == gCreater){_shmid = GetShmHelper(_key, gshmSize, IPC_CREAT | IPC_EXCL | 0666);if (_shmid < 0)return false;log(Info, "GetShmUseCreate share memory success, shmid: %d", _shmid);}return true;}// 使用bool GetShmForUse(){if (_who == gUser){_shmid = GetShmHelper(_key, gshmSize, IPC_CREAT | 0666);if (_shmid < 0)return false;log(Info, "GetShmForUse share memory success, shmid: %d", _shmid);}return true;}std::string ToHex(key_t key){char buffer[128];snprintf(buffer, sizeof(buffer), "%#x", key);return buffer;}~Shm(){if (_who == gCreater){int res = shmctl(_shmid, IPC_RMID, nullptr);if (res < 0){log(Fatal, "shmctl fail! : %s", strerror(errno));exit(3);}}log(Info, "shm remove done ...");}void *Addr(){return _addrshm;}// 将共享内存全部清空void Zero(){if (_addrshm){memset(_addrshm, 0, gshmSize);}}void DebugShm(){// 获取共享内存的属性struct shmid_ds ds;int n = shmctl(_shmid, IPC_STAT, &ds);if (n < 0)return;std::cout << "ds.shm_perm.__key : " << ToHex(ds.shm_perm.__key) << std::endl;std::cout << "ds.shm_nattch: " << ds.shm_nattch << std::endl;}private:// 创建keykey_t GetCommKey(){key_t k = ftok(_pathname.c_str(), _proj_id);if (k < 0){log(Fatal, "ftok, error: %s", strerror(errno));exit(1);}log(Info, "ftok success, key is %#x", k);return k;}int GetShmHelper(key_t key, int size, int flag){int shmid = shmget(key, size, flag); // 获取keyif (shmid < 0){log(Fatal, "create share memory error: %s", strerror(errno));exit(2);}log(Info, "create share memory success, shmid: %d", shmid);std::cout << "create share memory success, key: " << ToHex(_key) << std::endl;return shmid;}std::string RoleToString(int who){if (who == gCreater)return "Creater";else if (who == gUser)return "gUser";elsereturn "None";}void DetachShm(void *shmaddr){if (shmaddr == nullptr)return;shmdt(shmaddr); // 从进程的地址空间中分离一个共享内存段log(Info, "who: %s detach shm...", RoleToString(_who).c_str());}void *AttachShm(){if (_addrshm != nullptr)DetachShm(_addrshm);void *shaddr = shmat(_shmid, nullptr, 0); // 将共享内存挂接到程序地址空间当中if (shaddr == nullptr){log(Fatal, "shmat fail!");}//std::cout << "who: " << RoleToString(_who) << " attach shm..." << std::endl;log(Info, "who: %s attach shm...", RoleToString(_who).c_str());return shaddr;}private:key_t _key;int _shmid;std::string _pathname;int _proj_id;int _who;void *_addrshm;
};#endif
client.cc
#include "Shm.hpp"
#include "Log.hpp"
#include "namedPipe.hpp"int main()
{// 1. 创建共享内存Shm shm(gpathname, gproj_id, gUser);shm.Zero(); // 清空共享内存char *shmaddr = (char *)shm.Addr();// 打印信息shm.DebugShm();// 2. 打开管道NamePiped fifo(comm_path, User);fifo.OpenForWrite();// 当成stringchar ch = 'A';while (ch <= 'Z'){shmaddr[ch - 'A'] = ch;std::string temp = "wakeup";std::cout << "add " << ch << " into Shm, "<< "wakeup reader" << std::endl;fifo.WriteNamedPipe(temp);sleep(2);ch++;}return 0;
}
server.cc
#include "Shm.hpp"
#include "Log.hpp"
#include "namedPipe.hpp"int main()
{Shm shm(gpathname, gproj_id, gCreater);char *shmaddr = (char *)shm.Addr();// 查看信息shm.DebugShm();// 2. 创建管道NamePiped fifo(comm_path, Creater);fifo.OpenForRead();while (true){std::string temp;fifo.ReadNamedPipe(&temp);std::cout << "shm memory content: " << shmaddr << std::endl;}return 0;
}
system V消息队列
-
消息队列提供了一个从一个进程向另外一个进程发送一块数据的方法
-
每个数据块都被认为是有一个类型,接收者进程接收的数据块可以有不同的类型值
特性方面:IPC资源必须删除,否则不会自动清除,除非重启,所以system V IPC资源的生命周期随内核 -
消息队列是由操作系统来提供的
system V信号量
5个概念:
- 多个执行流(进程),能看到的一份资源:共享资源
- 被保护起来的资源 —>临界资源,同步和互斥,用互斥的方式保护共享资源,临界资源
- 互斥:任何时刻只能有一个进程在访问公共资源
- 资源:要被程序员访问,资源被访问也就是通过代码来访问(代码 = 访问共享资源的代码(临界区) + 不访问共享资源的代码(非临界区))
- 所谓的对共享资源进行保护(临界资源)本质是对共享资源的代码进行保护
- 这里的信号量也就相当于是一个计数器
- 申请计数器成功,就表示具有访问资源的权限了
- 申请了计数器资源,我当前访问我要的资源了吗?没有,申请了计数器资源是对资源的预定机制
- 计数器可以有效保证进入共享资源的执行流的数量
- 所以每一个执行流,想访问共享资源中的一部分资源,不是直接访问,而是先申请计数器!
程序员把这个计数器,叫做信号量
进程互斥
- 由于各进程要求共享资源,而且有些资源需要互斥使用,因此各进程间竞争使用这些资源,进程的这种关系为进程的互斥
- 系统中某些资源一次只允许一个进程使用,称这样的资源为临界资源或互斥资源。
- 在进程中涉及到互斥资源的程序段叫临界区
特性方面:
- IPC资源必须删除,否则不会自动清除,除非重启,所以system V IPC资源的生命周期随内核
这篇关于Linux进程间通信 pipe 实现线程池 命名管道 实现打印日志 共享内存代码验证 消息队列 信号量的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!