死磕Java并发编程(8):CurrentHashMap如何实现高效地线程安全?在Java8中有哪些设计实现的演进?

本文主要是介绍死磕Java并发编程(8):CurrentHashMap如何实现高效地线程安全?在Java8中有哪些设计实现的演进?,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

这篇文章一开始我以为会比较简单,但是在深入源码分析时,遇到了很大的阻碍,比前面我们分析AQS以及读写锁的源码要难理解的多,断断续续也写了4天了。 如果你看完还是没有理解的话,那我在这里表示深深的歉意,同时也欢迎你和我一起沟通。

本文是死磕Java并发编程系列文章的第 8 篇,主角就是 java 并发包中提供的 CurrentHashMap 这是一个线程安全且高效的HashMap ,也是面试的高频考点。本文主要围绕 ConcurrentHashMap 如何实现高效地线程安全?以及在Java8中它从设计实现上有哪些演进?

网上关于 HashMap 和 ConcurrentHashMap 的文章确实不少,不过目前的很多分析资料还是基于其早期版本,所以才想自己也写一篇,把细节说清楚说透,尤其像 Java8 中的 ConcurrentHashMap 的演进设计实现,大部分文章都说不清楚。希望能降低大家学习的成本,不希望大家看了一篇又一篇文章,最终还是模模糊糊。

阅读前提:

本文会涉及源码分析,所以至少读者要熟悉它们的接口使用,同时,对于并发,读者至少要知道 CAS、ReentrantLock、UNSAFE 操作这几个基本的知识,文中不会对这些知识进行介绍,如果不了解可以看看笔者的前面对应文章.

为什么需要 ConcurrentHashMap?

在并发编程中使用HashMap可能导致程序死循环。而使用线程安全的HashTable效率又非常低下(它的实现就是将put、get、size等方法加上 synchronized 关键字),基于以上两个原因,便有了ConcurrentHashMap的登场机会。

可能有的同学对 HashMap 为什么会在并发中出现死循环从而导致 cpu 占用达到100% 不太了解,这里直接展示一段示例代码,运行它就会出现死循环。

static final HashMap<String, String> map = new HashMap<String, String>(2);
Thread t = new Thread(new Runnable() {@Overridepublic void run() {for (int i = 0; i < 100000; i++) {int finalI1 = i;new Thread(new Runnable() {@Overridepublic void run() {map.put(String.valueOf(finalI1), "");}}, "ftf" + i).start();}}
}, "ftf");
t.start();
t.join();

死循环的概率还是非常低的,比较难以重现。为了提高出现概率,采用多次迭代测试。笔者在测试时 出现在 128次。

感兴趣的同学可以用 jstack 分析下,网上有很多教程,这里就不展开 排查过程了。原因就是:HashMap 在并发执行 put 操作时会引起死循环,是因为多线程会导致 HashMap 的 Entry 链表形成环形数据结构,一旦形成环形数据结构,Entry 的 next 节点永远不为空,就会产生死循环获 取 Entry 。从而导致CPU占用将近100%。

Java7中ConcurrentHashMap分析

首先,我这里强调,ConcurrentHashMap 的设计实现其实一直在演化,比如在 Java 8 中就发生了非常大的变化(Java 7 其实也有不少更新),所以,我这里将比较分析结构、实现机制等方面,对比不同版本的主要区别。

在 Java7 中的实现是基于:

  • 分离锁,也就是将内部进行分段(Segment),里面则是 HashEntry 的数组,和 HashMap 类似,哈希相同的条目也是以链表形式存放。
  • HashEntry 内部使用 volatile 的 value 字段来保证可见性,也利用了不可变对象的机制以改进利用 Unsafe 提供的底层能力,比如 volatile access,去直接完成部分操作,以最优化性能,毕竟 Unsafe 中的很多操作都是 JVM intrinsic 优化过的。

具体实现可以理解为:ConcurrentHashMap 是由 Segment 数组结构和 HashEntry 数组结构组成。Segment是一种可重入锁(继承了ReentrantLock),在ConcurrentHashMap里扮演锁的角色;HashEntry 则用于存储键值对数据。一个 ConcurrentHashMap 里包含一个 Segment 数组。Segment 的结构和 HashMap 类似,是一种 数组和链表结构。一个 Segment 里包含一个 HashEntry 数组,每个 HashEntry 是一个链表结构的元 素,每个Segment 守护着一个 HashEntry 数组里的元素,当对 HashEntry 数组的数据进行修改时, 必须首先获得与它对应的Segment锁。

在这里插入图片描述

初始化

在构造的时候,Segment 的数量由所谓的 concurrentcyLevel 决定,默认是 16,所以理论上,这个时候,最多可以同时支持 16 个线程并发写,只要它们的操作分别分布在不同的 Segment 上。也可以在相应构造函数直接指定。注意,Java 需要它是 2 的幂数值,如果输入是类似 15 这种非幂值,会被自动调整到 16 之类 2 的幂数值。并且一旦初始化后,它是不可以扩容的。

ConcurrentHashMap 初始化方法是通过 initialCapacity 、loadFactor 和 concurrencyLevel 等几个参数来初始化segment数组、段偏移量 segmentShift 、段掩码 segmentMask 和每个 segment 里的 HashEntry 数组来实现的。

