《探索C++多线程》:condition_variable源码(一)

2024-04-27 23:48

本文主要是介绍《探索C++多线程》:condition_variable源码(一),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

《探索C++多线程》:condition_variable源码(一)

。 https://blog.csdn.net/hujingshuang/article/details/70596630

        现在接着学习关于多线程编程的特征,在这一节,将会了解到多线程中的condition_variable(条件变量)的相关知识。

        在头文件<condition_variable>中有两种条件变量的类声明与定义:condition_variable、condition_variable_any,在这一节中先来了解condition_variable。

 

condiction_variable

        条件变量是一种多线程的同步机制,它能够阻塞线程,直到某一条件满足。条件变量要与互斥量联合使用,以避免出现竞争的情况,当调用condition_variable的一个等待函数时,它使用一个unique_lock对象来锁定线程。

        源码之前,了无秘密。下面就将来看一下condition_variable源码,如下:

 

 
  1. class condition_variable {

  2. public:

  3. typedef _Cnd_t native_handle_type;

  4.  
  5. condition_variable() { // 构造函数,初始化条件变量,所有的条件变量必须初始化后才能使用。

  6. _Cnd_initX(&_Cnd);

  7. }

  8.  
  9. ~condition_variable() _NOEXCEPT { // 析构函数

  10. _Cnd_destroy(&_Cnd);

  11. }

  12.  
  13. condition_variable(const condition_variable&) = delete;

  14. condition_variable& operator=(const condition_variable&) = delete;

  15.  
  16. void notify_one() _NOEXCEPT { // 唤醒一个在等待线程

  17. _Cnd_signalX(&_Cnd);

  18. }

  19.  
  20. void notify_all() _NOEXCEPT { // 唤醒所有在等待的线程

  21. _Cnd_broadcastX(&_Cnd);

  22. }

  23.  
  24. void wait(unique_lock<mutex>& _Lck) { // 等待

  25. _Cnd_waitX(&_Cnd, &_Lck.mutex()->_Mtx);

  26. }

  27.  
  28. template<class _Predicate>

  29. void wait(unique_lock<mutex>& _Lck, _Predicate _Pred) { // 等待,带有描述式

  30. while (!_Pred())

  31. wait(_Lck);

  32. }

  33.  
  34. template<class _Rep, class _Period>

  35. _Cv_status wait_for(unique_lock<mutex>& _Lck, const chrono::duration<_Rep, _Period>& _Rel_time) {

  36. stdext::threads::xtime _Tgt = _To_xtime(_Rel_time);

  37. return (wait_until(_Lck, &_Tgt));

  38. }

  39.  
  40. template<class _Rep, class _Period, class _Predicate>

  41. bool wait_for(unique_lock<mutex>& _Lck, const chrono::duration<_Rep, _Period>& _Rel_time, _Predicate _Pred) {

  42. stdext::threads::xtime _Tgt = _To_xtime(_Rel_time);

  43. return (wait_until(_Lck, &_Tgt, _Pred));

  44. }

  45.  
  46. template<class _Clock, class _Duration>

  47. _Cv_status wait_until( unique_lock<mutex>& _Lck, const chrono::time_point<_Clock, _Duration>& _Abs_time) {

  48. typename chrono::time_point<_Clock, _Duration>::duration

  49. _Rel_time = _Abs_time - _Clock::now();

  50. return (wait_for(_Lck, _Rel_time));

  51. }

  52.  
  53. template<class _Clock, class _Duration, class _Predicate>

  54. bool wait_until(unique_lock<mutex>& _Lck, const chrono::time_point<_Clock, _Duration>& _Abs_time, _Predicate _Pred) {

  55. typename chrono::time_point<_Clock, _Duration>::duration

  56. _Rel_time = _Abs_time - _Clock::now();

  57. return (wait_for(_Lck, _Rel_time, _Pred));

  58. }

  59.  
  60. _Cv_status wait_until( unique_lock<mutex>& _Lck, const xtime *_Abs_time) {

  61. if (!_Mtx_current_owns(&_Lck.mutex()->_Mtx))

  62. _Throw_Cpp_error(_OPERATION_NOT_PERMITTED);

  63. int _Res = _Cnd_timedwaitX(&_Cnd, &_Lck.mutex()->_Mtx, _Abs_time);

  64. return (_Res == _Thrd_timedout ? cv_status::timeout : cv_status::no_timeout);

  65. }

  66.  
  67. template<class _Predicate>

  68. bool wait_until(unique_lock<mutex>& _Lck, const xtime *_Abs_time, _Predicate _Pred) {

  69. bool _Res = true;

  70. while (_Res && !_Pred())

  71. _Res = wait_until(_Lck, _Abs_time)

  72. != cv_status::timeout;

  73. return (_Pred());

  74. }

  75.  
  76. native_handle_type native_handle() { // 返回条件变量的句柄

  77. return (_Cnd);

  78. }

  79.  
  80. void _Register(unique_lock<mutex>& _Lck, int *_Ready) {

  81. _Cnd_register_at_thread_exit(&_Cnd, &_Lck.release()->_Mtx, _Ready);

  82. }

  83.  
  84. void _Unregister(mutex& _Mtx) {

  85. _Cnd_unregister_at_thread_exit(&_Mtx._Mtx);

  86. }

  87.  
  88. private:

  89. _Cnd_t _Cnd;

  90. };

        condition_variable是一个类,它的定义有一点长,不过我们可以注意到,不外乎就是:私有成员变量、构造函数、析构函数、等待与唤醒方法。

 

        首先,我们来看构造函数和析构函数:

 

 
  1. condition_variable() { // 构造函数,初始化条件变量,所有的条件变量必须初始化后才能使用。

  2. _Cnd_initX(&_Cnd);

  3. }

  4.  
  5. ~condition_variable() _NOEXCEPT { // 析构函数

  6. _Cnd_destroy(&_Cnd);

  7. }

        构造函数只有一个,且不带任何参数;而且该类中禁止了拷贝构造函数和赋值函数,这样一来我们要构造一个对象就比较简单了。
        其次,我们来分析condition_variable的等待、唤醒操作:

 

                等待操作:wait()、wait_for()、wait_until()

                唤醒操作:notify_one()、notify_all()

        在详细讲解之前,先来看一段代码,然大家了解是如何使用condition_variable的,如下:

 

 
  1. #include <iostream> // std::cout

  2. #include <thread> // std::thread

  3. #include <mutex> // std::mutex, std::unique_lock

  4. #include <condition_variable> // std::condition_variable

  5.  
  6. using namespace std;

  7.  
  8. mutex mtx; // 互斥量

  9. condition_variable cv; // 条件变量

  10. bool ready = false; // 标志量

  11.  
  12. void print_id(int id) {

  13. unique_lock<mutex> lck(mtx); // 上锁

  14. while (!ready) {

  15. cv.wait(lck); // 线程等待直到被唤醒(释放锁 + 等待,唤醒,在函数返回之前重新上锁)

  16. }

  17. cout << "thread " << id << '\n';

  18. }

  19.  
  20. void go() {

  21. unique_lock<mutex> lck(mtx); // 上锁

  22. ready = true;

  23. cv.notify_all(); // 唤醒所有正在等待(挂起)的线程(在这里面要释放锁,为了在wait函数返回之前能成功的重新上锁)

  24. }

  25.  
  26. int main() {

  27. thread threads[10];

  28. for (int i = 0; i<10; ++i) {

  29. threads[i] = thread(print_id, i);

  30. }

  31.  
  32. cout << "10 threads ready to race...\n";

  33. go();

  34.  
  35. for (auto& th : threads) {

  36. th.join();

  37. }

  38.  
  39. return 0;

  40. }

        接下来基于上述代码,我们来分析一下多线程代码时如何运行的。

 

        首先,定义了几个全局变量:互斥量(mtx)、条件变量(cv)、标志量(ready)。整个代码的作用就是,当10个线程都准备好了之后,再并发执行(就好像赛马,把马牵出来,当所有马匹在赛道上就绪之后,再开始跑),也就是使用条件变量起到同步的作用。

        在代码中结合使用了mutex与condition_variable,请大家仔细阅读一下代码,然后再仔细阅读下面的解析:

                1、在print_id中,线程先将互斥量上锁(使用的unique_lock<mutex>),再判断ready,若ready为false,说明条件不满足,那么调用条件变量的wait()方法将线程挂起;

                2、当所有的线程都在等待状态时,说明所有线程已就绪,此时在主线程中将ready设为true,并调用notify_all()将所有挂起线程唤醒;

                3、所有线程被唤醒之后,并发执行打印自己的id;

                4、使用thread::join()方法,等所有线程都执行完毕,主线程才接着执行,直到出现结束。

        我们举例代码的运行过程就是这样,但是我们仔细想一想第1个过程:当有一个线程一来,就将互斥量先上了锁,然后发现条件不满足,就进入了挂起状态,此时代码中并没有解锁操作,那么该线程就一直持有锁(锁被独占了),这样的话其他线程根本就没有机会获取锁,那还谈什么后面的全部线程挂起、并发执行呢?

        实际上,上述分析并没有错,确实是这样的道理。但是这种问题是怎么解决的呢,其实这就是条件变量对象中wait()、notify()方法要处理的了,过程如下:

                1、线程A一来,就将互斥量上锁(持有了锁),ready为false,那么线程A将调用条件变量的wait()方法;

                2、在wait()方法中,做的第一件事就是将互斥量解锁(释放持有权),并进入等待状态(在wait()中阻塞,线程A挂起);

                3、现在线程B来了,互斥量是没有上锁的,所以线程B能持有锁,同理,接下来线程B也会挂起;

                4、当所有线程都挂起了(就绪),此时互斥量也没有被上锁,在主线程中将ready置为true,并调用notify_all()将所有挂起的线程都唤醒;

                5、此时所有线程将从wait()方法中返回,比如线程C先返回,在return之前,wait()方法做的最后一件事就是自动将互斥量上锁(线程C重新持有锁,以配合unique_lock的析构函数);

               6、由于while循环,此时再判断到ready为true,那么线程C将执行打印id的语句,由于此时只有线程C持有锁,不存在线程竞争问题,执行完打印之后,线程C就结束了,此时由unique_lock的析构函数解锁,释放所有权。

                7、由于在wait()方法return之前,会自动重新去持有锁,若此时锁由线程C持有,则其他线程将继续阻塞,直到线程C释放锁;若线程C执行完毕后释放了锁,那么其他线程将会争取锁的持有权,争取到锁的就会像之前的线程C一样;没有争取到的就继续阻塞;

                8、以此类推,由于每个线程都join,那么当所有线程执行完毕后,主线程才会继续执行;

        实际上,条件变量的wait()、wait_for()、wait_until()方法中所作的事是:解锁 + 等待、唤醒、加锁,这三个是有序发生的。综上所述,这就是condition_variable与mutex联合使用的大致过程,这也是条件变量同步机制的原理。可能写得有些啰嗦。

        下面,我们再来看一个例子,代码如下:

 

 
  1. #include <iostream>

  2. #include <string>

  3. #include <thread>

  4. #include <mutex>

  5. #include <condition_variable>

  6.  
  7. using namespace std;

  8.  
  9. mutex m;

  10. condition_variable cv;

  11. string data;

  12.  
  13. bool ready = false;

  14. bool processed = false;

  15.  
  16. void worker_thread() {

  17. unique_lock<mutex> lk(m); // 【1】

  18. cv.wait(lk, []{return ready;}); // 等价于 while(!ready) { cv.wait(lk); }

  19.  
  20. cout << "Worker thread is processing data\n";

  21. data += " after processing";

  22.  
  23. processed = true;

  24. cout << "Worker thread signals data processing completed\n";

  25.  
  26. //lk.unlock(); // 手动解锁

  27. cv.notify_one();

  28. }

  29.  
  30. int main() {

  31. thread worker(worker_thread);

  32.  
  33. data = "Example data";

  34.  
  35. {

  36. lock_guard<mutex> lk(m); // 【2】

  37. ready = true;

  38. cout << "main() signals data ready for processing\n";

  39. }

  40.  
  41. cv.notify_one(); // 唤醒线程worker

  42.  
  43. {

  44. unique_lock<mutex> lk(m); // 【3】

  45. cv.wait(lk, []{return processed; }); // 等价于 while(!processed) { cv.wait(lk); }

  46. }

  47.  
  48. cout << "Back in main(), data = " << data << '\n';

  49.  
  50. worker.join();

  51.  
  52. getchar();

  53. return 0;

  54. }

 

