Java并发编程与技术内幕:ArrayBlockingQueue、LinkedBlockingQueue及SynchronousQueue源码解析

本文主要是介绍Java并发编程与技术内幕:ArrayBlockingQueue、LinkedBlockingQueue及SynchronousQueue源码解析,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

一、BlockingQueue介绍与常用方法

BlockingQueue是一个阻塞队列。在高并发场景是用得非常多的,在线程池中。如果运行线程数目大于核心线程数目时,也会尝试把新加入的线程放到一个BlockingQueue中去。队列的特性就是先进先出很容易理解,在java里头它的实现类主要有下图的几种,其中最常用到的是ArrayBlockingQueue、LinkedBlockingQueue及SynchronousQueue这三种,这三个也是今天主要讲的类。

它主要的方法有

BlockingQueue的核心方法:
1、放入数据

  (1) add(object)

    队列没满的话,放入成功。否则抛出异常。

 (2)offer(object):

    表示如果可能的话,将object加到BlockingQueue里,即如果BlockingQueue可以容纳,则返回true,否则返回false.(本方法不阻塞当前执行方法的线程)
 (3)offer(E o, long timeout, TimeUnit unit)

      可以设定等待的时间,如果在指定的时间内,还不能往队列中加入BlockingQueue,则返回失败。
(4)put(object)

       把object加到BlockingQueue里,如果BlockQueue没有空间,则调用此方法的线程阻塞。直到BlockingQueue里面有空间再继续.
2、获取数据
(1)poll(time)

   取走BlockingQueue里排在首位的对象,若不能立即取出,则可以等time参数规定的时间,取不到时返回null;
(2)poll(long timeout, TimeUnit unit)

   从BlockingQueue取出一个队首的对象,如果在指定时间内,队列一旦有数据可取,则立即返回队列中的数据。否则知道时间超时还没有数据可取,返回失败。

(3)take()

  取走BlockingQueue里排在首位的对象,若BlockingQueue为空,阻断进入等待状态直到BlockingQueue有新的数据被加入; 
(4)drainTo()

   一次性从BlockingQueue获取所有可用的数据对象(还可以指定获取数据的个数),通过该方法,可以提升获取数据效率;不需要多次分批加锁或释放锁。

二、ArrayBlockingQueue

     一个基本数组的阻塞队列。可以设置列队的大小。

