本文主要是介绍并发之BlockingQueue,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
并发包中BlockingQueue的类关系图如下:
public interface BlockingQueue<E> extends Queue<E> {/*** 往队尾增加元素,如果队列已满则抛出IllegalStateException异常*/boolean add(E e);/*** 往队尾增加元素,如果队列满则返回false*/boolean offer(E e);/*** 往队尾添加元素,如果队列满则阻塞*/void put(E e) throws InterruptedException;/*** 往队尾增加元素,如果队列满等待timeout时间,如果仍无法添加,则返回false*/boolean offer(E e, long timeout, TimeUnit unit)throws InterruptedException;/*** 从队列中取出队头元素,如果队列为空则阻塞*/E take() throws InterruptedException;/*** 从队列中取出队头元素,如果为空则返回false*/E poll();/*** 从队列中取出队头元素,如果队列空等待timeout时间,如果仍无法获取,则返回false*/E poll(long timeout, TimeUnit unit)throws InterruptedException;/*** 从队列中取出队头元素,如果队列为空则抛出NoSuchElementException异常*/E remove();/*** 从队列中移除o对象(调用equals()比较),如果不存在或者队列为空,则返回false。如队列中存在多个o对象,则移除第一个*/boolean remove(Object o);/*** 取出队头元素但不删除,如果队列为空则抛出NoSuchElementException异常*/E element();/*** 取出队头元素但不删除,如果队列为空则返回null*/E peek();/*** 返回队列剩余空间*/int remainingCapacity();/*** 查询队列中是否存在o对象*/public boolean contains(Object o);/*** 删除队列中于c的公共元素*/int drainTo(Collection<? super E> c);/*** 删除队列中于c的公共元素,最多删除maxElements个*/int drainTo(Collection<? super E> c, int maxElements);
}
操作 | 抛出异常 | 返回boolean/null | 阻塞 |
---|---|---|---|
入队 | add | offer | put |
出队 | remove | poll | take |
查看队头 | element | peek |
- ArrayBlockingQueue
使用ReentrantLock进行同步,用数组实现双向队列来保存元素,创建ArrayBlockingQueue需要指定队列的大小,不像ArrayList可以拓容,是一种有界队列。
ArrayBlockingQueue重要包含如下几个成员变量
/** 保存元素的数组 */final Object[] items;/** 队头元素位置 */int takeIndex;/** 队尾元素位置(待入队位置) */int putIndex;/** 队列中的元素个数 */int count;
- LinkedBlockingQueue
使用有头结点链表实现队列,类似LinkedList,其最大节点数可达到Integer.MAX_VALUE。
- LinkedBlockingDeque
使用链表实现的双向队列,最大节点数为Integer.MAX_VALUE。
- PriorityBlockingQueue
优先阻塞队列使用数组存储队列元素,实则使用了大根堆来实现。大根堆的性质可以保证有线队列的堆顶元素一定为整个堆中最大的(但是不保证堆内有序)。
队列的初始大小为11,最大容量为Integer.MAX_VALUE - 8,每当原数组容量不够时,将会调用拓容方法,尤其注意,拓容前会释放队列的锁。
private void tryGrow(Object[] array, int oldCap) {lock.unlock(); // must release and then re-acquire main lockObject[] newArray = null;if (allocationSpinLock == 0 &&UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,0, 1)) {try {int newCap = oldCap + ((oldCap < 64) ?(oldCap + 2) : // grow faster if small(oldCap >> 1));if (newCap - MAX_ARRAY_SIZE > 0) { // possible overflowint minCap = oldCap + 1;if (minCap < 0 || minCap > MAX_ARRAY_SIZE)throw new OutOfMemoryError();newCap = MAX_ARRAY_SIZE;}if (newCap > oldCap && queue == array)newArray = new Object[newCap];} finally {allocationSpinLock = 0;}}if (newArray == null) // back off if another thread is allocatingThread.yield();lock.lock();//queue == array这个判断条件是为了判断此时队列是否已经被其他线程拓容if (newArray != null && queue == array) {queue = newArray;System.arraycopy(array, 0, newArray, 0, oldCap);}
}
拓容具体逻辑为当原容量小于64时,需要快速增长,所以每次增长100%+2,当原容量大于等于64时则增长50%。
由于拓容过程没有使用同步锁,所以有可能多个线程同时拓容,所以拓容完需要判断此时队列的数组是否还是原来拓容前的数组,如果是,则将原来的元素拷贝到新数组中(拷贝前需要重新获取队列的锁)
由于队列内部使用的数据结构为大根堆,所以队列的入队和出队相当于堆排序中的插入和删除操作,在此不再赘述。
- DelayQueue
使用了优先队列(小根堆)来保存每个元素的时延,时延最小的将会最先出队。当调用出队方法时,会取出队头元素,并获取其时延delay,如果delay小于等于0,则表示时延已到,则出队,否则当前调用await(delay)方法,等待delay纳秒以后被唤醒。
- SynchronousQueue
- LinkedTransferQueue
未完
这篇关于并发之BlockingQueue的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!