本文主要是介绍Java并发编程与技术内幕:ArrayBlockingQueue、LinkedBlockingQueue及SynchronousQueue源码解析,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
一、BlockingQueue介绍与常用方法
BlockingQueue是一个阻塞队列。在高并发场景是用得非常多的,在线程池中。如果运行线程数目大于核心线程数目时,也会尝试把新加入的线程放到一个BlockingQueue中去。队列的特性就是先进先出很容易理解,在java里头它的实现类主要有下图的几种,其中最常用到的是ArrayBlockingQueue、LinkedBlockingQueue及SynchronousQueue这三种,这三个也是今天主要讲的类。
它主要的方法有
BlockingQueue的核心方法:
1、放入数据
(1) add(object)
队列没满的话,放入成功。否则抛出异常。
(2)offer(object):
表示如果可能的话,将object加到BlockingQueue里,即如果BlockingQueue可以容纳,则返回true,否则返回false.(本方法不阻塞当前执行方法的线程)
(3)offer(E o, long timeout, TimeUnit unit)
可以设定等待的时间,如果在指定的时间内,还不能往队列中加入BlockingQueue,则返回失败。
(4)put(object)
把object加到BlockingQueue里,如果BlockQueue没有空间,则调用此方法的线程阻塞。直到BlockingQueue里面有空间再继续.
2、获取数据
(1)poll(time)
取走BlockingQueue里排在首位的对象,若不能立即取出,则可以等time参数规定的时间,取不到时返回null;
(2)poll(long timeout, TimeUnit unit)
从BlockingQueue取出一个队首的对象,如果在指定时间内,队列一旦有数据可取,则立即返回队列中的数据。否则知道时间超时还没有数据可取,返回失败。
(3)take()
取走BlockingQueue里排在首位的对象,若BlockingQueue为空,阻断进入等待状态直到BlockingQueue有新的数据被加入;
(4)drainTo()
一次性从BlockingQueue获取所有可用的数据对象(还可以指定获取数据的个数),通过该方法,可以提升获取数据效率;不需要多次分批加锁或释放锁。
二、ArrayBlockingQueue
一个基本数组的阻塞队列。可以设置列队的大小。
ArrayBlockingQueue的源码是比较简单的,下面是笔者抽取了一部分源码并加以注释。它的基本原理实际还是数组,只不过存、取、删时都要做队列是否满或空的判断。然后加锁访问。
-
package java.util.concurrent;
-
import java.util.concurrent.locks.Condition;
-
import java.util.concurrent.locks.ReentrantLock;
-
import java.util.AbstractQueue;
-
import java.util.Collection;
-
import java.util.Iterator;
-
import java.util.NoSuchElementException;
-
import java.lang.ref.WeakReference;
-
import java.util.Spliterators;
-
import java.util.Spliterator;
-
public class ArrayBlockingQueue<E> extends AbstractQueue<E>
-
implements BlockingQueue<E>, java.io.Serializable {
-
private static final long serialVersionUID = -817911632652898426L;
-
/** 真正存入数据的数组*/
-
final Object[] items;
-
/** take, poll, peek or remove的下一个索引 */
-
int takeIndex;
-
/** put, offer, or add的下一个索引 */
-
int putIndex;
-
/**队列中元素个数*/
-
int count;
-
/**可重入锁 */
-
final ReentrantLock lock;
-
/** 队列不为空的条件 */
-
private final Condition notEmpty;
-
/** 队列未满的条件 */
-
private final Condition notFull;
-
transient Itrs itrs = null;
-
/**
-
*当前元素个数-1
-
*/
-
final int dec(int i) {
-
return ((i == 0) ? items.length : i) - 1;
-
}
-
/**
-
* 返回对应索引上的元素
-
*/
-
@SuppressWarnings("unchecked")
-
final E itemAt(int i) {
-
return (E) items[i];
-
}
-
/**
-
* 非空检查
-
*
-
* @param v the element
-
*/
-
private static void checkNotNull(Object v) {
-
if (v == null)
-
throw new NullPointerException();
-
}
-
/**
-
* 元素放入队列,注意调用这个方法时都要先加锁
-
*
-
*/
-
private void enqueue(E x) {
-
final Object[] items = this.items;
-
items[putIndex] = x;
-
if (++putIndex == items.length)
-
putIndex = 0;
-
count++;//当前拥有元素个数加1
-
notEmpty.signal();//有一个元素加入成功,那肯定队列不为空
-
}
-
/**
-
* 元素出队,注意调用这个方法时都要先加锁
-
*
-
*/
-
private E dequeue() {
-
final Object[] items = this.items;
-
@SuppressWarnings("unchecked")
-
E x = (E) items[takeIndex];
-
items[takeIndex] = null;
-
if (++takeIndex == items.length)
-
takeIndex = 0;
-
count--;/当前拥有元素个数减1
-
if (itrs != null)
-
itrs.elementDequeued();
-
notFull.signal();//有一个元素取出成功,那肯定队列不满
-
return x;
-
}
-
/**
-
* 指定删除索引上的元素
-
*
-
*/
-
void removeAt(final int removeIndex) {
-
final Object[] items = this.items;
-
if (removeIndex == takeIndex) {
-
items[takeIndex] = null;
-
if (++takeIndex == items.length)
-
takeIndex = 0;
-
count--;
-
if (itrs != null)
-
itrs.elementDequeued();
-
} else {
-
final int putIndex = this.putIndex;
-
for (int i = removeIndex;;) {
-
int next = i + 1;
-
if (next == items.length)
-
next = 0;
-
if (next != putIndex) {
-
items[i] = items[next];
-
i = next;
-
} else {
-
items[i] = null;
-
this.putIndex = i;
-
break;
-
}
-
}
-
count--;
-
if (itrs != null)
-
itrs.removedAt(removeIndex);
-
}
-
notFull.signal();//有一个元素删除成功,那肯定队列不满
-
}
-
/**
-
*
-
* 构造函数,设置队列的初始容量
-
*/
-
public ArrayBlockingQueue(int capacity) {
-
this(capacity, false);
-
}
-
/**
-
* 构造函数。capacity设置数组大小 ,fair设置是否为公平锁
-
* capacity and the specified access policy.
-
*/
-
public ArrayBlockingQueue(int capacity, boolean fair) {
-
if (capacity <= 0)
-
throw new IllegalArgumentException();
-
this.items = new Object[capacity];
-
lock = new ReentrantLock(fair);//是否为公平锁,如果是的话,那么先到的线程先获得锁对象。
-
//否则,由操作系统调度由哪个线程获得锁,一般为false,性能会比较高
-
notEmpty = lock.newCondition();
-
notFull = lock.newCondition();
-
}
-
/**
-
*构造函数,带有初始内容的队列
-
*/
-
public ArrayBlockingQueue(int capacity, boolean fair,
-
Collection<? extends E> c) {
-
this(capacity, fair);
-
final ReentrantLock lock = this.lock;
-
lock.lock(); //要给数组设置内容,先上锁
-
try {
-
int i = 0;
-
try {
-
for (E e : c) {
-
checkNotNull(e);
-
items[i++] = e;//依次拷贝内容
-
}
-
} catch (ArrayIndexOutOfBoundsException ex) {
-
throw new IllegalArgumentException();
-
}
-
count = i;
-
putIndex = (i == capacity) ? 0 : i;//如果putIndex大于数组大小 ,那么从0重新开始
-
} finally {
-
lock.unlock();//最后一定要释放锁
-
}
-
}
-
/**
-
* 添加一个元素,其实super.add里面调用了offer方法
-
*/
-
public boolean add(E e) {
-
return super.add(e);
-
}
-
/**
-
*加入成功返回true,否则返回false
-
*
-
*/
-
public boolean offer(E e) {
-
checkNotNull(e);
-
final ReentrantLock lock = this.lock;
-
lock.lock();//上锁
-
try {
-
if (count == items.length) //超过数组的容量
-
return false;
-
else {
-
enqueue(e); //放入元素
-
return true;
-
}
-
} finally {
-
lock.unlock();
-
}
-
}
-
/**
-
* 如果队列已满的话,就会等待
-
*/
-
public void put(E e) throws InterruptedException {
-
checkNotNull(e);
-
final ReentrantLock lock = this.lock;
-
lock.lockInterruptibly();//和lock()方法的区别是让它在阻塞时也可抛出异常跳出
-
try {
-
while (count == items.length)
-
notFull.await(); //这里就是阻塞了,要注意。如果运行到这里,那么它会释放上面的锁,一直等到notify
-
enqueue(e);
-
} finally {
-
lock.unlock();
-
}
-
}
-
/**
-
* 带有超时时间的插入方法,unit表示是按秒、分、时哪一种
-
*/
-
public boolean offer(E e, long timeout, TimeUnit unit)
-
throws InterruptedException {
-
checkNotNull(e);
-
long nanos = unit.toNanos(timeout);
-
final ReentrantLock lock = this.lock;
-
lock.lockInterruptibly();
-
try {
-
while (count == items.length) {
-
if (nanos <= 0)
-
return false;
-
nanos = notFull.awaitNanos(nanos);//带有超时等待的阻塞方法
-
}
-
enqueue(e);//入队
-
return true;
-
} finally {
-
lock.unlock();
-
}
-
}
-
//实现的方法,如果当前队列为空,返回null
-
public E poll() {
-
final ReentrantLock lock = this.lock;
-
lock.lock();
-
try {
-
return (count == 0) ? null : dequeue();
-
} finally {
-
lock.unlock();
-
}
-
}
-
//实现的方法,如果当前队列为空,一直阻塞
-
public E take() throws InterruptedException {
-
final ReentrantLock lock = this.lock;
-
lock.lockInterruptibly();
-
try {
-
while (count == 0)
-
notEmpty.await();//队列为空,阻塞方法
-
return dequeue();
-
} finally {
-
lock.unlock();
-
}
-
}
-
//带有超时时间的取元素方法,否则返回Null
-
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
-
long nanos = unit.toNanos(timeout);
-
final ReentrantLock lock = this.lock;
-
lock.lockInterruptibly();
-
try {
-
while (count == 0) {
-
if (nanos <= 0)
-
return null;
-
nanos = notEmpty.awaitNanos(nanos);//超时等待
-
}
-
return dequeue();//取得元素
-
} finally {
-
lock.unlock();
-
}
-
}
-
//只是看一个队列最前面的元素,取出是不删除队列中的原来元素。队列为空时返回null
-
public E peek() {
-
final ReentrantLock lock = this.lock;
-
lock.lock();
-
try {
-
return itemAt(takeIndex); // 队列为空时返回null
-
} finally {
-
lock.unlock();
-
}
-
}
-
/**
-
* 返回队列当前元素个数
-
*
-
*/
-
public int size() {
-
final ReentrantLock lock = this.lock;
-
lock.lock();
-
try {
-
return count;
-
} finally {
-
lock.unlock();
-
}
-
}
-
/**
-
* 返回当前队列再放入多少个元素就满队
-
*/
-
public int remainingCapacity() {
-
final ReentrantLock lock = this.lock;
-
lock.lock();
-
try {
-
return items.length - count;
-
} finally {
-
lock.unlock();
-
}
-
}
-
/**
-
* 从队列中删除一个元素的方法。删除成功返回true,否则返回false
-
*/
-
public boolean remove(Object o) {
-
if (o == null) return false;
-
final Object[] items = this.items;
-
final ReentrantLock lock = this.lock;
-
lock.lock();
-
try {
-
if (count > 0) {
-
final int putIndex = this.putIndex;
-
int i = takeIndex;
-
do {
-
if (o.equals(items[i])) {
-
removeAt(i); //真正删除的方法
-
return true;
-
}
-
if (++i == items.length)
-
i = 0;
-
} while (i != putIndex);//一直不断的循环取出来做判断
-
}
-
return false;
-
} finally {
-
lock.unlock();
-
}
-
}
-
/**
-
* 是否包含一个元素
-
*/
-
public boolean contains(Object o) {
-
if (o == null) return false;
-
final Object[] items = this.items;
-
final ReentrantLock lock = this.lock;
-
lock.lock();
-
try {
-
if (count > 0) {
-
final int putIndex = this.putIndex;
-
int i = takeIndex;
-
do {
-
if (o.equals(items[i]))
-
return true;
-
if (++i == items.length)
-
i = 0;
-
} while (i != putIndex);
-
}
-
return false;
-
} finally {
-
lock.unlock();
-
}
-
}
-
/**
-
* 清空队列
-
*
-
*/
-
public void clear() {
-
final Object[] items = this.items;
-
final ReentrantLock lock = this.lock;
-
lock.lock();
-
try {
-
int k = count;
-
if (k > 0) {
-
final int putIndex = this.putIndex;
-
int i = takeIndex;
-
do {
-
items[i] = null;
-
if (++i == items.length)
-
i = 0;
-
} while (i != putIndex);
-
takeIndex = putIndex;
-
count = 0;
-
if (itrs != null)
-
itrs.queueIsEmpty();
-
for (; k > 0 && lock.hasWaiters(notFull); k--)
-
notFull.signal();
-
}
-
} finally {
-
lock.unlock();
-
}
-
}
-
/**
-
* 取出所有元素到集合
-
*/
-
public int drainTo(Collection<? super E> c) {
-
return drainTo(c, Integer.MAX_VALUE);
-
}
-
/**
-
* 取出所有元素到集合
-
*/
-
public int drainTo(Collection<? super E> c, int maxElements) {
-
checkNotNull(c);
-
if (c == this)
-
throw new IllegalArgumentException();
-
if (maxElements <= 0)
-
return 0;
-
final Object[] items = this.items;
-
final ReentrantLock lock = this.lock;
-
lock.lock();
-
try {
-
int n = Math.min(maxElements, count);
-
int take = takeIndex;
-
int i = 0;
-
try {
-
while (i < n) {
-
@SuppressWarnings("unchecked")
-
E x = (E) items[take];
-
c.add(x);
-
items[take] = null;
-
if (++take == items.length)
-
take = 0;
-
i++;
-
}
-
return n;
-
} finally {
-
// Restore invariants even if c.add() threw
-
if (i > 0) {
-
count -= i;
-
takeIndex = take;
-
if (itrs != null) {
-
if (count == 0)
-
itrs.queueIsEmpty();
-
else if (i > take)
-
itrs.takeIndexWrapped();
-
}
-
for (; i > 0 && lock.hasWaiters(notFull); i--)
-
notFull.signal();
-
}
-
}
-
} finally {
-
lock.unlock();
-
}
-
}
-
}
三、LinkedBlockingQueue
接下来看看LinkedBlockingQueue的部分源码。
-
package java.util.concurrent;
-
import java.util.concurrent.atomic.AtomicInteger;
-
import java.util.concurrent.locks.Condition;
-
import java.util.concurrent.locks.ReentrantLock;
-
import java.util.AbstractQueue;
-
import java.util.Collection;
-
import java.util.Iterator;
-
import java.util.NoSuchElementException;
-
import java.util.Spliterator;
-
import java.util.Spliterators;
-
import java.util.function.Consumer;
-
public class LinkedBlockingQueue<E> extends AbstractQueue<E>
-
implements BlockingQueue<E>, java.io.Serializable {
-
private static final long serialVersionUID = -6903933977591709194L;
-
/**
-
* 链表节点类
-
*/
-
static class Node<E> {
-
E item;
-
Node<E> next;//下一节点
-
Node(E x) { item = x; }
-
}
-
/** 链表大小 ,默认大小 是Integer.MAX_VALUE */
-
private final int capacity;
-
/**当前队列中存放的元素个数,注意是原子类*/
-
private final AtomicInteger count = new AtomicInteger();
-
/**
-
* 链表队列头节点
-
*/
-
transient Node<E> head;
-
/**
-
* 链表队列尾节点
-
*/
-
private transient Node<E> last;
-
/** 取元素时的可重入锁 */
-
private final ReentrantLock takeLock = new ReentrantLock();
-
/**不为空条件*/
-
private final Condition notEmpty = takeLock.newCondition();
-
/**放元素是时的重入锁 */
-
private final ReentrantLock putLock = new ReentrantLock();
-
/** 不为满的条件 */
-
private final Condition notFull = putLock.newCondition();
-
/**
-
* 不为空通知方法
-
*/
-
private void signalNotEmpty() {
-
final ReentrantLock takeLock = this.takeLock;
-
takeLock.lock();
-
try {
-
notEmpty.signal();
-
} finally {
-
takeLock.unlock();
-
}
-
}
-
/**
-
* 不为满通知方法
-
*/
-
private void signalNotFull() {
-
final ReentrantLock putLock = this.putLock;
-
putLock.lock();
-
try {
-
notFull.signal();
-
} finally {
-
putLock.unlock();
-
}
-
}
-
/**
-
* 进队
-
*
-
* @param node the node
-
*/
-
private void enqueue(Node<E> node) {
-
last = last.next = node;
-
}
-
/**
-
* 出队
-
*/
-
private E dequeue() {
-
Node<E> h = head;
-
Node<E> first = h.next;
-
h.next = h; // help GC
-
head = first;
-
E x = first.item;
-
first.item = null;
-
return x;
-
}
-
/**
-
* 取和入都上锁,此时无法取和放
-
*/
-
void fullyLock() {
-
putLock.lock();
-
takeLock.lock();
-
}
-
/**
-
* 释放锁
-
*/
-
void fullyUnlock() {
-
takeLock.unlock();
-
putLock.unlock();
-
}
-
/**
-
* 构造函数
-
*/
-
public LinkedBlockingQueue() {
-
this(Integer.MAX_VALUE);
-
}
-
/**
-
* 构造函数
-
*
-
*/
-
public LinkedBlockingQueue(int capacity) {
-
if (capacity <= 0) throw new IllegalArgumentException();
-
this.capacity = capacity;
-
last = head = new Node<E>(null);
-
}
-
/**
-
* 构造函数
-
*/
-
public LinkedBlockingQueue(Collection<? extends E> c) {
-
this(Integer.MAX_VALUE);
-
final ReentrantLock putLock = this.putLock;
-
putLock.lock(); //取得放入锁
-
try {
-
int n = 0;
-
for (E e : c) {
-
if (e == null)
-
throw new NullPointerException();
-
if (n == capacity)
-
throw new IllegalStateException("Queue full");
-
enqueue(new Node<E>(e));
-
++n;
-
}
-
count.set(n);
-
} finally {
-
putLock.unlock();
-
}
-
}
-
//阻塞等待放入
-
public void put(E e) throws InterruptedException {
-
if (e == null) throw new NullPointerException();
-
int c = -1;
-
Node<E> node = new Node<E>(e);
-
final ReentrantLock putLock = this.putLock;
-
final AtomicInteger count = this.count;
-
putLock.lockInterruptibly(); //取得放入锁
-
try {
-
while (count.get() == capacity) {//队列已满
-
notFull.await();
-
}
-
enqueue(node);//入队
-
c = count.getAndIncrement();//当前队列中元素个数加1
-
if (c + 1 < capacity)
-
notFull.signal();
-
} finally {
-
putLock.unlock();
-
}
-
if (c == 0)
-
signalNotEmpty();
-
}
-
/**
-
*带超时时间的阻塞等待放入,队列不满。放入成功返回true,否则返回fasle
-
*/
-
public boolean offer(E e, long timeout, TimeUnit unit)
-
throws InterruptedException {
-
if (e == null) throw new NullPointerException();
-
long nanos = unit.toNanos(timeout);
-
int c = -1;
-
final ReentrantLock putLock = this.putLock;
-
final AtomicInteger count = this.count;
-
putLock.lockInterruptibly();
-
try {
-
while (count.get() == capacity) {
-
if (nanos <= 0)
-
return false;
-
nanos = notFull.awaitNanos(nanos);
-
}
-
enqueue(new Node<E>(e));
-
c = count.getAndIncrement();
-
if (c + 1 < capacity)
-
notFull.signal();
-
} finally {
-
putLock.unlock();
-
}
-
if (c == 0)
-
signalNotEmpty();
-
return true;
-
}
-
/**
-
* 非阻塞放入。队列不满放入成功返回true,否则返回fasle
-
*/
-
public boolean offer(E e) {
-
if (e == null) throw new NullPointerException();
-
final AtomicInteger count = this.count;
-
if (count.get() == capacity)
-
return false;
-
int c = -1;
-
Node<E> node = new Node<E>(e);
-
final ReentrantLock putLock = this.putLock;
-
putLock.lock();
-
try {
-
if (count.get() < capacity) {
-
enqueue(node);
-
c = count.getAndIncrement();
-
if (c + 1 < capacity)
-
notFull.signal();
-
}
-
} finally {
-
putLock.unlock();
-
}
-
if (c == 0)
-
signalNotEmpty();
-
return c >= 0;
-
}
-
//阻塞等待取出元素
-
public E take() throws InterruptedException {
-
E x;
-
int c = -1;
-
final AtomicInteger count = this.count;
-
final ReentrantLock takeLock = this.takeLock;
-
takeLock.lockInterruptibly();
-
try {
-
while (count.get() == 0) {
-
notEmpty.await();
-
}
-
x = dequeue();
-
c = count.getAndDecrement();
-
if (c > 1)
-
notEmpty.signal();
-
} finally {
-
takeLock.unlock();
-
}
-
if (c == capacity)
-
signalNotFull();
-
return x;
-
}
-
//带有超时时间等待的取出元素
-
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
-
E x = null;
-
int c = -1;
-
long nanos = unit.toNanos(timeout);
-
final AtomicInteger count = this.count;
-
final ReentrantLock takeLock = this.takeLock;
-
takeLock.lockInterruptibly();//等待时可抛出异常跳出
-
try {
-
while (count.get() == 0) {
-
if (nanos <= 0)
-
return null;
-
nanos = notEmpty.awaitNanos(nanos);//超时等待
-
}
-
x = dequeue();
-
c = count.getAndDecrement();
-
if (c > 1)
-
notEmpty.signal();//不这空条件成立
-
} finally {
-
takeLock.unlock();
-
}
-
if (c == capacity)
-
signalNotFull();
-
return x;
-
}
-
//取队头元素。没有的话返回null,有的话返回元素,并将队列中删除此元素
-
public E poll() {
-
final AtomicInteger count = this.count;
-
if (count.get() == 0)
-
return null;
-
E x = null;
-
int c = -1;
-
final ReentrantLock takeLock = this.takeLock;
-
takeLock.lock();//获得取得锁
-
try {
-
if (count.get() > 0) {
-
x = dequeue();//出队
-
c = count.getAndDecrement();//当前队列中元素个数减去1
-
if (c > 1)
-
notEmpty.signal();//不为空条件成功
-
}
-
} finally {
-
takeLock.unlock();
-
}
-
if (c == capacity)
-
signalNotFull();
-
return x;
-
}
-
//取队头元素,但不从队列中删除 ,没有的话返回null,不阻塞
-
public E peek() {
-
if (count.get() == 0)
-
return null;
-
final ReentrantLock takeLock = this.takeLock;
-
takeLock.lock();//获得取得锁
-
try {
-
Node<E> first = head.next;
-
if (first == null)
-
return null;
-
else
-
return first.item;
-
} finally {
-
takeLock.unlock();
-
}
-
}
-
/**
-
* 删除时要同时取得放入锁和取得锁
-
*/
-
public boolean remove(Object o) {
-
if (o == null) return false;
-
fullyLock();//同时取得放入锁和取得锁
-
try {
-
for (Node<E> trail = head, p = trail.next;
-
p != null;
-
trail = p, p = p.next) {
-
if (o.equals(p.item)) {
-
unlink(p, trail);
-
return true;
-
}
-
}
-
return false;
-
} finally {
-
fullyUnlock();
-
}
-
}
-
/**
-
* 是否包含
-
*/
-
public boolean contains(Object o) {
-
if (o == null) return false;
-
fullyLock();//同时取得放入锁和取得锁
-
try {
-
for (Node<E> p = head.next; p != null; p = p.next)
-
if (o.equals(p.item))
-
return true;
-
return false;
-
} finally {
-
fullyUnlock();
-
}
-
}
-
}
从LinkedBlockingQueue的源码中,我们可以看出他和ArrayBlockingQueue主要有以下两点区别:
1、ArrayBlockingQueue数据是放在一个数组中。LinkedBlockingQueue是放在一个Node节点中,构成一个链接。
2、ArrayBlockingQueue取元素和放元素都是同一个锁,而LinkedBlockingQueue有两个锁,一个放入锁,一个取得锁。分别对应放入元素和取得元素时的操作。这是由链表的结构所确定的。但是删除一个元素时,要同时获得放入锁和取得锁。
四、SynchronousQueue
SynchronousQueue 这个队列实现了 BlockingQueue接口。该队列的特点
1.容量为0,无论何时 size方法总是返回0
2. put操作阻塞, 直到另外一个线程取走队列的元素。
3.take操作阻塞,直到另外的线程put某个元素到队列中。
4. 任何线程只能取得其他线程put进去的元素,而不会取到自己put进去的元素
-
public SynchronousQueue(boolean fair) {
-
transferer = fair ? new TransferQueue() : new TransferStack();
-
}
构造方法上接收boolean参数,表示这是一个公平的基于队列的排队模式,还是一个非公平的基于栈的排队模式。
这篇关于Java并发编程与技术内幕:ArrayBlockingQueue、LinkedBlockingQueue及SynchronousQueue源码解析的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!