并发编程实战14-LongAdder统计加法器-计数器jdk8

2024-01-24 15:38

本文主要是介绍并发编程实战14-LongAdder统计加法器-计数器jdk8,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

传统的原子锁AtomicLong/AtomicInt虽然也可以处理大量并发情况下的计数器,但是由于使用了自旋等待,当存在大量竞争时,会存在大量自旋等待,而导致CPU浪费,而有效计算很少,降低了计算效率。

而LongAdder是根据ConcurrentHashMap这类为并发设计的类的基本原理——锁分段,通过维护一个计数数组cells来保存多个计数副本,每个线程只对自己的副本进行操作,最后汇总来得到最终结果。

Adder的英文意思为加法器,从字面意思上就可以理解,LongAdder的作用是用来进行统计计算的。比如,我们需要一个计数器,加入计数值为N,当大量的线程访问时,N的值将会出现并发安全,但是,我们并不打算在每个单独的子线程中去查看这个N值,只是在所有的子线程完毕后,才会统计N的总数,即相当于一种并行计算,但是我们不在乎中间结果,只在乎,中间计算都进行了就可以。因此,我们对可以变量创建一个副本N’,每个线程都有这个副本,只对这个副本进行操作,最后,将所有的副本进行汇总就是最终的N值,即:N=N1'+N2'+N3'...+Nn'

详细原理:内部源码使用了一个cells的数组来保存每个线程的副本,第一个线程会初始化数组cells。新来到一个线程,会指向第一个cell,并检测是否有其他线程使用CAS,如果没有,则所有线程公用一个cel副本;否则,如果自旋等待,则当前线程新建一个cell副本,加入到数组cells,对副本进行操作。当所有操作完成后,使用sum函数就可以统计所有的操作。

这里也用到了cas操作,主要是为了防止多线程公用一个cell,导致一个cell数据并发出错,另外注意LongAdder的并不是细粒度的,所以不能使用中间值

  1. 线程副本cell
    LongAdder继承自Striped64类,主要逻辑方法在Striped64中。
    其中cell即为每个线程的副本变量,使用的是一个静态内部类Cell,维护了一个value变量
@sun.misc.Contended static final class Cell {volatile long value;Cell(long x) { value = x; }final boolean cas(long cmp, long val) {return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);}// Unsafe mechanicsprivate static final sun.misc.Unsafe UNSAFE;private static final long valueOffset;static {try {UNSAFE = sun.misc.Unsafe.getUnsafe();Class<?> ak = Cell.class;valueOffset = UNSAFE.objectFieldOffset(ak.getDeclaredField("value"));} catch (Exception e) {throw new Error(e);}}}

问题:既然Cell这么简单,为什么不直接用long value?
我的理解:可能要设计到Java值引用问题,如果直接使用long value,在线程B进行数组扩容后,线程A修改的value值将无法反映到线程B;而使用Cell对象对value进行包装后,由于线程A只会修改Cell对象的value值,不会修改Cell对象,所以线程B的Cell中的value也会跟着改变,就不会出现值问题。更具体的了解,可能需要了解一下java的值传递(java没有引用传递)

2.Striped64主体代码
主方法为longAccumulate方法,主要处理数组的扩容和冲突检查