ArrayBlockingQueue的源码是比较简单的,下面是笔者抽取了一部分源码并加以注释。它的基本原理实际还是数组,只不过存、取、删时都要做队列是否满或空的判断。然后加锁访问。

 

 
  1. package java.util.concurrent;

  2. import java.util.concurrent.locks.Condition;

  3. import java.util.concurrent.locks.ReentrantLock;

  4. import java.util.AbstractQueue;

  5. import java.util.Collection;

  6. import java.util.Iterator;

  7. import java.util.NoSuchElementException;

  8. import java.lang.ref.WeakReference;

  9. import java.util.Spliterators;

  10. import java.util.Spliterator;

  11.  
  12.  
  13. public class ArrayBlockingQueue<E> extends AbstractQueue<E>

  14. implements BlockingQueue<E>, java.io.Serializable {

  15.  
  16. private static final long serialVersionUID = -817911632652898426L;

  17.  
  18. /** 真正存入数据的数组*/

  19. final Object[] items;

  20.  
  21. /** take, poll, peek or remove的下一个索引 */

  22. int takeIndex;

  23.  
  24. /** put, offer, or add的下一个索引 */

  25. int putIndex;

  26.  
  27. /**队列中元素个数*/

  28. int count;

  29.  
  30.  
  31. /**可重入锁 */

  32. final ReentrantLock lock;

  33.  
  34. /** 队列不为空的条件 */

  35. private final Condition notEmpty;

  36.  
  37. /** 队列未满的条件 */

  38. private final Condition notFull;

  39.  
  40. transient Itrs itrs = null;

  41.  
  42.  
  43. /**

  44. *当前元素个数-1

  45. */

  46. final int dec(int i) {

  47. return ((i == 0) ? items.length : i) - 1;

  48. }

  49.  
  50. /**

  51. * 返回对应索引上的元素

  52. */

  53. @SuppressWarnings("unchecked")

  54. final E itemAt(int i) {

  55. return (E) items[i];

  56. }

  57.  
  58. /**

  59. * 非空检查

  60. *

  61. * @param v the element

  62. */

  63. private static void checkNotNull(Object v) {

  64. if (v == null)

  65. throw new NullPointerException();

  66. }

  67.  
  68. /**

  69. * 元素放入队列,注意调用这个方法时都要先加锁

  70. *

  71. */

  72. private void enqueue(E x) {

  73. final Object[] items = this.items;

  74. items[putIndex] = x;

  75. if (++putIndex == items.length)

  76. putIndex = 0;

  77. count++;//当前拥有元素个数加1

  78. notEmpty.signal();//有一个元素加入成功,那肯定队列不为空

  79. }

  80.  
  81. /**

  82. * 元素出队,注意调用这个方法时都要先加锁

  83. *

  84. */

  85. private E dequeue() {

  86. final Object[] items = this.items;

  87. @SuppressWarnings("unchecked")

  88. E x = (E) items[takeIndex];

  89. items[takeIndex] = null;

  90. if (++takeIndex == items.length)

  91. takeIndex = 0;

  92. count--;/当前拥有元素个数减1

  93. if (itrs != null)

  94. itrs.elementDequeued();

  95. notFull.signal();//有一个元素取出成功,那肯定队列不满

  96. return x;

  97. }

  98.  
  99. /**

  100. * 指定删除索引上的元素

  101. *

  102. */

  103. void removeAt(final int removeIndex) {

  104. final Object[] items = this.items;

  105. if (removeIndex == takeIndex) {

  106. items[takeIndex] = null;

  107. if (++takeIndex == items.length)

  108. takeIndex = 0;

  109. count--;

  110. if (itrs != null)

  111. itrs.elementDequeued();

  112. } else {

  113. final int putIndex = this.putIndex;

  114. for (int i = removeIndex;;) {

  115. int next = i + 1;

  116. if (next == items.length)

  117. next = 0;

  118. if (next != putIndex) {

  119. items[i] = items[next];

  120. i = next;

  121. } else {

  122. items[i] = null;

  123. this.putIndex = i;

  124. break;

  125. }

  126. }

  127. count--;

  128. if (itrs != null)

  129. itrs.removedAt(removeIndex);

  130. }

  131. notFull.signal();//有一个元素删除成功,那肯定队列不满

  132. }

  133.  
  134. /**

  135. *

  136. * 构造函数,设置队列的初始容量

  137. */

  138. public ArrayBlockingQueue(int capacity) {

  139. this(capacity, false);

  140. }

  141.  
  142. /**

  143. * 构造函数。capacity设置数组大小 ,fair设置是否为公平锁

  144. * capacity and the specified access policy.

  145. */

  146. public ArrayBlockingQueue(int capacity, boolean fair) {

  147. if (capacity <= 0)

  148. throw new IllegalArgumentException();

  149. this.items = new Object[capacity];

  150. lock = new ReentrantLock(fair);//是否为公平锁,如果是的话,那么先到的线程先获得锁对象。

  151. //否则,由操作系统调度由哪个线程获得锁,一般为false,性能会比较高

  152. notEmpty = lock.newCondition();

  153. notFull = lock.newCondition();

  154. }

  155.  
  156. /**

  157. *构造函数,带有初始内容的队列

  158. */

  159. public ArrayBlockingQueue(int capacity, boolean fair,

  160. Collection<? extends E> c) {

  161. this(capacity, fair);

  162.  
  163. final ReentrantLock lock = this.lock;

  164. lock.lock(); //要给数组设置内容,先上锁

  165. try {

  166. int i = 0;

  167. try {

  168. for (E e : c) {

  169. checkNotNull(e);

  170. items[i++] = e;//依次拷贝内容

  171. }

  172. } catch (ArrayIndexOutOfBoundsException ex) {

  173. throw new IllegalArgumentException();

  174. }

  175. count = i;

  176. putIndex = (i == capacity) ? 0 : i;//如果putIndex大于数组大小 ,那么从0重新开始

  177. } finally {

  178. lock.unlock();//最后一定要释放锁

  179. }

  180. }

  181.  
  182. /**

  183. * 添加一个元素,其实super.add里面调用了offer方法

  184. */

  185. public boolean add(E e) {

  186. return super.add(e);

  187. }

  188.  
  189. /**

  190. *加入成功返回true,否则返回false

  191. *

  192. */

  193. public boolean offer(E e) {

  194. checkNotNull(e);

  195. final ReentrantLock lock = this.lock;

  196. lock.lock();//上锁

  197. try {

  198. if (count == items.length) //超过数组的容量

  199. return false;

  200. else {

  201. enqueue(e); //放入元素

  202. return true;

  203. }

  204. } finally {

  205. lock.unlock();

  206. }

  207. }

  208.  
  209. /**

  210. * 如果队列已满的话,就会等待

  211. */

  212. public void put(E e) throws InterruptedException {

  213. checkNotNull(e);

  214. final ReentrantLock lock = this.lock;

  215. lock.lockInterruptibly();//和lock()方法的区别是让它在阻塞时也可抛出异常跳出

  216. try {

  217. while (count == items.length)

  218. notFull.await(); //这里就是阻塞了,要注意。如果运行到这里,那么它会释放上面的锁,一直等到notify

  219. enqueue(e);

  220. } finally {

  221. lock.unlock();

  222. }

  223. }

  224.  
  225. /**

  226. * 带有超时时间的插入方法,unit表示是按秒、分、时哪一种

  227. */

  228. public boolean offer(E e, long timeout, TimeUnit unit)

  229. throws InterruptedException {

  230.  
  231. checkNotNull(e);

  232. long nanos = unit.toNanos(timeout);

  233. final ReentrantLock lock = this.lock;

  234. lock.lockInterruptibly();

  235. try {

  236. while (count == items.length) {

  237. if (nanos <= 0)

  238. return false;

  239. nanos = notFull.awaitNanos(nanos);//带有超时等待的阻塞方法

  240. }

  241. enqueue(e);//入队

  242. return true;

  243. } finally {

  244. lock.unlock();

  245. }

  246. }

  247.  
  248. //实现的方法,如果当前队列为空,返回null

  249. public E poll() {

  250. final ReentrantLock lock = this.lock;

  251. lock.lock();

  252. try {

  253. return (count == 0) ? null : dequeue();

  254. } finally {

  255. lock.unlock();

  256. }

  257. }

  258. //实现的方法,如果当前队列为空,一直阻塞

  259. public E take() throws InterruptedException {

  260. final ReentrantLock lock = this.lock;

  261. lock.lockInterruptibly();

  262. try {

  263. while (count == 0)

  264. notEmpty.await();//队列为空,阻塞方法

  265. return dequeue();

  266. } finally {

  267. lock.unlock();

  268. }

  269. }

  270. //带有超时时间的取元素方法,否则返回Null

  271. public E poll(long timeout, TimeUnit unit) throws InterruptedException {

  272. long nanos = unit.toNanos(timeout);

  273. final ReentrantLock lock = this.lock;

  274. lock.lockInterruptibly();

  275. try {

  276. while (count == 0) {

  277. if (nanos <= 0)

  278. return null;

  279. nanos = notEmpty.awaitNanos(nanos);//超时等待

  280. }

  281. return dequeue();//取得元素

  282. } finally {

  283. lock.unlock();

  284. }

  285. }

  286. //只是看一个队列最前面的元素,取出是不删除队列中的原来元素。队列为空时返回null

  287. public E peek() {

  288. final ReentrantLock lock = this.lock;

  289. lock.lock();

  290. try {

  291. return itemAt(takeIndex); // 队列为空时返回null

  292. } finally {

  293. lock.unlock();

  294. }

  295. }

  296.  
  297. /**

  298. * 返回队列当前元素个数

  299. *

  300. */

  301. public int size() {

  302. final ReentrantLock lock = this.lock;

  303. lock.lock();

  304. try {

  305. return count;

  306. } finally {

  307. lock.unlock();

  308. }

  309. }

  310.  
  311. /**

  312. * 返回当前队列再放入多少个元素就满队

  313. */

  314. public int remainingCapacity() {

  315. final ReentrantLock lock = this.lock;

  316. lock.lock();

  317. try {

  318. return items.length - count;

  319. } finally {

  320. lock.unlock();

  321. }

  322. }

  323.  
  324. /**

  325. * 从队列中删除一个元素的方法。删除成功返回true,否则返回false

  326. */

  327. public boolean remove(Object o) {

  328. if (o == null) return false;

  329. final Object[] items = this.items;

  330. final ReentrantLock lock = this.lock;

  331. lock.lock();

  332. try {

  333. if (count > 0) {

  334. final int putIndex = this.putIndex;

  335. int i = takeIndex;

  336. do {

  337. if (o.equals(items[i])) {

  338. removeAt(i); //真正删除的方法

  339. return true;

  340. }

  341. if (++i == items.length)

  342. i = 0;

  343. } while (i != putIndex);//一直不断的循环取出来做判断

  344. }

  345. return false;

  346. } finally {

  347. lock.unlock();

  348. }

  349. }

  350.  
  351. /**

  352. * 是否包含一个元素

  353. */

  354. public boolean contains(Object o) {

  355. if (o == null) return false;

  356. final Object[] items = this.items;

  357. final ReentrantLock lock = this.lock;

  358. lock.lock();

  359. try {

  360. if (count > 0) {

  361. final int putIndex = this.putIndex;

  362. int i = takeIndex;

  363. do {

  364. if (o.equals(items[i]))

  365. return true;

  366. if (++i == items.length)

  367. i = 0;

  368. } while (i != putIndex);

  369. }

  370. return false;

  371. } finally {

  372. lock.unlock();

  373. }

  374. }

  375.  
  376. /**

  377. * 清空队列

  378. *

  379. */

  380. public void clear() {

  381. final Object[] items = this.items;

  382. final ReentrantLock lock = this.lock;

  383. lock.lock();

  384. try {

  385. int k = count;

  386. if (k > 0) {

  387. final int putIndex = this.putIndex;

  388. int i = takeIndex;

  389. do {

  390. items[i] = null;

  391. if (++i == items.length)

  392. i = 0;

  393. } while (i != putIndex);

  394. takeIndex = putIndex;

  395. count = 0;

  396. if (itrs != null)

  397. itrs.queueIsEmpty();

  398. for (; k > 0 && lock.hasWaiters(notFull); k--)

  399. notFull.signal();

  400. }

  401. } finally {

  402. lock.unlock();

  403. }

  404. }

  405.  
  406. /**

  407. * 取出所有元素到集合

  408. */

  409. public int drainTo(Collection<? super E> c) {

  410. return drainTo(c, Integer.MAX_VALUE);

  411. }

  412.  
  413. /**

  414. * 取出所有元素到集合

  415. */

  416. public int drainTo(Collection<? super E> c, int maxElements) {

  417. checkNotNull(c);

  418. if (c == this)

  419. throw new IllegalArgumentException();

  420. if (maxElements <= 0)

  421. return 0;

  422. final Object[] items = this.items;

  423. final ReentrantLock lock = this.lock;

  424. lock.lock();

  425. try {

  426. int n = Math.min(maxElements, count);

  427. int take = takeIndex;

  428. int i = 0;

  429. try {

  430. while (i < n) {

  431. @SuppressWarnings("unchecked")

  432. E x = (E) items[take];

  433. c.add(x);

  434. items[take] = null;

  435. if (++take == items.length)

  436. take = 0;

  437. i++;

  438. }

  439. return n;

  440. } finally {

  441. // Restore invariants even if c.add() threw

  442. if (i > 0) {

  443. count -= i;

  444. takeIndex = take;

  445. if (itrs != null) {

  446. if (count == 0)

  447. itrs.queueIsEmpty();

  448. else if (i > take)

  449. itrs.takeIndexWrapped();

  450. }

  451. for (; i > 0 && lock.hasWaiters(notFull); i--)

  452. notFull.signal();

  453. }

  454. }

  455. } finally {

  456. lock.unlock();

  457. }

  458. }

  459.  
  460.  
  461. }

 