下面结合源代码一起来看下,为方便理解,我直接注释在代码段里:

public ConcurrentHashMap(int initialCapacity,float loadFactor, int concurrencyLevel) {if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)throw new IllegalArgumentException();if (concurrencyLevel > MAX_SEGMENTS)concurrencyLevel = MAX_SEGMENTS;// Find power-of-two sizes best matching argumentsint sshift = 0;int ssize = 1;// 计算并行级别 ssize,因为要保持并行级别是 2 的 n 次方while (ssize < concurrencyLevel) {++sshift;ssize <<= 1;}// 我们这里先不要那么烧脑,用默认值,concurrencyLevel 为 16,sshift 为 4// 那么计算出 segmentShift 为 28,segmentMask 为 15,后面会用到这两个值this.segmentShift = 32 - sshift;this.segmentMask = ssize - 1;if (initialCapacity > MAXIMUM_CAPACITY)initialCapacity = MAXIMUM_CAPACITY;// initialCapacity 是设置整个 map 初始的大小,// 这里根据 initialCapacity 计算 Segment 数组中每个位置可以分到的大小// 如 initialCapacity 为 64,那么每个 Segment 可以分到 4 个int c = initialCapacity / ssize;if (c * ssize < initialCapacity)++c;// 默认 MIN_SEGMENT_TABLE_CAPACITY 是 2,这个值也是有讲究的,因为这样的话,对于具体的槽上,插入一个元素不至于扩容,插入第二个的时候才会扩容int cap = MIN_SEGMENT_TABLE_CAPACITY; while (cap < c)cap <<= 1;// 创建 Segment 数组,// 并创建数组的第一个元素 segment[0]Segment<K,V> s0 =new Segment<K,V>(loadFactor, (int)(cap * loadFactor),(HashEntry<K,V>[])new HashEntry[cap]);Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];// 往数组写入 segment[0]UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]this.segments = ss;
}

初始化完成,我们得到了一个 Segment 数组。这里之所以 segments 数组的长度必须是2的N次幂,主要是为了能通过按位与的散列算法来定位 segments 数组的索引。

注意:concurrencyLevel 的最大值是65535,这意味着 segments 数组的长度最大为65536, 对应的二进制是16位。

为了加深读者理解,下面来分析下,当我们用 new ConcurrentHashMap() 无参构造函数进行初始化的,那么初始化完成后:

  • Segment 数组长度为 16,不可以扩容
  • Segment[i] 的默认大小为 2,负载因子是 0.75,得出初始阈值为 1.5,也就是以后插入第一个元素不会触发扩容,插入第二个会进行第一次扩容
  • 这里初始化了 segment[0],其他位置还是 null,至于为什么要初始化 segment[0],后面的代码会介绍
  • 当前段偏移量 segmentShift 的值为 32 - 4 = 28,段掩码 segmentMask 为 16 - 1 = 15,这两个值马上就会用到

get 操作

get 操作需要保证的是可见性,所以并没有什么同步逻辑。

  1. 计算 hash 值,找到 segment 数组中的具体位置
  2. segment 中也是一个数组(HashEntry数组),根据 hash 找到数组中具体的位置
  3. 到这里是链表了,HashEntry 是链表中的元素,顺着链表进行查找即可
public V get(Object key) {Segment<K,V> s; // manually integrate access methods to reduce overheadHashEntry<K,V>[] tab;// 1. hash 值,32位int h = hash(key);// 利用位操作替换普通数学运算,将hash值无符号左移段偏移量位,即取高四位,在与上段掩码(15二进制位1111)long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;// 2. 根据 hash 找到对应的 segment,利用Unsafe直接进行volatile accessif ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&(tab = s.table) != null) {// 3. 找到segment 内部数组相应位置的链表,遍历for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile(tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);e != null; e = e.next) {K k;if ((k = e.key) == key || (e.hash == h && key.equals(k)))return e.value;}}return null;
}

put操作

对于 put 操作,首先是通过二次哈希避免哈希冲突,然后以 Unsafe 调用方式,直接获取相应的 Segment,然后进行线程安全的 put 操作:

public V put(K key, V value) {Segment<K,V> s;if (value == null)throw new NullPointerException();// 1. 二次哈希,以保证数据的分散性,避免哈希冲突int hash = hash(key.hashCode());// 2. 根据 hash 值找到 Segment 数组中的位置 j//    hash 是 32 位,无符号右移 segmentShift(28) 位,剩下高 4 位,//    然后和 segmentMask(15) 做一次与操作,也就是说 j 是 hash 值的高 4 位,也就是segment的数组下标int j = (hash >>> segmentShift) & segmentMask;// 刚刚说了,初始化的时候初始化了 segment[0],但是其他位置还是 null,// ensureSegment(j) 对 segment[j] 进行初始化if ((s = (Segment<K,V>)UNSAFE.getObject          // nonvolatile; recheck(segments, (j << SSHIFT) + SBASE)) == null) //  in ensureSegments = ensureSegment(j);// 3. 插入新值到 槽 s 中return s.put(key, hash, value, false);
}

其核心逻辑实现在下面的内部方法中:

