多线程无锁循环队列

2024-03-11 16:48
文章标签 多线程 队列 循环 无锁

本文主要是介绍多线程无锁循环队列,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

转自:https://blog.csdn.net/weixin_40825228/article/details/80783860

1、Lamport提出的无锁SPSC队列。

在其论文【论文】中证明了,在遵守【顺序一致性】内存模型的计算机中,单生产者单消费者(SPSC)先进先出队列中的锁是可以去除的,从而得到了一个无锁队列,并第一次给出了并发无锁先进先出(Concurrent Lock-free FIFO,CLF)队列的实现。通过去除队列中的锁,生产者、消费者可以并发的访问队列,从而提高了系统的并发执行度。

一种实现:

 
  1. #define DEFAULT_SIZE 30

  2. template<typename T>

  3. class CircuBuffer

  4. {

  5. public:

  6. CircuBuffer(unsigned size):

  7. size_(size)

  8. {

  9. writeIndex_=0;

  10. readIndex_=0;

  11. buffer_=new T[size_];

  12. }

  13.  
  14. CircuBuffer():

  15. size_(DEFAULT_SIZE)

  16. {

  17. writeIndex_=0;

  18. readIndex_=0;

  19. buffer_=new T[size_];

  20. }

  21.  
  22. ~CircuBuffer()

  23. {

  24. delete [] buffer_;

  25. }

  26.  
  27. unsigned getReadIndex() const

  28. {

  29. return readIndex_;

  30. }

  31.  
  32. unsigned getWriteIndex() const

  33. {

  34. return writeIndex_;

  35. }

  36.  
  37. bool isEmpty()

  38. {

  39. return writeIndex_==readIndex_;

  40. }

  41.  
  42. bool isFull()

  43. {

  44. return (readIndex_+1)%size_==writeIndex_;

  45. }

  46.  
  47. bool pushAnEle(T element)

  48. {

  49. //队列不为满

  50. if(!isFull())

  51. {

  52. buffer_[writeIndex_]=element;

  53. writeIndex_=(writeIndex_+1)%size_;

  54. return true;

  55. }

  56. else

  57. {

  58. return false;

  59. }

  60.  
  61. };

  62.  
  63. T* getAnEle()

  64. {

  65. //队列不为空

  66. if (!isEmpty())

  67. {

  68. T* temp=buffer_+readIndex_;

  69. readIndex_=(readIndex_+1)%size_;

  70. return temp;

  71. }

  72. else

  73. {

  74. return nullptr;

  75. }

  76. }

  77. private:

  78.  
  79. //写指针

  80. unsigned writeIndex_;

  81. //读指针

  82. unsigned readIndex_;

  83. //环形队列首地址

  84. T* buffer_;

  85. //环形队列的尺寸

  86. unsigned size_;

  87. };

往SPSC型的队列中放入数据的生产者改变【写指针】,消费者改变【读指针】。然而,生产者放入数据至队列中需要判满,这就需要读取【读指针】;同理消费者从队列中取数据,需要判空,这也需要读取【写指针】。若读指针与写指针在同一缓存行,而读写线程分别在不同的核上,这在多核平台上会产生严重的缓存颠簸。所谓的缓存颠簸是运行于多个核上的线程同时修改位于某个缓存行中的不同位置的数据时,导致该缓存行频繁地在多个核上被写无效的现象,这种现象会极大地损害系统的性能。

2、对于Lamport提出的无锁SPSC队列的改进型——FastForward队列。

第一节阐明了Lamport提出的无锁队列存在以下两个缺点:<1>由于生产者与消费者需要使用共享变量头指针与尾指针来同步信息,保存有头指针与尾指针的cacheline会频繁的分别被生产者与消费者修改,产生缓存颠簸,伤害系统的性能;<2>Lamport的CLF队列不能运行在支持弱内存一致性模型的机器中。

【改进1】针对上述去缺点,Join Giacomoni等人提出了针对缓存友好的CLF队列【缓存友好的CLF队列】。