三、LinkedBlockingQueue

接下来看看LinkedBlockingQueue的部分源码。

 

 
  1. package java.util.concurrent;

  2. import java.util.concurrent.atomic.AtomicInteger;

  3. import java.util.concurrent.locks.Condition;

  4. import java.util.concurrent.locks.ReentrantLock;

  5. import java.util.AbstractQueue;

  6. import java.util.Collection;

  7. import java.util.Iterator;

  8. import java.util.NoSuchElementException;

  9. import java.util.Spliterator;

  10. import java.util.Spliterators;

  11. import java.util.function.Consumer;

  12.  
  13. public class LinkedBlockingQueue<E> extends AbstractQueue<E>

  14. implements BlockingQueue<E>, java.io.Serializable {

  15. private static final long serialVersionUID = -6903933977591709194L;

  16.  
  17.  
  18.  
  19. /**

  20. * 链表节点类

  21. */

  22. static class Node<E> {

  23. E item;

  24. Node<E> next;//下一节点

  25. Node(E x) { item = x; }

  26. }

  27.  
  28. /** 链表大小 ,默认大小 是Integer.MAX_VALUE */

  29. private final int capacity;

  30.  
  31. /**当前队列中存放的元素个数,注意是原子类*/

  32. private final AtomicInteger count = new AtomicInteger();

  33.  
  34. /**

  35. * 链表队列头节点

  36. */

  37. transient Node<E> head;

  38.  
  39. /**

  40. * 链表队列尾节点

  41. */

  42. private transient Node<E> last;

  43.  
  44. /** 取元素时的可重入锁 */

  45. private final ReentrantLock takeLock = new ReentrantLock();

  46.  
  47. /**不为空条件*/

  48. private final Condition notEmpty = takeLock.newCondition();

  49.  
  50. /**放元素是时的重入锁 */

  51. private final ReentrantLock putLock = new ReentrantLock();

  52.  
  53. /** 不为满的条件 */

  54. private final Condition notFull = putLock.newCondition();

  55.  
  56. /**

  57. * 不为空通知方法

  58. */

  59. private void signalNotEmpty() {

  60. final ReentrantLock takeLock = this.takeLock;

  61. takeLock.lock();

  62. try {

  63. notEmpty.signal();

  64. } finally {

  65. takeLock.unlock();

  66. }

  67. }

  68.  
  69. /**

  70. * 不为满通知方法

  71. */

  72. private void signalNotFull() {

  73. final ReentrantLock putLock = this.putLock;

  74. putLock.lock();

  75. try {

  76. notFull.signal();

  77. } finally {

  78. putLock.unlock();

  79. }

  80. }

  81.  
  82. /**

  83. * 进队

  84. *

  85. * @param node the node

  86. */

  87. private void enqueue(Node<E> node) {

  88. last = last.next = node;

  89. }

  90.  
  91. /**

  92. * 出队

  93. */

  94. private E dequeue() {

  95. Node<E> h = head;

  96. Node<E> first = h.next;

  97. h.next = h; // help GC

  98. head = first;

  99. E x = first.item;

  100. first.item = null;

  101. return x;

  102. }

  103.  
  104. /**

  105. * 取和入都上锁,此时无法取和放

  106. */

  107. void fullyLock() {

  108. putLock.lock();

  109. takeLock.lock();

  110. }

  111.  
  112. /**

  113. * 释放锁

  114. */

  115. void fullyUnlock() {

  116. takeLock.unlock();

  117. putLock.unlock();

  118. }

  119.  
  120.  
  121. /**

  122. * 构造函数

  123. */

  124. public LinkedBlockingQueue() {

  125. this(Integer.MAX_VALUE);

  126. }

  127.  
  128. /**

  129. * 构造函数

  130. *

  131. */

  132. public LinkedBlockingQueue(int capacity) {

  133. if (capacity <= 0) throw new IllegalArgumentException();

  134. this.capacity = capacity;

  135. last = head = new Node<E>(null);

  136. }

  137.  
  138. /**

  139. * 构造函数

  140. */

  141. public LinkedBlockingQueue(Collection<? extends E> c) {

  142. this(Integer.MAX_VALUE);

  143. final ReentrantLock putLock = this.putLock;

  144. putLock.lock(); //取得放入锁

  145. try {

  146. int n = 0;

  147. for (E e : c) {

  148. if (e == null)

  149. throw new NullPointerException();

  150. if (n == capacity)

  151. throw new IllegalStateException("Queue full");

  152. enqueue(new Node<E>(e));

  153. ++n;

  154. }

  155. count.set(n);

  156. } finally {

  157. putLock.unlock();

  158. }

  159. }

  160.  
  161.  
  162. //阻塞等待放入

  163. public void put(E e) throws InterruptedException {

  164. if (e == null) throw new NullPointerException();

  165. int c = -1;

  166. Node<E> node = new Node<E>(e);

  167. final ReentrantLock putLock = this.putLock;

  168. final AtomicInteger count = this.count;

  169. putLock.lockInterruptibly(); //取得放入锁

  170. try {

  171. while (count.get() == capacity) {//队列已满

  172. notFull.await();

  173. }

  174. enqueue(node);//入队

  175. c = count.getAndIncrement();//当前队列中元素个数加1

  176. if (c + 1 < capacity)

  177. notFull.signal();

  178. } finally {

  179. putLock.unlock();

  180. }

  181. if (c == 0)

  182. signalNotEmpty();

  183. }

  184.  
  185. /**

  186. *带超时时间的阻塞等待放入,队列不满。放入成功返回true,否则返回fasle

  187. */

  188. public boolean offer(E e, long timeout, TimeUnit unit)

  189. throws InterruptedException {

  190.  
  191. if (e == null) throw new NullPointerException();

  192. long nanos = unit.toNanos(timeout);

  193. int c = -1;

  194. final ReentrantLock putLock = this.putLock;

  195. final AtomicInteger count = this.count;

  196. putLock.lockInterruptibly();

  197. try {

  198. while (count.get() == capacity) {

  199. if (nanos <= 0)

  200. return false;

  201. nanos = notFull.awaitNanos(nanos);

  202. }

  203. enqueue(new Node<E>(e));

  204. c = count.getAndIncrement();

  205. if (c + 1 < capacity)

  206. notFull.signal();

  207. } finally {

  208. putLock.unlock();

  209. }

  210. if (c == 0)

  211. signalNotEmpty();

  212. return true;

  213. }

  214.  
  215. /**

  216. * 非阻塞放入。队列不满放入成功返回true,否则返回fasle

  217. */

  218. public boolean offer(E e) {

  219. if (e == null) throw new NullPointerException();

  220. final AtomicInteger count = this.count;

  221. if (count.get() == capacity)

  222. return false;

  223. int c = -1;

  224. Node<E> node = new Node<E>(e);

  225. final ReentrantLock putLock = this.putLock;

  226. putLock.lock();

  227. try {

  228. if (count.get() < capacity) {

  229. enqueue(node);

  230. c = count.getAndIncrement();

  231. if (c + 1 < capacity)

  232. notFull.signal();

  233. }

  234. } finally {

  235. putLock.unlock();

  236. }

  237. if (c == 0)

  238. signalNotEmpty();

  239. return c >= 0;

  240. }

  241. //阻塞等待取出元素

  242. public E take() throws InterruptedException {

  243. E x;

  244. int c = -1;

  245. final AtomicInteger count = this.count;

  246. final ReentrantLock takeLock = this.takeLock;

  247. takeLock.lockInterruptibly();

  248. try {

  249. while (count.get() == 0) {

  250. notEmpty.await();

  251. }

  252. x = dequeue();

  253. c = count.getAndDecrement();

  254. if (c > 1)

  255. notEmpty.signal();

  256. } finally {

  257. takeLock.unlock();

  258. }

  259. if (c == capacity)

  260. signalNotFull();

  261. return x;

  262. }

  263. //带有超时时间等待的取出元素

  264. public E poll(long timeout, TimeUnit unit) throws InterruptedException {

  265. E x = null;

  266. int c = -1;

  267. long nanos = unit.toNanos(timeout);

  268. final AtomicInteger count = this.count;

  269. final ReentrantLock takeLock = this.takeLock;

  270. takeLock.lockInterruptibly();//等待时可抛出异常跳出

  271. try {

  272. while (count.get() == 0) {

  273. if (nanos <= 0)

  274. return null;

  275. nanos = notEmpty.awaitNanos(nanos);//超时等待

  276. }

  277. x = dequeue();

  278. c = count.getAndDecrement();

  279. if (c > 1)

  280. notEmpty.signal();//不这空条件成立

  281. } finally {

  282. takeLock.unlock();

  283. }

  284. if (c == capacity)

  285. signalNotFull();

  286. return x;

  287. }

  288. //取队头元素。没有的话返回null,有的话返回元素,并将队列中删除此元素

  289. public E poll() {

  290. final AtomicInteger count = this.count;

  291. if (count.get() == 0)

  292. return null;

  293. E x = null;

  294. int c = -1;

  295. final ReentrantLock takeLock = this.takeLock;

  296. takeLock.lock();//获得取得锁

  297. try {

  298. if (count.get() > 0) {

  299. x = dequeue();//出队

  300. c = count.getAndDecrement();//当前队列中元素个数减去1

  301. if (c > 1)

  302. notEmpty.signal();//不为空条件成功

  303. }

  304. } finally {

  305. takeLock.unlock();

  306. }

  307. if (c == capacity)

  308. signalNotFull();

  309. return x;

  310. }

  311. //取队头元素,但不从队列中删除 ,没有的话返回null,不阻塞

  312. public E peek() {

  313. if (count.get() == 0)

  314. return null;

  315. final ReentrantLock takeLock = this.takeLock;

  316. takeLock.lock();//获得取得锁

  317. try {

  318. Node<E> first = head.next;

  319. if (first == null)

  320. return null;

  321. else

  322. return first.item;

  323. } finally {

  324. takeLock.unlock();

  325. }

  326. }

  327.  
  328.  
  329. /**

  330. * 删除时要同时取得放入锁和取得锁

  331. */

  332. public boolean remove(Object o) {

  333. if (o == null) return false;

  334. fullyLock();//同时取得放入锁和取得锁

  335. try {

  336. for (Node<E> trail = head, p = trail.next;

  337. p != null;

  338. trail = p, p = p.next) {

  339. if (o.equals(p.item)) {

  340. unlink(p, trail);

  341. return true;

  342. }

  343. }

  344. return false;

  345. } finally {

  346. fullyUnlock();

  347. }

  348. }

  349.  
  350. /**

  351. * 是否包含

  352. */

  353. public boolean contains(Object o) {

  354. if (o == null) return false;

  355. fullyLock();//同时取得放入锁和取得锁

  356. try {

  357. for (Node<E> p = head.next; p != null; p = p.next)

  358. if (o.equals(p.item))

  359. return true;

  360. return false;

  361. } finally {

  362. fullyUnlock();

  363. }

  364. }

  365. }

  366.  