可以仔细瞧瞧上述代码,拢共两个线程:主线程、线程worker。接着我们来分析一下:

        1、主线程执行,并创建了线程worker,此时便有两个线程在运行了;

        2、可以看到代码中标出的【1】、【2】位置处都对互斥量进行上锁,由于线程的执行是不确定的,所以并不清楚先执行【1】还是【2】,也就是说有可能主线程先持有锁,也有可能是worker先持有锁;

        3、分为两种情况:

                情况一:worker先持有了锁(先执行【1】),那么主线程将会被阻塞。此时由于ready为false,根据之前讲的,worker执行wait()方法时会解锁,并进入等待状态;解锁后,主线程就会争取持有锁,由于worker在等待状态,没有其他线程跟主线程竞争,主线程会立即获取锁的持有权(执行【2】),并将ready置为true,在【2】所在的代码块结束后,lock_guard的析构函数会释放锁,接下来主线程调用notify_one()方法唤醒一个正在等待的线程(这里只有worker线程处于等待状态);当worker线程被唤醒后,也就意味着要离开wait()了,在离开之前要做的最后一件事是上锁,也就是使worker重新持有锁。那么此时问题来了,由于worker已经醒来,与主线程并发执行,那到底是worker先重新持有锁,还是主线程的【3】先持有锁呢?这是一个要考虑的问题;

                情况二:主线程先持有了锁(先执行【2】),那么worker将会被阻塞。在主线程中会将ready置为true,并结束【2】所在的语句块,释放锁的持有权;由于线程worker一直阻塞的,一旦互斥量解锁了,线程worker就会争取锁的持有权,那到底是worker先持有锁,还是主线程的【3】先持有锁呢?这是一个要考虑的问题。

        4、

                在3的情况一中:如果线程worker在wait()中争取到了锁的持有权,也就是在wait()方法中先重新上锁了,那么主线程将与worker并发执行;

                        ① 若在worker线程手动解锁之前,执行了主线程的【3】,那么主线程将被阻塞(因为worker正持有锁),worker将processed置为true,然后手动解锁;此时主线程与worker并发执行,而且主线程将立即持有锁(没有线程竞争 了),由于此时processed为ture,主线程就不会进入等待状态,而是继续执行,直到worker执行完毕后,主线程才会结束;

                        ② 若在worker线程手动解锁之后,执行了主线程的【3】,那么主线程不会进入等待状态(worker中processed已置为true了),而是继续执行,直到worker执行完毕后,主线程才会结束;

                在3的情况一中:如果主线程争取到了锁的持有权,也就是在wait()方法中重新上锁之前,执行了主线程的【3】,那么worker将继续阻塞在wait()方法中;此时processed为false,主线程调用wait()方法,释放锁,并进入等待状态,此时worker将在wait()中重新上锁成功(无线程竞争了),worker继续执行,将processed置为true,并进行手动解锁,最后调用notify_one()唤醒一个等待的线程(有且只有主线程);主线程被唤醒后,继续执行,直到worker执行完毕后,主线程才会结束。

 

                在3的情况二中,如果主线程争取到了锁的持有权,也就是【3】在【1】之前执行了,那么worker将继续阻塞,由于此时processed为false,主线程将调用wait()方法,解锁并进入等待状态;互斥量解锁之后,worker将会立即持有锁(由于主线程处于等待状态,没有竞争的线程了);此时,worker中判断ready,发现ready已经是true了,那么worker将继续执行不进入等待状态,执行中将processed置为true,并进行了手动解锁,最后调用notify_one()方法来唤醒一个等待状态的线程(有且只有主线程被唤醒);主线程被唤醒之后,将从wait()中返回,在返回之前做的最后一件事是重新持有锁(此时直接就持有了,没有竞争的线程);最后继续执行,直到worker执行完毕,主线程才会结束。

 