abstract class Striped64 extends Number {  @sun.misc.Contended static final class Cell { ... }  /** Number of CPUS, to place bound on table size */  static final int NCPU = Runtime.getRuntime().availableProcessors();  // cell数组,长度一样要是2^n,可以类比为jdk1.7的ConcurrentHashMap中的segments数组  transient volatile Cell[] cells;  // 累积器的基本值,在两种情况下会使用:  // 1、没有遇到并发的情况,直接使用base,速度更快;  // 2、多线程并发初始化table数组时,必须要保证table数组只被初始化一次,因此只有一个线程能够竞争成功,这种情况下竞争失败的线程会尝试在base上进行一次累积操作  transient volatile long base;  // 自旋标识,在对cells进行初始化,或者后续扩容时,需要通过CAS操作把此标识设置为1(busy,忙标识,相当于加锁),取消busy时可以直接使用cellsBusy = 0,相当于释放锁  transient volatile int cellsBusy;  Striped64() {  }  // 使用CAS更新base的值  final boolean casBase(long cmp, long val) {  return UNSAFE.compareAndSwapLong(this, BASE, cmp, val);  }  // 使用CAS将cells自旋标识更新为1  // 更新为0时可以不用CAS,直接使用cellsBusy就行  final boolean casCellsBusy() {  return UNSAFE.compareAndSwapInt(this, CELLSBUSY, 0, 1);  }  // 下面这两个方法是ThreadLocalRandom中的方法,不过因为包访问关系,这里又重新写一遍  // probe翻译过来是探测/探测器/探针这些,不好理解,它是ThreadLocalRandom里面的一个属性,  // 不过并不影响对Striped64的理解,这里可以把它理解为线程本身的hash值  static final int getProbe() {  return UNSAFE.getInt(Thread.currentThread(), PROBE);  }  // 相当于rehash,重新算一遍线程的hash值  static final int advanceProbe(int probe) {  probe ^= probe << 13;   // xorshift  probe ^= probe >>> 17;  probe ^= probe << 5;  UNSAFE.putInt(Thread.currentThread(), PROBE, probe);  return probe;  }  /** * 核心方法的实现,此方法建议在外部进行一次CAS操作(cell != null时尝试CAS更新base值,cells != null时,CAS更新hash值取模后对应的cell.value) * @param x the value 前面我说的二元运算中的第二个操作数,也就是外部提供的那个操作数 * @param fn the update function, or null for add (this convention avoids the need for an extra field or function in LongAdder). *     外部提供的二元算术操作,实例持有并且只能有一个,生命周期内保持不变,null代表LongAdder这种特殊但是最常用的情况,可以减少一次方法调用 * @param wasUncontended false if CAS failed before call 如果为false,表明调用者预先调用的一次CAS操作都失败了 */  final void longAccumulate(long x, LongBinaryOperator fn, boolean wasUncontended) {  int h;  // 这个if相当于给线程生成一个非0的hash值  if ((h = getProbe()) == 0) {  ThreadLocalRandom.current(); // force initialization  h = getProbe();  wasUncontended = true;  }  boolean collide = false; // True if last slot nonempty 如果hash取模映射得到的Cell单元不是null,则为true,此值也可以看作是扩容意向,感觉这个更好理解  for (;;) {  Cell[] as; Cell a; int n; long v;  if ((as = cells) != null && (n = as.length) > 0) { // cells已经被初始化了  if ((a = as[(n - 1) & h]) == null) { // hash取模映射得到的Cell单元还为null(为null表示还没有被使用)  if (cellsBusy == 0) {       // Try to attach new Cell 如果没有线程正在执行扩容  Cell r = new Cell(x);   // Optimistically create 先创建新的累积单元  if (cellsBusy == 0 && casCellsBusy()) { // 尝试加锁  boolean created = false;  try {               // Recheck under lock 在有锁的情况下再检测一遍之前的判断  Cell[] rs; int m, j;  if ((rs = cells) != null && (m = rs.length) > 0 && rs[j = (m - 1) & h] == null) { // 考虑别的线程可能执行了扩容,这里重新赋值重新判断  rs[j] = r; // 对没有使用的Cell单元进行累积操作(第一次赋值相当于是累积上一个操作数,求和时再和base执行一次运算就得到实际的结果)  created = true;  }  } finally {  cellsBusy = 0; 清空自旋标识,释放锁  }  if (created) // 如果原本为null的Cell单元是由自己进行第一次累积操作,那么任务已经完成了,所以可以退出循环  break;  continue;           // Slot is now non-empty 不是自己进行第一次累积操作,重头再来  }  }  collide = false; // 执行这一句是因为cells被加锁了,不能往下继续执行第一次的赋值操作(第一次累积),所以还不能考虑扩容  }  else if (!wasUncontended) // CAS already known to fail 前面一次CAS更新a.value(进行一次累积)的尝试已经失败了,说明已经发生了线程竞争  wasUncontended = true; // Continue after rehash 情况失败标识,后面去重新算一遍线程的hash值  else if (a.cas(v = a.value, ((fn == null) ? v + x : fn.applyAsLong(v, x)))) // 尝试CAS更新a.value(进行一次累积) ------ 标记为分支A  break; // 成功了就完成了累积任务,退出循环  else if (n >= NCPU || cells != as) // cell数组已经是最大的了,或者中途发生了扩容操作。因为NCPU不一定是2^n,所以这里用 >=  collide = false; // At max size or stale 长度n是递增的,执行到了这个分支,说明n >= NCPU会永远为true,下面两个else if就永远不会被执行了,也就永远不会再进行扩容  // CPU能够并行的CAS操作的最大数量是它的核心数(CAS在x86中对应的指令是cmpxchg,多核需要通过锁缓存来保证整体原子性),当n >= NCPU时,再出现几个线程映射到同一个Cell导致CAS竞争的情况,那就真不关扩容的事了,完全是hash值的锅了  else if (!collide) // 映射到的Cell单元不是null,并且尝试对它进行累积时,CAS竞争失败了,这时候把扩容意向设置为true  // 下一次循环如果还是跟这一次一样,说明竞争很严重,那么就真正扩容  collide = true; // 把扩容意向设置为true,只有这里才会给collide赋值为true,也只有执行了这一句,才可能执行后面一个else if进行扩容  else if (cellsBusy == 0 && casCellsBusy()) { // 最后再考虑扩容,能到这一步说明竞争很激烈,尝试加锁进行扩容 ------ 标记为分支B  try {  if (cells == as) {      // Expand table unless stale 检查下是否被别的线程扩容了(CAS更新锁标识,处理不了ABA问题,这里再检查一遍)  Cell[] rs = new Cell[n << 1]; // 执行2倍扩容  for (int i = 0; i < n; ++i)  rs[i] = as[i];  cells = rs;  }  } finally {  cellsBusy = 0; // 释放锁  }  collide = false; // 扩容意向为false  continue; // Retry with expanded table 扩容后重头再来  }  h = advanceProbe(h); // 重新给线程生成一个hash值,降低hash冲突,减少映射到同一个Cell导致CAS竞争的情况  }  else if (cellsBusy == 0 && cells == as && casCellsBusy()) { // cells没有被加锁,并且它没有被初始化,那么就尝试对它进行加锁,加锁成功进入这个else if  boolean init = false;  try {                           // Initialize table  if (cells == as) { // CAS避免不了ABA问题,这里再检测一次,如果还是null,或者空数组,那么就执行初始化  Cell[] rs = new Cell[2]; // 初始化时只创建两个单元  rs[h & 1] = new Cell(x); // 对其中一个单元进行累积操作,另一个不管,继续为null  cells = rs;  init = true;  }  } finally {  cellsBusy = 0; // 清空自旋标识,释放锁  }  if (init) // 如果某个原本为null的Cell单元是由自己进行第一次累积操作,那么任务已经完成了,所以可以退出循环  break;  }  else if (casBase(v = base, ((fn == null) ? v + x : fn.applyAsLong(v, x)))) // cells正在进行初始化时,尝试直接在base上进行累加操作  break;                          // Fall back on using base 直接在base上进行累积操作成功了,任务完成,可以退出循环了  }  }  // double的不讲,更long的逻辑基本上是一样的  final void doubleAccumulate(double x, DoubleBinaryOperator fn, boolean wasUncontended);  // Unsafe mechanics Unsafe初始化  private static final sun.misc.Unsafe UNSAFE;  private static final long BASE;  private static final long CELLSBUSY;  private static final long PROBE;  static {  try {  UNSAFE = sun.misc.Unsafe.getUnsafe();  Class<?> sk = Striped64.class;  BASE = UNSAFE.objectFieldOffset  (sk.getDeclaredField("base"));  CELLSBUSY = UNSAFE.objectFieldOffset  (sk.getDeclaredField("cellsBusy"));  Class<?> tk = Thread.class;  PROBE = UNSAFE.objectFieldOffset  (tk.getDeclaredField("threadLocalRandomProbe"));  } catch (Exception e) {  throw new Error(e);  }  }  } 

3.LongAdder的add()方法
add(long x) 方法不仅仅实现加,参数为负就为减;调用longAccumulate来进行操作

