JDK1.7 ConcurrentHashMap源码分析

2024-01-08 09:18

本文主要是介绍JDK1.7 ConcurrentHashMap源码分析,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

HashMap是java编程中最常用的数据结构之一,由于HashMap非线程安全,因此不适用于并发访问的场景。JDK1.5之前,通常使用HashTable作为HashMap的线程安全版本,HashTable对读写进行全局加锁,在高并发情况下会造成严重的锁竞争和等待,极大地降低了系统的吞吐量,ConcurrentHashMap应运而生。

相比于Hashtable以及Collections.synchronizedMap(),ConcurrentHashMap在线程安全的基础上提供了更好的写并发能力,并且读操作( get)通常不会阻塞,使得读写操作可并行执行,支持客户端修改ConcurrentHashMap的并发访问度,迭代期间也不会抛出 ConcurrentModificationException等等,ConcurrentHashMap有这么多优点,那么它有什么缺点吗?有,一致性问题,这是当前所有分布式系统都面临的问题。

1、Unsafe类和内存屏障简介

关于ConcurrentHashMap的实现,不论是在jdk1.7还是jdk1.8版本中ConcurrentHashMap中使用的最为核心也是最为频繁的就是Unsafe类中的各种native本地方法。所以这里有必要先介绍一下其中用的最多的几个Unsafe类中的核心方法。主要的几个方法是Unsafe.putObjectVolatile(obj,long,obj2)、 Unsafe.getObjectVolatile、 Unsafe.putOrderedObject等,具体这几个方法的区别通过对应的c的源代码可以略知一二:

void sun::misc::Unsafe::putObjectVolatile (jobject obj, jlong offset, jobject value)
{write_barrier ();volatile jobject *addr = (jobject *) ((char *) obj + offset);*addr = value;
}void sun::misc::Unsafe::putObject (jobject obj, jlong offset, jobject value)
{jobject *addr = (jobject *) ((char *) obj + offset);*addr = value;
}//用于和putObjectVolatile进行对比jobject sun::misc::Unsafe::getObjectVolatile (jobject obj, jlong offset)
{volatile jobject *addr = (jobject *) ((char *) obj + offset);jobject result = *addr;read_barrier ();return result;
}void sun::misc::Unsafe::putOrderedObject (jobject obj, jlong offset, jobject value)
{volatile jobject *addr = (jobject *) ((char *) obj + offset);*addr = value;
}

在上述Unsafe几个方法的源代码中,可以看到有write_barrier和read_barrier这两个内存屏障,这两个就是对应的硬件中的写屏障和读屏障,java内存模型中使用的所谓的LoadLoad、LoadStore、StoreStore、StoreLoad这几个屏障就是基于这两个屏障实现的。

写屏障的作用就是禁止了指令的重排序,并且配合C语言中的volatile关键字(C中的volatile关键字只能保证可见性不能保证有序性),通过添加内存屏障+C中的Volatile实现了类似Java中的Volatile关键字语义,即在putObjectVolatile方法中通过内存屏障保证了有序性,再通过volatile保证将对指定地址的操作是马上写入到共享的主存中而不是线程自身的本地工作内存中,这样配合下面的getObjectVolatile方法,就可以确保每次读取到的就是最新的数据。

对于getObjectVolatile而言,可以看到它在返回前加了read_barrier,这个读屏障的作用就是强制去读取主存中的数据而不是线程自己的本地工作内存,这样就确保了读取到的一定是最新的数据。
  
最后就是putOrderedObject,这个方法和putObjectVolatile的区别源码中在于没有加write_barrier,这个方法只保证了更新数据的可见性,但是无法保证有序性,因为没有添加屏障可能会导致最终生成的汇编指令被重排序优化,不过在ConcurrentHashMap中使用到这个方法的地方主要是在put方法更新数据的时候用到了,而关于put是加锁了的,所以在加锁的代码区域,用putOrderedObject比putObjectVolatile好在不需要添加屏障,因为只会有一个线程进行操作,从而允许进行指令优化重排序,性能会更好。

下面开始分析ConcurrentHashMap的实现原理。

2、ConcurrentHashMap数据结构

在这里插入图片描述
ConcurrentHashMap的基本策略是将table细分为多个Segment保存在数组segments中,每个Segment本身又是一个可并发的哈希表,同时每个Segment都是一把ReentrantLock锁,只有在同一个Segment内才存在竞态关系,不同的Segment之间没有锁竞争,这就是分段锁机制。Segment内部拥有一个HashEntry数组,数组中的每个元素又是一个链表。

