并发容器之BlockingQueue阻塞队列

2024-03-26 14:04

本文主要是介绍并发容器之BlockingQueue阻塞队列,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

BlockingQueue阻塞队列

BlockingQueue接口是在jdk5版本提供的,在线程池中用到了阻塞队列来实现,阻塞队列是深入学习线程池的基础,该队列通常是有限的容量,如果队列已满添加操作就会阻塞,如果队列为空,移除操作就会阻塞。

public interface BlockingQueue<Eextends Queue<E{
    // add/offer/put  插入数据,插入到队列尾部
  // add添加元素,如果队列已满,直接抛出异常IllegalArgumentException
    boolean add(E e);
  // 添加元素,如果队列已满,返回false
    boolean offer(E e);
    // 添加元素,如果队列已满,阻塞
    void put(E e) throws InterruptedException;
    // 超时
    boolean offer(E e, long timeout, TimeUnit unit)
        throws InterruptedException
;

  
    // take/poll/remove移除数据,移除队列头部
  // 移除元素并返回头元素,如果队列为空,阻塞
    take() throws InterruptedException;
  // 移除元素并返回头元素,如果队列为空,则返回null
  void put(E e) throws InterruptedException;
  // 超时
    poll(long timeout, TimeUnit unit)
        throws InterruptedException
;
  // 移除元素并返回头元素,如果队列为空,抛出NoSuchElementException异常
    boolean remove(Object o);
  
   int remainingCapacity();

    public boolean contains(Object o);

    int drainTo(Collection<? super E> c);

    int drainTo(Collection<? super E> c, int maxElements);
}

在使用阻塞队列时最好使用put()、take()以及可定时的offer()和poll(),而不要使用Queue接口中的方法,否则就丢失了阻塞的效果

BlockingQueue实现类

BlockingQueue接口有多个实现类:ArrayBlockingQueue、LinkedBlockingQueue、PriorityBlockingQueue、DelayQueue、SynchronousQueue等

阻塞队列
阻塞队列
ArrayBlockingQueue

ArrayBlockingQueue类底层是由数组支持的有界阻塞队列,创建时需要指定容量,实现了FIFO(先进先出)排序机制。新添加的元素都在队列的尾部,获取操作是从队列头部进行,可以设置公平策略,默认是非公平的

源码分析

内部没有实现读写分离,生产和消费不能完全并行,长度需要定义

// 存储队列元素
final Object[] items;

/** items index for next take, poll, peek or remove */
// 出队的数组下标(下一个待取出的元素索引)
int takeIndex;

/** items index for next put, offer, or add */
// 入队的数组下标(下一个待添加的元素索引)
int putIndex;

// 队列中的元素数量
int count;
// 锁
final ReentrantLock lock;

/** Condition for waiting takes */
// 出队的条件变量(消费者监视器)
private final Condition notEmpty;

/** Condition for waiting puts */
// 入队的条件变量(生产者监视器)
private final Condition notFull;
public ArrayBlockingQueue(int capacity) {
    this(capacity, false);
}

public ArrayBlockingQueue(int capacity, boolean fair) {
  if (capacity <= 0)
    throw new IllegalArgumentException();
  this.items = new Object[capacity];
  // 默认是非公平锁
  lock = new ReentrantLock(fair);
  notEmpty = lock.newCondition();
  notFull =  lock.newCondition();
}
重要方法

offer方法

offer方法向队列尾部添加元素,如果队列未满,则添加成功,返回true;如果队列已满则丢弃当前元素,返回false。该方法是不阻塞的

public boolean offer(E e) {
  // 如果元素为null,抛出EPN,队列中不可存储null值
    checkNotNull(e);
  // 获取锁
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
      // 队列已满,直接返回false
        if (count == items.length)
            return false;
        else {
          // 添加元素
            enqueue(e);
            return true;
        }
    } finally {
        lock.unlock();
    }
}

