生产者消费者模式保姆级教程 (阻塞队列解除耦合性) 一文帮你从C语言版本到C++ 版本, 从理论到实现 (一文足以)

本文主要是介绍生产者消费者模式保姆级教程 (阻塞队列解除耦合性) 一文帮你从C语言版本到C++ 版本, 从理论到实现 (一文足以),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

目录

一. 图解,步步分解理论基础

1.1 使用锁 + 条件变量实现生产者消费者模式理论基础

1.2 使用信号量实现生产者消费者模式的理论基础

二. Linux环境下对于基于阻塞队列的两种实现方式  (C版本)

2.1 条件变量 + 锁实现版本

2.2  信号量实现版本

三. C++11 新特性产生的线程库实现版本

四. 总结本章

一. 图解,步步分解理论基础

1.1 使用锁 + 条件变量实现生产者消费者模式理论基础

队列为空 和 队列满的时候需要阻塞住队列, 需要使用条件变量来实现判断是否需要阻塞队列

思考1 :  生产者生产需要的是空位置存储生产的产品, 消费者消费需要的是队列中存在产品, 于是如下

  • condition1 :  free (空位, 表示队列不是满的, 存在空位)
  • condition2 :  full : (装有产平, 代表队列不是空的, 存在产品)

伪代码逻辑:

  • 当队列为空的时候, 需要等待队列中含有产品, 等待消费 ,  同时通知唤醒一下生产者, 催一下, 你快点生产喔, 队列中绝对满足有空位条件的, 你快点生产吧哥
  • 当队列为满的时候, 需要等待队列中含有空位, 等待生产,   同时通知唤醒一下消费者,  催一下,你快点消费喔, 队列中绝对满足有产品条件的, 你快点消费吧哥
  • 解释while :  防止伪唤醒  :  消费线程在等待生产者线程生产出产品唤醒自己进行消费, 但是生产者可能仅仅只是生产出来一个产品, 但是它广播告诉所有的消费者线程你们醒过来争抢这一个消费名额吧, 其实最终只能有一个消费者得偿所愿, 其他的消费者相当于被骗了, 自己刚醒过来, 但是产品别别人抢走了, 现在还是空的, 那咋办? 继续睡, 所以需要循环的进行判断自己是不是被伪唤醒了, 是的话, 继续等
while (Isempty(bq)) {Wait(full);Notify(free);           
} while (IsFull(bq)) {Wait(free);Notify(full); 
}

思考2: 队列中的产品和空位是什么样的资源??  是可能别多个线程同时争抢的资源, 所以这种多个线程争抢的写入操作的资源是什么资源?? 是 临界资源, 于是乎需要的是加锁进行原子操作. 在多个线程都想对阻塞队列进行写入操作的过程中保护阻塞队列这个临界资源的操

//伪代码
Lock();
...  //临界资源处理的逻辑代码块
Unlock();

1.2 使用信号量实现生产者消费者模式的理论基础

  • 其实大体逻辑和上述是差不多的, 只是此处没有了条件变量和锁, 换之代替的是信号量
  • 信号量:  我的理解就是具有保护特性的临界资源...   啥意思呢,   一个信号量就是一个临界资源, -pthread  链接库中包含了一系列封装好的使用这个信号量临界资源的函数.....
  • 这些函数方法本身就包含了对于多线程同步访问临界资源(共享资源的处理机制)

        int sem_init(sem_t *sem, int pshared, unsigned int value);           //初始化信号量, pshared : 0 用于线程间同步, 1 用于进程间同步, value 初始化值

       int sem_destroy(sem_t *sem);    //销毁信号量

        int sem_wait(sem_t *sem);        //消费一个临界资源信号量, 如果信号量 val 值 > 0 直接 val -= 1;  如果信号量值 == 0 了 阻塞起来等待它 > 0 再行消费

       int sem_post(sem_t *sem);       //生产一个临界资源信号量, 对应临界资源信号量 += 1, 归还资源, 同时如果之前   sem 的 val == 0 阻塞等待消费, 这个时候会相当于唤醒消费者消费