为了减少占用空间,除了第一个Segment之外,剩余的Segment采用的是延迟初始化的机制,仅在第一次需要时才会创建(通过ensureSegment实现)。为了保证延迟初始化存在的可见性,访问segments数组及table数组中的元素均通过volatile访问,主要借助于Unsafe中原子操作getObjectVolatile来实现,此外,segments中segment的写入,以及table中元素和next域的写入均使用UNSAFE.putOrderedObject来完成。这些操作提供了AtomicReferenceArrays的功能。

3、成员变量

下面介绍ConcurrentHashMap中用到的成员域:

/*** 默认初始容量*/
static final int DEFAULT_INITIAL_CAPACITY = 16;/*** 默认加载因子*/
static final float DEFAULT_LOAD_FACTOR = 0.75f;/*** 默认并发度,该参数会影响segments数组的长度*/
static final int DEFAULT_CONCURRENCY_LEVEL = 16;/*** 最大容量,构造ConcurrentHashMap时指定的大小超过该值则会使用该值替换,* ConcurrentHashMap的大小必须是2的幂,且小于等于1<<30,以确保可使用int索引条目*/
static final int MAXIMUM_CAPACITY = 1 << 30;/*** 每个segment中table数组的最小长度,必须是2的幂,至少为2,以免延迟构造后立即调整大小*/
static final int MIN_SEGMENT_TABLE_CAPACITY = 2;/*** 允许的最大segment数量,用于限定构造函数参数concurrencyLevel的边界,必须是2的幂*/
static final int MAX_SEGMENTS = 1 << 16; // slightly conservative/*** 非锁定情况下调用size和containsValue方法的重试次数,避免由于table连续被修改导致无限重试*/
static final int RETRIES_BEFORE_LOCK = 2;/*** 与当前实例相关联的,用于key哈希码的随机值,以减少哈希冲突*/
private transient final int hashSeed = randomHashSeed(this);private static int randomHashSeed(ConcurrentHashMap instance) {if (sun.misc.VM.isBooted() && Holder.ALTERNATIVE_HASHING) {return sun.misc.Hashing.randomHashSeed(instance);}return 0;
}
/*** 用于索引segment的掩码值,key哈希码的高位用于选择segment*/
final int segmentMask;/*** 用于索引segment偏移值*/
final int segmentShift;/*** segments数组*/
final Segment<K,V>[] segments;

4、构造方法

@SuppressWarnings("unchecked")
public ConcurrentHashMap(int initialCapacity,float loadFactor, int concurrencyLevel) {if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)throw new IllegalArgumentException();if (concurrencyLevel > MAX_SEGMENTS)//concurrencyLevel = MAX_SEGMENTS;// 寻找与给定参数concurrencyLevel匹配的最佳Segment数组ssize,必须是2的幂// 如果concurrencyLevel是2的幂,那么最后选定的ssize就是concurrencyLevel// 否则concurrencyLevel,ssize为大于concurrencyLevel最小2的幂值// concurrencyLevel为7,则ssize为2的3次幂,为8int sshift = 0;int ssize = 1;while (ssize < concurrencyLevel) {++sshift;ssize <<= 1;}this.segmentShift = 32 - sshift;this.segmentMask = ssize - 1;if (initialCapacity > MAXIMUM_CAPACITY)initialCapacity = MAXIMUM_CAPACITY;//计算每个Segment中,table数组的初始大小int c = initialCapacity / ssize;if (c * ssize < initialCapacity)++c;int cap = MIN_SEGMENT_TABLE_CAPACITY;while (cap < c)cap <<= 1;// 创建segments和第一个segmentSegment<K,V> s0 =new Segment<K,V>(loadFactor, (int)(cap * loadFactor),(HashEntry<K,V>[])new HashEntry[cap]);Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];UNSAFE.putOrderedObject(ss, SBASE, s0); //原子按顺序写入segments[0]this.segments = ss;
}/*** map转化为ConcurrentHashMap* @param m the map*/
public ConcurrentHashMap(Map<? extends K, ? extends V> m) {this(Math.max((int) (m.size() / DEFAULT_LOAD_FACTOR) + 1,DEFAULT_INITIAL_CAPACITY),DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL);putAll(m);
}