private void enqueue(E x) {
  // assert lock.getHoldCount() == 1;
  // assert items[putIndex] == null;
  final Object[] items = this.items;
  items[putIndex] = x;
  // 计算下一个元素存放的下标位置
  if (++putIndex == items.length)
    putIndex = 0;
  count++;
  // 因为添加了一个元素,所以唤醒因为没有元素而被阻塞的take方法的一个线程
  notEmpty.signal();
}

put方法

put方法向队列尾部添加元素,如果队列未满,则添加成功,返回true;如果队列已满则阻塞当前线程直到队列有空闲为止。该方法是阻塞的

public void put(E e) throws InterruptedException {
  // 如果元素为null,抛出EPN,队列中不可存储null值
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
  // 获取可中断锁
    lock.lockInterruptibly();
    try {
      // 队列已满,阻塞
        while (count == items.length)
          // 需要等待notFull.notify的唤醒
            notFull.await();
      // 如果没有满,添加元素进入队列
        enqueue(e);
    } finally { // 释放锁
        lock.unlock();
    }
}
  • 所有执行put操作的线程竞争lock锁,拿到了lock锁的线程进入下一步,没有拿到lock锁的线程自旋竞争锁
  • 判断阻塞队列是否满了,如果满了,则调用await方法阻塞这个线程,并标记为notFull条件变量,同时释放lock锁,等待被消费者线程唤醒
  • 如果没有满,则调用enqueue方法将元素put进阻塞队列
  • 唤醒notEmpty条件变量

poll方法

从队列头获取并移除元素,如果队列为空,则直接返回null,该方法是不阻塞的

public E poll() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
      // 如果队列为空,则直接返回null
        return (count == 0) ? null : dequeue();
    } finally {
        lock.unlock();
    }
}

private E dequeue() {
  // assert lock.getHoldCount() == 1;
  // assert items[takeIndex] != null;
  final Object[] items = this.items;
  @SuppressWarnings("unchecked")
  E x = (E) items[takeIndex];
  // 会移除元素
  items[takeIndex] = null;
  // 队列头指针计算
  if (++takeIndex == items.length)
    takeIndex = 0;
  count--;
  if (itrs != null)
    itrs.elementDequeued();
  // 因为移除了一个元素,所以唤醒因为队列已满而被阻塞的put方法的一个线程
  notFull.signal();
  return x;
}

take方法

从队列头获取并移除元素,如果队列为空,则阻塞当前线程直到不为空返回元素,该方法是阻塞的

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
  // 获取可中断的锁
    lock.lockInterruptibly();
    try {
      // 队列为空,阻塞
        while (count == 0)
            notEmpty.await();
        return dequeue();
    } finally {
        lock.unlock();
    }
}
  • 所有执行take操作的线程竞争lock锁,拿到了lock锁的线程进入下一步,没有拿到lock锁的线程自旋竞争锁
  • 判断阻塞队列是否为空,如果是空,则调用await方法阻塞这个线程,并notEmpty.await条件变量,同时释放locak锁,等待被生产者线程唤醒
  • 如果没有空,则调用dequeue方法
  • 唤醒notFull.notify条件变量

peek方法

与poll方法类似,从队列头获取元素(不会移除元素),如果队列为空,则直接返回null,该方法是不阻塞的

public E peek() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        return itemAt(takeIndex); // null when queue is empty
    } finally {
        lock.unlock();
    }
}
使用案例

logback异步打印日志使用的ArrayBlockQueue

LinkedBlockingQueue

LinkedBlockingQueue类是底层为单向链表的有界阻塞队列,默认容量为Integer.MAX_VALUE,使用先进先出FIFO,线程池中newFixedThreadPool线程池就是使用了该队列

源码分析

可以很好的处理并发数据,其内部实现采用分离锁(读写分离两个锁),可以实现生产和消费操作的完全并行运行。