//其实信号量本身就是生产者消费者之间共享的临界资源, 通过对它的操作, 来制约了多线程的同步互斥访问这个临界资源的有条不紊的有序进行

  • 此处竟然说到了同步互斥访问临界资源了, 想必特别多的人对于同步  异步, 互斥 这些是分不清的, 此处图解一下:
  •  上述仅仅个人己见, 对于这种抽象概念的看发, 本就是随着理解层次的深入而深入的
  • 同步事件之间相互是需要等待制约的,  比如线程1 对于临界资源做写入操作的时候, 其他线程需要等待他用完之后放回去才能用....     所以同步 (暗藏相互的一种等待关系)

图解异步解耦合,  使用阻塞队列:

  •  解释异步事件, 还有很多人会从函数阻塞等待还是函数直接返回的角度来解释,  其实我觉得使用的是同样的思想, 函数不需要等待直接返回, 其实就解除了两者之间的耦合关联了, 我不需要等你准备好, 你妹准备好管我屁事, 我直接返回就是了, 你约束不了我
  • 同步事件 :   正好相反, 你没准备好我就不能直接返回,需要阻塞等待你准备好, 比如等待IO事件的发生

二. Linux环境下对于基于阻塞队列的两种实现方式  (C版本)

  • 队列的形式形式其实不重要, 重要的是阻塞, 于是我都直接使用环形队列来实现,简单方便....

2.1 条件变量 + 锁实现版本

  • 函数方法刨析:

 