final V put(K key, int hash, V value, boolean onlyIfAbsent) {// 在往该 segment 写入前,需要先获取该 segment 的独占锁//    先看主流程,后面还会具体介绍这部分内容HashEntry<K,V> node = tryLock() ? null :scanAndLockForPut(key, hash, value);V oldValue;try {// 这个是 segment 内部的数组HashEntry<K,V>[] tab = table;// 再利用 hash 值,求应该放置的数组下标int index = (tab.length - 1) & hash;// first 是数组该位置处的链表的表头HashEntry<K,V> first = entryAt(tab, index);// 下面这串 for 循环虽然很长,不过也很好理解,想象当前位置链表不为空则先遍历找是否存在,如果存在则覆盖,否则放到合适的位置for (HashEntry<K,V> e = first;;) {if (e != null) {K k;if ((k = e.key) == key ||(e.hash == hash && key.equals(k))) {oldValue = e.value;if (!onlyIfAbsent) {// 覆盖旧值e.value = value;++modCount;}break;}// 继续顺着链表走e = e.next;}else {// node 到底是不是 null,这个要看获取锁的过程,不过和这里都没有关系。// 如果不为 null,那就直接将它设置为链表表头;如果是null,初始化并设置为链表表头。if (node != null)node.setNext(first);elsenode = new HashEntry<K,V>(hash, key, value, first);int c = count + 1;// 如果超过了该 segment 的阈值,这个 segment 需要扩容if (c > threshold && tab.length < MAXIMUM_CAPACITY)rehash(node); // 扩容后面也会具体分析else// 没有达到阈值,将 node 放到数组 tab 的 index 位置,// 其实就是将新的节点设置成原链表的表头setEntryAt(tab, index, node);++modCount;count = c;oldValue = null;break;}}} finally {// 解锁unlock();}return oldValue;
}

rehash:扩容操作

重复一下,segment 数组不能扩容,扩容是 segment 数组某个位置内部的数组 HashEntry<K,V>[] 进行扩容,扩容后,容量为原来的 2 倍。

首先,我们要回顾一下触发扩容的地方,put 的时候,如果判断该值的插入会导致该 segment 的元素个数超过阈值,那么先进行扩容,再插值,读者这个时候可以回去 put 方法看一眼。

该方法不需要考虑并发,因为到这里的时候,是持有该 segment 的独占锁的。

// 方法参数上的 node 是这次扩容后,需要添加到新的数组中的数据。
private void rehash(HashEntry<K,V> node) {HashEntry<K,V>[] oldTable = table;int oldCapacity = oldTable.length;// 2 倍int newCapacity = oldCapacity << 1;threshold = (int)(newCapacity * loadFactor);// 创建新数组HashEntry<K,V>[] newTable =(HashEntry<K,V>[]) new HashEntry[newCapacity];// 新的掩码,如从 16 扩容到 32,那么 sizeMask 为 31,对应二进制 ‘000...00011111’int sizeMask = newCapacity - 1;// 遍历原数组,老套路,将原数组位置 i 处的链表拆分到 新数组位置 i 和 i+oldCap 两个位置for (int i = 0; i < oldCapacity ; i++) {// e 是链表的第一个元素HashEntry<K,V> e = oldTable[i];if (e != null) {HashEntry<K,V> next = e.next;// 计算应该放置在新数组中的位置,// 假设原数组长度为 16,e 在 oldTable[3] 处,那么 idx 只可能是 3 或者是 3 + 16 = 19int idx = e.hash & sizeMask;// 该位置处只有一个元素,那比较好办,直接放到新数组中对应的位置if (next == null)   newTable[idx] = e;else { // Reuse consecutive sequence at same slot// e 是链表表头HashEntry<K,V> lastRun = e;// idx 是当前链表的头结点 e 的新位置int lastIdx = idx;// 下面这个 for 循环会找到一个 lastRun 节点,这个节点之后的所有元素是将要放到一起的for (HashEntry<K,V> last = next;last != null;last = last.next) {int k = last.hash & sizeMask;if (k != lastIdx) {lastIdx = k;lastRun = last;}}// 将 lastRun 及其之后的所有节点组成的这个链表放到 lastIdx 这个位置newTable[lastIdx] = lastRun;// 下面的操作是处理 lastRun 之前的节点,//    这些节点可能分配在另一个链表中,也可能分配到上面的那个链表中for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {V v = p.value;int h = p.hash;int k = h & sizeMask;HashEntry<K,V> n = newTable[k];newTable[k] = new HashEntry<K,V>(h, p.key, v, n);}}}}// 将新来的 node 放到新数组中刚刚的 两个链表之一 的 头部int nodeIndex = node.hash & sizeMask; // add the new nodenode.setNext(newTable[nodeIndex]);newTable[nodeIndex] = node;table = newTable;
}

上面有两个挨着的 for 循环,第一个 for 有什么用呢?

这块代码我看的时候真的是很难理解,反复看了好几遍,主要原因还是对链表操作不太熟悉,这里为大家在解释下,帮助理解。这里需要进行第一个 for 循环,主要是因为扩容后,原来数组位置 i 的 HashEntry 是一个链表,那么这个链表的元素对应扩容后的数组位置必然是 i 或 i+oldCap。第一个循环就是为遍历当前位置 i 的链表找到最后一个在新数组中位置相同的节点 lastRun。