    public void add(long x) {  Cell[] as; long b, v; int m; Cell a;  if ((as = cells) != null || !casBase(b = base, b + x)) {  boolean uncontended = true;  if (as == null || (m = as.length - 1) < 0 ||  (a = as[getProbe() & m]) == null ||  !(uncontended = a.cas(v = a.value, v + x)))  longAccumulate(x, null, uncontended);  }  }  

jdk1.8的ConcurrentHashMap中,没有再使用Segment,使用了一个简单的仿造LongAdder实现的计数器,这样能够保证计数效率不低于使用Segment的效率。

jdk1.8 LongAdder源码学习

这篇关于并发编程实战14-LongAdder统计加法器-计数器jdk8的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!


原文地址:
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.chinasem.cn/article/640151

相关文章

Spring Security基于数据库的ABAC属性权限模型实战开发教程

《SpringSecurity基于数据库的ABAC属性权限模型实战开发教程》:本文主要介绍SpringSecurity基于数据库的ABAC属性权限模型实战开发教程,本文给大家介绍的非常详细,对大... 目录1. 前言2. 权限决策依据RBACABAC综合对比3. 数据库表结构说明4. 实战开始5. MyBA

揭秘Python Socket网络编程的7种硬核用法

《揭秘PythonSocket网络编程的7种硬核用法》Socket不仅能做聊天室,还能干一大堆硬核操作,这篇文章就带大家看看Python网络编程的7种超实用玩法,感兴趣的小伙伴可以跟随小编一起... 目录1.端口扫描器:探测开放端口2.简易 HTTP 服务器:10 秒搭个网页3.局域网游戏:多人联机对战4.

Spring Boot + MyBatis Plus 高效开发实战从入门到进阶优化(推荐)

《SpringBoot+MyBatisPlus高效开发实战从入门到进阶优化(推荐)》本文将详细介绍SpringBoot+MyBatisPlus的完整开发流程,并深入剖析分页查询、批量操作、动... 目录Spring Boot + MyBATis Plus 高效开发实战:从入门到进阶优化1. MyBatis

Java并发编程必备之Synchronized关键字深入解析

《Java并发编程必备之Synchronized关键字深入解析》本文我们深入探索了Java中的Synchronized关键字,包括其互斥性和可重入性的特性,文章详细介绍了Synchronized的三种... 目录一、前言二、Synchronized关键字2.1 Synchronized的特性1. 互斥2.

MyBatis 动态 SQL 优化之标签的实战与技巧(常见用法)

《MyBatis动态SQL优化之标签的实战与技巧(常见用法)》本文通过详细的示例和实际应用场景,介绍了如何有效利用这些标签来优化MyBatis配置,提升开发效率,确保SQL的高效执行和安全性,感... 目录动态SQL详解一、动态SQL的核心概念1.1 什么是动态SQL?1.2 动态SQL的优点1.3 动态S

Pandas使用SQLite3实战

《Pandas使用SQLite3实战》本文主要介绍了Pandas使用SQLite3实战,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学... 目录1 环境准备2 从 SQLite3VlfrWQzgt 读取数据到 DataFrame基础用法:读

Python异步编程中asyncio.gather的并发控制详解

《Python异步编程中asyncio.gather的并发控制详解》在Python异步编程生态中,asyncio.gather是并发任务调度的核心工具,本文将通过实际场景和代码示例,展示如何结合信号量... 目录一、asyncio.gather的原始行为解析二、信号量控制法:给并发装上"节流阀"三、进阶控制

Python实战之屏幕录制功能的实现

《Python实战之屏幕录制功能的实现》屏幕录制,即屏幕捕获,是指将计算机屏幕上的活动记录下来,生成视频文件,本文主要为大家介绍了如何使用Python实现这一功能,希望对大家有所帮助... 目录屏幕录制原理图像捕获音频捕获编码压缩输出保存完整的屏幕录制工具高级功能实时预览增加水印多平台支持屏幕录制原理屏幕

Redis中高并发读写性能的深度解析与优化

《Redis中高并发读写性能的深度解析与优化》Redis作为一款高性能的内存数据库,广泛应用于缓存、消息队列、实时统计等场景,本文将深入探讨Redis的读写并发能力,感兴趣的小伙伴可以了解下... 目录引言一、Redis 并发能力概述1.1 Redis 的读写性能1.2 影响 Redis 并发能力的因素二、

JDK多版本共存并自由切换的操作指南(本文为JDK8和JDK17)

《JDK多版本共存并自由切换的操作指南(本文为JDK8和JDK17)》本文介绍了如何在Windows系统上配置多版本JDK(以JDK8和JDK17为例),并通过图文结合的方式给大家讲解了详细步骤,具有... 目录第一步 下载安装JDK第二步 配置环境变量第三步 切换JDK版本并验证可能遇到的问题前提:公司常