#include <stdio.h>
#include <sys/types.h>
#include <unistd.h>
#include <stdlib.h>
#include <pthread.h>
#include <signal.h>//定义阻塞队列
typedef  struct BlockQueue {size_t cap;//容量size_t size;//当前产品个数int front;//队头游标int tail;//队尾游标int* data; //数据域pthread_mutex_t lock;pthread_cond_t full;pthread_cond_t free;
} BlockQueue;BlockQueue* bqp;//定义全局方便信号处理时候销毁
void DestroyBlockQueue(BlockQueue* bqp) {free(bqp->data);pthread_mutex_destroy(&bqp->lock);pthread_cond_destroy(&bqp->full);pthread_cond_destroy(&bqp->free);
}
//信号处理, 销毁整个阻塞队列。 做结束处理S
void handler(int signo) {printf("ByeBye\n");DestroyBlockQueue(bqp);exit(EXIT_SUCCESS);
}//初始化阻塞队列
BlockQueue* InitQueue(int n) {BlockQueue* bqp = (BlockQueue*)malloc(sizeof(BlockQueue));bqp->cap = n;bqp->size = 0;bqp->front = bqp->tail = 0;bqp->data = (int*)malloc(sizeof(int) * n);pthread_mutex_init(&bqp->lock, NULL);pthread_cond_init(&bqp->free, NULL);pthread_cond_init(&bqp->full, NULL);return bqp;
}int IsEmpty(BlockQueue* bqp) {return bqp->size == 0;
}int IsFull(BlockQueue* bqp) {return bqp->size == bqp->cap;
}void WaitConsume(BlockQueue* bqp) {//等待消费, 等队列有产品pthread_cond_wait(&bqp->full, &bqp->lock);
}void WaitProduct(BlockQueue* bqp) {//等待生产, 等队列有空位pthread_cond_wait(&bqp->free, &bqp->lock);
}void NotifyConsume(BlockQueue* bqp) {//通知消费, 队列中有产品了pthread_cond_signal(&bqp->full);
}void NotifyProduct(BlockQueue* bqp) {//通知生产, 队列中有空位了pthread_cond_signal(&bqp->free);
}void Lock(BlockQueue* bqp) {pthread_mutex_lock(&bqp->lock);
}void Unlock(BlockQueue* bqp) {pthread_mutex_unlock(&bqp->lock);}void Push(BlockQueue* bqp, int val) {Lock(bqp);while (IsFull(bqp)) {WaitProduct(bqp);//没有空位等待生产NotifyConsume(bqp);//催促消费}bqp->data[bqp->tail++] = val;bqp->tail %= bqp->cap;Unlock(bqp);bqp->size += 1;NotifyConsume(bqp);//有产品了通知消费
}void Pop(BlockQueue* bqp, int* popval) {Lock(bqp);while (IsEmpty(bqp)) {WaitConsume(bqp);//没有产品等待消费NotifyProduct(bqp);//催促生产}*popval = bqp->data[bqp->front++];bqp->front %= bqp->cap;Unlock(bqp);bqp->size -= 1;NotifyProduct(bqp);//有空位了通知生产
}void* ConsumeRoutine(void* args) {//消费者线程执行函数BlockQueue* bqp = (BlockQueue*)args;int popval = 0;for ( ;; ) {Pop(bqp, &popval);//取出消费产品printf("PopVal is %d, and has %d Products\n", popval, bqp->size); sleep(rand() % 3);}return (void*)0;
}void* ProductRoutine(void* args) {//生产者线程执行函数BlockQueue* bqp = (BlockQueue*)args;int pushval = 0;for ( ;;  ) {pushval = rand() % 1024;//随机一个数组当作产品Push(bqp, pushval);//装入产品printf("PushVal is %d, and has %d Products\n", pushval, bqp->size); sleep(rand() % 3);}return (void*)0;
}int main() {signal(SIGINT, handler);//注册信号处理函数srand((unsigned int)time(NULL));bqp = InitQueue(30);pthread_t consume1, consume2, product1, product2; pthread_create(&product1, NULL, ProductRoutine, (void*)bqp);pthread_create(&product2, NULL, ProductRoutine, (void*)bqp);pthread_create(&consume1, NULL, ConsumeRoutine, (void*)bqp);pthread_create(&consume2, NULL, ConsumeRoutine, (void*)bqp);pthread_join(product1, NULL);pthread_join(product2, NULL);pthread_join(consume1, NULL);pthread_join(consume2, NULL);return 0;
}

2.2  信号量实现版本

#include <stdio.h>
#include <sys/types.h>
#include <unistd.h>
#include <stdlib.h>
#include <pthread.h>
#include <signal.h>
#include <semaphore.h>//定义阻塞队列
typedef  struct BlockQueue {size_t cap;//容量size_t size;//当前产品个数int front;//队头游标int tail;//队尾游标int* data; //数据域sem_t data_sem;//产品信号量, 阻塞队列中有多少产品sem_t space_sem;//空位信号量. 阻塞队列中有多少空位
} BlockQueue;BlockQueue* bqp;//定义全局方便信号处理时候销毁
void DestroyBlockQueue(BlockQueue* bqp) {free(bqp->data);sem_destroy(&bqp->data_sem);sem_destroy(&bqp->space_sem);
}
//信号处理, 销毁整个阻塞队列。 做结束处理S
void handler(int signo) {printf("ByeBye\n");DestroyBlockQueue(bqp);exit(EXIT_SUCCESS);
}//初始化阻塞队列
BlockQueue* InitQueue(int n) {BlockQueue* bqp = (BlockQueue*)malloc(sizeof(BlockQueue));bqp->cap = n;bqp->size = 0;bqp->front = bqp->tail = 0;bqp->data = (int*)malloc(sizeof(int) * n);sem_init(&bqp->data_sem, 0, 0);//初始化产品量0sem_init(&bqp->space_sem, 0, n);//初始化空位量为队列容量return bqp;
}void Push(BlockQueue* bqp, int val) {sem_wait(&bqp->space_sem);//P 申请一个空位资源bqp->data[bqp->tail++] = val;bqp->tail %= bqp->cap;bqp->size += 1;sem_post(&bqp->data_sem); //V 制作或者说产生一个产品资源
}void Pop(BlockQueue* bqp, int* popval) {sem_wait(&bqp->data_sem); //P 申请一个产品*popval = bqp->data[bqp->front++];bqp->front %= bqp->cap;bqp->size -= 1;sem_post(&bqp->space_sem);//V 归还一个空位
}void* ConsumeRoutine(void* args) {//消费者线程执行函数BlockQueue* bqp = (BlockQueue*)args;int popval = 0;for ( ;; ) {Pop(bqp, &popval);//取出消费产品printf("PopVal is %d, and has %d Products\n", popval, bqp->size); sleep(rand() % 3);}return (void*)0;
}void* ProductRoutine(void* args) {//生产者线程执行函数BlockQueue* bqp = (BlockQueue*)args;int pushval = 0;for ( ;;  ) {pushval = rand() % 1024;//随机一个数组当作产品Push(bqp, pushval);//装入产品printf("PushVal is %d, and has %d Products\n", pushval, bqp->size); sleep(rand() % 3);}return (void*)0;
}int main() {signal(SIGINT, handler);srand((unsigned int)time(NULL));bqp = InitQueue(30);pthread_t consume1, consume2, product1, product2; pthread_create(&product1, NULL, ProductRoutine, (void*)bqp);pthread_create(&product2, NULL, ProductRoutine, (void*)bqp);pthread_create(&consume1, NULL, ConsumeRoutine, (void*)bqp);pthread_create(&consume2, NULL, ConsumeRoutine, (void*)bqp);pthread_join(product1, NULL);pthread_join(product2, NULL);pthread_join(consume1, NULL);pthread_join(consume2, NULL);return 0;
}

三. C++11 新特性产生的线程库实现版本

  • 首先是基础理论支撑, 函数分析什么的, 如果不清楚的可以去看如下的博客刨析

https://blog.csdn.net/weixin_53695360/article/details/122969157?spm=1001.2014.3001.5502

#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>
#include <windows.h>
using namespace std;
#define NUM 10class BlockQueue {
public:BlockQueue(int cap = NUM): _cap(cap){}void Push(int val) {unique_lock<mutex> _lock(mtx);while (IsFull()) {_free.wait(_lock);//没有空位生产等待NotifyConsume();//催促消费}_q.push(val);//生产产品NotifyConsume();//有产品了通知消费}void Pop(int& popval) {unique_lock<mutex> _lock(mtx);while (IsEmpty()) {_full.wait(_lock);//没有产品消费等待NotifyProduct();//催促生产}popval = _q.front();_q.pop();//消费产品NotifyProduct();//有空位了通知生产}int Size() {return _q.size();}
private:queue<int> _q;//存储队列int _cap;//容量mutex mtx;condition_variable _full;//存在产品condition_variable _free;//存在空位
private:void NotifyConsume() {_full.notify_one();//通知消费}void NotifyProduct() {_free.notify_one();//通知生产}bool IsEmpty() {return _q.size() == 0;}bool IsFull() {return _q.size() == _cap;}
};void ConsumeRoutine(void* args) {//消费者线程执行函数BlockQueue* bqp = (BlockQueue*)args;int popval = 0;for (;;) {bqp->Pop(popval);//取出消费产品printf("PopVal is %d, and has %d Products\n", popval, bqp->Size());Sleep((rand() % 3) * 100);}
}void ProductRoutine(void* args) {//生产者线程执行函数BlockQueue* bqp = (BlockQueue*)args;int pushval = 0;for (;;) {pushval = rand() % 1024;//随机一个数组当作产品bqp->Push(pushval);//装入产品printf("PushVal is %d, and has %d Products\n", pushval, bqp->Size());Sleep((rand() % 3) * 100);}
}int main() {srand((unsigned long)time(NULL));BlockQueue bq(30);thread t1(ProductRoutine, &bq);thread t2(ProductRoutine, &bq);thread t3(ConsumeRoutine, &bq);thread t4(ConsumeRoutine, &bq);t1.join();t2.join();t3.join();t4.join();return 0;
}

四. 总结本章

  • 本章主要介绍了基于阻塞队列的生产者消费者模式, 阻塞队列的作用是解除掉生产者和消费者之间的耦合关系, 消费者从阻塞队列中拿取产品,  生产者向阻塞队列中Push生产产品
  • 生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而 通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者 要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。这个阻塞队 列就是用来给生产者和消费者解耦的。
  • 然后还介绍了一下同步等待事件  和  异步直接返回不等待的事件的区别...
  • 同步阻塞IO : 用户主线程在发起一个IO操作以后, 必须挂起等待IO 操作完成, 才能继续运行下去
  • 同步非阻塞IO :  用户主线程发起一个IO操作之后直接返回, 但是需要不是循环是否IO就绪,轮询操作。。。
  • 异步阻塞IO   :  用户主线程发起 一个 IO 操作之后还是直接返回干自己的事情, 但是不需要进行轮询了,     其他的线程会阻塞起来帮我们监视IO。 满足了就通知主线程  大哥 IO就绪了你可以干事情了
  • 最后就是使用不同的方式完成生产者消费者模式的实现

这篇关于生产者消费者模式保姆级教程 (阻塞队列解除耦合性) 一文帮你从C语言版本到C++ 版本, 从理论到实现 (一文足以)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/734594

相关文章

Spring Security 从入门到进阶系列教程

Spring Security 入门系列 《保护 Web 应用的安全》 《Spring-Security-入门(一):登录与退出》 《Spring-Security-入门(二):基于数据库验证》 《Spring-Security-入门(三):密码加密》 《Spring-Security-入门(四):自定义-Filter》 《Spring-Security-入门(五):在 Sprin

Makefile简明使用教程

文章目录 规则makefile文件的基本语法:加在命令前的特殊符号:.PHONY伪目标: Makefilev1 直观写法v2 加上中间过程v3 伪目标v4 变量 make 选项-f-n-C Make 是一种流行的构建工具,常用于将源代码转换成可执行文件或者其他形式的输出文件(如库文件、文档等)。Make 可以自动化地执行编译、链接等一系列操作。 规则 makefile文件

hdu1043(八数码问题,广搜 + hash(实现状态压缩) )

利用康拓展开将一个排列映射成一个自然数,然后就变成了普通的广搜题。 #include<iostream>#include<algorithm>#include<string>#include<stack>#include<queue>#include<map>#include<stdio.h>#include<stdlib.h>#include<ctype.h>#inclu

hdu1180(广搜+优先队列)

此题要求最少到达目标点T的最短时间,所以我选择了广度优先搜索,并且要用到优先队列。 另外此题注意点较多,比如说可以在某个点停留,我wa了好多两次,就是因为忽略了这一点,然后参考了大神的思想,然后经过反复修改才AC的 这是我的代码 #include<iostream>#include<algorithm>#include<string>#include<stack>#include<

【C++ Primer Plus习题】13.4

大家好,这里是国中之林! ❥前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住分享一下给大家。点击跳转到网站。有兴趣的可以点点进去看看← 问题: 解答: main.cpp #include <iostream>#include "port.h"int main() {Port p1;Port p2("Abc", "Bcc", 30);std::cout <<

C++包装器

包装器 在 C++ 中,“包装器”通常指的是一种设计模式或编程技巧,用于封装其他代码或对象,使其更易于使用、管理或扩展。包装器的概念在编程中非常普遍,可以用于函数、类、库等多个方面。下面是几个常见的 “包装器” 类型: 1. 函数包装器 函数包装器用于封装一个或多个函数,使其接口更统一或更便于调用。例如,std::function 是一个通用的函数包装器,它可以存储任意可调用对象(函数、函数

2024年流动式起重机司机证模拟考试题库及流动式起重机司机理论考试试题

题库来源:安全生产模拟考试一点通公众号小程序 2024年流动式起重机司机证模拟考试题库及流动式起重机司机理论考试试题是由安全生产模拟考试一点通提供,流动式起重机司机证模拟考试题库是根据流动式起重机司机最新版教材,流动式起重机司机大纲整理而成(含2024年流动式起重机司机证模拟考试题库及流动式起重机司机理论考试试题参考答案和部分工种参考解析),掌握本资料和学校方法,考试容易。流动式起重机司机考试技

C++11第三弹:lambda表达式 | 新的类功能 | 模板的可变参数

🌈个人主页: 南桥几晴秋 🌈C++专栏: 南桥谈C++ 🌈C语言专栏: C语言学习系列 🌈Linux学习专栏: 南桥谈Linux 🌈数据结构学习专栏: 数据结构杂谈 🌈数据库学习专栏: 南桥谈MySQL 🌈Qt学习专栏: 南桥谈Qt 🌈菜鸡代码练习: 练习随想记录 🌈git学习: 南桥谈Git 🌈🌈🌈🌈🌈🌈🌈🌈🌈🌈🌈🌈🌈�

Centos7安装JDK1.8保姆版

工欲善其事,必先利其器。这句话同样适用于学习Java编程。在开始Java的学习旅程之前,我们必须首先配置好适合的开发环境。 通过事先准备好这些工具和配置,我们可以避免在学习过程中遇到因环境问题导致的代码异常或错误。一个稳定、高效的开发环境能够让我们更加专注于代码的学习和编写,提升学习效率,减少不必要的困扰和挫折感。因此,在学习Java之初,投入一些时间和精力来配置好开发环境是非常值得的。这将为我

【C++】_list常用方法解析及模拟实现

相信自己的力量,只要对自己始终保持信心,尽自己最大努力去完成任何事,就算事情最终结果是失败了,努力了也不留遗憾。💓💓💓 目录   ✨说在前面 🍋知识点一:什么是list? •🌰1.list的定义 •🌰2.list的基本特性 •🌰3.常用接口介绍 🍋知识点二:list常用接口 •🌰1.默认成员函数 🔥构造函数(⭐) 🔥析构函数 •🌰2.list对象