【Linux】生产者消费者模型{基于BlockingQueue的PC模型/RAII风格的加锁方式/串行,并行,并发}

本文主要是介绍【Linux】生产者消费者模型{基于BlockingQueue的PC模型/RAII风格的加锁方式/串行,并行,并发},希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

文章目录

  • 1.认识PC模型
  • 2.基于BlockingQueue的PC模型
    • 2.1串行,并行,并发
    • 2.2理解linux下的并发
    • 2.2RAII风格的加锁方式
    • 2.3阻塞队列
    • 2.4深入理解pthread_cond_wait
    • 2.5整体代码
      • 1.Task.hpp
      • 2.lockGuard.hpp
      • 3.BlockQueue.hpp
      • 4.pcModel.cc
  • 3.总结PC模型

1.认识PC模型

知乎好文

「生产者 - 消费者」模型

图解PC模型

在这里插入图片描述

通常所说的条件满足时去唤醒线程,如何知道条件满足

生产者产生数据,所以能确定“有”数据
消费者消费数据,所以能确定“没有”数据

3种关系:生产者和生产者【互斥】,消费者和消费者【互斥】,生产者和消费者【互斥/同步】

生产者和生产者:竞争和互斥关系,多个生产者不能同时进行生产操作。
消费者和消费者:竞争和互斥关系,多个消费者不能同时进行消费操作。
生产者和消费者:互斥和同步调关系,生产者和消费者不能同时进行生产和消费操作;生产者和消费者要能够互相通知,来确定资源是否就绪。

2种角色:生产者/消费者

生产者线程和消费者线程

1个交易场所:超市

数据结构实现的缓冲区

目的/意义

  1. 完成对生产者和消费者的解耦,提高整个过程的效率。
  2. 生产者和消费者模型实现了线程的角色化,使不同角色的线程执行特定的工作。
  3. 生产者能够确定容器中有数据,而消费者能够确定容器中没有数据。以此来为条件变量发送信号唤醒线程提供依据。
  4. 生产者消费者模型中的容器实际上就是以一定形式组织的(数据结构)一段内存空间(缓冲区)
    生产者消费者模型的优点
  5. 解耦:通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而是通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列;消费者不找生产者要数据,而是直接从阻塞队列里取。阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。这个阻塞队列就是用来给生产者和消费者解耦的。
  6. 提高效率:缓冲区的存在,实现了生产者和消费者的并发执行。多生产多消费模型,实现了生产者和生产者,消费者和消费者的并发执行。

2.基于BlockingQueue的PC模型

2.1串行,并行,并发

串行、并行和并发是计算机科学中常用来描述任务执行方式的重要概念。它们各自有着独特的特性和应用场景。以下是这三个概念的简要解释:

串行(Serial):
定义:在串行执行中,任务按照严格的顺序一个接一个地执行,前一个任务完成之前,下一个任务不会开始。
特性:没有两个任务会同时执行;资源利用率较低,因为处理器或其他资源在大部分时间可能处于空闲状态。
应用:在单核心处理器上,或者当任务之间存在严格的依赖关系时,通常会使用串行执行。
并行(Parallel):
定义:并行执行指的是多个任务同时执行,没有相互等待的情况。每个任务都在其自己的执行单元(如处理器核心)上独立运行。
特性:可以显著提高整体执行速度,因为多个任务可以同时进行;需要更多的硬件资源(如多核心处理器)来支持。
应用:适用于可以独立执行且没有相互依赖关系的任务。在科学计算、图形处理、大数据分析等领域广泛应用。
并发(Concurrency):
定义:并发执行指的是多个任务看起来是同时进行的,但实际上可能由于任务切换、资源争用等原因,并不是真正的同时执行。
特性:通过任务切换来模拟并行执行,使得多个任务能够在单个处理器核心上交替执行;并发控制机制(如锁、信号量等)用于管理对共享资源的访问,以避免数据不一致。
应用:在操作系统、多线程编程、网络编程等领域广泛应用。它允许程序在等待I/O操作(如网络请求或磁盘读写)时继续执行其他任务,从而提高整体性能。
区别与联系:

并行是真正的同时执行多个任务,而并发则更侧重于多个任务在逻辑上的同时执行,实际上可能由于任务切换等原因并不是真正的同时执行。
串行是最基本的执行方式,而并行和并发则是为了提高性能而采用的技术。
在多核心处理器上,可以实现真正的并行执行,而在单核心处理器上,则通常通过并发技术来模拟并行执行。
需要注意的是,虽然并行和并发都能提高性能,但它们也带来了复杂性,如同步问题、资源争用等,需要开发者仔细考虑和处理。