针对缺点2的改进的CLF队列的一种实现:

 
  1. #ifndef SPSC_QUEUE_H

  2. #define SPSC_QUEUE_H

  3. #include <stdint.h>

  4. #include <string.h>

  5. #define ELE_ZERO 0

  6.  
  7. class BaseSPSCLockFreeQueue

  8. {

  9.  
  10. enum QueueState

  11. {

  12. FULL=-1,

  13. SUCCESS,

  14. EMPTY

  15. };

  16.  
  17. public:

  18. BaseSPSCLockFreeQueue(int* dataArr,uint32_t maxSize)

  19. {

  20. head_=0;

  21. tail_=0;

  22. maxQueueSize_=maxSize;

  23. dataArray_=dataArr;

  24. memset(dataArray_,ELE_ZERO,maxSize);

  25. }

  26.  
  27. ~BaseSPSCLockFreeQueue()

  28. {

  29. }

  30.  
  31. //注意 data!=0

  32. QueueState pushData2Queue(int& dataIn)

  33. {

  34. if(dataArray_[head_]!=ELE_ZERO)

  35. return FULL;

  36.  
  37. dataArray_[head_]=dataIn;

  38. head_=(head_+1)/maxQueueSize_;

  39. return SUCCESS;

  40. }

  41.  
  42. QueueState getDataFromQueue(int& dataOut)

  43. {

  44. if(dataArray_[tail_]==ELE_ZERO)

  45. return EMPTY;

  46.  
  47. dataOut=dataArray_[tail_];

  48. tail_=(tail_+1)/maxQueueSize_;

  49. return SUCCESS;

  50. }

  51.  
  52. private:

  53. uint32_t head_;

  54. uint32_t tail_;

  55. uint32_t maxQueueSize_;

  56. int* dataArray_;

  57. };

  58.  
  59. #endif // SPSC_QUEUE_H

上述针对缺点2的改进队列。在弱内存一致型的机器上也适用。情形一:假定生产者还未将数据放入但已将写指针往后移了一格,且初始时写指针与读指针在同一位置,由于消费者有判据,因此不会出现错误。情形二:假定消费者还未取走新的数据,但读指针已经后移了一格,由于写指针也有判据,因此也不会出现错误。

【改进2】针对存在的缓存行颠簸的问题。主要消除两个方面的缓存行颠簸:1、写指针与读指针错误的缓存行共享;2、数据的错误共享。

针对缺点1的改进的CLF队列的一种实现:

 
  1. #ifndef MCRINGBUFFER_H

  2. #define MCRINGBUFFER_H

  3.  
  4. #include <stdint.h>

  5. #include <assert.h>

  6. //x86-64 计算机中为 8*8=64Bytes

  7. #define CacheLineLength 8

  8. #define ELE_ZERO 0

  9. class MCRingBuffer

  10. {

  11.  
  12. public:

  13. MCRingBuffer(int* array,size_t maxSize):

  14. maxSize_(maxSize)

  15. {

  16. head_=0;

  17. tail_=0;

  18. count_=0;

  19. buffer_=array;

  20.  
  21. for(size_t i=0;i<maxSize;++i)

  22. buffer_[i]=ELE_ZERO;

  23. }

  24.  
  25. ~MCRingBuffer()

  26. {

  27. delete buffer_;

  28. }

  29.  
  30. bool pushData(int& dataIn)

  31. {

  32. tempArray[count_++]=dataIn;

  33. if(count_==CacheLineLength*2)

  34. {

  35. for(int i=0;i<CacheLineLength*2;++i)

  36. {

  37. if(buffer_[head_]!=ELE_ZERO)

  38. {

  39. count_=0;

  40. return false;

  41. }

  42. else

  43. {

  44. buffer_[head_]=tempArray[i];

  45. }

  46. head_=(head_+1)/maxSize_;

  47. }

  48. count_=0;

  49. }

  50. return true;

  51. }

  52.  
  53. bool getData(int& dataOut)

  54. {

  55. if(buffer_[tail_]==ELE_ZERO)

  56. return false;

  57.  
  58. dataOut=buffer_[tail_];

  59. buffer_[tail_]=ELE_ZERO;

  60. tail_=(tail_+1)/maxSize_;

  61. return true;

  62. }

  63.  
  64. private:

  65.  
  66. //读指针

  67. volatile unsigned long tail_;

  68. long tailPadding_[CacheLineLength-1];

  69. //写指针

  70. volatile unsigned long head_;

  71. long headPadding_[CacheLineLength-1];

  72.  
  73. size_t maxSize_;

  74. int* buffer_;

  75.  
  76. unsigned short count_;

  77. int tempArray[CacheLineLength*2];

  78.  
  79.  
  80. };

  81.  
  82. #endif // MCRINGBUFFER_H