从LinkedBlockingQueue的源码中,我们可以看出他和ArrayBlockingQueue主要有以下两点区别:

 

1、ArrayBlockingQueue数据是放在一个数组中。LinkedBlockingQueue是放在一个Node节点中,构成一个链接。

2、ArrayBlockingQueue取元素和放元素都是同一个锁,而LinkedBlockingQueue有两个锁,一个放入锁,一个取得锁。分别对应放入元素和取得元素时的操作。这是由链表的结构所确定的。但是删除一个元素时,要同时获得放入锁和取得锁。

 

四、SynchronousQueue 

SynchronousQueue 这个队列实现了 BlockingQueue接口。该队列的特点 
1.容量为0,无论何时 size方法总是返回0
2. put操作阻塞, 直到另外一个线程取走队列的元素。
3.take操作阻塞,直到另外的线程put某个元素到队列中。
4. 任何线程只能取得其他线程put进去的元素,而不会取到自己put进去的元素

 

 
  1. public SynchronousQueue(boolean fair) {

  2. transferer = fair ? new TransferQueue() : new TransferStack();

  3. }

构造方法上接收boolean参数,表示这是一个公平的基于队列的排队模式,还是一个非公平的基于栈的排队模式。 

这篇关于Java并发编程与技术内幕:ArrayBlockingQueue、LinkedBlockingQueue及SynchronousQueue源码解析的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/755242

