本文主要是介绍七:ReentrantReadWriteLock —— 读写锁,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
目录
- 1、ReentrantReadWriteLock 入门
- 1.1、概念
- 1.2、案例
- 1.2.1、写写互斥
- 1.2.2 锁降级
- 2、ReentrantReadWriteLock 源码解析
- 2.1、属性
- 2.2、构造方法
- 2.3、内部类
- 2.4、读写状态的设计 —— 按位切割使用
- 2.5、【写锁】加锁方法 `lock()` —— ReentrantReadWriteLock.WriteLock
- 2.4.1、`acquire()` 方法 —— AQS
- 2.4.1.1、`tryAcquire()` 方法 —— AQS,由 ReentrantReadWriteLock.Sync 实现
- 2.4.1.1.1 `writerShouldBlock()` 方法 —— ReentrantReadWriteLock.Sync,由子类实现
- 2.6、【写锁】解锁方法 `unlock()` —— ReentrantReadWriteLock.WriteLock
- 2.6.1、`release()` 方法 —— AQS
- 2.6.1.1、`tryRelease()` 方法 —— AQS,由 ReentrantReadWriteLock.Sync 实现
- 2.7、【读锁】加锁方法 lock() —— ReentrantReadWriteLock.ReadLock
- 2.7.1、`acquireShared()` 方法 —— AQS
- 2.7.1.1、`tryAcquireShared()` 方法 —— AQS,由 ReentrantReadWriteLock.Sync 实现
- 2.7.1.1.1、`readerShouldBlock()` 方法 —— ReentrantReadWriteLock.Sync,由子类实现
- 2.7.1.1.1.1、`apparentlyFirstQueuedIsExclusive()` 方法 —— AQS
- 2.7.1.1.2、`fullTryAcquireShared()` 方法 —— ReentrantReadWriteLock.Sync
- 2.8、【读锁】解锁方法 `unlock()` —— ReentrantReadWriteLock.ReadLock
1、ReentrantReadWriteLock 入门
1.1、概念
ReentrantReadWriteLock
:读写锁。它表示两个锁,一个是读操作相关的锁,称为共享锁;一个是写相关的锁,称为排他锁。适用于读多写少的场景(如果用独占锁,效率及其低下),提高并发性能,但也需要更多的内存和处理器时间来维护状态信息
在没有写操作的情况下,多个线程同时读一个资源没有任何问题,所以应该允许多个线程同时读取共享资源;但是,如果一个线程想去写这些共享资源,就不应该允许其他线程对该资源进行读和写的操作了
- 并发:读读
- 互斥:读写、写读、写写
ReentrantLock
与 ReentrantReadWriteLock
区别:
ReentrantLock
:互斥锁,它允许同一线程对共享资源进行重入,即该线程在获得锁后可以再次获得该锁而不被阻塞ReentrantReadWriteLock
:由一个读锁和一个写锁组成,它允许多个线程同时读取共享资源,但只允许一个线程写入共享资源
当前线程获取读锁的条件:
- 没有其它线程的写锁
- 没有写请求或者有写请求,但当前线程和持有锁的线程是同一个线程
当前线程获取写锁的条件:
- 没有其它线程的读锁
- 没有其它线程的写锁
三个重要的特性:
- 公平选择性:支持非公平(默认)和公平的锁获取方式,吞吐量还是非公平优于公平
- 可重入:读锁和写锁都支持线程重入。读线程获取读锁后,能够再次获取读锁,但是不能获取写锁;写线程在获取写锁之后能够再次获取写锁,同时也可以获取读锁
- 锁降级:线程获取写锁之后获取读锁,再释放写锁,这样实现了写锁变为读锁,也叫锁降级
1.2、案例
1.2.1、写写互斥
public class Test {private static Map<String, Object> map = new HashMap<>();private static ReentrantReadWriteLock lock = new ReentrantReadWriteLock();public static void put(String key, Object value) throws InterruptedException {System.out.println("开始----" +Thread.currentThread().getName());lock.writeLock().lock();try {System.out.println("执行------" +Thread.currentThread().getName());map.put(key, value);Thread.sleep(1000);} finally {lock.writeLock().unlock();System.out.println("结束------" +Thread.currentThread().getName());}}public static Object get(String key) {System.out.println("开始----" +Thread.currentThread().getName());lock.readLock().lock();try {System.out.println("执行------" +Thread.currentThread().getName());return map.get(key);} finally {lock.readLock().unlock();System.out.println("执行------" +Thread.currentThread().getName());}}public static void main(String[] args) throws InterruptedException {Runnable writeTask = () -> {try {put("1", 1);} catch (InterruptedException e) {throw new RuntimeException(e);}};new Thread(writeTask).start();new Thread(writeTask).start();}
}
执行结果:
开始----Thread-1
开始----Thread-0
执行------Thread-1
结束------Thread-1
执行------Thread-0
结束------Thread-0
根据结果可以:线程 1、0 同时执行,但线程 1 获取写锁后,线程 0 阻塞,直至线程 1 执行完释放写锁之后,线程 0 获取写锁执行,最后释放写锁
当然,读者也可以去测试 读写、读读 等。
1.2.2 锁降级
锁降级: 先获取写锁,然后再获取读锁,最后释放写锁的过程。在这个过程中,线程可以先访问共享资源,然后放弃写权限,转而访问读资源【可以避免写操作期间读操作的阻塞,提高并发性能;由写锁降为读锁,释放写锁后,仍然持有读锁】。
还是上述那个例子:
public static void put(String key, Object value) {lock.writeLock().lock();try {map.put(key, value);// 获取读锁lock.readLock().lock();} finally {lock.writeLock().unlock();// 此处如果不释放读锁,其他线程获取写锁时将被阻塞//lock.readLock().unlock();}
}
2、ReentrantReadWriteLock 源码解析
2.1、属性
public class ReentrantReadWriteLock implements ReadWriteLock {// 读锁private final ReentrantReadWriteLock.ReadLock readerLock;// 写锁private final ReentrantReadWriteLock.WriteLock writerLock;// 同步机制final Sync sync;
}
2.2、构造方法
public ReentrantReadWriteLock() {this(false);
}public ReentrantReadWriteLock(boolean fair) {sync = fair ? new FairSync() : new NonfairSync();readerLock = new ReadLock(this);writerLock = new WriteLock(this);
}
默认非公平机制
2.3、内部类
// AQS:分公平、非公平
abstract static class Sync extends AbstractQueuedSynchronizer {// 尝试获取锁protected final boolean tryAcquire(int acquires) {}// 尝试释放锁protected final boolean tryRelease(int releases) {}protected final int tryAcquireShared(int unused) {}// 写锁是否应该阻塞abstract boolean writerShouldBlock();abstract boolean readerShouldBlock();// 每个读线程持有的计数:Sync 构造函数中初始化;读线程计数为 0 时删除private transient ThreadLocalHoldCounter readHolds;// 缓存最后一个读锁的计数private transient HoldCounter cachedHoldCounter;// 记录第一个读锁private transient Thread firstReader = null;// 第一个读锁的计数器private transient int firstReaderHoldCount;// 内部类,继承了 ThreadLocal,和当前线程绑定static final class ThreadLocalHoldCounter extends ThreadLocal<HoldCounter> {public HoldCounter initialValue() {return new HoldCounter();}}// 内部类:读线程的计数器static final class HoldCounter {int count = 0;final long tid = getThreadId(Thread.currentThread());}
}// 非公平
static final class NonfairSync extends Sync {final boolean writerShouldBlock() {return false;}
}// 公平
static final class FairSync extends Sync {final boolean writerShouldBlock() {return hasQueuedPredecessors();}
}// 读锁
public static class ReadLock implements Lock {
}// 写锁
public static class WriteLock implements Lock {private final Sync sync;// 加锁:通过 Sync#acquire() 方法加锁public void lock() {}
}
2.4、读写状态的设计 —— 按位切割使用
在分析 ReentrantLock
的时候,Sync
内部类类是继承于 AQS,以 int state
为线程加锁状态:state == 0
表示未加锁;state > 0
表示已加锁
同样,ReentrantReadWriteLock
也是继承于 AQS 来实现同步,那 int state
是如何同时来区分读锁和写锁的呢?
如果要用一个变量维护多种状态,需要采用 “按位切割使用” 的方式来维护这个变量,将其切分为两部分:
高 16 为表示读
,低 16 为表示写
查看 Sync
类中的静态变量:
abstract static class Sync extends AbstractQueuedSynchronizer {// 高16位为读锁,低16位为写锁static final int SHARED_SHIFT = 16;// 读锁单位static final int SHARED_UNIT = (1 << SHARED_SHIFT);// 读锁最大数量static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;// 写锁最大数量static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;// 获取读锁的数量【高 16 位:移位】static int sharedCount(int c) { return c >>> SHARED_SHIFT; }// 获取写锁的数量【& 与运算】static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }}
移位操作符:在二进制下进行移位【十进制转化为二进制】
<<
:左移。如果左移过程中超过了 32 位,高位就会舍弃,低位补零【最终结果:十进制数 * (2 ^ n)】。如:3 << 2 = 3 * (2 ^ 2) = 12>>
:右移。低位会舍弃,高位补零【最终结果:十进制数 / (2 ^ n)】>>>
:右移零填充运算符【用来执行无符号位移的位运算符】:将一个数的二进制表示向右移动,并用 0 填充左侧的空位
所以,通过移位操作符:
- state【高 16:读;低 16:写】:00000000 00000000 00000000 00000000
- SHARED_UNIT: 00000000 00000001 00000000 00000000
- MAX_COUNT: 00000000 00000000 11111111 11111111
- EXCLUSIVE_MASK: 00000000 00000000 11111111 11111111
2.5、【写锁】加锁方法 lock()
—— ReentrantReadWriteLock.WriteLock
public void lock() {sync.acquire(1);
}
调用 AQS#acquire()
方法【Sync
没有重写此方法,父类有】
2.4.1、acquire()
方法 —— AQS
public final void acquire(int arg) {if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) {selfInterrupt();}
}
acquire()
方法在之前的文章 六:ReentrantLock —— 可重入锁 中已经分析过:只有 tryAcquire()
方法供子类实现。这里实现的子类是 ReentrantReadWriteLock.Sync
2.4.1.1、tryAcquire()
方法 —— AQS,由 ReentrantReadWriteLock.Sync 实现
// 尝试获取写锁
protected final boolean tryAcquire(int acquires) { Thread current = Thread.currentThread();// 获取 state 的值int c = getState();// 获取写锁的数量int w = exclusiveCount(c);if (c != 0) {// 要么就是读锁,要么就是写锁// 1.如果没有写锁【有读锁】,不管是否当前线程是持有锁线程,直接返回【持有读锁线程无法再持有写锁:读写互斥】// 2.如果是写锁,但是不是当前持有锁线程,直接返回【写写互斥】if (w == 0 || current != getExclusiveOwnerThread()) {return false;}// 是写锁,且是当前线程if (w + exclusiveCount(acquires) > MAX_COUNT) {throw new Error("Maximum lock count exceeded");}// 重入setState(c + acquires);return true;}// 没有读锁、写锁,竞争写锁if (writerShouldBlock() || !compareAndSetState(c, c + acquires)) {return false;}setExclusiveOwnerThread(current);return true;
}
2.4.1.1.1 writerShouldBlock()
方法 —— ReentrantReadWriteLock.Sync,由子类实现
分公平锁、非公平锁
- 公平锁:由
AQS#hasQueuedPredecessors()
方法决定【同步等待队列中有线程等待(返回 true),则不竞争锁,直接返回 false;否则,竞争写锁,进行 CAS 操作,返回 true】 - 非公平锁:直接抢占,进行 CAS 操作,返回 true
2.6、【写锁】解锁方法 unlock()
—— ReentrantReadWriteLock.WriteLock
public void unlock() {sync.release(1);
}
调用 AQS#release()
方法【Sync
没有重写此方法,父类有】
2.6.1、release()
方法 —— AQS
public final boolean release(int arg) {if (tryRelease(arg)) {Node h = head;if (h != null && h.waitStatus != 0)unparkSuccessor(h);return true;}return false;
}
tryRelease()
方法被 Sync
类实现
2.6.1.1、tryRelease()
方法 —— AQS,由 ReentrantReadWriteLock.Sync 实现
protected final boolean tryRelease(int releases) {if (!isHeldExclusively()) {// 如果当前线程不是持有锁线程,则抛异常throw new IllegalMonitorStateException();} int nextc = getState() - releases;// boolean free = (exclusiveCount(nextc) == 0);// 判断写锁数量是否为 0【重入】boolean free = exclusiveCount(nextc) == 0;if (free) {setExclusiveOwnerThread(null);}setState(nextc);return free;
}
2.7、【读锁】加锁方法 lock() —— ReentrantReadWriteLock.ReadLock
public void lock() {sync.acquireShared(1);
}
调用 AQS#acquireShared()
方法
2.7.1、acquireShared()
方法 —— AQS
public final void acquireShared(int arg) {if (tryAcquireShared(arg) < 0) {doAcquireShared(arg);}
}
2.7.1.1、tryAcquireShared()
方法 —— AQS,由 ReentrantReadWriteLock.Sync 实现
protected final int tryAcquireShared(int unused) {Thread current = Thread.currentThread();int c = getState();if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) {// 如果有写锁,且当前线程不是持有写锁线程,则返回 -1【写读互斥】return -1;}int r = sharedCount(c);// 如果获取到读锁,则返回 1// 1.readerShouldBlock() 方法// 公平锁:有线程排队,返回 true,直接跳过;否则,返回 false// 非公平锁:为防止写线程饥饿问题,所以判断 head 节点的后驱节点是否为写锁,如果是,返回 true,直接跳过;否则,返回 false// 2.compareAndSetState(c, c + SHARED_UNIT):CAS 操作 state 的高 16 位(读锁)if (!readerShouldBlock() && r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)) {if (r == 0) {// 如果之前没有读锁,则这是第一个线程获取读锁,计数器记为 1,且用 firstReader 执向firstReader = current;firstReaderHoldCount = 1;} else if (firstReader == current) {// 如果之前有读锁,且当前线程等于 firstReader(第一个获取读锁的线程),则自加firstReaderHoldCount++;} else {// 缓存最后一个读线程的计数HoldCounter rh = cachedHoldCounter;// 我是第 2 个拿到读锁的(rh == null)| 我是第 N(N > 2)个拿到读锁的,且当前线程不是最后一个线程,那么将 cachedHoldCounter 设置为最后一个线程if (rh == null || rh.tid != getThreadId(current)) {cachedHoldCounter = rh = readHolds.get();} else if (rh.count == 0) {// 我是第 N(N > 2)个拿到读锁的,当前线程是最后一个线程,且计数器为 0 【读锁释放】readHolds.set(rh);}// 计数器自增rh.count++;}return 1;}// 用于 CAS 操作失败【N (N > 2)个线程同时执行 CAS 操作】return fullTryAcquireShared(current);
}
- 如果写锁被其它线程持有,则获取读锁失败,返回 -1
- 判断是否因同步队列策略而阻塞【公平锁:有线程排队,跳过逻辑;否则,执行逻辑;非公平锁:为防止写线程饥饿问题,所以判断 head 节点的后驱节点是否为写锁,如果是,跳过逻辑;否则,执行逻辑】。如果不阻塞,就执行 CAS 操作更新 state
- 如果步骤 2 失败,因为线程显然不符合条件【CAS 失败或计数饱和】,则使用完整的重试循环链接到版本
2.7.1.1.1、readerShouldBlock()
方法 —— ReentrantReadWriteLock.Sync,由子类实现
公平锁:由 hasQueuedPredecessors()
方法决定【同步等待队列中有线程等待(返回 true),则不竞争锁,直接返回 false;否则,竞争写锁,进行 CAS 操作,返回 true】
final boolean readerShouldBlock() {return hasQueuedPredecessors();
}
非公平锁:由 apparentlyFirstQueuedIsExclusive()
方法决定【如果同步等待队列中不为空,且 head 节点的后继节点为【独占式】节点,则返回 true;否则,返回 false;如果有读锁,写锁是无法获取的,防止写线程饥饿,一直阻塞(读锁是共享的,可以一直获取)】
final boolean readerShouldBlock() {return apparentlyFirstQueuedIsExclusive();
}
2.7.1.1.1.1、apparentlyFirstQueuedIsExclusive()
方法 —— AQS
final boolean apparentlyFirstQueuedIsExclusive() {Node h, s;return (h = head) != null && (s = h.next) != null && !s.isShared() && s.thread != null;
}
如果同步等待队列中不为空,且 head 节点的后继节点为【独占式】节点,则返回 true;否则,返回 false
2.7.1.1.2、fullTryAcquireShared()
方法 —— ReentrantReadWriteLock.Sync
final int fullTryAcquireShared(Thread current) {HoldCounter rh = null;// 自旋for (;;) {int c = getState();// 是否存在写锁if (exclusiveCount(c) != 0) {if (getExclusiveOwnerThread() != current) {// 如果存在写锁,且当前线程不是持有写锁的线程,返回 -1;如果是,就会去持有读锁return -1;}// 公平:有排队的,进入逻辑;没排队的,过!// 非公平:head 的 next 是写不,是,进入逻辑;如果不是,过!} else if (readerShouldBlock()) {if (firstReader == current) {// assert firstReaderHoldCount > 0;} else {// 需要阻塞:去掉最后一个计数为 0 的if (rh == null) {rh = cachedHoldCounter;if (rh == null || rh.tid != getThreadId(current)) {rh = readHolds.get();if (rh.count == 0) {readHolds.remove();}}}if (rh.count == 0) {return -1;}}}if (sharedCount(c) == MAX_COUNT) {throw new Error("Maximum lock count exceeded");}// 尝试获取读锁if (compareAndSetState(c, c + SHARED_UNIT)) {if (sharedCount(c) == 0) {// 第一次获取读锁firstReader = current;firstReaderHoldCount = 1;} else if (firstReader == current) {firstReaderHoldCount++;} else {if (rh == null) {rh = cachedHoldCounter;}if (rh == null || rh.tid != getThreadId(current)) {rh = readHolds.get(); } else if (rh.count == 0)readHolds.set(rh);rh.count++;cachedHoldCounter = rh;}return 1;}}
}
2.8、【读锁】解锁方法 unlock()
—— ReentrantReadWriteLock.ReadLock
protected final boolean tryReleaseShared(int unused) {Thread current = Thread.currentThread();if (firstReader == current) {if (firstReaderHoldCount == 1)firstReader = null;elsefirstReaderHoldCount--;} else {HoldCounter rh = cachedHoldCounter;if (rh == null || rh.tid != getThreadId(current))rh = readHolds.get();int count = rh.count;if (count <= 1) {readHolds.remove();if (count <= 0)throw unmatchedUnlockException();}--rh.count;}for (;;) {int c = getState();int nextc = c - SHARED_UNIT;if (compareAndSetState(c, nextc))return nextc == 0;}
}
- 判断当前线程是否是第一个获取读锁的线程。如果是,它将检查读锁的持有次数,如果持有次数为1,则将第一个读者置为null,否则将持有次数减1
- 如果当前线程不是第一个获取读锁的线程,则会尝试从缓存中获取HoldCounter对象,该对象记录了线程持有的读锁次数。如果缓存中的HoldCounter对象与当前线程不匹配,则从readHolds中获取与当前线程对应的HoldCounter对象。接着,它将该对象的持有次数减1,并在线程持有次数不大于1时,从readHolds中移除该对象。如果持有次数小于等于0,会抛出一个异常
- 最后,该函数进入一个循环,不断尝试将锁的状态减去SHARED_UNIT(表示读锁的单位),并使用compareAndSetState方法原子地更新锁的状态。如果更新成功,它会检查更新后的状态是否为0,如果是,则返回true,表示成功释放了读锁
这篇关于七:ReentrantReadWriteLock —— 读写锁的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!