Java并发编程与技术内幕:ArrayBlockingQueue、LinkedBlockingQueue及SynchronousQueue源码解析

本文主要是介绍Java并发编程与技术内幕:ArrayBlockingQueue、LinkedBlockingQueue及SynchronousQueue源码解析,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

一、BlockingQueue介绍与常用方法

BlockingQueue是一个阻塞队列。在高并发场景是用得非常多的,在线程池中。如果运行线程数目大于核心线程数目时,也会尝试把新加入的线程放到一个BlockingQueue中去。队列的特性就是先进先出很容易理解,在java里头它的实现类主要有下图的几种,其中最常用到的是ArrayBlockingQueue、LinkedBlockingQueue及SynchronousQueue这三种,这三个也是今天主要讲的类。

它主要的方法有

BlockingQueue的核心方法:
1、放入数据

  (1) add(object)

    队列没满的话,放入成功。否则抛出异常。

 (2)offer(object):

    表示如果可能的话,将object加到BlockingQueue里,即如果BlockingQueue可以容纳,则返回true,否则返回false.(本方法不阻塞当前执行方法的线程)
 (3)offer(E o, long timeout, TimeUnit unit)

      可以设定等待的时间,如果在指定的时间内,还不能往队列中加入BlockingQueue,则返回失败。
(4)put(object)

       把object加到BlockingQueue里,如果BlockQueue没有空间,则调用此方法的线程阻塞。直到BlockingQueue里面有空间再继续.
2、获取数据
(1)poll(time)

   取走BlockingQueue里排在首位的对象,若不能立即取出,则可以等time参数规定的时间,取不到时返回null;
(2)poll(long timeout, TimeUnit unit)

   从BlockingQueue取出一个队首的对象,如果在指定时间内,队列一旦有数据可取,则立即返回队列中的数据。否则知道时间超时还没有数据可取,返回失败。

(3)take()

  取走BlockingQueue里排在首位的对象,若BlockingQueue为空,阻断进入等待状态直到BlockingQueue有新的数据被加入; 
(4)drainTo()

   一次性从BlockingQueue获取所有可用的数据对象(还可以指定获取数据的个数),通过该方法,可以提升获取数据效率;不需要多次分批加锁或释放锁。

二、ArrayBlockingQueue

     一个基本数组的阻塞队列。可以设置列队的大小。