再来说一说为什么需要做手动解锁这一步:

        其实在这个例子中,做不做手动解锁这一步,没有太大的影响,也不会影响多线程的运行结果,根据上述分析,我所能想到的影响就是,存在这么一种情况:如果不手动释放锁(worker中的lk.unlock()这条语句被注释掉了),那么主线程在等待状态时,如果worker调用了notify_one()将主线程唤醒,此时由于worker还持有锁,所以主线程的wait()方法在重新上锁时,依然会被阻塞,这种情况直到线程worker运行完毕,unique_lock的析构函数释放了锁,那么此时主线程才能重新上锁成功;也就是说,手动解锁避免了主线程被唤醒后还要被阻塞的情况。

        另外,还有一点:为什么要使用unique_lock对象来对加锁、解锁呢?请参见我的下一篇文章:《探索C++多线程》:condition_variable源码(二)。

        这一节花了大篇幅分析了两个例子,我觉得这样才能更好理解和运用condition_variable,可能存在很多口水话的地方,希望不要介意。

condition_variable的方法:

        condition_variable::notify_one():唤醒一个处于等待状态的线程;

        condition_variable::notify_all():唤醒所有处于等待状态的线程;

        condition_variable::wait():将线程置于等待状态,直到被notify_xxx()唤醒;

        condition_variable::wait_for():将线程置于等待状态,直到一段时间结束后自动醒来或被notify_xxx()唤醒;

        condition_variable::wait_until():将线程置于等待状态,直到指定的时间点到来自动唤醒或被notify_xxx()唤醒;