构造器中各个参数的含义:

initialCapacity:创建ConccurentHashMap对象的初始容量,即ConccurentHashMap中HashEntity的总数量,创建时未指定initialCapacity则默认为16,最大容量为MAXIMUM_CAPACITY。

loadFactor:负载因子,用于计算Segment的threshold域,

concurrencyLevel:即ConccurentHashMap的并发度,支持同时更新ConccurentHashMap且不发生锁竞争的最大线程数。concurrencyLevel不能代表ConccurentHashMap实际并发度,ConccurentHashMap会使用大于等于该值的2的幂指数的最小值作为实际并发度,实际并发度即为segments数组的长度。创建时未指定concurrencyLevel则默认为16。

并发度对ConccurentHashMap性能具有举足轻重的作用,如果并发度设置的过小,会带来严重的锁竞争问题;如果并发度设置的过大,原本位于同一个Segment内的访问会扩散到不同的Segment中,CPU cache命中率会下降,从而引起程序性能下降。

5、原子方法

先看源码:

/*** 获取给定table的第i个元素,使用volatile读语义*/
@SuppressWarnings("unchecked")
static final <K,V> HashEntry<K,V> entryAt(HashEntry<K,V>[] tab, int i) {return (tab == null) ? null :(HashEntry<K,V>) UNSAFE.getObjectVolatile(tab, ((long)i << TSHIFT) + TBASE);
}/*** 设置给定table的第i个元素,使用volatile写入语义*/
static final <K,V> void setEntryAt(HashEntry<K,V>[] tab, int i,HashEntry<K,V> e) {UNSAFE.putOrderedObject(tab, ((long)i << TSHIFT) + TBASE, e);
}
/*** 通过Unsafe提供的具有volatile元素访问语义的操作获取给定Segment数组的第j个元素(如果ss非空)* 注意:因为Segment数组的每个元素只能设置一次(使用完全有序的写入),* 所以一些性能敏感的方法只能依靠此方法作为对空读取的重新检查。*/
@SuppressWarnings("unchecked")
static final <K,V> Segment<K,V> segmentAt(Segment<K,V>[] ss, int j) {long u = (j << SSHIFT) + SBASE;return ss == null ? null :(Segment<K,V>) UNSAFE.getObjectVolatile(ss, u);
}
/*** 根据给定hash获取segment*/
@SuppressWarnings("unchecked")
private Segment<K,V> segmentForHash(int h) {long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;return (Segment<K,V>) UNSAFE.getObjectVolatile(segments, u);
}/*** 根据给定segment和hash获取table entry*/
@SuppressWarnings("unchecked")
static final <K,V> HashEntry<K,V> entryForHash(Segment<K,V> seg, int h) {HashEntry<K,V>[] tab;return (seg == null || (tab = seg.table) == null) ? null :(HashEntry<K,V>) UNSAFE.getObjectVolatile(tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
}

ConcurrentHashMap主要使用以上几个方法对segments数组和table数组进行读写,并保证线程安全性。

其主要使用了UNSAFE.getObjectVolatile提供Volatile读语义,UNSAFE.putOrderedObject提供了Volatile写语义。下面分析这两个方法带来的好处:

UNSAFE.getObjectVolatile使得非volatile声明的对象具有volatile读的语义,那么要使非volatile声明的对象具有volatile写的语义则需要借助操作UNSAFE.putObjectvolatile。

那么UNSAFE.putOrderedObject操作的含义和作用又是什么呢?

为了控制特定条件下的指令重排序和内存可见性问题,Java编译器使用一种叫内存屏障(Memory Barrier,或叫做内存栅栏,Memory Fence)的CPU指令来禁止指令重排序。java中volatile写入使用了内存屏障中的LoadStore屏障规则,对于这样的语句Load1; LoadStore; Store2,在Store2及后续写入操作被刷出前,保证Load1要读取的数据被读取完毕。volatile的写所插入的storeLoad是一个耗时的操作,因此出现了一个对volatile写的升级版本,利用lazySet方法进行性能优化,在实现上对volatile的写只会在之前插入StoreStore屏障,对于这样的语句Store1; StoreStore; Store2,在Store2及后续写入操作执行前,保证Store1的写入操作对其它处理器可见,也就是按顺序的写入。UNSAFE.putOrderedObject正是提供了这样的语义,避免了写写指令重排序,但不保证内存可见性,因此读取时需借助volatile读保证可见性

ConcurrentHashMap正是利用了这些高性能的原子读写来避免加锁带来的开销,从而大幅度提高了性能。

6、ensureSegment方法

ensureSegment用于确定指定的Segment是否存在,不存在则会创建。源码如下:

@SuppressWarnings("unchecked")
private Segment<K,V> ensureSegment(int k) {final Segment<K,V>[] ss = this.segments;long u = (k << SSHIFT) + SBASE; // raw offsetSegment<K,V> seg;if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) {// 以初始化时创建的第一个坑位的ss[0]作为模版进行创建Segment<K,V> proto = ss[0]; // use segment 0 as prototypeint cap = proto.table.length;float lf = proto.loadFactor;int threshold = (int)(cap * lf);HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap];// 二次检查是否有其它线程创建了这个Segmentif ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))== null) { // recheckSegment<K,V> s = new Segment<K,V>(lf, threshold, tab);//这里通过自旋的CAS方式对segments数组中偏移量为u位置设置值为s,这是一种不加锁的方式,//万一有多个线程同时执行这一步,那么只会有一个成功,而其它线程在看到第一个执行成功的线程结果后//会获取到最新的数据从而发现需要更新的坑位已经不为空了,那么就跳出while循环并返回最新的segwhile ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))== null) {if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s))break;}}}return seg;
}

使用getObjectVolatile()方法提供的原子读语义获取指定Segment,如果为空,以构造ConcurrentHashMap对象时创建的Segment为模板,创建新的Segment。ensureSegment在创建Segment期间为不断使用getObjectVolatile()检查指定Segment是否为空,防止其他线程已经创建了Segment。

7、HashEntry

开始介绍Segment之前,我们先看看HashEntry的结构:

static final class HashEntry<K,V> {final int hash;final K key;volatile V value;volatile HashEntry<K,V> next;HashEntry(int hash, K key, V value, HashEntry<K,V> next) {this.hash = hash;this.key = key;this.value = value;this.next = next;}/*** 使用volatile写入语义设置next域*/final void setNext(HashEntry<K,V> n) {UNSAFE.putOrderedObject(this, nextOffset, n);}// Unsafe mechanicsstatic final sun.misc.Unsafe UNSAFE;static final long nextOffset;static {try {UNSAFE = sun.misc.Unsafe.getUnsafe();Class k = HashEntry.class;nextOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("next"));} catch (Exception e) {throw new Error(e);}}
}

HashEntry将value和next声明为volatile ,是为了保证内存可见性,也就是每次读取都将从内存读取最新的值,而不会从缓存读取。同时,写入next域也使用volatile写入语义保证原子性。写入使用原子性操作,读取使用volatile,从而保证了多线程访问的线程安全性。

8、Segment

Segment作为ConccurentHashMap的专用数据结构,同时扩展了ReentrantLock,使得Segment本身就是一把可重入锁,方便执行锁定。Segment内部持有一个始终处于一致状态的entry列表,使得读取操作无需加锁(通过volatile读table数组)。调整tables大小期间通过复制节点实现,使得旧版本的table仍然可以遍历。

Segment仅定义需要加锁的可变方法,针对ConcurrentHashMap中相应方法的调用都会被代理到Segment中的方法。这些可变方法使用scanAndLock和scanAndLockForPut在竞争中使用受控旋转,也就是自旋次数受限制的自旋锁。由于线程的阻塞与唤醒通常伴随着上下文切换、CPU抢占等,都是开销比较大的操作,使用自旋次数受限制的自旋锁,可以提高获取锁的概率,降低线程阻塞的概率,这样可极大地提升性能。**之所以自旋次数受限制,是因为自旋会不断的消耗CPU的时间,无限制的自旋会导致开销增长。因此自旋锁适用于多核CPU下,同时线程等待锁的时间非常短;若等待某个锁需要的时间较长,让线程尽早进入阻塞才是正确的选择。

9、Segment成员变量

/*** 对segment加锁时,在阻塞之前进行自旋的最大次数。* 在多处理器上,使用有限数量的重试来维护在定位节点时获取的高速缓存。*/
static final int MAX_SCAN_RETRIES =Runtime.getRuntime().availableProcessors() > 1 ? 64 : 1;/*** 每个segment的table数组,访问数组中的元素通过entryAt/setEntryAt提供的volatile语义来完成*/
transient volatile HashEntry<K,V>[] table;/*** 元素的数量,只能在锁中或其他volatile读保证可见性之间进行访问*/
transient int count;/*** 当前segment中可变操作发生的次数,put,remove等,可能会溢出32位* 它为CHM isEmpty()和size()方法中的稳定性检查提供了足够的准确性。* 只能在锁中或其他volatile读保证可见性之间进行访问*/
transient int modCount;/*** 当table大小超过此阈值时,对table进行扩容,值为(int)(capacity *loadFactor)*/
transient int threshold;/*** 负载因子*/
final float loadFactor;/*** 构造方法*/
Segment(float lf, int threshold, HashEntry<K,V>[] tab) {this.loadFactor = lf;this.threshold = threshold;this.table = tab;
}

10、scanAndLockForPut方法

 private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {HashEntry<K,V> first = entryForHash(this, hash);//根据key的hash值查找头节点HashEntry<K,V> e = first;HashEntry<K,V> node = null;int retries = -1; // negative while locating nodewhile (!tryLock()) {//尝试获取锁,成功则直接返回,失败则开始自旋HashEntry<K,V> f; // 用于后续重新检查头节点if (retries < 0) {if (e == null) {//结束遍历节点if (node == null) // 创建节点node = new HashEntry<K,V>(hash, key, value, null);retries = 0;}else if (key.equals(e.key))//找到节点,结束遍历retries = 0;elsee = e.next;}else if (++retries > MAX_SCAN_RETRIES) {//达到最大尝试次数lock();//进入加锁方法,失败则会进入排队,阻塞当前线程break;}else if ((retries & 1) == 0 &&(f = entryForHash(this, hash)) != first) {e = first = f; // 头节点变化,需要重新遍历,说明有新节点加入或被移除retries = -1;}}return node;
}

分析:while循环每执行一次,都会尝试获取锁,成功则会返回。retries 初始值设为-1是为了遍历当前hash对应桶的链表,找到则停止遍历,未找到则会预创建一个节点;同时,如果头节点发生变化,则会重新进行遍历,直到自旋次数大于MAX_SCAN_RETRIES,使用lock加锁,获取锁失败则会进入等待队列。

为什么scanAndLockForPut中要遍历一次链表?

前面已经提过scanAndLockForPut使用自旋次数受限制的自旋锁进行优化加锁的方式,此外,遍历一次链表也是一种优化方法,主要是尽可能使当前链表中的节点进入CPU高速缓存,提高缓存命中率,以便获取锁定后的遍历速度更快。实际上加锁后并没有使用已经找到的节点,因为它们必须在锁定下重新获取,以确保更新的顺序一致性,但是遍历一次后通常可以更快地重新定位。这是一种预热优化的方式,scanAndLock中也使用了该优化方式。

scanAndLock内部实现方式与scanAndLockForPut相似,但比scanAndLockForPut更简单,scanAndLock不需要预创建节点。因此scanAndLock主要用于remove和replace操作,而scanAndLockForPut则用于put,这里就不再贴源码。

11、Segment put方法

final V put(K key, int hash, V value, boolean onlyIfAbsent) {HashEntry<K,V> node = tryLock() ? null :scanAndLockForPut(key, hash, value);V oldValue;try {HashEntry<K,V>[] tab = table;int index = (tab.length - 1) & hash;HashEntry<K,V> first = entryAt(tab, index);for (HashEntry<K,V> e = first;;) {if (e != null) {K k;if ((k = e.key) == key ||(e.hash == hash && key.equals(k))) {oldValue = e.value;if (!onlyIfAbsent) {e.value = value;++modCount;}break;}e = e.next;}else {if (node != null)node.setNext(first);elsenode = new HashEntry<K,V>(hash, key, value, first);int c = count + 1;if (c > threshold && tab.length < MAXIMUM_CAPACITY)rehash(node);elsesetEntryAt(tab, index, node);++modCount;count = c;oldValue = null;break;}}} finally {unlock();}return oldValue;
}

Segment中put的大体流程是先对Segment加锁,然后根据(tab.length-1)&hash找到对应的slot,然后遍历slot对应的链表,如果key对应的entry已经存在,根据onlyIfAbsent标志决定是否替换旧值,如果key对应的entry不存在,创建新节点插入链表头部,期间若容量超过限制,判断是否需要进行rehash。

put实现还是比较简单的,下面谈谈其中主要的几个优化点。

scanAndLockForPut的作用已经介绍过了,如果锁能很快的获取,有限次数的自旋可防止线程进入阻塞,有助于提升性能;此外,自旋期间会遍历链表,希望遍历的链表被CPU Cache所缓存,为后续实际put过程中的链表遍历操作提升性能;最后scanAndLockForPut还会预创建节点。

HashEntry<K,V>[] tab = table有什么好处?

从Segment源码可知,table被声明为volatile,为了保证内存可见性,table上的修改都必须立即更新到主存,volatile写实际是具有一定开销的。由于put中的代码都在加锁区执行,锁既能保证可见性,也能保证原子性,因此,不需要针对table进行volatile写,将table引用赋值给局部变量以实现编译、运行时的优化。

node.setNext(first)也是同样的道理,HashEntry的next同样被声明为volatile,因此这里使用优化的方式UNSAFE.putOrderedObject进行volatile写入。

既然put已在加锁区运行,为何访问tab中的元素不直接通过数组索引,而是用entryAt(tab, index)?

加锁保证了table引用的同步语义,但是对table数组中元素的写入使用UNSAFE.putOrderedObject进行顺序写,该操作只是禁止写写指令重排序,不能保证写入后的内存可见性。因此,必须使用entryAt(tab, index)提供的volatile读来获取最新的数据。

12、rehash方法

rehash主要的作用的是扩容,将扩容前table中的节点重新分配到新的table。由于table的capacity都是2的幂,按照2的幂次方扩容为原来的一倍,扩容前在slot i中的元素,扩容后要么还是在slot i里,或者i+扩容前table的capacity的slot中,这样使得只需要移动原来桶中的部分元素即可将所有节点分配到新的table。

为了提高效率,rehash首先找到第一个后续所有节点在扩容后index都保持不变的节点,将这个节点加入扩容后的table的index对应的slot中,然后将这个节点之前的所有节点重排即可。

@SuppressWarnings("unchecked")
private void rehash(HashEntry<K,V> node) {HashEntry<K,V>[] oldTable = table;int oldCapacity = oldTable.length;int newCapacity = oldCapacity << 1;threshold = (int)(newCapacity * loadFactor);HashEntry<K,V>[] newTable =(HashEntry<K,V>[]) new HashEntry[newCapacity];int sizeMask = newCapacity - 1;for (int i = 0; i < oldCapacity ; i++) {HashEntry<K,V> e = oldTable[i];if (e != null) {HashEntry<K,V> next = e.next;int idx = e.hash & sizeMask;if (next == null)   // slot中只有一个元素newTable[idx] = e;else { // Reuse consecutive sequence at same slotHashEntry<K,V> lastRun = e;int lastIdx = idx;for (HashEntry<K,V> last = next;last != null;last = last.next) {int k = last.hash & sizeMask;if (k != lastIdx) {lastIdx = k;lastRun = last;}}newTable[lastIdx] = lastRun;//复制lastRun之前的所有节点for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {V v = p.value;int h = p.hash;int k = h & sizeMask;HashEntry<K,V> n = newTable[k];newTable[k] = new HashEntry<K,V>(h, p.key, v, n);}}}}int nodeIndex = node.hash & sizeMask; //添加新节点node.setNext(newTable[nodeIndex]);newTable[nodeIndex] = node;table = newTable;
}

最好的情况,每个slot链表的所有节点在扩容后index都保持不变,那么只需移动头节点,不用创建新节点即可完成扩容和节点重新分配;最差的情况,每个链表的倒数两个节点在扩容后index不同,那么需要重建并复制所有节点。

13、get方法

get与containsKey两个方法的实现几乎完全一致,都不需要加锁读数据,下面以get源码说明:

public V get(Object key) {Segment<K,V> s; HashEntry<K,V>[] tab;int h = hash(key);long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&(tab = s.table) != null) {for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile(tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);e != null; e = e.next) {K k;if ((k = e.key) == key || (e.hash == h && key.equals(k)))return e.value;}}return null;
}

首先计算key的hash码,计算Segment的index,使用getObjectVolatile()方法提供的原子读语义获取Segment,再计算Segment中slot的索引,使用getObjectVolatile()方法提供的原子读语义获取slot头节点,遍历链表,判断是否存在key相同的节点以及获得该节点的value。由于遍历过程中其他线程可能对链表结构做了调整,因此get和containsKey返回的可能是过时的数据,这就是ConcurrentHashMap的弱一致性。如果要求强一致性,那么必须使用Collections.synchronizedMap()。

14、size方法

public int size() {final Segment<K,V>[] segments = this.segments;int size;boolean overflow; // 为true表示size溢出32位long sum;         // 所有segment中modCount的总和long last = 0L;  int retries = -1; // 第一次迭代不计入重试,因此总共会重试3次try {for (;;) {if (retries++ == RETRIES_BEFORE_LOCK) {for (int j = 0; j < segments.length; ++j)ensureSegment(j).lock(); // force creation}sum = 0L;size = 0;overflow = false;for (int j = 0; j < segments.length; ++j) {Segment<K,V> seg = segmentAt(segments, j);if (seg != null) {sum += seg.modCount;int c = seg.count;if (c < 0 || (size += c) < 0)overflow = true;}}if (sum == last)break;last = sum;}} finally {//由于只有在retries等于RETRIES_BEFORE_LOCK时才会执行强制加锁,并且由于是用的retries++,//所以强制加锁完毕后,retries的值是一定会大于RETRIES_BEFORE_LOCKif (retries > RETRIES_BEFORE_LOCK) {for (int j = 0; j < segments.length; ++j)segmentAt(segments, j).unlock();}}return overflow ? Integer.MAX_VALUE : size;
}

首先不加锁循环所有的Segment(通过Unsafe的getObjectVolatile()以保证原子读语义),计算所有Segment的count之后,同时计算所有Segment的modcount之和sum,如果sum与last相等,说明迭代期间没有发生其他线程修改ConcurrentHashMap的情况,返回size;当重试次数超过预定义的值(RETRIES_BEFORE_LOCK为2)时,对所有的Segment依次进行加锁,再计算size的值。需要注意的是,加锁过程中会强制创建所有的不存在Segment,否则容易出现其他线程创建Segment并进行put,remove等操作。由于retries初始值为-1,因此会尝试3次才会对所有的Segment加锁。

15、弱一致性

ConcurrentHashMap是弱一致的。 ConcurrentHashMap的get,containsKey,clear,iterator 都是弱一致性的。

get和containsKey都是无锁的操作,均通过getObjectVolatile()提供的原子读语义来获得Segment以及对应的链表,然后遍历链表。由于遍历期间其他线程可能对链表结构做了调整,因此get和containsKey返回的可能是过时的数据。如在get执行UNSAFE.getObjectVolatile(segments, u)之后,其他线程若执行了clear操作,则get将读到失效的数据。

由于clear没有使用全局的锁(采用的是分段锁),当清除完一个segment之后,开始清理下一个segment的时候,已经清理segments可能又被加入了数据,因此clear返回的时候,ConcurrentHashMap中是可能存在数据的。因此,clear方法是弱一致的。

ConcurrentHashMap中的迭代器主要包括entrySet、keySet、values方法,迭代器在遍历期间如果已经遍历的table上的内容变化了,迭代器不会抛出ConcurrentModificationException异常。如果未遍历的数组上的内容发生了变化,则有可能反映到迭代过程中,这就是ConcurrentHashMap迭代器弱一致的表现。

到此,jdk1.7中ConcurrentHashMap的源码分析完成。

欢迎小伙伴们留言交流~~
浏览更多文章可关注微信公众号:diggkr

这篇关于JDK1.7 ConcurrentHashMap源码分析的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/582989

相关文章

性能分析之MySQL索引实战案例

文章目录 一、前言二、准备三、MySQL索引优化四、MySQL 索引知识回顾五、总结 一、前言 在上一讲性能工具之 JProfiler 简单登录案例分析实战中已经发现SQL没有建立索引问题,本文将一起从代码层去分析为什么没有建立索引? 开源ERP项目地址:https://gitee.com/jishenghua/JSH_ERP 二、准备 打开IDEA找到登录请求资源路径位置

JAVA智听未来一站式有声阅读平台听书系统小程序源码

智听未来,一站式有声阅读平台听书系统 🌟&nbsp;开篇:遇见未来,从“智听”开始 在这个快节奏的时代,你是否渴望在忙碌的间隙,找到一片属于自己的宁静角落?是否梦想着能随时随地,沉浸在知识的海洋,或是故事的奇幻世界里?今天,就让我带你一起探索“智听未来”——这一站式有声阅读平台听书系统,它正悄悄改变着我们的阅读方式,让未来触手可及! 📚&nbsp;第一站:海量资源,应有尽有 走进“智听

Java ArrayList扩容机制 (源码解读)

结论:初始长度为10,若所需长度小于1.5倍原长度,则按照1.5倍扩容。若不够用则按照所需长度扩容。 一. 明确类内部重要变量含义         1:数组默认长度         2:这是一个共享的空数组实例,用于明确创建长度为0时的ArrayList ,比如通过 new ArrayList<>(0),ArrayList 内部的数组 elementData 会指向这个 EMPTY_EL

如何在Visual Studio中调试.NET源码

今天偶然在看别人代码时,发现在他的代码里使用了Any判断List<T>是否为空。 我一般的做法是先判断是否为null,再判断Count。 看了一下Count的源码如下: 1 [__DynamicallyInvokable]2 public int Count3 {4 [__DynamicallyInvokable]5 get

SWAP作物生长模型安装教程、数据制备、敏感性分析、气候变化影响、R模型敏感性分析与贝叶斯优化、Fortran源代码分析、气候数据降尺度与变化影响分析

查看原文>>>全流程SWAP农业模型数据制备、敏感性分析及气候变化影响实践技术应用 SWAP模型是由荷兰瓦赫宁根大学开发的先进农作物模型,它综合考虑了土壤-水分-大气以及植被间的相互作用;是一种描述作物生长过程的一种机理性作物生长模型。它不但运用Richard方程,使其能够精确的模拟土壤中水分的运动,而且耦合了WOFOST作物模型使作物的生长描述更为科学。 本文让更多的科研人员和农业工作者

MOLE 2.5 分析分子通道和孔隙

软件介绍 生物大分子通道和孔隙在生物学中发挥着重要作用,例如在分子识别和酶底物特异性方面。 我们介绍了一种名为 MOLE 2.5 的高级软件工具,该工具旨在分析分子通道和孔隙。 与其他可用软件工具的基准测试表明,MOLE 2.5 相比更快、更强大、功能更丰富。作为一项新功能,MOLE 2.5 可以估算已识别通道的物理化学性质。 软件下载 https://pan.quark.cn/s/57

工厂ERP管理系统实现源码(JAVA)

工厂进销存管理系统是一个集采购管理、仓库管理、生产管理和销售管理于一体的综合解决方案。该系统旨在帮助企业优化流程、提高效率、降低成本,并实时掌握各环节的运营状况。 在采购管理方面,系统能够处理采购订单、供应商管理和采购入库等流程,确保采购过程的透明和高效。仓库管理方面,实现库存的精准管理,包括入库、出库、盘点等操作,确保库存数据的准确性和实时性。 生产管理模块则涵盖了生产计划制定、物料需求计划、

衡石分析平台使用手册-单机安装及启动

单机安装及启动​ 本文讲述如何在单机环境下进行 HENGSHI SENSE 安装的操作过程。 在安装前请确认网络环境,如果是隔离环境,无法连接互联网时,请先按照 离线环境安装依赖的指导进行依赖包的安装,然后按照本文的指导继续操作。如果网络环境可以连接互联网,请直接按照本文的指导进行安装。 准备工作​ 请参考安装环境文档准备安装环境。 配置用户与安装目录。 在操作前请检查您是否有 sud

线性因子模型 - 独立分量分析(ICA)篇

序言 线性因子模型是数据分析与机器学习中的一类重要模型,它们通过引入潜变量( latent variables \text{latent variables} latent variables)来更好地表征数据。其中,独立分量分析( ICA \text{ICA} ICA)作为线性因子模型的一种,以其独特的视角和广泛的应用领域而备受关注。 ICA \text{ICA} ICA旨在将观察到的复杂信号

Spring 源码解读:自定义实现Bean定义的注册与解析

引言 在Spring框架中,Bean的注册与解析是整个依赖注入流程的核心步骤。通过Bean定义,Spring容器知道如何创建、配置和管理每个Bean实例。本篇文章将通过实现一个简化版的Bean定义注册与解析机制,帮助你理解Spring框架背后的设计逻辑。我们还将对比Spring中的BeanDefinition和BeanDefinitionRegistry,以全面掌握Bean注册和解析的核心原理。