2.2理解linux下的并发

Linux下的并发主要涉及到多任务、多线程和进程间通信等多个方面。要理解Linux下的并发,我们可以从以下几个关键点出发:

多任务处理:Linux是一个多任务操作系统,这意味着它可以同时处理多个任务或进程。这些任务可以是用户级别的程序,也可以是系统级别的服务。操作系统通过时间片轮转或优先级调度等方式,使得这些任务看起来是同时执行的。

进程与线程:

进程:在Linux中,进程是资源分配的基本单位。每个进程都有自己的地址空间、文件描述符、信号处理器等。进程之间的通信和同步通常通过管道、消息队列、共享内存等方式实现。
线程:线程是CPU调度的基本单位。与进程相比,线程共享进程的地址空间和资源,因此线程间的通信和同步相对简单,但也需要小心处理共享数据的访问冲突。Linux通过POSIX线程(pthread)库支持多线程编程。
并发控制:当多个任务或线程需要访问共享资源时,就需要进行并发控制。这主要通过互斥锁(mutex)、读写锁、条件变量、信号量等机制实现。这些机制可以确保同一时间只有一个任务或线程能够访问共享资源,从而避免数据竞争和不一致。

异步编程:异步编程是处理并发的一种常见方法。在Linux中,可以使用异步I/O(如epoll)、事件驱动编程等方式实现异步处理。这种方式允许程序在等待某些操作(如网络请求或文件读写)完成时,继续执行其他任务,从而提高整体性能。

进程间通信(IPC):在Linux中,进程间通信是并发编程中不可或缺的一部分。常见的IPC机制包括管道、命名管道(FIFO)、消息队列、共享内存、信号和套接字等。这些机制允许不同进程之间交换数据和协调行动。

调度器与内核:Linux内核负责管理和调度进程和线程的执行。调度器根据进程的优先级、状态和资源需求等因素,决定哪个进程或线程应该获得CPU的使用权。内核还提供了各种系统调用和接口,支持并发编程的各种需求。

综上所述,Linux下的并发是一个复杂而丰富的领域,涉及到多任务处理、进程与线程、并发控制、异步编程、进程间通信以及内核调度等多个方面。理解这些概念和机制,对于编写高效、稳定的并发程序至关重要。

2.2RAII风格的加锁方式

RAII(Resource Acquisition Is Initialization)风格的加锁方式是一种在C++等编程语言中常见的资源管理技术,其核心思想是将资源的获取(如加锁)与对象的初始化绑定在一起,资源的释放(如解锁)则与对象的析构绑定在一起。通过这种方式,可以确保在对象的生命周期内,资源始终保持有效状态,并且无需显式地调用释放资源的函数。

在加锁的场景中,RAII风格的加锁方式通常是通过创建一个锁对象来实现的。当这个锁对象被创建时,它会自动地获取锁,而当对象离开其作用域或被销毁时,它会自动地释放锁。这样,程序员就无需在每个可能的退出路径上都显式地调用解锁函数,从而减少了出错的可能性,并提高了代码的可读性和可维护性。

例如,在C++中,可以使用智能指针或自定义的锁类来实现RAII风格的加锁。当智能指针或锁类的对象被创建时,它们会自动地获取锁;当这些对象离开其作用域或被销毁时(例如,在函数返回或异常发生时),它们的析构函数会自动地释放锁。

这种加锁方式不仅简化了代码,还提高了程序的健壮性。它可以帮助程序员避免忘记解锁或多次解锁等常见的并发问题,从而减少了死锁和竞态条件的风险。同时,由于锁的获取和释放是自动进行的,因此也可以提高程序的性能。

需要注意的是,虽然RAII风格的加锁方式可以简化并发编程,但并不能完全解决所有的并发问题。在编写并发程序时,还需要考虑其他因素,如线程间的通信、同步机制的选择等。因此,在使用RAII风格的加锁方式时,应结合具体的应用场景和需求来选择合适的解决方案。

2.3阻塞队列

在多线程编程中,阻塞队列(Blocking Queue)是一种常用于实现生产者和消费者模型的数据结构。与普通的队列区别在于,当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中被放入了元素;当队列满时,往队列里存放元素的操作也会被阻塞,直到有元素被从队列中取出(以上的操作都是基于不同的线程来说的,线程在对阻塞队列进行操作时会被阻塞)。在进程间管道通信中,管道就是一个简单的阻塞队列,当管道中没有数据时,读端需要阻塞等待;当管道中的数据存满时,写端需要阻塞等待。