ArrayBlockingQueue的源码是比较简单的,下面是笔者抽取了一部分源码并加以注释。它的基本原理实际还是数组,只不过存、取、删时都要做队列是否满或空的判断。然后加锁访问。

 

 
  1. package java.util.concurrent;

  2. import java.util.concurrent.locks.Condition;

  3. import java.util.concurrent.locks.ReentrantLock;

  4. import java.util.AbstractQueue;

  5. import java.util.Collection;

  6. import java.util.Iterator;

  7. import java.util.NoSuchElementException;

  8. import java.lang.ref.WeakReference;

  9. import java.util.Spliterators;

  10. import java.util.Spliterator;

  11.  
  12.  
  13. public class ArrayBlockingQueue<E> extends AbstractQueue<E>

  14. implements BlockingQueue<E>, java.io.Serializable {

  15.  
  16. private static final long serialVersionUID = -817911632652898426L;

  17.  
  18. /** 真正存入数据的数组*/

  19. final Object[] items;

  20.  
  21. /** take, poll, peek or remove的下一个索引 */

  22. int takeIndex;

  23.  
  24. /** put, offer, or add的下一个索引 */

  25. int putIndex;

  26.  
  27. /**队列中元素个数*/

  28. int count;

  29.  
  30.  
  31. /**可重入锁 */

  32. final ReentrantLock lock;

  33.  
  34. /** 队列不为空的条件 */

  35. private final Condition notEmpty;

  36.  
  37. /** 队列未满的条件 */

  38. private final Condition notFull;

  39.  
  40. transient Itrs itrs = null;

  41.  
  42.  
  43. /**

  44. *当前元素个数-1

  45. */

  46. final int dec(int i) {

  47. return ((i == 0) ? items.length : i) - 1;

  48. }

  49.  
  50. /**

  51. * 返回对应索引上的元素

  52. */

  53. @SuppressWarnings("unchecked")

  54. final E itemAt(int i) {

  55. return (E) items[i];

  56. }

  57.  
  58. /**

  59. * 非空检查

  60. *

  61. * @param v the element

  62. */

  63. private static void checkNotNull(Object v) {

  64. if (v == null)

  65. throw new NullPointerException();

  66. }

  67.  
  68. /**

  69. * 元素放入队列,注意调用这个方法时都要先加锁

  70. *

  71. */

  72. private void enqueue(E x) {

  73. final Object[] items = this.items;

  74. items[putIndex] = x;

  75. if (++putIndex == items.length)

  76. putIndex = 0;

  77. count++;//当前拥有元素个数加1

  78. notEmpty.signal();//有一个元素加入成功,那肯定队列不为空

  79. }

  80.  
  81. /**

  82. * 元素出队,注意调用这个方法时都要先加锁

  83. *

  84. */

  85. private E dequeue() {

  86. final Object[] items = this.items;

  87. @SuppressWarnings("unchecked")

  88. E x = (E) items[takeIndex];

  89. items[takeIndex] = null;

  90. if (++takeIndex == items.length)

  91. takeIndex = 0;

  92. count--;/当前拥有元素个数减1

  93. if (itrs != null)

  94. itrs.elementDequeued();

  95. notFull.signal();//有一个元素取出成功,那肯定队列不满

  96. return x;

  97. }

  98.  
  99. /**

  100. * 指定删除索引上的元素

  101. *

  102. */

  103. void removeAt(final int removeIndex) {

  104. final Object[] items = this.items;

  105. if (removeIndex == takeIndex) {

  106. items[takeIndex] = null;

  107. if (++takeIndex == items.length)

  108. takeIndex = 0;

  109. count--;

  110. if (itrs != null)

  111. itrs.elementDequeued();

  112. } else {

  113. final int putIndex = this.putIndex;

  114. for (int i = removeIndex;;) {

  115. int next = i + 1;

  116. if (next == items.length)

  117. next = 0;

  118. if (next != putIndex) {

  119. items[i] = items[next];

  120. i = next;

  121. } else {

  122. items[i] = null;

  123. this.putIndex = i;

  124. break;

  125. }

  126. }

  127. count--;

  128. if (itrs != null)

  129. itrs.removedAt(removeIndex);

  130. }

  131. notFull.signal();//有一个元素删除成功,那肯定队列不满

  132. }

  133.  
  134. /**

  135. *

  136. * 构造函数,设置队列的初始容量

  137. */

  138. public ArrayBlockingQueue(int capacity) {

  139. this(capacity, false);

  140. }

  141.  
  142. /**

  143. * 构造函数。capacity设置数组大小 ,fair设置是否为公平锁

  144. * capacity and the specified access policy.

  145. */

  146. public ArrayBlockingQueue(int capacity, boolean fair) {

  147. if (capacity <= 0)

  148. throw new IllegalArgumentException();

  149. this.items = new Object[capacity];

  150. lock = new ReentrantLock(fair);//是否为公平锁,如果是的话,那么先到的线程先获得锁对象。

  151. //否则,由操作系统调度由哪个线程获得锁,一般为false,性能会比较高

  152. notEmpty = lock.newCondition();

  153. notFull = lock.newCondition();

  154. }

  155.  
  156. /**

  157. *构造函数,带有初始内容的队列

  158. */

  159. public ArrayBlockingQueue(int capacity, boolean fair,

  160. Collection<? extends E> c) {

  161. this(capacity, fair);

  162.  
  163. final ReentrantLock lock = this.lock;

  164. lock.lock(); //要给数组设置内容,先上锁

  165. try {

  166. int i = 0;

  167. try {

  168. for (E e : c) {

  169. checkNotNull(e);

  170. items[i++] = e;//依次拷贝内容

  171. }

  172. } catch (ArrayIndexOutOfBoundsException ex) {

  173. throw new IllegalArgumentException();

  174. }

  175. count = i;

  176. putIndex = (i == capacity) ? 0 : i;//如果putIndex大于数组大小 ,那么从0重新开始

  177. } finally {

  178. lock.unlock();//最后一定要释放锁

  179. }

  180. }

  181.  
  182. /**

  183. * 添加一个元素,其实super.add里面调用了offer方法

  184. */

  185. public boolean add(E e) {

  186. return super.add(e);

  187. }

  188.  
  189. /**

  190. *加入成功返回true,否则返回false

  191. *

  192. */

  193. public boolean offer(E e) {

  194. checkNotNull(e);

  195. final ReentrantLock lock = this.lock;

  196. lock.lock();//上锁

  197. try {

  198. if (count == items.length) //超过数组的容量

  199. return false;

  200. else {

  201. enqueue(e); //放入元素

  202. return true;

  203. }

  204. } finally {

  205. lock.unlock();

  206. }

  207. }

  208.  
  209. /**

  210. * 如果队列已满的话,就会等待

  211. */

  212. public void put(E e) throws InterruptedException {

  213. checkNotNull(e);

  214. final ReentrantLock lock = this.lock;

  215. lock.lockInterruptibly();//和lock()方法的区别是让它在阻塞时也可抛出异常跳出

  216. try {

  217. while (count == items.length)

  218. notFull.await(); //这里就是阻塞了,要注意。如果运行到这里,那么它会释放上面的锁,一直等到notify

  219. enqueue(e);

  220. } finally {

  221. lock.unlock();

  222. }

  223. }

  224.  
  225. /**

  226. * 带有超时时间的插入方法,unit表示是按秒、分、时哪一种

  227. */

  228. public boolean offer(E e, long timeout, TimeUnit unit)

  229. throws InterruptedException {

  230.  
  231. checkNotNull(e);

  232. long nanos = unit.toNanos(timeout);

  233. final ReentrantLock lock = this.lock;

  234. lock.lockInterruptibly();

  235. try {

  236. while (count == items.length) {

  237. if (nanos <= 0)

  238. return false;

  239. nanos = notFull.awaitNanos(nanos);//带有超时等待的阻塞方法

  240. }

  241. enqueue(e);//入队

  242. return true;

  243. } finally {

  244. lock.unlock();

  245. }

  246. }

  247.  
  248. //实现的方法,如果当前队列为空,返回null

  249. public E poll() {

  250. final ReentrantLock lock = this.lock;

  251. lock.lock();

  252. try {

  253. return (count == 0) ? null : dequeue();

  254. } finally {

  255. lock.unlock();

  256. }

  257. }

  258. //实现的方法,如果当前队列为空,一直阻塞

  259. public E take() throws InterruptedException {

  260. final ReentrantLock lock = this.lock;

  261. lock.lockInterruptibly();

  262. try {

  263. while (count == 0)

  264. notEmpty.await();//队列为空,阻塞方法

  265. return dequeue();

  266. } finally {

  267. lock.unlock();

  268. }

  269. }

  270. //带有超时时间的取元素方法,否则返回Null

  271. public E poll(long timeout, TimeUnit unit) throws InterruptedException {

  272. long nanos = unit.toNanos(timeout);

  273. final ReentrantLock lock = this.lock;

  274. lock.lockInterruptibly();

  275. try {

  276. while (count == 0) {

  277. if (nanos <= 0)

  278. return null;

  279. nanos = notEmpty.awaitNanos(nanos);//超时等待

  280. }

  281. return dequeue();//取得元素

  282. } finally {

  283. lock.unlock();

  284. }

  285. }

  286. //只是看一个队列最前面的元素,取出是不删除队列中的原来元素。队列为空时返回null

  287. public E peek() {

  288. final ReentrantLock lock = this.lock;

  289. lock.lock();

  290. try {

  291. return itemAt(takeIndex); // 队列为空时返回null

  292. } finally {

  293. lock.unlock();

  294. }

  295. }

  296.  
  297. /**

  298. * 返回队列当前元素个数

  299. *

  300. */

  301. public int size() {

  302. final ReentrantLock lock = this.lock;

  303. lock.lock();

  304. try {

  305. return count;

  306. } finally {

  307. lock.unlock();

  308. }

  309. }

  310.  
  311. /**

  312. * 返回当前队列再放入多少个元素就满队

  313. */

  314. public int remainingCapacity() {

  315. final ReentrantLock lock = this.lock;

  316. lock.lock();

  317. try {

  318. return items.length - count;

  319. } finally {

  320. lock.unlock();

  321. }

  322. }

  323.  
  324. /**

  325. * 从队列中删除一个元素的方法。删除成功返回true,否则返回false

  326. */

  327. public boolean remove(Object o) {

  328. if (o == null) return false;

  329. final Object[] items = this.items;

  330. final ReentrantLock lock = this.lock;

  331. lock.lock();

  332. try {

  333. if (count > 0) {

  334. final int putIndex = this.putIndex;

  335. int i = takeIndex;

  336. do {

  337. if (o.equals(items[i])) {

  338. removeAt(i); //真正删除的方法

  339. return true;

  340. }

  341. if (++i == items.length)

  342. i = 0;

  343. } while (i != putIndex);//一直不断的循环取出来做判断

  344. }

  345. return false;

  346. } finally {

  347. lock.unlock();

  348. }

  349. }

  350.  
  351. /**

  352. * 是否包含一个元素

  353. */

  354. public boolean contains(Object o) {

  355. if (o == null) return false;

  356. final Object[] items = this.items;

  357. final ReentrantLock lock = this.lock;

  358. lock.lock();

  359. try {

  360. if (count > 0) {

  361. final int putIndex = this.putIndex;

  362. int i = takeIndex;

  363. do {

  364. if (o.equals(items[i]))

  365. return true;

  366. if (++i == items.length)

  367. i = 0;

  368. } while (i != putIndex);

  369. }

  370. return false;

  371. } finally {

  372. lock.unlock();

  373. }

  374. }

  375.  
  376. /**

  377. * 清空队列

  378. *

  379. */

  380. public void clear() {

  381. final Object[] items = this.items;

  382. final ReentrantLock lock = this.lock;

  383. lock.lock();

  384. try {

  385. int k = count;

  386. if (k > 0) {

  387. final int putIndex = this.putIndex;

  388. int i = takeIndex;

  389. do {

  390. items[i] = null;

  391. if (++i == items.length)

  392. i = 0;

  393. } while (i != putIndex);

  394. takeIndex = putIndex;

  395. count = 0;

  396. if (itrs != null)

  397. itrs.queueIsEmpty();

  398. for (; k > 0 && lock.hasWaiters(notFull); k--)

  399. notFull.signal();

  400. }

  401. } finally {

  402. lock.unlock();

  403. }

  404. }

  405.  
  406. /**

  407. * 取出所有元素到集合

  408. */

  409. public int drainTo(Collection<? super E> c) {

  410. return drainTo(c, Integer.MAX_VALUE);

  411. }

  412.  
  413. /**

  414. * 取出所有元素到集合

  415. */

  416. public int drainTo(Collection<? super E> c, int maxElements) {

  417. checkNotNull(c);

  418. if (c == this)

  419. throw new IllegalArgumentException();

  420. if (maxElements <= 0)

  421. return 0;

  422. final Object[] items = this.items;

  423. final ReentrantLock lock = this.lock;

  424. lock.lock();

  425. try {

  426. int n = Math.min(maxElements, count);

  427. int take = takeIndex;

  428. int i = 0;

  429. try {

  430. while (i < n) {

  431. @SuppressWarnings("unchecked")

  432. E x = (E) items[take];

  433. c.add(x);

  434. items[take] = null;

  435. if (++take == items.length)

  436. take = 0;

  437. i++;

  438. }

  439. return n;

  440. } finally {

  441. // Restore invariants even if c.add() threw

  442. if (i > 0) {

  443. count -= i;

  444. takeIndex = take;

  445. if (itrs != null) {

  446. if (count == 0)

  447. itrs.queueIsEmpty();

  448. else if (i > take)

  449. itrs.takeIndexWrapped();

  450. }

  451. for (; i > 0 && lock.hasWaiters(notFull); i--)

  452. notFull.signal();

  453. }

  454. }

  455. } finally {

  456. lock.unlock();

  457. }

  458. }

  459.  
  460.  
  461. }

 

