本文主要是介绍Java AQS 阻塞式锁和相关同步器工具的框架,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
8 J.U.C
Java 并发工具包
AQS 原理
AQS:AbstractQueuedSynchronizer(抽象队列同步器),阻塞式锁和相关同步器工具的框架
特点:
- 用 state 属性来表示资源的状态(分独占模式和共享模式),子类需要定义如何维护这个状态,控制如何获取锁和释放锁
- getState - 获取 state 状态
- setState - 设置 state 状态
- compareAndSetState - cas 机制设置 state 状态
- 独占模式是只有一个线程能够访问资源,而共享模式可以允许多个线程访问资源
- 提供了基于 FIFO 的等待队列,类似于 Monitor 的 EntryList
- 条件变量来实现等待、唤醒机制,支持多个条件变量,类似于 Monitor 的 WaitSet
子类主要实现这样一些方法(不重写的话是会默认抛出 UnsupportedOperationException)
- tryAcquire
- tryRelease
- tryAcquireShared
- tryReleaseShared
- isHeldExclusively
获取锁的姿势
// 如果获取锁失败
if (!tryAcquire(arg)) {// 入队, 可以选择阻塞当前线程 park unpark
}
释放锁的姿势
// 如果释放锁成功
if (tryRelease(arg)) {// 让阻塞线程恢复运行
}
实现不可重入锁
package com.rainsun.d7_thread_pool;import lombok.extern.slf4j.Slf4j;import java.net.MulticastSocket;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;@Slf4j(topic = "c.d3_TestAqs")
public class d3_TestAqs {public static void main(String[] args) {MyLock lock = new MyLock();new Thread(()->{lock.lock();try {log.debug("locking...");Thread.sleep(1000);} catch (InterruptedException e) {throw new RuntimeException(e);} finally {log.debug("unlocking...");lock.unlock();}}, "t1").start();new Thread(()->{lock.lock();try {log.debug("locking...");} finally {log.debug("unlocking...");lock.unlock();}}, "t2").start();}
}// 自定义锁(不可重入锁)
class MyLock implements Lock {private MySync sync = new MySync();class MySync extends AbstractQueuedSynchronizer{@Override // 独占锁protected boolean tryAcquire(int arg) {if(compareAndSetState(0, 1)){// 加上了锁,设置 owner 为当前线程setExclusiveOwnerThread(Thread.currentThread());return true;}return false;}@Overrideprotected boolean tryRelease(int arg) {setExclusiveOwnerThread(null);setState(0);return true;}@Override // 是否持有独占锁protected boolean isHeldExclusively() {return getState() == 1;}public Condition newCondition(){return new ConditionObject();}}@Overridepublic void lock() {sync.acquire(1);}@Overridepublic void lockInterruptibly() throws InterruptedException {sync.acquireInterruptibly(1);}@Overridepublic boolean tryLock() {return sync.tryAcquire(1);}@Overridepublic boolean tryLock(long time, TimeUnit unit) throws InterruptedException {return sync.tryAcquireNanos(1, unit.toNanos(time));}@Overridepublic void unlock() {sync.release(1);}@Overridepublic Condition newCondition() {return sync.newCondition();}
}
17:19:48 [t1] c.d3_TestAqs - locking...
17:19:49 [t1] c.d3_TestAqs - unlocking...
17:19:49 [t2] c.d3_TestAqs - locking...
17:19:49 [t2] c.d3_TestAqs - unlocking...
ReentrantLock 原理
ReentrantLock 实现了 Lock 接口,内部有一个 Sync 的类继承自 AbstractQueuedSynchronizer (AQS),内部还有两个锁实现非公平锁和公平锁都继承了 Sync 类
非公平锁实现原理
默认创建非公平锁,当没有竞争时:执行的线程为当前线程
final boolean initialTryLock() {Thread current = Thread.currentThread();if (compareAndSetState(0, 1)) { // first attempt is unguardedsetExclusiveOwnerThread(current);return true;} else if (getExclusiveOwnerThread() == current) {int c = getState() + 1;if (c < 0) // overflowthrow new Error("Maximum lock count exceeded");setState(c);return true;} elsereturn false;
}
第一个竞争出现时:CAS 修改 state 失败,进入FIFO队列等待
Thread-1 执行了
-
CAS 尝试将 state 由 0 改为 1,结果失败
@ReservedStackAccess final void lock() {if (!initialTryLock())acquire(1); }
-
进入 tryAcquire 逻辑,这时 state 已经是1,结果仍然失败
public final void acquire(int arg) {if (!tryAcquire(arg))acquire(null, arg, false, false, false, 0L); }protected final boolean tryAcquire(int acquires) {if (getState() == 0 && compareAndSetState(0, acquires)) {setExclusiveOwnerThread(Thread.currentThread());return true;}return false; }
-
接下来进入 addWaiter 逻辑,将 Thread 封装为Node,构造 Node 队列
- 图中黄色三角表示该 Node 的 waitStatus 状态,其中 0 为默认正常状态
- Node 的创建是懒惰的
- 其中第一个 Node 称为 Dummy(哑元)或哨兵,用来占位,并不关联线程
当前线程进入 acquireQueued 逻辑
- acquireQueued 会在一个死循环中不断尝试获得锁,失败后进入 park 阻塞
- 如果自己是紧邻着 head(排第二位),那么再次 tryAcquire 尝试获取锁,当然这时 state 仍为 1,失败
- 进入 shouldParkAfterFailedAcquire 逻辑,将前驱 node,即 head 的 waitStatus 改为 -1,这次返回 false
- shouldParkAfterFailedAcquire 执行完毕回到 acquireQueued ,再次 tryAcquire 尝试获取锁,当然这时 state 仍为 1,失败
- 当再次进入 shouldParkAfterFailedAcquire 时,这时因为其前驱 node 的 waitStatus 已经是 -1,这次返回 true
- 进入 parkAndCheckInterrupt, Thread-1 park(灰色表示)
再次有多个线程经历上述过程竞争失败,变成这个样子
Thread-0 释放锁,进入 tryRelease 流程,如果成功
- 设置 exclusiveOwnerThread 为 null
- state = 0
当前队列不为 null,并且 head 的 waitStatus = -1,进入 unparkSuccessor 流程
找到队列中离 head 最近的一个 Node(没取消的),unpark 恢复其运行,本例中即为 Thread-1
回到 Thread-1 的 acquireQueued 流程
如果加锁成功(没有竞争),会设置
- exclusiveOwnerThread 为 Thread-1,state = 1
- head 指向刚刚 Thread-1 所在的 Node,该 Node 清空 Thread
- 原本的 head 因为从链表断开,而可被垃圾回收
如果这时候有其它线程来竞争(非公平的体现),例如这时有 Thread-4 来了
如果不巧又被 Thread-4 占了先
- Thread-4 被设置为 exclusiveOwnerThread,state = 1
- Thread-1 再次进入 acquireQueued 流程,获取锁失败,重新进入 park 阻塞
可重入原理
加锁时,state 加1:
final boolean initialTryLock() {Thread current = Thread.currentThread();if (compareAndSetState(0, 1)) { // first attempt is unguardedsetExclusiveOwnerThread(current);return true;} else if (getExclusiveOwnerThread() == current) {int c = getState() + 1;if (c < 0) // overflowthrow new Error("Maximum lock count exceeded");setState(c);return true;} elsereturn false;
}
getExclusiveOwnerThread() == current
表示当前线程已经获得锁了
这时会 int c = getState() + 1;
state 的值加1
解锁时,state 减 1:
@ReservedStackAccess
protected final boolean tryRelease(int releases) {int c = getState() - releases; // state - 1if (getExclusiveOwnerThread() != Thread.currentThread())throw new IllegalMonitorStateException();boolean free = (c == 0); // 只有 state 等于 0 才会 free,释放锁if (free)setExclusiveOwnerThread(null);setState(c);return free;
}
可打断原理
不可打断模式下,即使它被打断,仍会驻留在 AQS 队列中,一直要等到获得锁后方能得知自己被打断了
final int acquire(Node node, int arg, boolean shared,boolean interruptible, boolean timed, long time) {Thread current = Thread.currentThread();// ...for (;;) {Node t;if ((t = tail) == null) {// ...} else {long nanos;spins = postSpins = (byte)((postSpins << 1) | 1);if (!timed)LockSupport.park(this);else if ((nanos = time - System.nanoTime()) > 0L)LockSupport.parkNanos(this, nanos);elsebreak;node.clearStatus();if ((interrupted |= Thread.interrupted()) && interruptible) // Thread.interrupted()打断了,但是如果interruptible为false 并不会 break 停止,只将 interrupted标记 置为 true 了break;}}return cancelAcquire(node, interrupted, interruptible);
}private int cancelAcquire(Node node, boolean interrupted,boolean interruptible) {if (node != null) {node.waiter = null;node.status = CANCELLED;if (node.prev != null)cleanQueue();}if (interrupted) {if (interruptible)return CANCELLED;elseThread.currentThread().interrupt();}return 0;
}
公平锁实现原理
final boolean initialTryLock() {Thread current = Thread.currentThread();int c = getState();if (c == 0) {// 队列中没有元素才会竞争if (!hasQueuedThreads() && compareAndSetState(0, 1)) {setExclusiveOwnerThread(current);return true;}} else if (getExclusiveOwnerThread() == current) {if (++c < 0) // overflowthrow new Error("Maximum lock count exceeded");setState(c);return true;}return false;
}/*** Acquires only if thread is first waiter or empty*/
protected final boolean tryAcquire(int acquires) {// hasQueuedPredecessors 会检查当前 AQS 队列是否有前驱节点,没有才会竞争if (getState() == 0 && !hasQueuedPredecessors() &&compareAndSetState(0, acquires)) {setExclusiveOwnerThread(Thread.currentThread());return true;}return false;
}
读写锁
ReentrantReadWriteLock
当读操作远远高于写操作时,这时候使用 读写锁 让 读-读 可以并发,提高性能。 类似于数据库中的 select … from … lock in share mode
提供一个 数据容器类 内部分别使用读锁保护数据的 read() 方法,写锁保护数据的 write() 方法
package com.rainsun.d8_JUC;import lombok.extern.slf4j.Slf4j;import java.util.concurrent.locks.ReentrantReadWriteLock;@Slf4j(topic = "c.d2_ReadWriteLockTest")
public class d2_ReadWriteLockTest {public static void main(String[] args) throws InterruptedException {DataContainer dataContainer = new DataContainer();new Thread(()->{dataContainer.read();},"t1").start();Thread.sleep(100);new Thread(()->{dataContainer.write();},"t2").start();}
}@Slf4j(topic = "c.DataContainer")
class DataContainer{private Object data;private ReentrantReadWriteLock rw = new ReentrantReadWriteLock();private ReentrantReadWriteLock.ReadLock r = rw.readLock();private ReentrantReadWriteLock.WriteLock w = rw.writeLock();public Object read(){log.debug("get read lock ...");r.lock();try {log.debug("read");Thread.sleep(1000);return data;} catch (InterruptedException e) {throw new RuntimeException(e);} finally {r.unlock();log.debug("release read lock...");}}public void write(){log.debug("get write lock");w.lock();try {log.debug("write");}finally {log.debug("release write lock");w.lock();}}
}
11:22:49 [t1] c.DataContainer - get read lock ...
11:22:49 [t1] c.DataContainer - read
11:22:49 [t2] c.DataContainer - get write lock
11:22:50 [t1] c.DataContainer - release read lock...
11:22:50 [t2] c.DataContainer - write
11:22:50 [t2] c.DataContainer - release write lock
读完才能写,读读是并发的
注意事项
- 读锁不支持条件变量
- 重入时升级不支持:即持有读锁的情况下去获取写锁,会导致获取写锁永久等待
升级:读锁里包含写锁,则写锁会一直等待读完,但是读在等待写完
r.lock();
try {// ...w.lock();try {// ...} finally{w.unlock();}
} finally{r.unlock();
}
- 重入时降级支持:即持有写锁的情况下去获取读锁
写锁后,在内部加上读锁,这样当写锁释放的时候,就获得了读锁。就能保证自己写的数据不会被其他线程获得写锁而篡改(因为我获取了读锁,与其他线程的写锁互斥,其他线程的写锁就不能改变我刚改的值)
class CachedData {Object data;// 是否有效,如果失效,需要重新计算 datavolatile boolean cacheValid;final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();void processCachedData() {rwl.readLock().lock();if (!cacheValid) {// 获取写锁前必须释放读锁rwl.readLock().unlock();rwl.writeLock().lock();try {// 判断是否有其它线程已经获取了写锁、更新了缓存, 避免重复更新if (!cacheValid) {data = ...cacheValid = true;}// 降级为读锁, 释放写锁时可以获取读锁,这样能够让其它线程读取缓存,但写不进去rwl.readLock().lock();} finally {rwl.writeLock().unlock();}}// 自己用完数据, 释放读锁try {use(data);} finally {rwl.readLock().unlock();}}
}
读写锁原理
读写锁用的是同一个 Sycn 同步器,因此等待队列、state 等也是同一个
不同的是,state 中的低 16 为用于给写锁状态,高 16 用于给读锁状态计数
加锁原理
(1)t1 w.lock
t1 成功上锁,流程与 ReentrantLock 加锁相比没有特殊之处,不同是写锁状态占了 state 的低 16 位,而读锁使用的是 state 的高 16 位。
即 state 低 16 位为 1,ownerThread 为当前的 t1 线程
(2)t2. r.lock
t2 执行 r.lock,这时进入读锁的 sync.acquireShared(1) 流程,首先会进入 tryAcquireShared 流程。如果有写锁占据,那么 tryAcquireShared 返回 -1 表示失败
public final void acquireShared(int arg) {if (tryAcquireShared(arg) < 0)acquire(null, arg, true, false, false, 0L);
}
@ReservedStackAccess
protected final int tryAcquireShared(int unused) {Thread current = Thread.currentThread();int c = getState();// 有写锁,且不是当前线程的写锁,那就不可以加读锁// 如果有写锁但是当前线程的写锁,那可以加读锁,因为可以锁降级if (exclusiveCount(c) != 0 &&getExclusiveOwnerThread() != current)return -1;int r = sharedCount(c);if (!readerShouldBlock() &&r < MAX_COUNT &&compareAndSetState(c, c + SHARED_UNIT)) {// ...return 1;}return fullTryAcquireShared(current);
}
3)这时会进入 sync.doAcquireShared(1) 流程,首先也是调用 addWaiter 添加节点,不同之处在于节点被设置为 Node.SHARED 模式而非 Node.EXCLUSIVE 模式,注意此时 t2 仍处于活跃状态
4)t2 会看看自己的节点是不是老二,如果是,还会再次调用 tryAcquireShared(1) 来尝试获取锁
5)如果没有成功,在 doAcquireShared 内 for (;😉 循环一次,把前驱节点的 waitStatus 改为 -1,再 for (;😉 循环一次尝试 tryAcquireShared(1) 如果还不成功,那么在 parkAndCheckInterrupt() 处 park
(6)t3 r.lock,t4 w.lock
这种状态下,假设又有 t3 加读锁和 t4 加写锁,这期间 t1 仍然持有锁,就变成了下面的样子
解锁原理
t1 w.unlock:
这时会走到写锁的 sync.release(1) 流程,调用 sync.tryRelease(1) 成功,变成下面的样子
接下来执行唤醒流程 sync.unparkSuccessor,即让老二恢复运行,这时 t2 在 doAcquireShared 内 parkAndCheckInterrupt() 处恢复运行
这回再来一次 for (;😉 执行 tryAcquireShared 成功则让读锁计数加一
这时 t2 已经恢复运行,接下来 t2 调用 setHeadAndPropagate(node, 1),它原本所在节点被置为头节点
事情还没完,在 setHeadAndPropagate 方法内还会检查下一个节点是否是 shared,如果是则调用 doReleaseShared() 将 head 的状态从 -1 改为 0 并唤醒老二,这时 t3 在 doAcquireShared 内parkAndCheckInterrupt() 处恢复运行
这回再来一次 for (;😉 执行 tryAcquireShared 成功则让读锁计数加一
这时 t3 已经恢复运行,接下来 t3 调用 setHeadAndPropagate(node, 1),它原本所在节点被置为头节点
下一个节点不是 shared 了,因此不会继续唤醒 t4 所在节点
t2 r.unlock,t3 r.unlock
t2 进入 sync.releaseShared(1) 中,调用 tryReleaseShared(1) 让计数减一,但由于计数还不为零
t3 进入 sync.releaseShared(1) 中,调用 tryReleaseShared(1) 让计数减一,这回计数为零了,进入 doReleaseShared() 将头节点从 -1 改为 0 并唤醒老二,即
之后 t4 在 acquireQueued 中 parkAndCheckInterrupt 处恢复运行,再次 for (;😉 这次自己是老二,并且没有其他 竞争,tryAcquire(1) 成功,修改头结点,流程结束
StampedLock 配合戳使用的读写锁
该类自 JDK 8 加入,是为了进一步优化读性能,它的特点是在使用读锁、写锁时都必须配合【戳】使用
乐观读,StampedLock 支持 tryOptimisticRead() 方法(乐观读),读取完毕后需要做一次 戳校验 如果校验通过,表示这期间确实没有写操作,数据可以安全使用,如果校验没通过,需要重新获取读锁,保证数据安全
提供一个 数据容器类 内部分别使用读锁保护数据的 read() 方法,写锁保护数据的 write() 方法
Semaphore 信号量
Semaphore 用来限制能同时访问共享资源的线程上限。
@Slf4j(topic = "c.d3_SemaphoreTest")
public class d3_SemaphoreTest {public static void main(String[] args) {// 1. 创建 semaphore 对象Semaphore semaphore = new Semaphore(3);// 2. 10 线程同时执行for (int i = 0; i < 10; i++) {new Thread(()->{// 3. acquire 获得许可try {semaphore.acquire();} catch (InterruptedException e) {throw new RuntimeException(e);}try {log.debug("running ... ");try {Thread.sleep(1000);} catch (InterruptedException e) {throw new RuntimeException(e);}log.debug("end...");} finally {// 4. 释放许可semaphore.release();}}).start();}}
}
Semaphore 原理
Semaphore 有点像一个停车场,permits 就好像停车位数量,当线程获得了 permits 就像是获得了停车位,然后停车场显示空余车位减一
刚开始,permits(state)为 3,这时 5 个线程来获取资源
假设其中 Thread-1,Thread-2,Thread-4 cas 竞争成功,而 Thread-0 和 Thread-3 竞争失败,进入 AQS 队列 park 阻塞
这时 Thread-4 释放了 permits,接下来 Thread-0 竞争成功,permits 再次设置为 0,设置自己为 head 节点,断开原来的 head 节点,unpark 接下来的 Thread-3 节点,但由于 permits 是 0,因此 Thread-3 在尝试不成功后再次进入 park 状态
CountdownLatch 倒计时锁
用来进行线程同步协作,等待所有线程完成倒计时。
其中构造参数用来初始化等待计数值,await() 用来等待计数归零,countDown() 用来让计数减一
@Slf4j(topic = "c.d4_CountDownLatchTest")
public class d4_CountDownLatchTest {public static void main(String[] args) throws InterruptedException {CountDownLatch countDownLatch = new CountDownLatch(3);new Thread(()->{log.debug("begin...");try {Thread.sleep(1000);} catch (InterruptedException e) {throw new RuntimeException(e);}countDownLatch.countDown();log.debug("end...");}).start();new Thread(()->{log.debug("begin...");try {Thread.sleep(2000);} catch (InterruptedException e) {throw new RuntimeException(e);}countDownLatch.countDown();log.debug("end...");}).start();new Thread(()->{log.debug("begin...");try {Thread.sleep(1500);} catch (InterruptedException e) {throw new RuntimeException(e);}countDownLatch.countDown();log.debug("end...");}).start();log.debug("waiting...");countDownLatch.await();log.debug("waiting end...");}
}
配合线程池使用:
@Slf4j(topic = "c.d4_CountDownLatchTest")
public class d4_CountDownLatchTest {public static void main(String[] args) throws InterruptedException {ExecutorService service = Executors.newFixedThreadPool(4);CountDownLatch countDownLatch = new CountDownLatch(3);service.submit(()->{log.debug("begin...");try {Thread.sleep(1000);} catch (InterruptedException e) {throw new RuntimeException(e);}countDownLatch.countDown();log.debug("end...{}", countDownLatch.getCount());});service.submit(()->{log.debug("begin...");try {Thread.sleep(1500);} catch (InterruptedException e) {throw new RuntimeException(e);}countDownLatch.countDown();log.debug("end...{}", countDownLatch.getCount());});service.submit(()->{log.debug("begin...");try {Thread.sleep(2000);} catch (InterruptedException e) {throw new RuntimeException(e);}countDownLatch.countDown();log.debug("end...{}", countDownLatch.getCount());});service.submit(()->{log.debug("waiting...");try {countDownLatch.await();} catch (InterruptedException e) {throw new RuntimeException(e);}log.debug("waiting end...");});}
}
16:38:58 [pool-1-thread-4] c.d4_CountDownLatchTest - waiting...
16:38:58 [pool-1-thread-2] c.d4_CountDownLatchTest - begin...
16:38:58 [pool-1-thread-1] c.d4_CountDownLatchTest - begin...
16:38:58 [pool-1-thread-3] c.d4_CountDownLatchTest - begin...
16:38:59 [pool-1-thread-1] c.d4_CountDownLatchTest - end...2
16:38:59 [pool-1-thread-2] c.d4_CountDownLatchTest - end...1
16:39:00 [pool-1-thread-3] c.d4_CountDownLatchTest - end...0
16:39:00 [pool-1-thread-4] c.d4_CountDownLatchTest - waiting end...
等待其他线程完成的 demo:
public class d5_CountDownLatchDemo {public static void main(String[] args) throws InterruptedException {ExecutorService service = Executors.newFixedThreadPool(10);CountDownLatch latch = new CountDownLatch(10);String[] allstate = new String[10];for(int i = 0; i < 10; ++ i){int finalI = i;new Thread(()->{for(int load = 0; load <= 100; ++ load){allstate[finalI] = load + "%";System.out.print("\r" + Arrays.toString(allstate));}latch.countDown();}).start();}latch.await();System.out.println("\nGame begin !");service.shutdown();}
}
CyclicBarrier 同步屏障
循环栅栏,用来进行线程协作,等待线程满足某个计数。构造时设置『计数个数』,每个线程执行到某个需要“同步”的时刻调用 await() 方法进行等待,每等待一个线程计数个数就加1
当等待的线程数满足『计数个数』时,即最后一个线程到达同步点的屏障时,屏障解除,线程继续执行
CyclicBarrier 与 CountDownLatch 的主要区别在于 CyclicBarrier 是可以重用的
CyclicBarrier 可以被比喻为『人满发车』,没坐满,前面已上车的线程就需要继续等待
@Slf4j(topic = "c.d6_CyclicBarrierTest")
public class d6_CyclicBarrierTest {public static void main(String[] args) {ExecutorService service = Executors.newFixedThreadPool(2);CyclicBarrier barrier = new CyclicBarrier(2, ()->{log.debug("task1 task2 finish...");});for (int i = 0; i < 3; i++) {service.submit(()->{log.debug("task1 begin...");try {Thread.sleep(1000);} catch (InterruptedException e) {throw new RuntimeException(e);}try {// 初始计数-1barrier.await();log.debug("task1 end...");} catch (Exception e) {e.printStackTrace();}});service.submit(()->{log.debug("task2 begin...");try {Thread.sleep(1000);} catch (InterruptedException e) {throw new RuntimeException(e);}try {// 初始计数-1barrier.await();log.debug("task2 end...");} catch (Exception e) {e.printStackTrace();}});}service.shutdown();}
}
这篇关于Java AQS 阻塞式锁和相关同步器工具的框架的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!