如果没有第一个 for 循环,也是可以工作的,但是,这个 for 循环下来,如果 lastRun 的后面还有比较多的节点,那么这次就是值得的。因为我们只需要克隆 lastRun 前面的节点,后面的一串节点跟着 lastRun 进行赋值就可以了,不需要做任何操作。

Doug Lea 大神这块的想法一般人可能是想不到的,毕竟作为并发包中的基础类 都是为了将并发性能做到极致的。但是也有最差的情况,就是找到的 lastRun 是链表的最后一个元素,或者排在倒数,那么这次遍历就显得多余了,而且浪费了性能。不过 Doug Lea 也说了,根据统计,如果使用默认的阈值,大约只有 1/6 的节点需要克隆

size 操作

知道了 ConcurrentHashMap 通过分段锁实现高性能且线程安全的原理。试想,如果不进行同步,简单的计算所有 Segment 的总值,可能会因为并发 put,导致结果不准确,但是直接锁定所有 Segment 进行计算,就会变得非常昂贵。

所以,ConcurrentHashMap 的实现是通过重试机制(RETRIES_BEFORE_LOCK,指定重试次数 2),来试图获得可靠值。如果没有监控到发生变化(通过对比 Segment.modCount),就直接返回,否则获取锁进行操作。

Java8中ConcurrentHashMap分析

在 Java 8 和之后的版本中,ConcurrentHashMap 发生了哪些变化呢?

  • Java8 对 HashMap 进行了一些修改,最大的不同就是利用了红黑树,所以其由 数组+链表+红黑树 组成。

  • 因为不再使用 Segment,初始化操作大大简化,修改为 lazy-load 形式,这样可以有效避免初始开销,解决了老版本很多人抱怨的这一点。

  • 数据存储利用 volatile 来保证可见性。

  • 使用 CAS 等操作,在特定场景进行无锁并发操作。

这里介绍一个最常问的问题:Java8 为什么使用红黑树呢?

根据 Java7 HashMap 的介绍,我们知道,查找的时候,根据 hash 值我们能够快速定位到数组的具体下标,但是之后的话,需要顺着链表一个个比较下去才能找到我们需要的,时间复杂度取决于链表的长度,为 O(n)

为了降低这部分的开销,在 Java8 中,当链表中的元素达到了 8 个时,会将链表转换为红黑树,在这些位置进行查找的时候可以降低时间复杂度为 O(logN)

在这里插入图片描述

注意,上图是示意图,主要是描述结构,不会达到这个状态的,因为这么多数据的时候早就扩容了。

Java7 中使用 HashEntry 来代表每个 HashMap 中的数据节点,Java8 中使用 Node,基本没有区别,都是 key,value,hash 和 next 这四个属性,不过,Node 只能用于链表的情况,红黑树的情况需要使用 TreeNode

先看看现在的数据存储内部实现,我们可以发现 Key 是 final 的,因为在生命周期中,一个条目的 Key 发生变化是不可能的;与此同时 val,则声明为 volatile,以保证可见性。

static class Node<K,V> implements Map.Entry<K,V> {final int hash;final K key;volatile V val;volatile Node<K,V> next;// …
}

为了提高大家的阅读体验,我这里就不再介绍 get 方法和构造函数了,相对比较简单,相信你如果看懂了 Java7 的实现一定没有啥问题的。直接看并发的 put 是如何实现的。

put操作

final V putVal(K key, V value, boolean onlyIfAbsent) {if (key == null || value == null) throw new NullPointerException();// 得到 hash 值int hash = spread(key.hashCode());// 用于记录相应链表的长度int binCount = 0;for (Node<K,V>[] tab = table;;) {Node<K,V> f; int n, i, fh;// 如果数组"空",进行数组初始化if (tab == null || (n = tab.length) == 0)// 初始化数组tab = initTable();// 找该 hash 值对应的数组下标,得到第一个节点 felse if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {// 如果数组该位置为空,// 用一次 CAS 操作将这个新值放入其中即可,这个 put 操作差不多就结束了,可以拉到最后面// 如果 CAS 失败,那就是有并发操作,进到下一个循环就好了if (casTabAt(tab, i, null,new Node<K,V>(hash, key, value, null)))break;                   // no lock when adding to empty bin}// hash 居然可以等于 MOVED,这个需要到后面才能看明白,不过从名字上也能猜到,肯定是因为在扩容else if ((fh = f.hash) == MOVED)// 帮助数据迁移,这个等到看完数据迁移部分的介绍后,再理解这个就很简单了tab = helpTransfer(tab, f);else { // 到这里就是说,f 是该位置的头结点,而且不为空V oldVal = null;// 获取数组该位置的头结点的监视器锁synchronized (f) {if (tabAt(tab, i) == f) {if (fh >= 0) { // 头结点的 hash 值大于 0,说明是链表// 用于累加,记录链表的长度binCount = 1;// 遍历链表for (Node<K,V> e = f;; ++binCount) {K ek;// 如果发现了"相等"的 key,判断是否要进行值覆盖,然后也就可以 break 了if (e.hash == hash &&((ek = e.key) == key ||(ek != null && key.equals(ek)))) {oldVal = e.val;if (!onlyIfAbsent)e.val = value;break;}// 到了链表的最末端,将这个新值放到链表的最后面Node<K,V> pred = e;if ((e = e.next) == null) {pred.next = new Node<K,V>(hash, key,value, null);break;}}}else if (f instanceof TreeBin) { // 红黑树Node<K,V> p;binCount = 2;// 调用红黑树的插值方法插入新节点if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,value)) != null) {oldVal = p.val;if (!onlyIfAbsent)p.val = value;}}}}if (binCount != 0) {// 判断是否要将链表转换为红黑树,临界值和 HashMap 一样,也是 8if (binCount >= TREEIFY_THRESHOLD)// 这个方法和 HashMap 中稍微有一点点不同,那就是它不是一定会进行红黑树转换,// 如果当前数组的长度小于 64,那么会选择进行数组扩容,而不是转换为红黑树// 具体源码我们就不看了,扩容部分后面说treeifyBin(tab, i);if (oldVal != null)return oldVal;break;}}}// addCount(1L, binCount);return null;
}

