本文主要是介绍LinkedBlockingQueue源码学习,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
首先来看一个例子,例子来源于网上:
/*** 多线程模拟实现生产者/消费者模型* */
public class BlockingQueueTest2 {/*** * 定义装苹果的篮子* */public class Basket {// 篮子,能够容纳3个苹果BlockingQueue<String> basket = new LinkedBlockingQueue<String>(3);// 生产苹果,放入篮子public void produce() throws InterruptedException {// put方法放入一个苹果,若basket满了,等到basket有位置basket.put("An apple");}// 消费苹果,从篮子中取走public String consume() throws InterruptedException {// take方法取出一个苹果,若basket为空,等到basket有苹果为止(获取并移除此队列的头部)return basket.take();}}// 定义苹果生产者class Producer implements Runnable {private String instance;private Basket basket;public Producer(String instance, Basket basket) {this.instance = instance;this.basket = basket;}public void run() {try {while (true) {// 生产苹果System.out.println("生产者准备生产苹果:" + instance);basket.produce();System.out.println("!生产者生产苹果完毕:" + instance);// 休眠300msThread.sleep(300);}} catch (InterruptedException ex) {System.out.println("Producer Interrupted");}}}// 定义苹果消费者class Consumer implements Runnable {private String instance;private Basket basket;public Consumer(String instance, Basket basket) {this.instance = instance;this.basket = basket;}public void run() {try {while (true) {// 消费苹果System.out.println("消费者准备消费苹果:" + instance);System.out.println(basket.consume());System.out.println("!消费者消费苹果完毕:" + instance);// 休眠1000msThread.sleep(1000);}} catch (InterruptedException ex) {System.out.println("Consumer Interrupted");}}}public static void main(String[] args) {BlockingQueueTest2 test = new BlockingQueueTest2();// 建立一个装苹果的篮子Basket basket = test.new Basket();ExecutorService service = Executors.newCachedThreadPool();Producer producer = test.new Producer("生产者001", basket);Producer producer2 = test.new Producer("生产者002", basket);Consumer consumer = test.new Consumer("消费者001", basket);service.submit(producer);service.submit(producer2);service.submit(consumer);// 程序运行5s后,所有任务停止
// try {
// Thread.sleep(1000 * 5);
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
// service.shutdownNow();}}
采用线程池和阻塞队列实现生产/消费者模型。其中LinkedBlockingQueue是阻塞队列,同时线程安全,其特点:
采用链表数据结构Node的方式进行节点数据的记录,
同时其进行入队和出队的计数器采用原子性的AtomicInteger
其出队和入队采用采用两把锁,putLock和takeLock,同时进行删除的时候,采用fullLock
其与LinkedBlockingQueue相比,其可以无界可以有界,而ArrayBlockingQueue是有界的,同时实现的数据结构不通过,一个采用数组、一个采用链表,同时采用的锁的方式不同,ArrayBlockingQueue采用一把锁,没有对生产和消费消息进行锁的分离。
1.相关变量
//容量,为空时使用Integer.MAX_VALUE=2^31-1
private final int capacity;/** Current number of elements */
//计数,队列中的元素个数
private final AtomicInteger count = new AtomicInteger();//头结点,head.item==null,首节点不存放元素
transient Node<E> head;//尾节点,last.next==null
private transient Node<E> last;/** Lock held by take, poll, etc */
//消费队列锁
private final ReentrantLock takeLock = new ReentrantLock();/** Wait queue for waiting takes */
//消费队列等待消费,用于队满时,进行消费
private final Condition notEmpty = takeLock.newCondition();/** Lock held by put, offer, etc */
//生产队列锁
private final ReentrantLock putLock = new ReentrantLock();/** Wait queue for waiting puts */
//生产队列等待生产,用于队空时,进行生产
private final Condition notFull = putLock.newCondition();//节点信息:数据、后继点击
static class Node<E> {E item;/*** One of:* - the real successor Node* - this Node, meaning the successor is head.next* - null, meaning there is no successor (this is the last node)*///下一个节点,分为三种情况:// 指向真正的节点、指向自己,后继节点为head.next、为空,表示当前节点为尾节点Node<E> next;Node(E x) { item = x; }
}
2.构造方法
//构造方法,空参构造默认队列容量为2^31-1
public LinkedBlockingQueue() {this(Integer.MAX_VALUE);
}//构造方法,带指定容量
public LinkedBlockingQueue(int capacity) {//对容量进行校验if (capacity <= 0) throw new IllegalArgumentException();this.capacity = capacity;//创建节点信息last = head = new Node<E>(null);
}//构造方法,放入带指定集合的元素信息入队
//首先采用默认大小,进行上锁操作,
// 放入元素到队列中,进行遍历,放入
public LinkedBlockingQueue(Collection<? extends E> c) {//默认队列大小,2^31-1this(Integer.MAX_VALUE);//进行上锁操作final ReentrantLock putLock = this.putLock;putLock.lock(); // Never contended, but necessary for visibilitytry {//放入元素,进行计数int n = 0;for (E e : c) {if (e == null)throw new NullPointerException();if (n == capacity)throw new IllegalStateException("Queue full");enqueue(new Node<E>(e));++n;}count.set(n);} finally {//释放锁putLock.unlock();}
}
3.方法
生产方法
put
//入队操作
//首先获取锁,再检查队列是否满了,如果满了,则进行阻塞等待,
// 如果队列没有满,则进行生产操作,同时计数器进行计数
//生产后的元素个数如果还没有达到容量时,会继续唤醒其他生产线程
//当生产的元素是元素的第一个元素时唤醒阻塞等待消费的线程
public void put(E e) throws InterruptedException {//非空校验if (e == null) throw new NullPointerException();// Note: convention in all put/take/etc is to preset local var// holding count negative to indicate failure unless set.//设置计数为0,失败的时候返回int c = -1;Node<E> node = new Node<E>(e);final ReentrantLock putLock = this.putLock;final AtomicInteger count = this.count;//中断上锁putLock.lockInterruptibly();try {/** Note that count is used in wait guard even though it is* not protected by lock. This works because count can* only decrease at this point (all other puts are shut* out by lock), and we (or some other waiting put) are* signalled if it ever changes from capacity. Similarly* for all other uses of count in other wait guards.*///检查队列是否满了,满了进行阻塞操作while (count.get() == capacity) {notFull.await();}//入队操作,将节点信息插入到队尾//last=last.next=nodeenqueue(node);c = count.getAndIncrement();//元素没有满,则唤醒被阻塞的线程,增加线程if (c + 1 < capacity)notFull.signal();} finally {//释放锁putLock.unlock();}//插入的是一个元素时唤醒阻塞等待的线程if (c == 0)signalNotEmpty();
}
offer
//阻塞带超时时间的offer操作
public boolean offer(E e, long timeout, TimeUnit unit)throws InterruptedException {if (e == null) throw new NullPointerException();long nanos = unit.toNanos(timeout);int c = -1;final ReentrantLock putLock = this.putLock;final AtomicInteger count = this.count;putLock.lockInterruptibly();try {while (count.get() == capacity) {//如果时间<0,则表示超时返回了,此时队列未满,直接返回if (nanos <= 0)return false;nanos = notFull.awaitNanos(nanos);}//否者进行入队操作enqueue(new Node<E>(e));c = count.getAndIncrement();if (c + 1 < capacity)notFull.signal();} finally {putLock.unlock();}if (c == 0)signalNotEmpty();return true;
}//首先进行非空校验,如果队满了,直接返回false
//如果没有满,则进行上锁,同时进行判断,
// 如果计数<容量,则进行入队操作
//最后释放锁
public boolean offer(E e) {if (e == null) throw new NullPointerException();final AtomicInteger count = this.count;if (count.get() == capacity)return false;int c = -1;Node<E> node = new Node<E>(e);final ReentrantLock putLock = this.putLock;putLock.lock();try {if (count.get() < capacity) {enqueue(node);c = count.getAndIncrement();if (c + 1 < capacity)notFull.signal();}} finally {putLock.unlock();}if (c == 0)signalNotEmpty();return c >= 0;
}
消费者操作
take操作
//take操作 消费消息
//如果队列为非空或者被唤醒,进行消费操作,计数器-1
public E take() throws InterruptedException {E x;int c = -1;final AtomicInteger count = this.count;final ReentrantLock takeLock = this.takeLock;takeLock.lockInterruptibly();try {while (count.get() == 0) {notEmpty.await();}x = dequeue();c = count.getAndDecrement();if (c > 1)notEmpty.signal();} finally {takeLock.unlock();}//开始消费if (c == capacity)signalNotFull();return x;
}
//进行消费操作 poll,带超时时间
public E poll(long timeout, TimeUnit unit) throws InterruptedException {E x = null;int c = -1;long nanos = unit.toNanos(timeout);final AtomicInteger count = this.count;final ReentrantLock takeLock = this.takeLock;takeLock.lockInterruptibly();try {while (count.get() == 0) {if (nanos <= 0)return null;nanos = notEmpty.awaitNanos(nanos);}x = dequeue();c = count.getAndDecrement();if (c > 1)notEmpty.signal();} finally {takeLock.unlock();}if (c == capacity)signalNotFull();return x;
}//进行poll操作public E poll() {final AtomicInteger count = this.count;if (count.get() == 0)return null;E x = null;int c = -1;final ReentrantLock takeLock = this.takeLock;takeLock.lock();try {if (count.get() > 0) {x = dequeue();c = count.getAndDecrement();if (c > 1)notEmpty.signal();}} finally {takeLock.unlock();}if (c == capacity)signalNotFull();return x;}
remove操作:
//删除操作,释放指定节点信息
public boolean remove(Object o) {if (o == null) return false;//对生产消息和消费消息进行上锁fullyLock();try {for (Node<E> trail = head, p = trail.next;p != null;trail = p, p = p.next) {if (o.equals(p.item)) {//释放节点unlink(p, trail);return true;}}return false;} finally {fullyUnlock();}
}
drainTo操作
drainTo操作
public int drainTo(Collection<? super E> c) {return drainTo(c, Integer.MAX_VALUE);}//一次性地将队列中的全部元素消费完同时返回指定集合的信息,避免多次加锁造成的性能开销//其中c和maxElement表示返回的集合、要获取的元素个数public int drainTo(Collection<? super E> c, int maxElements) {if (c == null)throw new NullPointerException();if (c == this)throw new IllegalArgumentException();if (maxElements <= 0)return 0;boolean signalNotFull = false;//进行上锁final ReentrantLock takeLock = this.takeLock;takeLock.lock();try {//拿到两者之间的最小的一个int n = Math.min(maxElements, count.get());// count.get provides visibility to first n NodesNode<E> h = head;int i = 0;try {//将元素添加中集合c中while (i < n) {Node<E> p = h.next;c.add(p.item);p.item = null;h.next = h;h = p;++i;}return n;} finally {// Restore invariants even if c.add() threwif (i > 0) {// assert h.item == null;head = h;signalNotFull = (count.getAndAdd(-i) == capacity);}}} finally {takeLock.unlock();if (signalNotFull)signalNotFull();}}
这篇关于LinkedBlockingQueue源码学习的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!