// 头节点
transient Node<E> head;
//尾结点
private transient Node<E> last;
// 记录队列的个数
private final AtomicInteger count = new AtomicInteger();
// take、poll方法时获取该锁,控制元素出队,同时只有一个线程可以从队列头部获取元素,其他线程阻塞
/** Lock held by take, poll, etc */
private final ReentrantLock takeLock = new ReentrantLock();
// 条件变量,存放出栈被阻塞的线程
// 当队列为空时,执行出队操作的线程会被放入这个条件队列中阻塞
/** Wait queue for waiting takes */
private final Condition notEmpty = takeLock.newCondition();
// put、offer方法时获取该锁,控制元素入队,同时只有一个线程可以在队尾添加元素,其他线程阻塞
/** Lock held by put, offer, etc */
private final ReentrantLock putLock = new ReentrantLock();
// 条件变量,存放入栈被阻塞的线程
// 当队列满时,执行入队操作的线程会被放入这个条件队列中阻塞
/** Wait queue for waiting puts */
private final Condition notFull = putLock.newCondition();

public LinkedBlockingQueue() {
    this(Integer.MAX_VALUE);
}

public LinkedBlockingQueue(int capacity) {
  if (capacity <= 0throw new IllegalArgumentException();
  this.capacity = capacity;
  last = head = new Node<E>(null);
}
重要方法

offer方法

向尾部插入元素,成功插入返回true,队列已满则丢弃当前元素,返回false,该方法是不阻塞的

public boolean offer(E e) {
  // 如果元素为null,则抛出NPE,队列中不可存储null值
    if (e == nullthrow new NullPointerException();
  // 获取当前队列元素数量
    final AtomicInteger count = this.count;
  // 如果已满,则直接返回false
    if (count.get() == capacity)
        return false;
  
    int c = -1;
  // 构造新节点
    Node<E> node = new Node<E>(e);
  // 获取putLock独占锁
    final ReentrantLock putLock = this.putLock;
    putLock.lock();
    try {
      // 队列不满,则进队
        if (count.get() < capacity) {
          // 进队
            enqueue(node);
          // 队列元素数量+1
            c = count.getAndIncrement();
            if (c + 1 < capacity) // 新元素入队后,队列依然没满,则唤醒notFull条件队列中的线程,通知可以进行入队操作了
                notFull.signal();
        }
    } finally {
      // 释放锁
        putLock.unlock();
    }
    if (c == 0// 说明该元素入队之前,队列为空,此时加入了一个元素,应该通知notEmpty条件队列中的线程,可以进行出队操作了
        signalNotEmpty();
    return c >= 0;
}

put方法

put方法与offer方法类似,只是如果队列已满,则阻塞当前线程,知道队列有空闲才会插入成功返回,该方法是阻塞的

public void put(E e) throws InterruptedException {
  // 如果元素为null,则抛出NPE,队列中不可存储null值
    if (e == nullthrow new NullPointerException();
    // Note: convention in all put/take/etc is to preset local var
    // holding count negative to indicate failure unless set.
    int c = -1;
  // 构建新节点
    Node<E> node = new Node<E>(e);
    final ReentrantLock putLock = this.putLock;
  // 获取当前队列元素数量
    final AtomicInteger count = this.count;
  // 获取可中断的putLock锁
    putLock.lockInterruptibly();
    try {
        // 如果队列已满,则进行等待,使用while循环防止被虚假唤醒
        while (count.get() == capacity) {
            notFull.await();
        }
       // 入队
        enqueue(node);
      // 队列中元素数量+1
        c = count.getAndIncrement();
        if (c + 1 < capacity)// 新元素入队后,队列依然没满,则唤醒notFull条件队列中的线程,通知可以进行入队操作了
            notFull.signal();
    } finally { // 释放锁
        putLock.unlock();
    }
    if (c == 0)// 说明该元素入队之前,队列为空,此时加入了一个元素,应该通知notEmpty条件队列中的线程,可以进行出队操作了
        signalNotEmpty();
}

poll方法

从队列头部获取元素,并移除该元素,如果队列为空,则直接返回null,该方法是不阻塞的

public E poll() {
  // 获取队列元素数量
    final AtomicInteger count = this.count;
  // 队列为空,则直接返回null
    if (count.get() == 0)
        return null;
    E x = null;
    int c = -1;
  // 获取takeLock锁
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lock();
    try {
      // 队列不为空
        if (count.get() > 0) {
          // 出队,该方法会移除所取出的元素
            x = dequeue();
          // 队列元素数量-1
            c = count.getAndDecrement();
            if (c > 1// c>1,表示该元素出队之后,至少还有一个元素,可以通知notEmpty条件队列中的线程进行唤醒,来执行出队操作
                notEmpty.signal();
        }
    } finally { // 释放锁
        takeLock.unlock();
    }
    if (c == capacity) // 在进行出队操作前队列中元素是满的,此时可能NotFull是被阻塞的,所以通知NotFull条件队列中的线程进行唤醒,来执行入队操作
        signalNotFull();
    return x;
}

// Removes a node from head of queue.
private E dequeue() {
  // assert takeLock.isHeldByCurrentThread();
  // assert head.item == null;
  Node<E> h = head;
  Node<E> first = h.next;
  h.next = h; // help GC
  head = first;
  E x = first.item;
  first.item = null;
  return x;
}

peek方法

从队列头部获取元素,但是不会移除该元素,如果队列为空,则直接返回null,该方法是不阻塞的

public E peek() {
  // 队列为空,直接返回null
    if (count.get() == 0)
        return null;
    final ReentrantLock takeLock = this.takeLock;
  // 获取takeLock锁
    takeLock.lock();
    try {
      // 获取头部元素
        Node<E> first = head.next;
        if (first == null)
            return null;
        else
            return first.item;
    } finally {
        takeLock.unlock();
    }
}

take方法

与poll方法类似,从队列头部获取元素,并移除该元素,但是如果队列为空,则会阻塞当前线程直到队列不为空然后返回元素,该方法是阻塞的

public E take() throws InterruptedException {
    E x;
    int c = -1;
  // 队列元素数量
    final AtomicInteger count = this.count;
    final ReentrantLock takeLock = this.takeLock;
  // 获取可中断的takeLock锁
    takeLock.lockInterruptibly();
    try {
      // 队列为空,则进行阻塞,使用while循环防止虚假唤醒
        while (count.get() == 0) {
            notEmpty.await();
        }
      // 出队并移除元素
        x = dequeue();
      // 队列元素数量-1
        c = count.getAndDecrement();
        if (c > 1)// c>1,表示该元素出队之后,至少还有一个元素,可以通知notEmpty条件队列中的线程进行唤醒,来执行出队操作
            notEmpty.signal();
    } finally { // 释放锁
        takeLock.unlock();
    }
    if (c == capacity)// 在进行出队操作前队列中元素是满的,此时可能NotFull是被阻塞的,所以通知NotFull条件队列中的线程进行唤醒,来执行入队操作
        signalNotFull();
    return x;
}
PriorityBlockingQueue

PriorityBlockingQueue类是一个无界阻塞队列,与LinkedBlockingQueue类似,只是排序是基于优先级的阻塞队列,可以决定元素的优先顺序(使用自然排序或者比较器来进行排序,传入的对象必须实现Comparable接口),会自动进行扩容,内部控制线程同步的锁是公平锁,存储使用的是平衡二叉堆实现的

源码分析
// 存放队列元素
private transient Object[] queue;

// 队列元素数量
private transient int size;


// 比较器,使用该比较器来比较元素大小进行排序,如果为null,则使用元素的自然排序
private transient Comparator<? super E> comparator;

// 控制只能有一个线程进行入队、出队操作
private final ReentrantLock lock;

// 条件变量用来实现take方法阻塞
private final Condition notEmpty;

// 自旋锁,使用cas操作来保证只有一个线程可以扩容队列,状态为0表示当前没有进行扩容,1表示当前正在扩容
private transient volatile int allocationSpinLock;

// 默认队列大小为11
private static final int DEFAULT_INITIAL_CAPACITY = 11;
// 最大队列大小
 private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;

/**
     * A plain PriorityQueue used only for serialization,
     * to maintain compatibility with previous versions
     * of this class. Non-null only during serialization/deserialization.
     */

private PriorityQueue<E> q;
public PriorityBlockingQueue() {
    this(DEFAULT_INITIAL_CAPACITY, null);
}

public PriorityBlockingQueue(int initialCapacity,
                                 Comparator<? super E> comparator)
 
{
  if (initialCapacity < 1)
    throw new IllegalArgumentException();
  this.lock = new ReentrantLock();
  this.notEmpty = lock.newCondition();
  this.comparator = comparator;
  this.queue = new Object[initialCapacity];
}
重要方法

offer方法

offer方法在队列中添加添加元素,由于该队列是无界的,所以不会阻塞

public boolean offer(E e) {
   // 如果元素为null,则抛出NPE,队列中不可存储null值
    if (e == null)
        throw new NullPointerException();
    final ReentrantLock lock = this.lock;
  // 获取锁
    lock.lock();
    int n, cap;
    Object[] array;
  // 当前元素个数>=队列容量,进行扩容
    while ((n = size) >= (cap = (array = queue).length))
        tryGrow(array, cap);
    try {
        Comparator<? super E> cmp = comparator;
        if (cmp == null// 比较器为null,进行元素的自然排序
            siftUpComparable(n, e, array);
        else // 有自定义的比较器,使用自定义比较器进行排序
            siftUpUsingComparator(n, e, array, cmp);
        size = n + 1;
        notEmpty.signal(); // 唤醒由于队列没有元素导致的take操作阻塞的一个线程
    } finally {
        lock.unlock();
    }
    return true;
}

// 扩容操作
private void tryGrow(Object[] array, int oldCap) {
  // 先释放掉主锁,由于扩容比较费时,释放锁可以让其他线程可以做其他的操作
  lock.unlock(); // must release and then re-acquire main lock
  Object[] newArray = null;
  // 进行cas操作,成功则进行扩容,保证只有一个线程可以进行扩容操作
  if (allocationSpinLock == 0 &&
      UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,
                               01)) {
    try {
      // oldCap小于64,则执行oldCap + 2,否则乘以2
      int newCap = oldCap + ((oldCap < 64) ?
                             (oldCap + 2) : // grow faster if small
                             (oldCap >> 1));
      // 最大容量不能超过MAX_ARRAY_SIZE
      if (newCap - MAX_ARRAY_SIZE > 0) {    // possible overflow
        int minCap = oldCap + 1;
        if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
          throw new OutOfMemoryError();
        newCap = MAX_ARRAY_SIZE;
      }
      if (newCap > oldCap && queue == array)
        newArray = new Object[newCap];
    } finally {
      allocationSpinLock = 0;
    }
  }
  // 在某线程进行扩容时,可能会有其他线程也来进行扩容,由于CAS操作失败,会执行该代码,让出cpu
  if (newArray == null// back off if another thread is allocating
    Thread.yield();
  lock.lock();
  // 扩容成功
  if (newArray != null && queue == array) {
    queue = newArray;
    System.arraycopy(array, 0, newArray, 0, oldCap);
  }
}

// 二叉堆操作
// k传入的是队列的元素数量,也就是所要添加的元素可能会放入的数组下标,x是所要添加的元素,array是当前的队列元素
private static <T> void siftUpComparable(int k, T x, Object[] array) {
  Comparable<? super T> key = (Comparable<? super T>) x;
  // 第一个元素添加进来时不需要进入while循环,直接添加元素即可,因为只有一个元素,不需要排序
  while (k > 0) {
    // 因为是树形,找到该元素的父节点
    int parent = (k - 1) >>> 1;
    Object e = array[parent];
    // 比较该节点和父节点
    if (key.compareTo((T) e) >= 0)
      break;
    // 与父节点交换位置
    array[k] = e;
    k = parent;
  }
  array[k] = key;
}

put方法

与offer方法相同

public void put(E e) {
    offer(e); // never need to block
}

poll方法

poll方法获取第一个元素,如果队列为空,则返回null

public E poll() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        return dequeue();
    } finally {
        lock.unlock();
    }
}

private E dequeue() {
  int n = size - 1;
  // 队列为空,则直接返回null
  if (n < 0)
    return null;
  else {
    Object[] array = queue;
    // 获取队头元素
    E result = (E) array[0];
    // 获取队尾元素,并赋值为null
    E x = (E) array[n];
    array[n] = null;
    Comparator<? super E> cmp = comparator;
    // 根节点没有了,需要重新进行二叉堆的构建
    if (cmp == null)
      siftDownComparable(0, x, array, n);
    else
      siftDownUsingComparator(0, x, array, n, cmp);
    size = n;
    return result;
  }
}

// 由于根节点被移除了,所以需要重新构建二叉堆
// k为0,x为之前的队尾元素,array为队列,n为队尾数组下标
private static <T> void siftDownComparable(int k, T x, Object[] array,
                                               int n)
 
{
  if (n > 0) {
    Comparable<? super T> key = (Comparable<? super T>)x;
    int half = n >>> 1;           // loop while a non-leaf
    while (k < half) {
      int child = (k << 1) + 1// assume left child is least
      Object c = array[child];
      int right = child + 1;
      if (right < n &&
          ((Comparable<? super T>) c).compareTo((T) array[right]) > 0)
        c = array[child = right];
      if (key.compareTo((T) c) <= 0)
        break;
      array[k] = c;
      k = child;
    }
    array[k] = key;
  }
}

take方法

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
  // 获取可中断的锁
    lock.lockInterruptibly();
    E result;
    try {
      // 如果队列为空,则阻塞
        while ( (result = dequeue()) == null)
            notEmpty.await();
    } finally {
        lock.unlock();
    }
    return result;
}
DelayQueue

DelayQueue类是一种延迟队列,带有延迟时间,只有当延迟时间到了,才能从队列中获取到该元素。DelayQueue中的元素必须实现Delayed接口,该队列也是一个没有大小限制的队列,可以用做对缓存超时的数据移除、任务超时处理和空闲连接关闭等。

源码分析
private final transient ReentrantLock lock = new ReentrantLock();
private final PriorityQueue<E> q = new PriorityQueue<E>();
// leader变量的使用基于Leader-Follower模式的变体,用于减少不必要的线程等待
// 当一个线程调用队列的take方法变为leader线程后,它会调用条件变量available.awaitNanos等待delay时间,但是其他线程则会调用available.await()进行无限等待
private Thread leader = null;
// 条件变量
private final Condition available = lock.newCondition();
public DelayQueue() {}
重要方法

offer方法

插入元素到队列

public boolean offer(E e) {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
      // 入队操作,如果为null会抛出NPE
        q.offer(e);
        if (q.peek() == e) { // 使用的是PriorityQueue,优先级队列,获取的是最先要过期的,所以当前元素时第一个元素,之前的队列没有元素
            leader = null;
          // 入队成功,通知被阻塞的线程进行唤醒
            available.signal();
        }
        return true;
    } finally {
        lock.unlock();
    }
}

take方法

获取并移除队列里面延迟时间过期的元素,如果没有过期元素则等待,会阻塞

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        for (;;) {
          // 获取但不移除元素 1
            E first = q.peek();
            if (first == null// 队列中没有元素
              // 会进行阻塞,等待唤醒
                available.await();
            else {
              // 获取该元素的过期时间
                long delay = first.getDelay(NANOSECONDS);
                if (delay <= 0// 已经过期,取出
                    return q.poll();
                first = null// 走到这里说明没有过期 don't retain ref while waiting
                if (leader != null// leader不为null,说明有其他线程在进行take操作,进行阻塞等待
                    available.await();
                else { // 当前没有其他线程在进行take操作,选取当前线程作为leader
                    Thread thisThread = Thread.currentThread();
                    leader = thisThread; 
                    try {
                        available.awaitNanos(delay); // 等待该元素过期,然后重新竞争锁
                    } finally {
                      // 重置leader
                        if (leader == thisThread)
                            leader = null;
                    }
                }
            }
        }
    } finally {
        if (leader == null && q.peek() != null// 当前线程已经移除了过期元素,并且队列中还有元素,唤醒被阻塞的线程
            available.signal();
        lock.unlock();
    }
}

poll方法

获取并移除队列中的过期元素,没有则返回null,不会阻塞

public E poll() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        E first = q.peek();
      // 队列为空,或者不为空但是没有过期,直接返回null
        if (first == null || first.getDelay(NANOSECONDS) > 0)
            return null;
        else
            return q.poll();
    } finally {
        lock.unlock();
    }
}
SynchronousQueue

SynchronousQueue类是一个没有缓冲的队列,不会在队列中维护任何的存储空间,没有存储能力,生产者生产的数据直接会被消费者获取并消费,只会在没有可消费的数据时,阻塞数据的消费者,生产者生产数据的速度绝对不能快于消费者消费数据的速度,否则时间一长,会最终耗尽所有的可用堆内存空间,newCachedThreadPool线程池使用了该队列

TransferQueue

TransferQueue类主要新增了tryTransfer方法和transfer方法,实现类有LinkedTransferQueue

public interface TransferQueue<Eextends BlockingQueue<E{
   
    boolean tryTransfer(E e);
  // 生产者会一直阻塞直到所添加到队列的元素被某一个消费者所消费
    void transfer(E e) throws InterruptedException;

    boolean tryTransfer(E e, long timeout, TimeUnit unit)
        throws InterruptedException
;

    boolean hasWaitingConsumer();

   
    int getWaitingConsumerCount();
}

阻塞方法与非阻塞方法

非阻塞方法
  • add 将元素插入到队列尾部,插入成功返回true, 插入失败抛出异常
  • remove 移除队首元素,移除成功返回true, 移除失败,抛出异常
  • offer(E e) 将元素插入到队尾,插入成功,返回true, 插入失败返回false
  • poll 移除并获取队首元素, 成功返回队首元素,否则返回null
  • peek 获取队首元素, 成功返回队首元素,否则返回null
阻塞方法
  • put 将元素插入到队尾, 如果队列已满,则等待
  • take 从队首取元素, 如果队列为空,则等待
  • offer(E e, long timeout, TimeUnit unit) 向队尾存入元素,如果队列已满,则等待一定的时间,如果时间已到,还是没有插入成功,则返回false,否则返回true
  • E poll(long timeout, TimeUnit unit) 从队首取元素,如果队列为空,则等待一段时间,当时间已到,如果没有取到,,则返回null,否则返回取得的元素

https://zhhll.icu/2022/多线程/并发容器/4.BlockingQueue阻塞队列/

本文由 mdnice 多平台发布

这篇关于并发容器之BlockingQueue阻塞队列的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/848744

相关文章

hdu1180(广搜+优先队列)

此题要求最少到达目标点T的最短时间,所以我选择了广度优先搜索,并且要用到优先队列。 另外此题注意点较多,比如说可以在某个点停留,我wa了好多两次,就是因为忽略了这一点,然后参考了大神的思想,然后经过反复修改才AC的 这是我的代码 #include<iostream>#include<algorithm>#include<string>#include<stack>#include<

poj 3190 优先队列+贪心

题意: 有n头牛,分别给他们挤奶的时间。 然后每头牛挤奶的时候都要在一个stall里面,并且每个stall每次只能占用一头牛。 问最少需要多少个stall,并输出每头牛所在的stall。 e.g 样例: INPUT: 51 102 43 65 84 7 OUTPUT: 412324 HINT: Explanation of the s

poj 2431 poj 3253 优先队列的运用

poj 2431: 题意: 一条路起点为0, 终点为l。 卡车初始时在0点,并且有p升油,假设油箱无限大。 给n个加油站,每个加油站距离终点 l 距离为 x[i],可以加的油量为fuel[i]。 问最少加几次油可以到达终点,若不能到达,输出-1。 解析: 《挑战程序设计竞赛》: “在卡车开往终点的途中,只有在加油站才可以加油。但是,如果认为“在到达加油站i时,就获得了一

高并发环境中保持幂等性

在高并发环境中保持幂等性是一项重要的挑战。幂等性指的是无论操作执行多少次,其效果都是相同的。确保操作的幂等性可以避免重复执行带来的副作用。以下是一些保持幂等性的常用方法: 唯一标识符: 请求唯一标识:在每次请求中引入唯一标识符(如 UUID 或者生成的唯一 ID),在处理请求时,系统可以检查这个标识符是否已经处理过,如果是,则忽略重复请求。幂等键(Idempotency Key):客户端在每次

K8S(Kubernetes)开源的容器编排平台安装步骤详解

K8S(Kubernetes)是一个开源的容器编排平台,用于自动化部署、扩展和管理容器化应用程序。以下是K8S容器编排平台的安装步骤、使用方式及特点的概述: 安装步骤: 安装Docker:K8S需要基于Docker来运行容器化应用程序。首先要在所有节点上安装Docker引擎。 安装Kubernetes Master:在集群中选择一台主机作为Master节点,安装K8S的控制平面组件,如AP

Spring框架5 - 容器的扩展功能 (ApplicationContext)

private static ApplicationContext applicationContext;static {applicationContext = new ClassPathXmlApplicationContext("bean.xml");} BeanFactory的功能扩展类ApplicationContext进行深度的分析。ApplicationConext与 BeanF

poj3750约瑟夫环,循环队列

Description 有N个小孩围成一圈,给他们从1开始依次编号,现指定从第W个开始报数,报到第S个时,该小孩出列,然后从下一个小孩开始报数,仍是报到S个出列,如此重复下去,直到所有的小孩都出列(总人数不足S个时将循环报数),求小孩出列的顺序。 Input 第一行输入小孩的人数N(N<=64) 接下来每行输入一个小孩的名字(人名不超过15个字符) 最后一行输入W,S (W < N),用

POJ2010 贪心优先队列

c头牛,需要选n头(奇数);学校总共有f的资金, 每头牛分数score和学费cost,问合法招生方案中,中间分数(即排名第(n+1)/2)最高的是多少。 n头牛按照先score后cost从小到大排序; 枚举中间score的牛,  预处理左边与右边的最小花费和。 预处理直接优先队列贪心 public class Main {public static voi

容器编排平台Kubernetes简介

目录 什么是K8s 为什么需要K8s 什么是容器(Contianer) K8s能做什么? K8s的架构原理  控制平面(Control plane)         kube-apiserver         etcd         kube-scheduler         kube-controller-manager         cloud-controlle

Java并发编程之——BlockingQueue(队列)

一、什么是BlockingQueue BlockingQueue即阻塞队列,从阻塞这个词可以看出,在某些情况下对阻塞队列的访问可能会造成阻塞。被阻塞的情况主要有如下两种: 1. 当队列满了的时候进行入队列操作2. 当队列空了的时候进行出队列操作123 因此,当一个线程试图对一个已经满了的队列进行入队列操作时,它将会被阻塞,除非有另一个线程做了出队列操作;同样,当一个线程试图对一个空