4、实验对比。Lamport与FastForward队列性能对比。

这篇关于多线程无锁循环队列的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/798419

相关文章

Redis延迟队列的实现示例

《Redis延迟队列的实现示例》Redis延迟队列是一种使用Redis实现的消息队列,本文主要介绍了Redis延迟队列的实现示例,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习... 目录一、什么是 Redis 延迟队列二、实现原理三、Java 代码示例四、注意事项五、使用 Redi

JAVA中while循环的使用与注意事项

《JAVA中while循环的使用与注意事项》:本文主要介绍while循环在编程中的应用,包括其基本结构、语句示例、适用场景以及注意事项,文中通过代码介绍的非常详细,需要的朋友可以参考下... 目录while循环1. 什么是while循环2. while循环的语句3.while循环的适用场景以及优势4. 注意

Python中的异步:async 和 await以及操作中的事件循环、回调和异常

《Python中的异步:async和await以及操作中的事件循环、回调和异常》在现代编程中,异步操作在处理I/O密集型任务时,可以显著提高程序的性能和响应速度,Python提供了asyn... 目录引言什么是异步操作?python 中的异步编程基础async 和 await 关键字asyncio 模块理论

好题——hdu2522(小数问题:求1/n的第一个循环节)

好喜欢这题,第一次做小数问题,一开始真心没思路,然后参考了网上的一些资料。 知识点***********************************无限不循环小数即无理数,不能写作两整数之比*****************************(一开始没想到,小学没学好) 此题1/n肯定是一个有限循环小数,了解这些后就能做此题了。 按照除法的机制,用一个函数表示出来就可以了,代码如下

hdu1180(广搜+优先队列)

此题要求最少到达目标点T的最短时间,所以我选择了广度优先搜索,并且要用到优先队列。 另外此题注意点较多,比如说可以在某个点停留,我wa了好多两次,就是因为忽略了这一点,然后参考了大神的思想,然后经过反复修改才AC的 这是我的代码 #include<iostream>#include<algorithm>#include<string>#include<stack>#include<

poj 3190 优先队列+贪心

题意: 有n头牛,分别给他们挤奶的时间。 然后每头牛挤奶的时候都要在一个stall里面,并且每个stall每次只能占用一头牛。 问最少需要多少个stall,并输出每头牛所在的stall。 e.g 样例: INPUT: 51 102 43 65 84 7 OUTPUT: 412324 HINT: Explanation of the s

poj 2431 poj 3253 优先队列的运用

poj 2431: 题意: 一条路起点为0, 终点为l。 卡车初始时在0点,并且有p升油,假设油箱无限大。 给n个加油站,每个加油站距离终点 l 距离为 x[i],可以加的油量为fuel[i]。 问最少加几次油可以到达终点,若不能到达,输出-1。 解析: 《挑战程序设计竞赛》: “在卡车开往终点的途中,只有在加油站才可以加油。但是,如果认为“在到达加油站i时,就获得了一

poj3750约瑟夫环,循环队列

Description 有N个小孩围成一圈,给他们从1开始依次编号,现指定从第W个开始报数,报到第S个时,该小孩出列,然后从下一个小孩开始报数,仍是报到S个出列,如此重复下去,直到所有的小孩都出列(总人数不足S个时将循环报数),求小孩出列的顺序。 Input 第一行输入小孩的人数N(N<=64) 接下来每行输入一个小孩的名字(人名不超过15个字符) 最后一行输入W,S (W < N),用

POJ2010 贪心优先队列

c头牛,需要选n头(奇数);学校总共有f的资金, 每头牛分数score和学费cost,问合法招生方案中,中间分数(即排名第(n+1)/2)最高的是多少。 n头牛按照先score后cost从小到大排序; 枚举中间score的牛,  预处理左边与右边的最小花费和。 预处理直接优先队列贪心 public class Main {public static voi

多线程解析报表

假如有这样一个需求,当我们需要解析一个Excel里多个sheet的数据时,可以考虑使用多线程,每个线程解析一个sheet里的数据,等到所有的sheet都解析完之后,程序需要提示解析完成。 Way1 join import java.time.LocalTime;public class Main {public static void main(String[] args) thro