相关文章

网页解析 lxml 库--实战

lxml库使用流程 lxml 是 Python 的第三方解析库,完全使用 Python 语言编写,它对 XPath表达式提供了良好的支 持,因此能够了高效地解析 HTML/XML 文档。本节讲解如何通过 lxml 库解析 HTML 文档。 pip install lxml lxm| 库提供了一个 etree 模块,该模块专门用来解析 HTML/XML 文档,下面来介绍一下 lxml 库

JVM 的类初始化机制

前言 当你在 Java 程序中new对象时,有没有考虑过 JVM 是如何把静态的字节码(byte code)转化为运行时对象的呢,这个问题看似简单,但清楚的同学相信也不会太多,这篇文章首先介绍 JVM 类初始化的机制,然后给出几个易出错的实例来分析,帮助大家更好理解这个知识点。 JVM 将字节码转化为运行时对象分为三个阶段,分别是:loading 、Linking、initialization

Spring Security 基于表达式的权限控制

前言 spring security 3.0已经可以使用spring el表达式来控制授权,允许在表达式中使用复杂的布尔逻辑来控制访问的权限。 常见的表达式 Spring Security可用表达式对象的基类是SecurityExpressionRoot。 表达式描述hasRole([role])用户拥有制定的角色时返回true (Spring security默认会带有ROLE_前缀),去