三、LinkedBlockingQueue

接下来看看LinkedBlockingQueue的部分源码。

 

 
  1. package java.util.concurrent;

  2. import java.util.concurrent.atomic.AtomicInteger;

  3. import java.util.concurrent.locks.Condition;

  4. import java.util.concurrent.locks.ReentrantLock;

  5. import java.util.AbstractQueue;

  6. import java.util.Collection;

  7. import java.util.Iterator;

  8. import java.util.NoSuchElementException;

  9. import java.util.Spliterator;

  10. import java.util.Spliterators;

  11. import java.util.function.Consumer;

  12.  
  13. public class LinkedBlockingQueue<E> extends AbstractQueue<E>

  14. implements BlockingQueue<E>, java.io.Serializable {

  15. private static final long serialVersionUID = -6903933977591709194L;

  16.  
  17.  
  18.  
  19. /**

  20. * 链表节点类

  21. */

  22. static class Node<E> {

  23. E item;

  24. Node<E> next;//下一节点

  25. Node(E x) { item = x; }

  26. }

  27.  
  28. /** 链表大小 ,默认大小 是Integer.MAX_VALUE */

  29. private final int capacity;

  30.  
  31. /**当前队列中存放的元素个数,注意是原子类*/

  32. private final AtomicInteger count = new AtomicInteger();

  33.  
  34. /**

  35. * 链表队列头节点

  36. */

  37. transient Node<E> head;

  38.  
  39. /**

  40. * 链表队列尾节点

  41. */

  42. private transient Node<E> last;

  43.  
  44. /** 取元素时的可重入锁 */

  45. private final ReentrantLock takeLock = new ReentrantLock();

  46.  
  47. /**不为空条件*/

  48. private final Condition notEmpty = takeLock.newCondition();

  49.  
  50. /**放元素是时的重入锁 */

  51. private final ReentrantLock putLock = new ReentrantLock();

  52.  
  53. /** 不为满的条件 */

  54. private final Condition notFull = putLock.newCondition();

  55.  
  56. /**

  57. * 不为空通知方法

  58. */

  59. private void signalNotEmpty() {

  60. final ReentrantLock takeLock = this.takeLock;

  61. takeLock.lock();

  62. try {

  63. notEmpty.signal();

  64. } finally {

  65. takeLock.unlock();

  66. }

  67. }

  68.  
  69. /**

  70. * 不为满通知方法

  71. */

  72. private void signalNotFull() {

  73. final ReentrantLock putLock = this.putLock;

  74. putLock.lock();

  75. try {

  76. notFull.signal();

  77. } finally {

  78. putLock.unlock();

  79. }

  80. }

  81.  
  82. /**

  83. * 进队

  84. *

  85. * @param node the node

  86. */

  87. private void enqueue(Node<E> node) {

  88. last = last.next = node;

  89. }

  90.  
  91. /**

  92. * 出队

  93. */

  94. private E dequeue() {

  95. Node<E> h = head;

  96. Node<E> first = h.next;

  97. h.next = h; // help GC

  98. head = first;

  99. E x = first.item;

  100. first.item = null;

  101. return x;

  102. }

  103.  
  104. /**

  105. * 取和入都上锁,此时无法取和放

  106. */

  107. void fullyLock() {

  108. putLock.lock();

  109. takeLock.lock();

  110. }

  111.  
  112. /**

  113. * 释放锁

  114. */

  115. void fullyUnlock() {

  116. takeLock.unlock();

  117. putLock.unlock();

  118. }

  119.  
  120.  
  121. /**

  122. * 构造函数

  123. */

  124. public LinkedBlockingQueue() {

  125. this(Integer.MAX_VALUE);

  126. }

  127.  
  128. /**

  129. * 构造函数

  130. *

  131. */

  132. public LinkedBlockingQueue(int capacity) {

  133. if (capacity <= 0) throw new IllegalArgumentException();

  134. this.capacity = capacity;

  135. last = head = new Node<E>(null);

  136. }

  137.  
  138. /**

  139. * 构造函数

  140. */

  141. public LinkedBlockingQueue(Collection<? extends E> c) {

  142. this(Integer.MAX_VALUE);

  143. final ReentrantLock putLock = this.putLock;

  144. putLock.lock(); //取得放入锁

  145. try {

  146. int n = 0;

  147. for (E e : c) {

  148. if (e == null)

  149. throw new NullPointerException();

  150. if (n == capacity)

  151. throw new IllegalStateException("Queue full");

  152. enqueue(new Node<E>(e));

  153. ++n;

  154. }

  155. count.set(n);

  156. } finally {

  157. putLock.unlock();

  158. }

  159. }

  160.  
  161.  
  162. //阻塞等待放入

  163. public void put(E e) throws InterruptedException {

  164. if (e == null) throw new NullPointerException();

  165. int c = -1;

  166. Node<E> node = new Node<E>(e);

  167. final ReentrantLock putLock = this.putLock;

  168. final AtomicInteger count = this.count;

  169. putLock.lockInterruptibly(); //取得放入锁

  170. try {

  171. while (count.get() == capacity) {//队列已满

  172. notFull.await();

  173. }

  174. enqueue(node);//入队

  175. c = count.getAndIncrement();//当前队列中元素个数加1

  176. if (c + 1 < capacity)

  177. notFull.signal();

  178. } finally {

  179. putLock.unlock();

  180. }

  181. if (c == 0)

  182. signalNotEmpty();

  183. }

  184.  
  185. /**

  186. *带超时时间的阻塞等待放入,队列不满。放入成功返回true,否则返回fasle

  187. */

  188. public boolean offer(E e, long timeout, TimeUnit unit)

  189. throws InterruptedException {

  190.  
  191. if (e == null) throw new NullPointerException();

  192. long nanos = unit.toNanos(timeout);

  193. int c = -1;

  194. final ReentrantLock putLock = this.putLock;

  195. final AtomicInteger count = this.count;

  196. putLock.lockInterruptibly();

  197. try {

  198. while (count.get() == capacity) {

  199. if (nanos <= 0)

  200. return false;

  201. nanos = notFull.awaitNanos(nanos);

  202. }

  203. enqueue(new Node<E>(e));

  204. c = count.getAndIncrement();

  205. if (c + 1 < capacity)

  206. notFull.signal();

  207. } finally {

  208. putLock.unlock();

  209. }

  210. if (c == 0)

  211. signalNotEmpty();

  212. return true;

  213. }

  214.  
  215. /**

  216. * 非阻塞放入。队列不满放入成功返回true,否则返回fasle

  217. */

  218. public boolean offer(E e) {

  219. if (e == null) throw new NullPointerException();

  220. final AtomicInteger count = this.count;

  221. if (count.get() == capacity)

  222. return false;

  223. int c = -1;

  224. Node<E> node = new Node<E>(e);

  225. final ReentrantLock putLock = this.putLock;

  226. putLock.lock();

  227. try {

  228. if (count.get() < capacity) {

  229. enqueue(node);

  230. c = count.getAndIncrement();

  231. if (c + 1 < capacity)

  232. notFull.signal();

  233. }

  234. } finally {

  235. putLock.unlock();

  236. }

  237. if (c == 0)

  238. signalNotEmpty();

  239. return c >= 0;

  240. }

  241. //阻塞等待取出元素

  242. public E take() throws InterruptedException {

  243. E x;

  244. int c = -1;

  245. final AtomicInteger count = this.count;

  246. final ReentrantLock takeLock = this.takeLock;

  247. takeLock.lockInterruptibly();

  248. try {

  249. while (count.get() == 0) {

  250. notEmpty.await();

  251. }

  252. x = dequeue();

  253. c = count.getAndDecrement();

  254. if (c > 1)

  255. notEmpty.signal();

  256. } finally {

  257. takeLock.unlock();

  258. }

  259. if (c == capacity)

  260. signalNotFull();

  261. return x;

  262. }

  263. //带有超时时间等待的取出元素

  264. public E poll(long timeout, TimeUnit unit) throws InterruptedException {

  265. E x = null;

  266. int c = -1;

  267. long nanos = unit.toNanos(timeout);

  268. final AtomicInteger count = this.count;

  269. final ReentrantLock takeLock = this.takeLock;

  270. takeLock.lockInterruptibly();//等待时可抛出异常跳出

  271. try {

  272. while (count.get() == 0) {

  273. if (nanos <= 0)

  274. return null;

  275. nanos = notEmpty.awaitNanos(nanos);//超时等待

  276. }

  277. x = dequeue();

  278. c = count.getAndDecrement();

  279. if (c > 1)

  280. notEmpty.signal();//不这空条件成立

  281. } finally {

  282. takeLock.unlock();

  283. }

  284. if (c == capacity)

  285. signalNotFull();

  286. return x;

  287. }

  288. //取队头元素。没有的话返回null,有的话返回元素,并将队列中删除此元素

  289. public E poll() {

  290. final AtomicInteger count = this.count;

  291. if (count.get() == 0)

  292. return null;

  293. E x = null;

  294. int c = -1;

  295. final ReentrantLock takeLock = this.takeLock;

  296. takeLock.lock();//获得取得锁

  297. try {

  298. if (count.get() > 0) {

  299. x = dequeue();//出队

  300. c = count.getAndDecrement();//当前队列中元素个数减去1

  301. if (c > 1)

  302. notEmpty.signal();//不为空条件成功

  303. }

  304. } finally {

  305. takeLock.unlock();

  306. }

  307. if (c == capacity)

  308. signalNotFull();

  309. return x;

  310. }

  311. //取队头元素,但不从队列中删除 ,没有的话返回null,不阻塞

  312. public E peek() {

  313. if (count.get() == 0)

  314. return null;

  315. final ReentrantLock takeLock = this.takeLock;

  316. takeLock.lock();//获得取得锁

  317. try {

  318. Node<E> first = head.next;

  319. if (first == null)

  320. return null;

  321. else

  322. return first.item;

  323. } finally {

  324. takeLock.unlock();

  325. }

  326. }

  327.  
  328.  
  329. /**

  330. * 删除时要同时取得放入锁和取得锁

  331. */

  332. public boolean remove(Object o) {

  333. if (o == null) return false;

  334. fullyLock();//同时取得放入锁和取得锁

  335. try {

  336. for (Node<E> trail = head, p = trail.next;

  337. p != null;

  338. trail = p, p = p.next) {

  339. if (o.equals(p.item)) {

  340. unlink(p, trail);

  341. return true;

  342. }

  343. }

  344. return false;

  345. } finally {

  346. fullyUnlock();

  347. }

  348. }

  349.  
  350. /**

  351. * 是否包含

  352. */

  353. public boolean contains(Object o) {

  354. if (o == null) return false;

  355. fullyLock();//同时取得放入锁和取得锁

  356. try {

  357. for (Node<E> p = head.next; p != null; p = p.next)

  358. if (o.equals(p.item))

  359. return true;

  360. return false;

  361. } finally {

  362. fullyUnlock();

  363. }

  364. }

  365. }

  366.  