以上就是本篇博文所剖析的知识点,如果有表达不清或者有误的地方,请各位大佬多多指教~

这篇关于《探索C++多线程》:condition_variable源码(一)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/941819

相关文章

【C++ Primer Plus习题】13.4

大家好,这里是国中之林! ❥前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住分享一下给大家。点击跳转到网站。有兴趣的可以点点进去看看← 问题: 解答: main.cpp #include <iostream>#include "port.h"int main() {Port p1;Port p2("Abc", "Bcc", 30);std::cout <<

深入探索协同过滤:从原理到推荐模块案例

文章目录 前言一、协同过滤1. 基于用户的协同过滤(UserCF)2. 基于物品的协同过滤(ItemCF)3. 相似度计算方法 二、相似度计算方法1. 欧氏距离2. 皮尔逊相关系数3. 杰卡德相似系数4. 余弦相似度 三、推荐模块案例1.基于文章的协同过滤推荐功能2.基于用户的协同过滤推荐功能 前言     在信息过载的时代,推荐系统成为连接用户与内容的桥梁。本文聚焦于

C++包装器

包装器 在 C++ 中,“包装器”通常指的是一种设计模式或编程技巧,用于封装其他代码或对象,使其更易于使用、管理或扩展。包装器的概念在编程中非常普遍,可以用于函数、类、库等多个方面。下面是几个常见的 “包装器” 类型: 1. 函数包装器 函数包装器用于封装一个或多个函数,使其接口更统一或更便于调用。例如,std::function 是一个通用的函数包装器,它可以存储任意可调用对象(函数、函数

C++11第三弹:lambda表达式 | 新的类功能 | 模板的可变参数

🌈个人主页: 南桥几晴秋 🌈C++专栏: 南桥谈C++ 🌈C语言专栏: C语言学习系列 🌈Linux学习专栏: 南桥谈Linux 🌈数据结构学习专栏: 数据结构杂谈 🌈数据库学习专栏: 南桥谈MySQL 🌈Qt学习专栏: 南桥谈Qt 🌈菜鸡代码练习: 练习随想记录 🌈git学习: 南桥谈Git 🌈🌈🌈🌈🌈🌈🌈🌈🌈🌈🌈🌈🌈�

JAVA智听未来一站式有声阅读平台听书系统小程序源码

智听未来,一站式有声阅读平台听书系统 🌟&nbsp;开篇:遇见未来,从“智听”开始 在这个快节奏的时代,你是否渴望在忙碌的间隙,找到一片属于自己的宁静角落?是否梦想着能随时随地,沉浸在知识的海洋,或是故事的奇幻世界里?今天,就让我带你一起探索“智听未来”——这一站式有声阅读平台听书系统,它正悄悄改变着我们的阅读方式,让未来触手可及! 📚&nbsp;第一站:海量资源,应有尽有 走进“智听

【C++】_list常用方法解析及模拟实现

相信自己的力量,只要对自己始终保持信心,尽自己最大努力去完成任何事,就算事情最终结果是失败了,努力了也不留遗憾。💓💓💓 目录   ✨说在前面 🍋知识点一:什么是list? •🌰1.list的定义 •🌰2.list的基本特性 •🌰3.常用接口介绍 🍋知识点二:list常用接口 •🌰1.默认成员函数 🔥构造函数(⭐) 🔥析构函数 •🌰2.list对象

06 C++Lambda表达式

lambda表达式的定义 没有显式模版形参的lambda表达式 [捕获] 前属性 (形参列表) 说明符 异常 后属性 尾随类型 约束 {函数体} 有显式模版形参的lambda表达式 [捕获] <模版形参> 模版约束 前属性 (形参列表) 说明符 异常 后属性 尾随类型 约束 {函数体} 含义 捕获:包含零个或者多个捕获符的逗号分隔列表 模板形参:用于泛型lambda提供个模板形参的名

6.1.数据结构-c/c++堆详解下篇(堆排序,TopK问题)

上篇:6.1.数据结构-c/c++模拟实现堆上篇(向下,上调整算法,建堆,增删数据)-CSDN博客 本章重点 1.使用堆来完成堆排序 2.使用堆解决TopK问题 目录 一.堆排序 1.1 思路 1.2 代码 1.3 简单测试 二.TopK问题 2.1 思路(求最小): 2.2 C语言代码(手写堆) 2.3 C++代码(使用优先级队列 priority_queue)

Java ArrayList扩容机制 (源码解读)

结论:初始长度为10,若所需长度小于1.5倍原长度,则按照1.5倍扩容。若不够用则按照所需长度扩容。 一. 明确类内部重要变量含义         1:数组默认长度         2:这是一个共享的空数组实例,用于明确创建长度为0时的ArrayList ,比如通过 new ArrayList<>(0),ArrayList 内部的数组 elementData 会指向这个 EMPTY_EL

如何在Visual Studio中调试.NET源码

今天偶然在看别人代码时,发现在他的代码里使用了Any判断List<T>是否为空。 我一般的做法是先判断是否为null,再判断Count。 看了一下Count的源码如下: 1 [__DynamicallyInvokable]2 public int Count3 {4 [__DynamicallyInvokable]5 get