本文主要是介绍ConcurrentHashMap扩容原理 | 存储流程 | 源码探究,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
新人写手,代码菜鸡;笔下生涩,诚惶诚恐。
初试锋芒,尚显青涩;望君指点,愿受教诲。
本篇文章将从源码的层面,探讨ConcurrentHashMap的存储流程以及扩容原理
Java版本为JDK17,源代码可能与其他版本略有不同
推荐阅读:HashMap实现原理、扩容机制
一、构造函数
1.1 无参构造函数
ConcurrentHashMap的无参构造函数是一个空方法
public ConcurrentHashMap() {
}
1.2 指定容量、负载因子
多了一个并发级别concurrencyLevel,没看出来大用途emm......
public ConcurrentHashMap(int initialCapacity,float loadFactor, int concurrencyLevel) {if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)throw new IllegalArgumentException();if (initialCapacity < concurrencyLevel) // Use at least as many binsinitialCapacity = concurrencyLevel; // as estimated threadslong size = (long)(1.0 + (long)initialCapacity / loadFactor);// 找到大于等于 容量 / 负载因子 + 1 的2的幂int cap = (size >= (long)MAXIMUM_CAPACITY) ?MAXIMUM_CAPACITY : tableSizeFor((int)size);this.sizeCtl = cap;
}
1.3 传入Map
public ConcurrentHashMap(Map<? extends K, ? extends V> m) {this.sizeCtl = DEFAULT_CAPACITY;putAll(m);
}
二、关键成员变量
扩容相关的变量都用到了volatile关键字作修饰,保证变量并发可见性
// 默认大小,树化阈值等与HashMap相同
// ...
// 存储元素的Node数组
transient volatile Node<K,V>[] table;
// 协助扩容会用到
private transient volatile Node<K,V>[] nextTable;
// 大于0时,为扩容阈值,等于0时,为初始状态,小于0时,表示Map处于扩容中等一些特殊状态
// 扩容时,该值的低十六位用于表示共同扩容线程数
private transient volatile int sizeCtl;
// 扩容下标指针,用于记录数组扩容到了哪一个元素,从数组末尾开始
private transient volatile int transferIndex;
// 使用Unsafe高效读取和修改变量值
private static final Unsafe U = Unsafe.getUnsafe();
private static final long SIZECTL= U.objectFieldOffset(test.ConcurrentHashMap.class, "sizeCtl");
private static final long TRANSFERINDEX= U.objectFieldOffset(test.ConcurrentHashMap.class, "transferIndex");
// 扩容标志位数
private static final int RESIZE_STAMP_BITS = 16;
// 最大允许扩容线程数
private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1;
// 扩容标志位左移位数,扩容时用于将sizeCtl置为负数
private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS;
// 表示节点的状态,一般扩容时会将正在扩容的节点hash值修改为以下值
static final int MOVED = -1; // hash for forwarding nodes
static final int TREEBIN = -2; // hash for roots of trees
static final int RESERVED = -3; // hash for transient reservations
// 系统可用线程数,用于计算扩容步长
static final int NCPU = Runtime.getRuntime().availableProcessors();
三、存储流程
3.1 put流程
put方法会调用putVal方法,业务逻辑都在putVal中
public V put(K key, V value) {return putVal(key, value, false);
}
final V putVal(K key, V value, boolean onlyIfAbsent) {// key 和 value都不能为nullif (key == null || value == null) throw new NullPointerException();// key 的 hash 值int hash = spread(key.hashCode());int binCount = 0;// 并发场景基本都用到了for循环,执行插入时,若map正在扩容导致插入失败// 待扩容完成后可在此尝试插入for (Node<K,V>[] tab = table;;) {Node<K,V> f; int n, i, fh; K fk; V fv;// table为null或长度为0,执行初始化if (tab == null || (n = tab.length) == 0)tab = initTable();// 如果当前table数组i位置没有元素,直接以cas方式插入当前待插入节点else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value)))break; // no lock when adding to empty bin}// 如果当前当前节点元素hash值为-1,表示正在扩容(扩容时,扩容节点hash值会置为-1)else if ((fh = f.hash) == MOVED)// 则辅助进行扩容,具体后面说tab = helpTransfer(tab, f);// 走到这里,说明table[i]位置节点存在,并且数据不在扩容// 和HashMap类似,onlyIfAbsent为true时,// 仅当节点不存在时,才会插入;// 如果存在,不会更新节点值// 同样的,判断节点相同,需要hash值和key值都一致else if (onlyIfAbsent // check first node without acquiring lock&& fh == hash&& ((fk = f.key) == key || (fk != null && key.equals(fk)))&& (fv = f.val) != null)return fv;else {V oldVal = null;// 锁住当前节点synchronized (f) {// 再次判断当前节点是否发生了变化,防止被其他线程提前修改if (tabAt(tab, i) == f) {// fh < 0时,代表三个特殊状态// static final int MOVED = -1; 当前节点正在扩容// static final int TREEBIN = -2; 当前节点是一颗树// static final int RESERVED = -3; 保留当前位置// 这里fh >= 0表示当前节点f为单节点(不是树或链表),或者为链表if (fh >= 0) {// 链表长度binCount = 1;// 开始寻找待插入节点在链表中的位置// 找不到则直接挂载到链表尾部for (Node<K,V> e = f;; ++binCount) {K ek;// 找到了,直接更新数据(当然onlyIfAbsent为false)if (e.hash == hash &&((ek = e.key) == key ||(ek != null && key.equals(ek)))) {oldVal = e.val;if (!onlyIfAbsent)e.val = value;break;}Node<K,V> pred = e;// 如果链表最后都没有找到,新建一个Node挂到后面if ((e = e.next) == null) {pred.next = new Node<K,V>(hash, key, value);break;}}}// 如果为树,则遍历树,更新或插入节点else if (f instanceof TreeBin) {Node<K,V> p;binCount = 2;if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,value)) != null) {oldVal = p.val;if (!onlyIfAbsent)p.val = value;}}// 防止递归更新产生死锁else if (f instanceof ReservationNode)throw new IllegalStateException("Recursive update");}}if (binCount != 0) {// 如果链表长度大于等于8,尝试将链表转化为树// 只有table容量大于等于64才会尝试转化,否则优先扩容if (binCount >= TREEIFY_THRESHOLD)treeifyBin(tab, i);if (oldVal != null)return oldVal;break;}}}// 计数+1,并检查是否需要扩容(是否达到扩容阈值)addCount(1L, binCount);return null;
}
- 由于需要保证线程安全,所以并不是每一个插入操作都是一次成功(如插入时Map正在扩容),所以用到了for循环;
- 插入元素时,用到了Synchronized锁,锁粒度具体到Node[]数组中的某个Node节点;
- 加锁后又判断了一次if (tabAt(tab, i) == f),类似双重校验锁,ConcurrentHashMap大量使用到了这种处理方式。
3.2 get流程
public V get(Object key) {Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;// 获取hash值int h = spread(key.hashCode());// 确保能查到元素,否则直接返回nullif ((tab = table) != null && (n = tab.length) > 0 &&(e = tabAt(tab, (n - 1) & h)) != null) {// hash和key都相同,返回值if ((eh = e.hash) == h) {if ((ek = e.key) == key || (ek != null && key.equals(ek)))return e.val;}// 节点hash值小于0,表示可能处于扩容状态else if (eh < 0)return (p = e.find(h, key)) != null ? p.val : null;// 遍历链表,继续查找key相同的节点,返回值while ((e = e.next) != null) {if (e.hash == h &&((ek = e.key) == key || (ek != null && key.equals(ek))))return e.val;}}return null;
}// 类似,遍历链表查找key
Node<K,V> find(int h, Object k) {Node<K,V> e = this;if (k != null) {do {K ek;if (e.hash == h &&((ek = e.key) == k || (ek != null && k.equals(ek))))return e;} while ((e = e.next) != null);}return null;
}
四、扩容原理
在说传参为Map的构造函数的时候,见到过这样的方法
public void putAll(Map<? extends K, ? extends V> m) {tryPresize(m.size());for (Map.Entry<? extends K, ? extends V> e : m.entrySet())putVal(e.getKey(), e.getValue(), false);
}
接下来就从tryPresize方法入手
4.1 tryPresize 尝试扩容
private final void tryPresize(int size) {// 如果size >= max / 2,扩容后容量就用max// 否则使用1.5 * size + 1向上扩为2的幂次方int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY :tableSizeFor(size + (size >>> 1) + 1);int sc;// sizeCtl >= 0表示不处于扩容等异常状态while ((sc = sizeCtl) >= 0) {Node<K,V>[] tab = table; int n;// putAll方法可能会执行此分支,table为空if (tab == null || (n = tab.length) == 0) {// 初始化长度sc够就用sc,否则用前面算出来的cn = (sc > c) ? sc : c;// cas把当前map状态置为扩容中,确保只有一个线程进入if (U.compareAndSetInt(this, SIZECTL, sc, -1)) {try {// 确保table引用没被修改// 创建新数组// 将sizeCtl赋值为容量的0.75倍if (table == tab) {@SuppressWarnings("unchecked")Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];table = nt;sc = n - (n >>> 2);}} finally {sizeCtl = sc;}}}// 数组容量已经大于所需要的容量,或者容量达到最大值// 无需扩容,直接退出循环else if (c <= sc || n >= MAXIMUM_CAPACITY)break;else if (tab == table) {// 获取扩容标志// 假设n = 16// 此时rs为1000 0000 0001 1011int rs = resizeStamp(n);// 扩容标志会使得低到高第16位为1,左移16位,保证sc为负数// 此时sc修改为1000 0000 0001 1011 0000 0000 0000 0010// 即-2145714174if (U.compareAndSetInt(this, SIZECTL, sc,(rs << RESIZE_STAMP_SHIFT) + 2))transfer(tab, null);}}
}
- if (U.compareAndSetInt(this, SIZECTL, sc, -1))保证只有一个线程执行new Node[]操作
- 扩容标志,用在协助扩容的时候,识别两个线程处于同一个扩容任务中
4.2 transfer 扩容方法
计算步长与初始化
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {int n = tab.length, stride;// 每个线程迁移数据的步长,数组长度 / 8 / cpu可用线程数// 最小为16if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)stride = MIN_TRANSFER_STRIDE; // subdivide range// 初始化走这if (nextTab == null) { // initiatingtry {@SuppressWarnings("unchecked")// 新建数组,容量为之前的两倍,并赋值Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];nextTab = nt;} catch (Throwable ex) { // try to cope with OOMEsizeCtl = Integer.MAX_VALUE;return;}// 赋值给volatile变量// private transient volatile Node<K,V>[] nextTable;nextTable = nextTab;// private transient volatile int transferIndex;transferIndex = n;}// ......
}
线程接受扩容任务
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {// ......// 新数组长度int nextn = nextTab.length;// 扩容节点,挂载的是新数组ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);// 任务标识符,为true时表示需要接受扩容任务boolean advance = true;// 结束标识符,为true时表示扩容结束boolean finishing = false; // to ensure sweep before committing nextTabfor (int i = 0, bound = 0;;) {Node<K,V> f; int fh;// 需要接受任务时while (advance) {int nextIndex, nextBound;// 第三个分支i被赋值后或扩容结束走此分支// 表示当前线程已接受任务if (--i >= bound || finishing)advance = false;// 数组所有节点扩容任务均分配完成else if ((nextIndex = transferIndex) <= 0) {i = -1;advance = false;}// 修改扩容下标,假设需要扩容的数组长度为32,步长16// 则每次循环transferIndex会减16,直到减为0// nextBound记录当前循环到的transferIndex下标// i赋值为nextIndex-1,(bound - i)即为当前线程需要数据迁移的数组下标// 接受过任务,将advance置为false,不再进入下一循环else if (U.compareAndSetInt(this, TRANSFERINDEX, nextIndex,nextBound = (nextIndex > stride ?nextIndex - stride : 0))) {bound = nextBound;i = nextIndex - 1;advance = false;}}}
}
bound -> i即为当前线程需要进行数据迁移的数组下标。循环结束时,若此时没有其他线程协助扩容,当前线程会再次接受新的一批迁移任务
扩容后处理
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {// ......// ......for (int i = 0, bound = 0;;) {// ......// 当前线程没有分配到任务时会满足i < 0条件if (i < 0 || i >= n || i + n >= nextn) {int sc;// 扩容结束,nextTable置为null// nextTab赋值给table// 阈值置为新数组容量的0.75倍// 第一次进入为false,之后finishing被赋值为true,执行finishing逻辑if (finishing) {nextTable = null;table = nextTab;sizeCtl = (n << 1) - (n >>> 1);return;}// cas使sizeCtl = sizeCtl - 1if (U.compareAndSetInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {// 第一个线程进入此,不会走下面的if// 进入transfer方法时,满足(sc - 2) == resizeStamp(n) << RESIZE_STAMP_SHIFT// 第二次进入,sc = sizeCtl比原来的值少1,可以进入if条件,执行return;// 这里的判断,是为了检查所有辅助扩容线程是否均已扩容完毕// 每个协助扩容线程都会将sc + 1if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)return;// 扩容结束finishing = advance = true;// i重新赋值为数组长度,让最后一个线程检查数组是否迁移完毕,并赋值sc为新的扩容阈值i = n; // recheck before commit}}// ......}
}
数据迁移
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {// ......// ......for (int i = 0, bound = 0;;) {// ......// ......// 走到之后的分支意味着线程接收到了任务// 如果数组i位置为null,不需要迁移,并将数组i位置hash置为MOVED,代表已经迁移过else if ((f = tabAt(tab, i)) == null)/*ForwardingNode(Node<K,V>[] tab) {super(MOVED, null, null);this.nextTable = tab;}fwd hash为-1,key、value均为null*/advance = casTabAt(tab, i, null, fwd);// hash为-1,说明该节点已经迁移过else if ((fh = f.hash) == MOVED)advance = true; // already processed// 进入节点迁移逻辑else {// 上锁,锁的是node数组f节点synchronized (f) {// 校验引用if (tabAt(tab, i) == f) {Node<K,V> ln, hn;// 单节点或链表部分if (fh >= 0) {// 和HashMap类似,将节点hash与n进行按位与分为两组// 拆成低位链表和高位链表// 低位链表放在新数组i位置,高位链表则为n + i位置int runBit = fh & n;Node<K,V> lastRun = f;// 这里的循环是为了找到链表最后面的节点族的第一个节点// 即保证lastRun节点及其之后的节点同属于高/低位链表// 如此设计,最后遍历链表遍历到lastRun即可,因为lastRun之后的节点都和lastRun同属于一批for (Node<K,V> p = f.next; p != null; p = p.next) {int b = p.hash & n;if (b != runBit) {runBit = b;lastRun = p;}}// 先将最后的节点族赋值if (runBit == 0) {ln = lastRun;hn = null;}else {hn = lastRun;ln = null;}// 再次遍历f节点链表,开始构建高/低位链表for (Node<K,V> p = f; p != lastRun; p = p.next) {int ph = p.hash; K pk = p.key; V pv = p.val;// 除了lastRun及其之后的链表是尾插法,其余遍历的链表采用头插法if ((ph & n) == 0)ln = new Node<K,V>(ph, pk, pv, ln);elsehn = new Node<K,V>(ph, pk, pv, hn);}// 新数组赋值,并将旧书组i位置置为fwd,标记已经迁移setTabAt(nextTab, i, ln);setTabAt(nextTab, i + n, hn);setTabAt(tab, i, fwd);advance = true;}// 树节点同样分为高低位数else if (f instanceof TreeBin) {TreeBin<K,V> t = (TreeBin<K,V>)f;TreeNode<K,V> lo = null, loTail = null;TreeNode<K,V> hi = null, hiTail = null;int lc = 0, hc = 0;for (Node<K,V> e = t.first; e != null; e = e.next) {int h = e.hash;TreeNode<K,V> p = new TreeNode<K,V>(h, e.key, e.val, null, null);if ((h & n) == 0) {if ((p.prev = loTail) == null)lo = p;elseloTail.next = p;loTail = p;++lc;}else {if ((p.prev = hiTail) == null)hi = p;elsehiTail.next = p;hiTail = p;++hc;}}// 拆分成两棵树后,判断长度是否小于6,决定是否需要红黑数链表化ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :(hc != 0) ? new TreeBin<K,V>(lo) : t;hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :(lc != 0) ? new TreeBin<K,V>(hi) : t;setTabAt(nextTab, i, ln);setTabAt(nextTab, i + n, hn);setTabAt(tab, i, fwd);advance = true;}// 递归更新抛出异常else if (f instanceof ReservationNode)throw new IllegalStateException("Recursive update");}}}}
}
这里的链表迁移与HashMap略有不同:
HashMap的高低位链表,相对顺序与原顺序相同;
而ConcurrentHashMap,lastRun及其之后的节点顺序保持一致,在链表尾部。其他的节点会以头插法的方式加入到新数组中,如下图,橙色1 4 5为低位,2 3 6 7为高位
4.3 helpTransfer 协助扩容
putVal方法中,当前节点hash为-1时,线程会协助进行扩容
// 如果当前当前节点元素hash值为-1,表示正在扩容(扩容时,扩容节点hash值会置为-1)
else if ((fh = f.hash) == MOVED)// 则辅助进行扩容,具体后面说tab = helpTransfer(tab, f);
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {Node<K,V>[] nextTab; int sc;// 旧数组不为空,并且当前f节点为ForwardingNode// 并且f.nextTable也不为空,transfer方法中,nextTable为新数组if (tab != null && (f instanceof ForwardingNode) &&(nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {// 扩容戳int rs = resizeStamp(tab.length) << RESIZE_STAMP_SHIFT;// 新老数组都是同一个,并且sizeCtl处于扩容状态,协助进行扩容// 不相同可能是扩容完成了,或者发生了新的扩容while (nextTab == nextTable && table == tab &&(sc = sizeCtl) < 0) {// 扩容时,sizeCtl低16位代表扩容的线程数,首次进入时被设置为了2// 当此时同时扩容数达到上限,或者开始执行最终扩容检查时(最终检查时,sizeCtl == rs + 1)// 或者扩容下标为0(任务全部分配)时,退出if (sc == rs + MAX_RESIZERS || sc == rs + 1 ||transferIndex <= 0)break;// sizeCtl + 1,表示此时扩容线程+1if (U.compareAndSetInt(this, SIZECTL, sc, sc + 1)) {// 协助扩容transfer(tab, nextTab);break;}}return nextTab;}return table;
}
transfer方法中,将nextTab赋值给了volatile变量nextTable,旧数组长度n赋值给了transferIndex,此处用来判断是否是同一个扩容任务以及快速判断扩容是否完成。
结语:相对于HashMap,ConcurrentHashMap源码稍微绕一点。本文起到辅助理解作用,其他方法(remove等)原理类似,感兴趣可自行阅读源码。
这篇关于ConcurrentHashMap扩容原理 | 存储流程 | 源码探究的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!