本文主要是介绍多线程篇(阻塞队列- ArrayBlockingQueue)(持续更新迭代),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
目录
一、源码分析
1. 先看个关系图
2. 构造方法
3. 核心属性
4. 核心功能
入队(放入数据)
出队(取出数据)
5. 总结
一、源码分析
1. 先看个关系图
PS:先看个关系图
ArrayBlockingQueue是最典型的有界阻塞队列,其内部是用数组存储元素的,
初始化时需要指定容量大小利用 ReentrantLock 实现线程安全。
在生产者-消费者模型中使用时,如果生产速度和消费速度基本匹配的情况下,使用
ArrayBlockingQueue是个不错选择;
当如果生产速度远远大于消费速度,则会导致队列填满,大量生产线程被阻塞。
使用独占锁ReentrantLock实现线程安全,入队和出队操作使用同一个锁对象,也就是只能有一个
线程可以进行入队或者出队操作;
这也就意味着生产者和消费者无法并行操作,在高并发场景下会成为性能瓶颈。
PS:从图中可以看有2个指针,插入和获取。
ArrayBlockingQueue使用
BlockingQueue<String> queue = new ArrayBlockingQueue(1024);queue.put("1"); //向队列中添加元素String take = queue.take(); // 获取元素
ArrayBlockingQueue特性
2. 构造方法
public ArrayBlockingQueue(int capacity) {this(capacity, false);}public ArrayBlockingQueue(int capacity, boolean fair) {if (capacity <= 0)throw new IllegalArgumentException();this.items = new Object[capacity];lock = new ReentrantLock(fair);notEmpty = lock.newCondition();notFull = lock.newCondition();}
参数:
- capacity:定义数组大小
- fair:true/false(公平、非公平锁)
默认是非公平锁的实现
3. 核心属性
public class ArrayBlockingQueue<E> extends AbstractQueue<E>implements BlockingQueue<E>, java.io.Serializable {/** The queued items */final Object[] items;/** items index for next take, poll, peek or remove */int takeIndex;/** items index for next put, offer, or add */int putIndex;/** Number of elements in the queue */int count;/** Concurrency control uses the classic two-condition algorithm* found in any textbook.*//** Main lock guarding all access */final ReentrantLock lock;/** Condition for waiting takes */private final Condition notEmpty;/** Condition for waiting puts */private final Condition notFull;
}
属性说明
- items:存放元素的数组。
- takeIndex:获取的指针位置。
- putIndex:插入的指针位置。
- count:队列中的元素个数。
- lock:内部锁。
- notEmpty:消费者
- notFull:生产者
4. 核心功能
入队(放入数据)
public void put(E e) throws InterruptedException {//检查是否为空checkNotNull(e);final ReentrantLock lock = this.lock;//加锁,如果线程中断抛出异常 lock.lockInterruptibly();try {//阻塞队列已满,则将生产者挂起,等待消费者唤醒//设计注意点: 用while不用if是为了防止虚假唤醒while (count == items.length)notFull.await(); //队列满了,使用notFull等待(生产者阻塞)// 入队enqueue(e);} finally {lock.unlock(); // 唤醒消费者线程}
}private void enqueue(E x) {final Object[] items = this.items;//入队 使用的putIndexitems[putIndex] = x;if (++putIndex == items.length) putIndex = 0; //设计的精髓: 环形数组,putIndex指针到数组尽头了,返回头部count++;//notEmpty条件队列转同步队列,准备唤醒消费者线程,因为入队了一个元素,肯定不为空了notEmpty.signal();
}
put主要做了几件事情
1、加中断锁lockInterruptibly(和普通lock一样,只不过多了个判断,如果被中断,则抛异常)。
2、判断队列中的数组长度元素已满,满了则让生产者阻塞。
3、没满,则入队,调用enqueue方法
4、解锁
enqueue主要做了几件事情
1、入队时,把元素放在当前putIndex指针的位置上。
2、看下一指针位置是不是到了队尾,如果是,则改为0(设计的精髓: 环形数组,putIndex指针
到数组尽头了,返回头部)。
3、数组元素+1
4、唤醒消费者线程,因为入队了一个元素,肯定不为空了。
出队(取出数据)
public E take() throws InterruptedException {final ReentrantLock lock = this.lock;//加锁,如果线程中断抛出异常 lock.lockInterruptibly();try {//如果队列为空,则消费者挂起while (count == 0)notEmpty.await();//出队return dequeue();} finally {lock.unlock();// 唤醒生产者线程}
}
private E dequeue() {final Object[] items = this.items;@SuppressWarnings("unchecked")E x = (E) items[takeIndex]; //取出takeIndex位置的元素items[takeIndex] = null;if (++takeIndex == items.length)takeIndex = 0; //设计的精髓: 环形数组,takeIndex 指针到数组尽头了,返回头部count--;if (itrs != null)itrs.elementDequeued();//notFull条件队列转同步队列,准备唤醒生产者线程,此时队列有空位notFull.signal();return x;
}
take主要做了几件事情
1、添加中断锁。
2、判断队列是否为空,为空,消费者则挂起
3、如果不为空,则出队,调用dequeue方法
4、解锁
dequeue主要做了几件事情
1、取出takeIndex指针位置的元素。
2、看指针位置是不是到了队尾,如果是,则置0,下一次从头在拿。
3、唤醒生产者线程,此时队列有空位
5. 总结
ArrayBlockingQueue 是一个环形数组,通过维护队首、队尾的指针,来优化(避免了数组的插入
和删除,带来的数组位置移动)插入、删除,从而使时间复杂度为O(1).
这篇关于多线程篇(阻塞队列- ArrayBlockingQueue)(持续更新迭代)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!