浅析Spring Security认证过程

类图 为了方便理解Spring Security认证流程,特意画了如下的类图,包含相关的核心认证类 概述 核心验证器 AuthenticationManager 该对象提供了认证方法的入口,接收一个Authentiaton对象作为参数; public interface AuthenticationManager {Authentication authenticate(Authenti

Spring Security--Architecture Overview

1 核心组件 这一节主要介绍一些在Spring Security中常见且核心的Java类,它们之间的依赖,构建起了整个框架。想要理解整个架构,最起码得对这些类眼熟。 1.1 SecurityContextHolder SecurityContextHolder用于存储安全上下文(security context)的信息。当前操作的用户是谁,该用户是否已经被认证,他拥有哪些角色权限…这些都被保

Spring Security基于数据库验证流程详解

Spring Security 校验流程图 相关解释说明(认真看哦) AbstractAuthenticationProcessingFilter 抽象类 /*** 调用 #requiresAuthentication(HttpServletRequest, HttpServletResponse) 决定是否需要进行验证操作。* 如果需要验证,则会调用 #attemptAuthentica

Spring Security 从入门到进阶系列教程

Spring Security 入门系列 《保护 Web 应用的安全》 《Spring-Security-入门(一):登录与退出》 《Spring-Security-入门(二):基于数据库验证》 《Spring-Security-入门(三):密码加密》 《Spring-Security-入门(四):自定义-Filter》 《Spring-Security-入门(五):在 Sprin

Java架构师知识体认识

源码分析 常用设计模式 Proxy代理模式Factory工厂模式Singleton单例模式Delegate委派模式Strategy策略模式Prototype原型模式Template模板模式 Spring5 beans 接口实例化代理Bean操作 Context Ioc容器设计原理及高级特性Aop设计原理Factorybean与Beanfactory Transaction 声明式事物

Java进阶13讲__第12讲_1/2

多线程、线程池 1.  线程概念 1.1  什么是线程 1.2  线程的好处 2.   创建线程的三种方式 注意事项 2.1  继承Thread类 2.1.1 认识  2.1.2  编码实现  package cn.hdc.oop10.Thread;import org.slf4j.Logger;import org.slf4j.LoggerFactory

JAVA智听未来一站式有声阅读平台听书系统小程序源码

智听未来,一站式有声阅读平台听书系统 🌟&nbsp;开篇:遇见未来,从“智听”开始 在这个快节奏的时代,你是否渴望在忙碌的间隙,找到一片属于自己的宁静角落?是否梦想着能随时随地,沉浸在知识的海洋,或是故事的奇幻世界里?今天,就让我带你一起探索“智听未来”——这一站式有声阅读平台听书系统,它正悄悄改变着我们的阅读方式,让未来触手可及! 📚&nbsp;第一站:海量资源,应有尽有 走进“智听