从LinkedBlockingQueue的源码中,我们可以看出他和ArrayBlockingQueue主要有以下两点区别:

 

1、ArrayBlockingQueue数据是放在一个数组中。LinkedBlockingQueue是放在一个Node节点中,构成一个链接。

2、ArrayBlockingQueue取元素和放元素都是同一个锁,而LinkedBlockingQueue有两个锁,一个放入锁,一个取得锁。分别对应放入元素和取得元素时的操作。这是由链表的结构所确定的。但是删除一个元素时,要同时获得放入锁和取得锁。

 

四、SynchronousQueue 

SynchronousQueue 这个队列实现了 BlockingQueue接口。该队列的特点 
1.容量为0,无论何时 size方法总是返回0
2. put操作阻塞, 直到另外一个线程取走队列的元素。
3.take操作阻塞,直到另外的线程put某个元素到队列中。
4. 任何线程只能取得其他线程put进去的元素,而不会取到自己put进去的元素

 

 
  1. public SynchronousQueue(boolean fair) {

  2. transferer = fair ? new TransferQueue() : new TransferStack();

  3. }

构造方法上接收boolean参数,表示这是一个公平的基于队列的排队模式,还是一个非公平的基于栈的排队模式。 

这篇关于Java并发编程与技术内幕:ArrayBlockingQueue、LinkedBlockingQueue及SynchronousQueue源码解析的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/755242