2.4深入理解pthread_cond_wait

  1. 一个要访问临界资源的线程,在访问前先检测当前的临界资源是否满足访问条件。如果不满足,则去等待,在临界区中调用pthread_cond_wait时是持有锁的状态,当成功调用wait之后,会自动释放所持有的锁,这也是设计这个接口的设计者考虑周到的一方面:持有锁的线程检测到资源未就绪而去等待,他去等待了需要保证其他线程可以去访问临界资源,而访问临界资源就需要锁,所以调用wait的线程等待前会先释放所持有的锁,不影响其他线程获取锁
  2. 等待的线程被唤醒时是从被阻塞的地方唤醒,即被唤醒时仍处于临界区,wait函数的另一功能就是被唤醒的时候,会自动帮助线程获取锁,但是只要是一个函数,就可能调用失败,如果调用失败,该线程在应该等待的情况下没有执行等待,即发生了不合理的情况:队列已满还去执行push。也可能存在 伪唤醒 的情况,即对方线程在不应该唤醒的情况下唤醒了当前线程。为了防止上述两种情况,我们把if ⇒ while,即如果函数调用失败此时while条件仍满足,那么他就再次调用。如果被伪唤醒,那么它在被唤醒后,再调用while如果条件满足,再次去等待,直到被真正唤醒。

伪唤醒

条件变量的伪唤醒是指在没有满足特定条件的情况下,条件变量的等待操作被唤醒。这种情况可能是由于操作系统的调度策略或其他因素导致的,而不是由于满足了条件。
伪唤醒可能会导致程序的错误行为或不正确的结果。当一个线程被伪唤醒时,它会重新获得互斥锁并从等待操作的地方继续执行,但实际上条件并没有满足。这可能会导致线程在不正确的状态下执行,或者导致竞争条件的发生。
为了防止条件变量的伪唤醒,通常需要将条件的检查与等待操作放在一个while循环中。在等待操作被唤醒后,线程会再次检查条件是否满足,如果条件不满足,则继续等待。这样可以确保线程只在满足条件时才继续执行,避免了伪唤醒带来的问题。

signa放在解锁前和解锁后的问题:

放在解锁前,向目标进程发送唤醒信号,【此时未执行解锁】,目标线程被唤醒后,等待锁,即这个情况没有影响,目标线程从等待条件变量变成了等待锁。如果在向目标进程发送唤醒信号前,有其他线程处于被唤醒且等锁状态,那么目标线程被唤醒后,也没有影响:和其他线程竞争锁即可。
放在解锁后,在锁已经被释放的前提下,目标线程被唤醒,获取对应的锁即可。

2.5整体代码

1.Task.hpp

#pragma once#include <iostream>
#include <functional>typedef std::function<int(int, int)> func_t;class Task
{
public:Task() {}Task(int x, int y, func_t func): _x(x),_y(y),_func(func){}int operator()(){return _func(_x, _y);}public:int _x;int _y;func_t _func;
};

2.lockGuard.hpp