put 的主流程看完了,但是至少留下了几个问题,第一个是初始化,第二个是扩容,第三个是帮助数据迁移,这些我们都会在后面进行一一介绍。

初始化数组:initTable

从上面的 put 操作可以看到,数组初始化是在 put 操作时进行的,采用的 lazy-load 形式。

这个比较简单,主要就是初始化一个合适大小的数组,然后会设置 sizeCtl。

初始化方法中的并发问题是通过对 sizeCtl 进行一个 CAS 操作来控制的。

private final Node<K,V>[] initTable() {Node<K,V>[] tab; int sc;while ((tab = table) == null || tab.length == 0) {// 初始化的"功劳"被其他线程"抢去"了if ((sc = sizeCtl) < 0)Thread.yield(); // lost initialization race; just spin// CAS 一下,将 sizeCtl 设置为 -1,代表抢到了锁else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {try {if ((tab = table) == null || tab.length == 0) {// DEFAULT_CAPACITY 默认初始容量是 16int n = (sc > 0) ? sc : DEFAULT_CAPACITY;@SuppressWarnings("unchecked")// 初始化数组,长度为 16 或初始化时提供的长度Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];// 将这个数组赋值给 table,table 是 volatile 的table = tab = nt;// 如果 n 为 16 的话,那么这里 sc = 12// 其实就是 0.75 * nsc = n - (n >>> 2);}} finally {// 设置 sizeCtl 为 sc,我们就当是 12 吧sizeCtl = sc;}break;}}return tab;
}

链表转为红黑树:treeifyBin

这里需要注意:前面我们在 put 源码分析也说过,treeifyBin 不一定就会进行红黑树转换,也可能是仅仅做数组扩容。我们还是进行源码分析吧。

private final void treeifyBin(Node<K,V>[] tab, int index) {Node<K,V> b; int n, sc;if (tab != null) {if ((n = tab.length) < MIN_TREEIFY_CAPACITY)// 所以,如果数组长度小于 64 的时候,其实也就是 32 或者 16 或者更小的时候,会进行数组扩容tryPresize(n << 1);// b 是头结点else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {// 加锁synchronized (b) {if (tabAt(tab, index) == b) {TreeNode<K,V> hd = null, tl = null;// 下面就是遍历链表,建立一颗红黑树for (Node<K,V> e = b; e != null; e = e.next) {TreeNode<K,V> p =new TreeNode<K,V>(e.hash, e.key, e.val,null, null);if ((p.prev = tl) == null)hd = p;elsetl.next = p;tl = p;}// 将红黑树设置到数组相应位置中setTabAt(tab, index, new TreeBin<K,V>(hd));}}}}
}

扩容:tryPresize

如果说 Java8 ConcurrentHashMap 的源码不简单,那么说的就是扩容操作和迁移操作。

这里的扩容也是做翻倍扩容的,扩容后数组容量为原来的 2 倍。

这个方法要完完全全看懂还需要看之后的 transfer 方法,读者应该提前知道这点。

// 首先要说明的是,方法参数 size 传进来的时候就已经翻了倍了
private final void tryPresize(int size) {// c:size 的 1.5 倍,再加 1,再往上取最近的 2 的 n 次方。int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY :tableSizeFor(size + (size >>> 1) + 1);// 目前容器大小int sc;while ((sc = sizeCtl) >= 0) {Node<K,V>[] tab = table; int n;// 这个 if 分支和之前说的初始化数组的代码基本上是一样的,在这里,我们可以不用管这块代码if (tab == null || (n = tab.length) == 0) {n = (sc > c) ? sc : c;if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {try {if (table == tab) {@SuppressWarnings("unchecked")Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];table = nt;// 16-4=12sc = n - (n >>> 2);}} finally {sizeCtl = sc;}}}// 小于目前大小或者达到最大值直接返回else if (c <= sc || n >= MAXIMUM_CAPACITY)break;// 说明是tab过程中没有发生变化,类似于懒加载的双重检查else if (tab == table) {// value = 32795int rs = resizeStamp(n);if (sc < 0) {Node<K,V>[] nt;if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||transferIndex <= 0)break;// 2. 用 CAS 将 sizeCtl 加 1,然后执行 transfer 方法 此时 nextTab 不为 nullif (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))transfer(tab, nt);}// 1. 将 sizeCtl 设置为 (rs << RESIZE_STAMP_SHIFT) + 2)// 我是没看懂这个值真正的意义是什么?不过可以计算出来的是,结果是一个比较大的负数// 调用 transfer 方法,此时 nextTab 参数为 nullelse if (U.compareAndSwapInt(this, SIZECTL, sc,(rs << RESIZE_STAMP_SHIFT) + 2))transfer(tab, null);}}
}

这个方法的核心在于 sizeCtl 值的操作,首先将其设置为一个负数,然后执行 transfer(tab, null),再下一个循环将 sizeCtl 加 1,并执行 transfer(tab, nt),之后可能是继续 sizeCtl 加 1,并执行 transfer(tab, nt)。

所以,可能的操作就是执行 1 次 transfer(tab, null) + 多次 transfer(tab, nt),这里怎么结束循环的需要看完 transfer 源码才清楚。

数据迁移:transfer

下面这个方法有点长,将原来的 tab 数组的元素迁移到新的 nextTab 数组中。

虽然我们之前说的 tryPresize 方法中多次调用 transfer 不涉及多线程,但是这个 transfer 方法可以在其他地方被调用,典型地,我们之前在说 put 方法的时候就说过了,请往上看 put 方法,是不是有个地方调用了 helpTransfer 方法,helpTransfer 方法会调用 transfer 方法的。

此方法支持多线程执行,外围调用此方法的时候,会保证第一个发起数据迁移的线程,nextTab 参数为 null,之后再调用此方法的时候,nextTab 不会为 null。

阅读源码之前,先要理解并发操作的机制。原数组长度为 n,所以我们有 n 个迁移任务,让每个线程每次负责一个小任务是最简单的,每做完一个任务再检测是否有其他没做完的任务,帮助迁移就可以了,而 Doug Lea 使用了一个 stride,简单理解就是步长,每个线程每次负责迁移其中的一部分,如每次迁移 16 个小任务。所以,我们就需要一个全局的调度者来安排哪个线程执行哪几个任务,这个就是属性 transferIndex 的作用。

第一个发起数据迁移的线程会将 transferIndex 指向原数组最后的位置,然后从后往前的 stride 个任务属于第一个线程,然后将 transferIndex 指向新的位置,再往前的 stride 个任务属于第二个线程,依此类推。当然,这里说的第二个线程不是真的一定指代了第二个线程,也可以是同一个线程,这个读者应该能理解吧。其实就是将一个大的迁移任务分为了一个个任务包。

private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {int n = tab.length, stride;// stride 在单核下直接等于 n,多核模式下为 (n>>>3)/NCPU,最小值是 16// stride 可以理解为”步长“,有 n 个位置是需要进行迁移的,// 将这 n 个任务分为多个任务包,每个任务包有 stride 个任务if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)stride = MIN_TRANSFER_STRIDE; // subdivide range// 如果 nextTab 为 null,先进行一次初始化// 前面我们说了,外围会保证第一个发起迁移的线程调用此方法时,参数 nextTab 为 null// 之后参与迁移的线程调用此方法时,nextTab 不会为 nullif (nextTab == null) {            // initiatingtry {@SuppressWarnings("unchecked")// 容量翻倍Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];nextTab = nt;} catch (Throwable ex) {      // try to cope with OOMEsizeCtl = Integer.MAX_VALUE;return;}// nextTable 是 ConcurrentHashMap 中的属性nextTable = nextTab;// transferIndex 也是 ConcurrentHashMap 的属性,用于控制迁移的位置transferIndex = n;}int nextn = nextTab.length;// ForwardingNode 翻译过来就是正在被迁移的 Node// 这个构造方法会生成一个Node,key、value 和 next 都为 null,关键是 hash 为 MOVED// 后面我们会看到,原数组中位置 i 处的节点完成迁移工作后,// 就会将位置 i 处设置为这个 ForwardingNode,用来告诉其他线程该位置已经处理过了// 所以它其实相当于是一个标志。ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);// advance 指的是做完了一个位置的迁移工作,可以准备做下一个位置的了boolean advance = true;boolean finishing = false; // to ensure sweep before committing nextTab// 下面这个 for 循环,最难理解的在前面,而要看懂它们,应该先看懂后面的,然后再倒回来// i 是位置索引,bound 是边界,注意是从后往前for (int i = 0, bound = 0;;) {Node<K,V> f; int fh;// 下面这个 while 真的是不好理解// advance 为 true 表示可以进行下一个位置的迁移了// 简单理解为:i 指向了 transferIndex,bound 指向了 transferIndex-stridewhile (advance) {int nextIndex, nextBound;if (--i >= bound || finishing)advance = false;// 将 transferIndex 值赋给 nextIndex// 这里 transferIndex 一旦小于等于 0,说明原数组的所有位置都有相应的线程去处理了else if ((nextIndex = transferIndex) <= 0) {i = -1;advance = false;}else if (U.compareAndSwapInt(this, TRANSFERINDEX, nextIndex,nextBound = (nextIndex > stride ?nextIndex - stride : 0))) {// 看括号中的代码,nextBound 是这次迁移任务的边界,注意,是从后往前bound = nextBound;i = nextIndex - 1;advance = false;}}if (i < 0 || i >= n || i + n >= nextn) {int sc;// 所有的迁移操作已经完成if (finishing) {nextTable = null;// 将新的 nextTab 赋值给 table 属性,完成迁移table = nextTab;// 重新计算 sizeCtl:n 是原数组长度,所以 sizeCtl 得出的值将是新数组长度的 0.75 倍sizeCtl = (n << 1) - (n >>> 1);return;}// 之前我们说过,sizeCtl 在迁移前会设置为 (rs << RESIZE_STAMP_SHIFT) + 2// 然后,每有一个线程参与迁移就会将 sizeCtl 加 1,// 这里使用 CAS 操作对 sizeCtl 进行减 1,代表做完了属于自己的任务if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {// 任务结束,方法退出if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)return;// 到这里,说明 (sc - 2) == resizeStamp(n) << RESIZE_STAMP_SHIFT,// 也就是说,所有的迁移任务都做完了,也就会进入到上面的 if(finishing){} 分支了finishing = advance = true;i = n; // recheck before commit}}// 如果位置 i 处是空的,没有任何节点,那么放入刚刚初始化的 ForwardingNode ”空节点“else if ((f = tabAt(tab, i)) == null)advance = casTabAt(tab, i, null, fwd);// 该位置处是一个 ForwardingNode,代表该位置已经迁移过了else if ((fh = f.hash) == MOVED)advance = true; // already processedelse {// 对数组该位置处的结点加锁,开始处理数组该位置处的迁移工作synchronized (f) {if (tabAt(tab, i) == f) {Node<K,V> ln, hn;// 头结点的 hash 大于 0,说明是链表的 Node 节点if (fh >= 0) {// 下面这一块和 Java7 中的 ConcurrentHashMap 迁移是差不多的,// 需要将链表一分为二,// 找到原链表中的 lastRun,然后 lastRun 及其之后的节点是一起进行迁移的// lastRun 之前的节点需要进行克隆,然后分到两个链表中int runBit = fh & n;Node<K,V> lastRun = f;for (Node<K,V> p = f.next; p != null; p = p.next) {int b = p.hash & n;if (b != runBit) {runBit = b;lastRun = p;}}if (runBit == 0) {ln = lastRun;hn = null;}else {hn = lastRun;ln = null;}for (Node<K,V> p = f; p != lastRun; p = p.next) {int ph = p.hash; K pk = p.key; V pv = p.val;if ((ph & n) == 0)ln = new Node<K,V>(ph, pk, pv, ln);elsehn = new Node<K,V>(ph, pk, pv, hn);}// 其中的一个链表放在新数组的位置 isetTabAt(nextTab, i, ln);// 另一个链表放在新数组的位置 i+nsetTabAt(nextTab, i + n, hn);// 将原数组该位置处设置为 fwd,代表该位置已经处理完毕,// 其他线程一旦看到该位置的 hash 值为 MOVED,就不会进行迁移了setTabAt(tab, i, fwd);// advance 设置为 true,代表该位置已经迁移完毕advance = true;}else if (f instanceof TreeBin) {// 红黑树的迁移TreeBin<K,V> t = (TreeBin<K,V>)f;TreeNode<K,V> lo = null, loTail = null;TreeNode<K,V> hi = null, hiTail = null;int lc = 0, hc = 0;for (Node<K,V> e = t.first; e != null; e = e.next) {int h = e.hash;TreeNode<K,V> p = new TreeNode<K,V>(h, e.key, e.val, null, null);if ((h & n) == 0) {if ((p.prev = loTail) == null)lo = p;elseloTail.next = p;loTail = p;++lc;}else {if ((p.prev = hiTail) == null)hi = p;elsehiTail.next = p;hiTail = p;++hc;}}// 如果一分为二后,节点数少于 8,那么将红黑树转换回链表ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :(hc != 0) ? new TreeBin<K,V>(lo) : t;hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :(lc != 0) ? new TreeBin<K,V>(hi) : t;// 将 ln 放置在新数组的位置 isetTabAt(nextTab, i, ln);// 将 hn 放置在新数组的位置 i+nsetTabAt(nextTab, i + n, hn);// 将原数组该位置处设置为 fwd,代表该位置已经处理完毕,// 其他线程一旦看到该位置的 hash 值为 MOVED,就不会进行迁移了setTabAt(tab, i, fwd);// advance 设置为 true,代表该位置已经迁移完毕advance = true;}}}}}
}

说到底,transfer 这个方法并没有实现所有的迁移任务,每次调用这个方法只实现了 transferIndex 往前 stride 个位置的迁移工作,其他的需要由外围来控制。

这个时候,再回去仔细看 tryPresize 方法可能就会更加清晰一些了。

总结

今天我从线程安全问题开始,分析为什么要使用ConcurrentHashMap,进而分析了 Java 7 和 Java 8 中 ConcurrentHashMap 是如何设计实现的,从源码层面说明白了具体的实现逻辑。其实仔细认真读懂后你会发现其实也不是太难。希望本文让你对 ConcurrentHashMap 面试相关问题轻松的应对,同时作为并发编程技巧对你在日常开发可以有所帮助。

笔者水平有限,文章难免会有纰漏,如有错误欢迎一起交流探讨,我会第一时间更正的。都看到这里了,码字不易,可爱的你记得 “点赞” 哦,我需要你的正向反馈。

(全文完)fighting!

参考资料:

  1. 周志明:《深入理解 Java 虚拟机》
  2. 方腾飞:《Java 并发编程的艺术》
  3. https://www.javadoop.com/

个人公众号

在这里插入图片描述

  • 觉得写得还不错的小伙伴麻烦动手点赞+关注
  • 文章如果存在不正确的地方,麻烦指出,非常感谢您的阅读;
  • 推荐大家关注我的公众号,会为你定期推送原创干货文章,拉你进优质学习社群;
  • github地址:github.com/coderluojust/qige_blogs

这篇关于死磕Java并发编程(8):CurrentHashMap如何实现高效地线程安全?在Java8中有哪些设计实现的演进?的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/825059

相关文章

JVM 的类初始化机制

前言 当你在 Java 程序中new对象时,有没有考虑过 JVM 是如何把静态的字节码(byte code)转化为运行时对象的呢,这个问题看似简单,但清楚的同学相信也不会太多,这篇文章首先介绍 JVM 类初始化的机制,然后给出几个易出错的实例来分析,帮助大家更好理解这个知识点。 JVM 将字节码转化为运行时对象分为三个阶段,分别是:loading 、Linking、initialization

Spring Security 基于表达式的权限控制

前言 spring security 3.0已经可以使用spring el表达式来控制授权,允许在表达式中使用复杂的布尔逻辑来控制访问的权限。 常见的表达式 Spring Security可用表达式对象的基类是SecurityExpressionRoot。 表达式描述hasRole([role])用户拥有制定的角色时返回true (Spring security默认会带有ROLE_前缀),去

浅析Spring Security认证过程

类图 为了方便理解Spring Security认证流程,特意画了如下的类图,包含相关的核心认证类 概述 核心验证器 AuthenticationManager 该对象提供了认证方法的入口,接收一个Authentiaton对象作为参数; public interface AuthenticationManager {Authentication authenticate(Authenti

Spring Security--Architecture Overview

1 核心组件 这一节主要介绍一些在Spring Security中常见且核心的Java类,它们之间的依赖,构建起了整个框架。想要理解整个架构,最起码得对这些类眼熟。 1.1 SecurityContextHolder SecurityContextHolder用于存储安全上下文(security context)的信息。当前操作的用户是谁,该用户是否已经被认证,他拥有哪些角色权限…这些都被保

Spring Security基于数据库验证流程详解

Spring Security 校验流程图 相关解释说明(认真看哦) AbstractAuthenticationProcessingFilter 抽象类 /*** 调用 #requiresAuthentication(HttpServletRequest, HttpServletResponse) 决定是否需要进行验证操作。* 如果需要验证,则会调用 #attemptAuthentica

Spring Security 从入门到进阶系列教程

Spring Security 入门系列 《保护 Web 应用的安全》 《Spring-Security-入门(一):登录与退出》 《Spring-Security-入门(二):基于数据库验证》 《Spring-Security-入门(三):密码加密》 《Spring-Security-入门(四):自定义-Filter》 《Spring-Security-入门(五):在 Sprin

Java架构师知识体认识

源码分析 常用设计模式 Proxy代理模式Factory工厂模式Singleton单例模式Delegate委派模式Strategy策略模式Prototype原型模式Template模板模式 Spring5 beans 接口实例化代理Bean操作 Context Ioc容器设计原理及高级特性Aop设计原理Factorybean与Beanfactory Transaction 声明式事物

不懂推荐算法也能设计推荐系统

本文以商业化应用推荐为例,告诉我们不懂推荐算法的产品,也能从产品侧出发, 设计出一款不错的推荐系统。 相信很多新手产品,看到算法二字,多是懵圈的。 什么排序算法、最短路径等都是相对传统的算法(注:传统是指科班出身的产品都会接触过)。但对于推荐算法,多数产品对着网上搜到的资源,都会无从下手。特别当某些推荐算法 和 “AI”扯上关系后,更是加大了理解的难度。 但,不了解推荐算法,就无法做推荐系

Java进阶13讲__第12讲_1/2

多线程、线程池 1.  线程概念 1.1  什么是线程 1.2  线程的好处 2.   创建线程的三种方式 注意事项 2.1  继承Thread类 2.1.1 认识  2.1.2  编码实现  package cn.hdc.oop10.Thread;import org.slf4j.Logger;import org.slf4j.LoggerFactory

hdu1043(八数码问题,广搜 + hash(实现状态压缩) )

利用康拓展开将一个排列映射成一个自然数,然后就变成了普通的广搜题。 #include<iostream>#include<algorithm>#include<string>#include<stack>#include<queue>#include<map>#include<stdio.h>#include<stdlib.h>#include<ctype.h>#inclu