本文主要是介绍【Linux】生产者消费者模型{基于BlockingQueue的PC模型/RAII风格的加锁方式/串行,并行,并发},希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
文章目录
- 1.认识PC模型
- 2.基于BlockingQueue的PC模型
- 2.1串行,并行,并发
- 2.2理解linux下的并发
- 2.2RAII风格的加锁方式
- 2.3阻塞队列
- 2.4深入理解pthread_cond_wait
- 2.5整体代码
- 1.Task.hpp
- 2.lockGuard.hpp
- 3.BlockQueue.hpp
- 4.pcModel.cc
- 3.总结PC模型
1.认识PC模型
知乎好文
「生产者 - 消费者」模型
图解PC模型
通常所说的条件满足时去唤醒线程,如何知道条件满足了
生产者产生数据,所以能确定“有”数据
消费者消费数据,所以能确定“没有”数据
3种关系:生产者和生产者【互斥】,消费者和消费者【互斥】,生产者和消费者【互斥/同步】
生产者和生产者:竞争和互斥关系,多个生产者不能同时进行生产操作。
消费者和消费者:竞争和互斥关系,多个消费者不能同时进行消费操作。
生产者和消费者:互斥和同步调关系,生产者和消费者不能同时进行生产和消费操作;生产者和消费者要能够互相通知,来确定资源是否就绪。
2种角色:生产者/消费者
生产者线程和消费者线程
1个交易场所:超市
数据结构实现的缓冲区
目的/意义
- 完成对生产者和消费者的解耦,提高整个过程的效率。
- 生产者和消费者模型实现了线程的角色化,使不同角色的线程执行特定的工作。
- 生产者能够确定容器中有数据,而消费者能够确定容器中没有数据。以此来为条件变量发送信号唤醒线程提供依据。
- 生产者消费者模型中的容器实际上就是以一定形式组织的(数据结构)一段内存空间(缓冲区)
生产者消费者模型的优点 - 解耦:通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而是通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列;消费者不找生产者要数据,而是直接从阻塞队列里取。阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。这个阻塞队列就是用来给生产者和消费者解耦的。
- 提高效率:缓冲区的存在,实现了生产者和消费者的并发执行。多生产多消费模型,实现了生产者和生产者,消费者和消费者的并发执行。
2.基于BlockingQueue的PC模型
2.1串行,并行,并发
串行、并行和并发是计算机科学中常用来描述任务执行方式的重要概念。它们各自有着独特的特性和应用场景。以下是这三个概念的简要解释:
串行(Serial):
定义:在串行执行中,任务按照严格的顺序一个接一个地执行,前一个任务完成之前,下一个任务不会开始。
特性:没有两个任务会同时执行;资源利用率较低,因为处理器或其他资源在大部分时间可能处于空闲状态。
应用:在单核心处理器上,或者当任务之间存在严格的依赖关系时,通常会使用串行执行。
并行(Parallel):
定义:并行执行指的是多个任务同时执行,没有相互等待的情况。每个任务都在其自己的执行单元(如处理器核心)上独立运行。
特性:可以显著提高整体执行速度,因为多个任务可以同时进行;需要更多的硬件资源(如多核心处理器)来支持。
应用:适用于可以独立执行且没有相互依赖关系的任务。在科学计算、图形处理、大数据分析等领域广泛应用。
并发(Concurrency):
定义:并发执行指的是多个任务看起来是同时进行的,但实际上可能由于任务切换、资源争用等原因,并不是真正的同时执行。
特性:通过任务切换来模拟并行执行,使得多个任务能够在单个处理器核心上交替执行;并发控制机制(如锁、信号量等)用于管理对共享资源的访问,以避免数据不一致。
应用:在操作系统、多线程编程、网络编程等领域广泛应用。它允许程序在等待I/O操作(如网络请求或磁盘读写)时继续执行其他任务,从而提高整体性能。
区别与联系:
并行是真正的同时执行多个任务,而并发则更侧重于多个任务在逻辑上的同时执行,实际上可能由于任务切换等原因并不是真正的同时执行。
串行是最基本的执行方式,而并行和并发则是为了提高性能而采用的技术。
在多核心处理器上,可以实现真正的并行执行,而在单核心处理器上,则通常通过并发技术来模拟并行执行。
需要注意的是,虽然并行和并发都能提高性能,但它们也带来了复杂性,如同步问题、资源争用等,需要开发者仔细考虑和处理。
2.2理解linux下的并发
Linux下的并发主要涉及到多任务、多线程和进程间通信等多个方面。要理解Linux下的并发,我们可以从以下几个关键点出发:
多任务处理:Linux是一个多任务操作系统,这意味着它可以同时处理多个任务或进程。这些任务可以是用户级别的程序,也可以是系统级别的服务。操作系统通过时间片轮转或优先级调度等方式,使得这些任务看起来是同时执行的。
进程与线程:
进程:在Linux中,进程是资源分配的基本单位。每个进程都有自己的地址空间、文件描述符、信号处理器等。进程之间的通信和同步通常通过管道、消息队列、共享内存等方式实现。
线程:线程是CPU调度的基本单位。与进程相比,线程共享进程的地址空间和资源,因此线程间的通信和同步相对简单,但也需要小心处理共享数据的访问冲突。Linux通过POSIX线程(pthread)库支持多线程编程。
并发控制:当多个任务或线程需要访问共享资源时,就需要进行并发控制。这主要通过互斥锁(mutex)、读写锁、条件变量、信号量等机制实现。这些机制可以确保同一时间只有一个任务或线程能够访问共享资源,从而避免数据竞争和不一致。
异步编程:异步编程是处理并发的一种常见方法。在Linux中,可以使用异步I/O(如epoll)、事件驱动编程等方式实现异步处理。这种方式允许程序在等待某些操作(如网络请求或文件读写)完成时,继续执行其他任务,从而提高整体性能。
进程间通信(IPC):在Linux中,进程间通信是并发编程中不可或缺的一部分。常见的IPC机制包括管道、命名管道(FIFO)、消息队列、共享内存、信号和套接字等。这些机制允许不同进程之间交换数据和协调行动。
调度器与内核:Linux内核负责管理和调度进程和线程的执行。调度器根据进程的优先级、状态和资源需求等因素,决定哪个进程或线程应该获得CPU的使用权。内核还提供了各种系统调用和接口,支持并发编程的各种需求。
综上所述,Linux下的并发是一个复杂而丰富的领域,涉及到多任务处理、进程与线程、并发控制、异步编程、进程间通信以及内核调度等多个方面。理解这些概念和机制,对于编写高效、稳定的并发程序至关重要。
2.2RAII风格的加锁方式
RAII(Resource Acquisition Is Initialization)风格的加锁方式是一种在C++等编程语言中常见的资源管理技术,其核心思想是将资源的获取(如加锁)与对象的初始化绑定在一起,资源的释放(如解锁)则与对象的析构绑定在一起。通过这种方式,可以确保在对象的生命周期内,资源始终保持有效状态,并且无需显式地调用释放资源的函数。
在加锁的场景中,RAII风格的加锁方式通常是通过创建一个锁对象来实现的。当这个锁对象被创建时,它会自动地获取锁,而当对象离开其作用域或被销毁时,它会自动地释放锁。这样,程序员就无需在每个可能的退出路径上都显式地调用解锁函数,从而减少了出错的可能性,并提高了代码的可读性和可维护性。
例如,在C++中,可以使用智能指针或自定义的锁类来实现RAII风格的加锁。当智能指针或锁类的对象被创建时,它们会自动地获取锁;当这些对象离开其作用域或被销毁时(例如,在函数返回或异常发生时),它们的析构函数会自动地释放锁。
这种加锁方式不仅简化了代码,还提高了程序的健壮性。它可以帮助程序员避免忘记解锁或多次解锁等常见的并发问题,从而减少了死锁和竞态条件的风险。同时,由于锁的获取和释放是自动进行的,因此也可以提高程序的性能。
需要注意的是,虽然RAII风格的加锁方式可以简化并发编程,但并不能完全解决所有的并发问题。在编写并发程序时,还需要考虑其他因素,如线程间的通信、同步机制的选择等。因此,在使用RAII风格的加锁方式时,应结合具体的应用场景和需求来选择合适的解决方案。
2.3阻塞队列
在多线程编程中,阻塞队列(Blocking Queue)是一种常用于实现生产者和消费者模型的数据结构。与普通的队列区别在于,当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中被放入了元素;当队列满时,往队列里存放元素的操作也会被阻塞,直到有元素被从队列中取出(以上的操作都是基于不同的线程来说的,线程在对阻塞队列进行操作时会被阻塞)。在进程间管道通信中,管道就是一个简单的阻塞队列,当管道中没有数据时,读端需要阻塞等待;当管道中的数据存满时,写端需要阻塞等待。
2.4深入理解pthread_cond_wait
- 一个要访问临界资源的线程,在访问前先检测当前的临界资源是否满足访问条件。如果不满足,则去等待,在临界区中调用pthread_cond_wait时是持有锁的状态,当成功调用wait之后,会自动释放所持有的锁,这也是设计这个接口的设计者考虑周到的一方面:持有锁的线程检测到资源未就绪而去等待,他去等待了需要保证其他线程可以去访问临界资源,而访问临界资源就需要锁,所以调用wait的线程等待前会先释放所持有的锁,不影响其他线程获取锁
- 等待的线程被唤醒时是从被阻塞的地方唤醒,即被唤醒时仍处于临界区,wait函数的另一功能就是被唤醒的时候,会自动帮助线程获取锁,但是只要是一个函数,就可能调用失败,如果调用失败,该线程在应该等待的情况下没有执行等待,即发生了不合理的情况:队列已满还去执行push。也可能存在 伪唤醒 的情况,即对方线程在不应该唤醒的情况下唤醒了当前线程。为了防止上述两种情况,我们把if ⇒ while,即如果函数调用失败此时while条件仍满足,那么他就再次调用。如果被伪唤醒,那么它在被唤醒后,再调用while如果条件满足,再次去等待,直到被真正唤醒。
伪唤醒
条件变量的伪唤醒是指在没有满足特定条件的情况下,条件变量的等待操作被唤醒。这种情况可能是由于操作系统的调度策略或其他因素导致的,而不是由于满足了条件。
伪唤醒可能会导致程序的错误行为或不正确的结果。当一个线程被伪唤醒时,它会重新获得互斥锁并从等待操作的地方继续执行,但实际上条件并没有满足。这可能会导致线程在不正确的状态下执行,或者导致竞争条件的发生。
为了防止条件变量的伪唤醒,通常需要将条件的检查与等待操作放在一个while循环中。在等待操作被唤醒后,线程会再次检查条件是否满足,如果条件不满足,则继续等待。这样可以确保线程只在满足条件时才继续执行,避免了伪唤醒带来的问题。
signa放在解锁前和解锁后的问题:
放在解锁前,向目标进程发送唤醒信号,【此时未执行解锁】,目标线程被唤醒后,等待锁,即这个情况没有影响,目标线程从等待条件变量变成了等待锁。如果在向目标进程发送唤醒信号前,有其他线程处于被唤醒且等锁状态,那么目标线程被唤醒后,也没有影响:和其他线程竞争锁即可。
放在解锁后,在锁已经被释放的前提下,目标线程被唤醒,获取对应的锁即可。
2.5整体代码
1.Task.hpp
#pragma once#include <iostream>
#include <functional>typedef std::function<int(int, int)> func_t;class Task
{
public:Task() {}Task(int x, int y, func_t func): _x(x),_y(y),_func(func){}int operator()(){return _func(_x, _y);}public:int _x;int _y;func_t _func;
};
2.lockGuard.hpp
#pragma once#include <iostream>
#include <pthread.h>class Mutex
{
public:Mutex(pthread_mutex_t *mtx): pmtx_(mtx){}void lock(){//std::cout << "加锁中..." << std::endl;pthread_mutex_lock(pmtx_);}void unlock(){//std::cout << "解锁中..." << std::endl;pthread_mutex_unlock(pmtx_);}~Mutex(){}private:pthread_mutex_t *pmtx_;
};// RAII风格的加锁方式
class lockGuard
{
public:lockGuard(pthread_mutex_t *mtx): _mtx(mtx){_mtx.lock();}~lockGuard(){_mtx.unlock();}private:Mutex _mtx;
};
3.BlockQueue.hpp
#pragma once#include <iostream>
#include <queue>
#include <mutex>
#include <pthread.h>
#include "lockGuard.hpp"// #define INI_MTX(mtx) pthread_mutex_init(&mtx, nullptr)
// #define INI_COND(cond) pthread_cond_init(&_Empty, nullptr);
// #define INI_COND(cond) pthread_cond_init(&_Full, nullptr);const int g_DefaultCap = 5;template <class T>
class BlockQueue
{
private:bool isQueueEmpty(){return _bq.size() == 0;}bool isQueueFull(){return _bq.size() == _capacity;}public:BlockQueue(int capacity = g_DefaultCap): _capacity(capacity){pthread_mutex_init(&_mtx, nullptr);pthread_cond_init(&_Empty, nullptr);pthread_cond_init(&_Full, nullptr);}void push(const T &in){lockGuard lockguard(&_mtx); // 自动调用构造函数申请锁while (isQueueFull()) // 用while而非if==>访问临界资源100%确定资源就绪pthread_cond_wait(&_Full, &_mtx);_bq.push(in);pthread_cond_signal(&_Empty);} // 自动调用lockguard析构函数解锁void pop(T *out){lockGuard lockguard(&_mtx);while (isQueueEmpty())pthread_cond_wait(&_Empty, &_mtx);*out = _bq.front();_bq.pop();pthread_cond_signal(&_Full);}~BlockQueue(){pthread_mutex_destroy(&_mtx);pthread_cond_destroy(&_Empty);pthread_cond_destroy(&_Full);}private:std::queue<T> _bq; // 阻塞队列int _capacity; // 队列容量pthread_mutex_t _mtx; // 互斥锁pthread_cond_t _Empty; // 用它来表示bq 是否空的条件pthread_cond_t _Full; // 用它来表示bq 是否满的条件
};
4.pcModel.cc
#include <pthread.h>
#include <unistd.h>
#include <ctime>#include "Task.hpp"
#include "BlockQueue.hpp"int myAdd(int x, int y)
{return x + y;
}// 生产者启动例程
void *producer(void *args)
{BlockQueue<Task> *bq = (BlockQueue<Task> *)args;while (true){//生产数据(自己生产/网络获取/用户输入)int x = rand() % 10 + 1;usleep(rand() % 1000);int y = rand() % 5 + 1;Task t(x, y, myAdd);//发送数据bq->push(t);//for debug:输出消息std::cout << pthread_self() << " productor: " << t._x << "+" << t._y << "= ?" << std::endl;sleep(1);}return nullptr;
}// 消费者启动例程
void *consumer(void *args)
{BlockQueue<Task> *bq = (BlockQueue<Task> *)args;while (true){//接收数据Task t;bq->pop(&t);//处理数据std::cout << pthread_self() << " consumer : " << t._x << "+" << t._y << "=" << t() << std::endl;}return nullptr;
}int main()
{srand((uint64_t)time(nullptr) ^ getpid() ^ 0x13254);BlockQueue<Task> *bq = new BlockQueue<Task>();pthread_t consmr[2], prodcr[2];pthread_create(consmr, nullptr, consumer, bq);pthread_create(consmr + 1, nullptr, consumer, bq);pthread_create(prodcr, nullptr, producer, bq);pthread_create(prodcr + 1, nullptr, producer, bq);pthread_join(consmr[0], nullptr);pthread_join(consmr[1], nullptr);pthread_join(prodcr[0], nullptr);pthread_join(prodcr[1], nullptr);delete bq;return 0;
}
单线程生产/单线程消费PC模型生产/消费速度对结果影响
- 速度相当
- 生产速度快
- 生产者通过用户输入的信息获取数据
3.总结PC模型
- 生产者获取数据(发送数据数据前即他生产数据/创造数据)需要花时间,例如访问网络获取数据,读取磁盘数据;消费者处理数据(消费数据后)也需要花时间,例如进行数据计算,或者IO操作。
- 发送数据和接收数据必须串行执行(互斥),但是生产数据和处理数据可以并行执行。当消费者处理数据时,生产者可以向缓冲区发送数据,也可以生产数据;当生产者生产数据时,消费者可以从缓冲区接收数据,也可以处理数据。
- 缓冲区是通过实现生产者线程和消费者线程的并发执行来提高效率的
- 多个生产者线程在发送数据时必须串行执行(互斥),但在生产数据时(生产前)可以并发执行:当一个生产者正在生产数据时,其他生产者可以生产数据,也可以发送数据。
- 多个消费者线程在接收数据时必须串行执行(互斥),但在处理数据时(消费后)可以并发执行:当一个消费者正在处理数据时,其他消费者可以接收数据,也可以处理数据。
- 多生产多消费模型是通过实现生产者和生产者,消费者和消费者的并发执行来提高效率的。
- 多生产多消费模型适用于生产数据或处理数据比较花时间的场景,如IO操作,访问网络等。如果只是简单的处理过程就没有必要使用多生产多消费,因为线程切换反而成为效率的影响方。
这篇关于【Linux】生产者消费者模型{基于BlockingQueue的PC模型/RAII风格的加锁方式/串行,并行,并发}的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!