#pragma once#include <iostream>
#include <pthread.h>class Mutex
{
public:Mutex(pthread_mutex_t *mtx): pmtx_(mtx){}void lock(){//std::cout << "加锁中..." << std::endl;pthread_mutex_lock(pmtx_);}void unlock(){//std::cout << "解锁中..." << std::endl;pthread_mutex_unlock(pmtx_);}~Mutex(){}private:pthread_mutex_t *pmtx_;
};// RAII风格的加锁方式
class lockGuard
{
public:lockGuard(pthread_mutex_t *mtx): _mtx(mtx){_mtx.lock();}~lockGuard(){_mtx.unlock();}private:Mutex _mtx;
};

3.BlockQueue.hpp

#pragma once#include <iostream>
#include <queue>
#include <mutex>
#include <pthread.h>
#include "lockGuard.hpp"// #define INI_MTX(mtx) pthread_mutex_init(&mtx, nullptr)
// #define INI_COND(cond) pthread_cond_init(&_Empty, nullptr);
// #define INI_COND(cond) pthread_cond_init(&_Full, nullptr);const int g_DefaultCap = 5;template <class T>
class BlockQueue
{
private:bool isQueueEmpty(){return _bq.size() == 0;}bool isQueueFull(){return _bq.size() == _capacity;}public:BlockQueue(int capacity = g_DefaultCap): _capacity(capacity){pthread_mutex_init(&_mtx, nullptr);pthread_cond_init(&_Empty, nullptr);pthread_cond_init(&_Full, nullptr);}void push(const T &in){lockGuard lockguard(&_mtx); // 自动调用构造函数申请锁while (isQueueFull()) // 用while而非if==>访问临界资源100%确定资源就绪pthread_cond_wait(&_Full, &_mtx);_bq.push(in);pthread_cond_signal(&_Empty);} // 自动调用lockguard析构函数解锁void pop(T *out){lockGuard lockguard(&_mtx);while (isQueueEmpty())pthread_cond_wait(&_Empty, &_mtx);*out = _bq.front();_bq.pop();pthread_cond_signal(&_Full);}~BlockQueue(){pthread_mutex_destroy(&_mtx);pthread_cond_destroy(&_Empty);pthread_cond_destroy(&_Full);}private:std::queue<T> _bq;     // 阻塞队列int _capacity;         // 队列容量pthread_mutex_t _mtx;  // 互斥锁pthread_cond_t _Empty; // 用它来表示bq 是否空的条件pthread_cond_t _Full;  // 用它来表示bq 是否满的条件
};

4.pcModel.cc

#include <pthread.h>
#include <unistd.h>
#include <ctime>#include "Task.hpp"
#include "BlockQueue.hpp"int myAdd(int x, int y)
{return x + y;
}// 生产者启动例程
void *producer(void *args)
{BlockQueue<Task> *bq = (BlockQueue<Task> *)args;while (true){//生产数据(自己生产/网络获取/用户输入)int x = rand() % 10 + 1;usleep(rand() % 1000);int y = rand() % 5 + 1;Task t(x, y, myAdd);//发送数据bq->push(t);//for debug:输出消息std::cout << pthread_self() << " productor: " << t._x << "+" << t._y << "= ?" << std::endl;sleep(1);}return nullptr;
}// 消费者启动例程
void *consumer(void *args)
{BlockQueue<Task> *bq = (BlockQueue<Task> *)args;while (true){//接收数据Task t;bq->pop(&t);//处理数据std::cout << pthread_self() << " consumer : " << t._x << "+" << t._y << "=" << t() << std::endl;}return nullptr;
}int main()
{srand((uint64_t)time(nullptr) ^ getpid() ^ 0x13254);BlockQueue<Task> *bq = new BlockQueue<Task>();pthread_t consmr[2], prodcr[2];pthread_create(consmr, nullptr, consumer, bq);pthread_create(consmr + 1, nullptr, consumer, bq);pthread_create(prodcr, nullptr, producer, bq);pthread_create(prodcr + 1, nullptr, producer, bq);pthread_join(consmr[0], nullptr);pthread_join(consmr[1], nullptr);pthread_join(prodcr[0], nullptr);pthread_join(prodcr[1], nullptr);delete bq;return 0;
}

在这里插入图片描述

单线程生产/单线程消费PC模型生产/消费速度对结果影响

  1. 速度相当
    在这里插入图片描述
    在这里插入图片描述
  2. 生产速度快
    在这里插入图片描述
  3. 生产者通过用户输入的信息获取数据
    在这里插入图片描述

3.总结PC模型

  1. 生产者获取数据(发送数据数据前即他生产数据/创造数据)需要花时间,例如访问网络获取数据,读取磁盘数据;消费者处理数据(消费数据后)也需要花时间,例如进行数据计算,或者IO操作。
  2. 发送数据和接收数据必须串行执行(互斥),但是生产数据和处理数据可以并行执行。当消费者处理数据时,生产者可以向缓冲区发送数据,也可以生产数据;当生产者生产数据时,消费者可以从缓冲区接收数据,也可以处理数据。
  3. 缓冲区是通过实现生产者线程和消费者线程的并发执行来提高效率的
  4. 多个生产者线程在发送数据时必须串行执行(互斥),但在生产数据时(生产前)可以并发执行:当一个生产者正在生产数据时,其他生产者可以生产数据,也可以发送数据。
  5. 多个消费者线程在接收数据时必须串行执行(互斥),但在处理数据时(消费后)可以并发执行:当一个消费者正在处理数据时,其他消费者可以接收数据,也可以处理数据。
  6. 多生产多消费模型是通过实现生产者和生产者,消费者和消费者的并发执行来提高效率的。
  7. 多生产多消费模型适用于生产数据或处理数据比较花时间的场景,如IO操作,访问网络等。如果只是简单的处理过程就没有必要使用多生产多消费,因为线程切换反而成为效率的影响方。

这篇关于【Linux】生产者消费者模型{基于BlockingQueue的PC模型/RAII风格的加锁方式/串行,并行,并发}的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/852308

相关文章

大模型研发全揭秘:客服工单数据标注的完整攻略

在人工智能(AI)领域,数据标注是模型训练过程中至关重要的一步。无论你是新手还是有经验的从业者,掌握数据标注的技术细节和常见问题的解决方案都能为你的AI项目增添不少价值。在电信运营商的客服系统中,工单数据是客户问题和解决方案的重要记录。通过对这些工单数据进行有效标注,不仅能够帮助提升客服自动化系统的智能化水平,还能优化客户服务流程,提高客户满意度。本文将详细介绍如何在电信运营商客服工单的背景下进行

linux-基础知识3

打包和压缩 zip 安装zip软件包 yum -y install zip unzip 压缩打包命令: zip -q -r -d -u 压缩包文件名 目录和文件名列表 -q:不显示命令执行过程-r:递归处理,打包各级子目录和文件-u:把文件增加/替换到压缩包中-d:从压缩包中删除指定的文件 解压:unzip 压缩包名 打包文件 把压缩包从服务器下载到本地 把压缩包上传到服务器(zip

Andrej Karpathy最新采访:认知核心模型10亿参数就够了,AI会打破教育不公的僵局

夕小瑶科技说 原创  作者 | 海野 AI圈子的红人,AI大神Andrej Karpathy,曾是OpenAI联合创始人之一,特斯拉AI总监。上一次的动态是官宣创办一家名为 Eureka Labs 的人工智能+教育公司 ,宣布将长期致力于AI原生教育。 近日,Andrej Karpathy接受了No Priors(投资博客)的采访,与硅谷知名投资人 Sara Guo 和 Elad G

Linux 网络编程 --- 应用层

一、自定义协议和序列化反序列化 代码: 序列化反序列化实现网络版本计算器 二、HTTP协议 1、谈两个简单的预备知识 https://www.baidu.com/ --- 域名 --- 域名解析 --- IP地址 http的端口号为80端口,https的端口号为443 url为统一资源定位符。CSDNhttps://mp.csdn.net/mp_blog/creation/editor

【Python编程】Linux创建虚拟环境并配置与notebook相连接

1.创建 使用 venv 创建虚拟环境。例如,在当前目录下创建一个名为 myenv 的虚拟环境: python3 -m venv myenv 2.激活 激活虚拟环境使其成为当前终端会话的活动环境。运行: source myenv/bin/activate 3.与notebook连接 在虚拟环境中,使用 pip 安装 Jupyter 和 ipykernel: pip instal

内核启动时减少log的方式

内核引导选项 内核引导选项大体上可以分为两类:一类与设备无关、另一类与设备有关。与设备有关的引导选项多如牛毛,需要你自己阅读内核中的相应驱动程序源码以获取其能够接受的引导选项。比如,如果你想知道可以向 AHA1542 SCSI 驱动程序传递哪些引导选项,那么就查看 drivers/scsi/aha1542.c 文件,一般在前面 100 行注释里就可以找到所接受的引导选项说明。大多数选项是通过"_

Retrieval-based-Voice-Conversion-WebUI模型构建指南

一、模型介绍 Retrieval-based-Voice-Conversion-WebUI(简称 RVC)模型是一个基于 VITS(Variational Inference with adversarial learning for end-to-end Text-to-Speech)的简单易用的语音转换框架。 具有以下特点 简单易用:RVC 模型通过简单易用的网页界面,使得用户无需深入了

透彻!驯服大型语言模型(LLMs)的五种方法,及具体方法选择思路

引言 随着时间的发展,大型语言模型不再停留在演示阶段而是逐步面向生产系统的应用,随着人们期望的不断增加,目标也发生了巨大的变化。在短短的几个月的时间里,人们对大模型的认识已经从对其zero-shot能力感到惊讶,转变为考虑改进模型质量、提高模型可用性。 「大语言模型(LLMs)其实就是利用高容量的模型架构(例如Transformer)对海量的、多种多样的数据分布进行建模得到,它包含了大量的先验

图神经网络模型介绍(1)

我们将图神经网络分为基于谱域的模型和基于空域的模型,并按照发展顺序详解每个类别中的重要模型。 1.1基于谱域的图神经网络         谱域上的图卷积在图学习迈向深度学习的发展历程中起到了关键的作用。本节主要介绍三个具有代表性的谱域图神经网络:谱图卷积网络、切比雪夫网络和图卷积网络。 (1)谱图卷积网络 卷积定理:函数卷积的傅里叶变换是函数傅里叶变换的乘积,即F{f*g}