相关文章

SpringBoot使用Apache Tika检测敏感信息

《SpringBoot使用ApacheTika检测敏感信息》ApacheTika是一个功能强大的内容分析工具,它能够从多种文件格式中提取文本、元数据以及其他结构化信息,下面我们来看看如何使用Ap... 目录Tika 主要特性1. 多格式支持2. 自动文件类型检测3. 文本和元数据提取4. 支持 OCR(光学

Java内存泄漏问题的排查、优化与最佳实践

《Java内存泄漏问题的排查、优化与最佳实践》在Java开发中,内存泄漏是一个常见且令人头疼的问题,内存泄漏指的是程序在运行过程中,已经不再使用的对象没有被及时释放,从而导致内存占用不断增加,最终... 目录引言1. 什么是内存泄漏?常见的内存泄漏情况2. 如何排查 Java 中的内存泄漏?2.1 使用 J

JAVA系统中Spring Boot应用程序的配置文件application.yml使用详解

《JAVA系统中SpringBoot应用程序的配置文件application.yml使用详解》:本文主要介绍JAVA系统中SpringBoot应用程序的配置文件application.yml的... 目录文件路径文件内容解释1. Server 配置2. Spring 配置3. Logging 配置4. Ma

Java 字符数组转字符串的常用方法

《Java字符数组转字符串的常用方法》文章总结了在Java中将字符数组转换为字符串的几种常用方法,包括使用String构造函数、String.valueOf()方法、StringBuilder以及A... 目录1. 使用String构造函数1.1 基本转换方法1.2 注意事项2. 使用String.valu

java脚本使用不同版本jdk的说明介绍

《java脚本使用不同版本jdk的说明介绍》本文介绍了在Java中执行JavaScript脚本的几种方式,包括使用ScriptEngine、Nashorn和GraalVM,ScriptEngine适用... 目录Java脚本使用不同版本jdk的说明1.使用ScriptEngine执行javascript2.

Spring MVC如何设置响应

《SpringMVC如何设置响应》本文介绍了如何在Spring框架中设置响应,并通过不同的注解返回静态页面、HTML片段和JSON数据,此外,还讲解了如何设置响应的状态码和Header... 目录1. 返回静态页面1.1 Spring 默认扫描路径1.2 @RestController2. 返回 html2

Spring常见错误之Web嵌套对象校验失效解决办法

《Spring常见错误之Web嵌套对象校验失效解决办法》:本文主要介绍Spring常见错误之Web嵌套对象校验失效解决的相关资料,通过在Phone对象上添加@Valid注解,问题得以解决,需要的朋... 目录问题复现案例解析问题修正总结  问题复现当开发一个学籍管理系统时,我们会提供了一个 API 接口去

Java操作ElasticSearch的实例详解

《Java操作ElasticSearch的实例详解》Elasticsearch是一个分布式的搜索和分析引擎,广泛用于全文搜索、日志分析等场景,本文将介绍如何在Java应用中使用Elastics... 目录简介环境准备1. 安装 Elasticsearch2. 添加依赖连接 Elasticsearch1. 创

Spring核心思想之浅谈IoC容器与依赖倒置(DI)

《Spring核心思想之浅谈IoC容器与依赖倒置(DI)》文章介绍了Spring的IoC和DI机制,以及MyBatis的动态代理,通过注解和反射,Spring能够自动管理对象的创建和依赖注入,而MyB... 目录一、控制反转 IoC二、依赖倒置 DI1. 详细概念2. Spring 中 DI 的实现原理三、

SpringBoot 整合 Grizzly的过程

《SpringBoot整合Grizzly的过程》Grizzly是一个高性能的、异步的、非阻塞的HTTP服务器框架,它可以与SpringBoot一起提供比传统的Tomcat或Jet... 目录为什么选择 Grizzly?Spring Boot + Grizzly 整合的优势添加依赖自定义 Grizzly 作为