Java并发-同步器原理

2024-01-19 22:32
文章标签 java 并发 原理 同步器

本文主要是介绍Java并发-同步器原理,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

CountDownLatch

在我们实际做开发的时候可能会遇到这样的需求,主线程需要等待所有子线程完成任务之后再执行操作,我们可以用join来实现这个功能,但是join不够灵活,主线程只能等子线程完全执行完毕才能从join返回,而不能在子线程某个位置就执行返回。
为了解决这个问题,JDK开发组提供了CountDownLatch这个类,这样的需求通过CountDownLatch来实现会更加优雅、灵活。

CountDownLatch的使用

public static final CountDownLatch countDownLatch = new CountDownLatch(2);public static void main(String[] args) throws InterruptedException {Thread threadOne = new Thread(new Runnable() {@Overridepublic void run() {System.out.println("threadOne start");try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("threadOne end");countDownLatch.countDown();}});Thread threadTwo = new Thread(new Runnable() {@Overridepublic void run() {System.out.println("threadTwo start");try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("threadTwo end");countDownLatch.countDown();}});threadOne.start();threadTwo.start();countDownLatch.await();System.out.println("main end");
}

运行结果:

threadOne start
threadTwo start
threadOne end
threadTwo end
main end

原理解析

基于AQS

CountDownLatch有一个Sync静态内部类继承了AQS,我们调用CountDownLatch的构造函数的时候会生成这个Sync的一个实例对象,我们设置的参数count其实就是设置AQS的state属性。

public CountDownLatch(int count) {if (count < 0) throw new IllegalArgumentException("count < 0");this.sync = new Sync(count);
}private static final class Sync extends AbstractQueuedSynchronizer {private static final long serialVersionUID = 4982264981922014374L;Sync(int count) {setState(count);}int getCount() {return getState();}// 执行CountDownLatch的await方法最后会跳转到这个方法,state为0返回1,不为0就返回-1。protected int tryAcquireShared(int acquires) {return (getState() == 0) ? 1 : -1;}// 执行CountDownLatch的countDown方法最终会跳转到这个方法,该方法通过CAS + 自旋的方式对count进行修改,如果cas成功并且state == 0,就会返回true。protected boolean tryReleaseShared(int releases) {for (;;) {int c = getState();// 如果修改之前的state已经为0了,直接返回,这里可以防止countDown方法一直调用,导致state变为负数,state最低为0。if (c == 0)return false;int nextc = c-1;// CAS修改state,让state减一,如果减完之后state为0,就返回true。if (compareAndSetState(c, nextc))return nextc == 0;}}
}

await

调用await方法的线程,如果在CountDownLatch的count不为0的时候会阻塞,知道count为0的时候才会被唤醒。

public void await() throws InterruptedException {// 调用AQS的acquireSharedInterruptibly方法sync.acquireSharedInterruptibly(1);
}public final void acquireSharedInterruptibly(int arg)throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();// 调用Sync的tryAcquireShared方法,该方法根据state是否为0,会返回1,或者-1// 返回1时,直接返回方法,执行任何操作// 返回-1时,表示state不为0,那么就把当前线程加到AQS队列中,同时阻塞if (tryAcquireShared(arg) < 0)doAcquireSharedInterruptibly(arg);
}

countDown

调用countDown方法,CountDownLatch的count就会减一,当count为0的时候就会去唤醒所有调用了await阻塞的线程。

public void countDown() {// 调用AQS的releaseShared方法sync.releaseShared(1);
}public final boolean releaseShared(int arg) {// 调用Sync的tryReleaseShared方法,如果为true,表示最后一个线程调用countDown之后,count为0了,这时候要唤醒所有调用了await被阻塞的线程。if (tryReleaseShared(arg)) {// 唤醒队列中所有阻塞的线程doReleaseShared();return true;}return false;
}private void doReleaseShared() {for (;;) {Node h = head;if (h != null && h != tail) {int ws = h.waitStatus;if (ws == Node.SIGNAL) {// 设置waitStatusif (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))continue;// 唤醒线程unparkSuccessor(h);}else if (ws == 0 &&!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))continue;}if (h == head)break;}
}

CountDownLatch的计数器是一次性的,也就是等到计数器值变为0后,再调用CountDownLatch的await和 countdown方法都会立刻返回。

CyclicBarrier

CyclicBarrier是回环屏障的意思,它可以让一组线程全部达到一个状态后再全部同时执行。之所以叫作屏障是因为线程调用await方法后就会被阻塞,这个阻塞点就称为屏障点,等所有线程都调用了await方法后,线程们就会冲破屏障,继续向下运行。同时CyclicBarrier的计数器是可以重置。

CyclicBarrier的使用

线程在调用了CyclicBarrier的await,如果当前没有足够的线程调用了await,就会阻塞直到有足够的线程调用了await才会返回,并且会重置计数器。

public static final CyclicBarrier cyclicBarrier = new CyclicBarrier(2);public static void main(String[] args) throws InterruptedException {Thread threadOne = new Thread(new Runnable() {@Overridepublic void run() {try {System.out.println("threadOne step1");cyclicBarrier.await();System.out.println("threadOne step2");cyclicBarrier.await();System.out.println("threadOne step3");cyclicBarrier.await();} catch (InterruptedException e) {e.printStackTrace();} catch (BrokenBarrierException e) {e.printStackTrace();}}});Thread threadTwo = new Thread(new Runnable() {@Overridepublic void run() {try {System.out.println("threadTwo step1");cyclicBarrier.await();System.out.println("threadTwo step2");cyclicBarrier.await();System.out.println("threadTwo step3");cyclicBarrier.await();} catch (InterruptedException e) {e.printStackTrace();} catch (BrokenBarrierException e) {e.printStackTrace();}}});threadOne.start();threadTwo.start();threadOne.join();threadTwo.join();System.out.println("main end");
}

运行结果如下:

threadOne step1
threadTwo step1
threadTwo step2
threadOne step2
threadOne step3
threadTwo step3
main end

CyclicBarrier的原理

基于锁的内部原理

CyclicBarrier有以下的属性,其内部原理是通过锁来完成的。

// 线程await的时候要获得这个锁才可以执行操作
private final ReentrantLock lock = new ReentrantLock();
// 线程await,如果没有足够的线程到达屏障就会通过Condition来阻塞
private final Condition trip = lock.newCondition();
// 初始化的时候parties和count是一样的,等count为0的时候,CyclicBarrier会利用parties来重置count(计数器)。
private final int parties;
// 计数器,每当有线程调用await的时候,count就会减一,当count为0的时候就会唤醒之前阻塞的线程。
private int count;
// 构造CyclicBarrier的时候可以传入一个barrierCommand,在count为0的时候就会调用里面的run方法。
private final Runnable barrierCommand;
// Generation里面有一个boolean类型的broke,broke为true的时候就可以中断CyclicBarrier。
private Generation generation = new Generation();

CyclicBarrier的构造函数

public CyclicBarrier(int parties) {this(parties, null);
}// parties和count是相等的
public CyclicBarrier(int parties, Runnable barrierAction) {if (parties <= 0) throw new IllegalArgumentException();this.parties = parties;this.count = parties;this.barrierCommand = barrierAction;
}

dowait

无论是await()还是await(long timeout, TimeUnit unit)最终都会调用到dowait方法。

public int await() throws InterruptedException, BrokenBarrierException {try {return dowait(false, 0L);} catch (TimeoutException toe) {throw new Error(toe);}
}public int await(long timeout, TimeUnit unit)throws InterruptedException,BrokenBarrierException,TimeoutException {return dowait(true, unit.toNanos(timeout));
}// 参数timed为false时,nacos就没用了
// 参数timed为true时,nacos是过期的时间
private int dowait(boolean timed, long nanos)throws InterruptedException, BrokenBarrierException,TimeoutException {final ReentrantLock lock = this.lock;// 加锁lock.lock();try {// 判断Generation的broke是否为true,如果是true就抛出异常final Generation g = generation;if (g.broken)throw new BrokenBarrierException();// 判断线程是否中断if (Thread.interrupted()) {breakBarrier();throw new InterruptedException();}// count减一int index = --count;// 如果count为0,会执行下面的逻辑if (index == 0) {boolean ranAction = false;try {// 调用barrierCommand的run方法,这个是我们在构造函数中传入的final Runnable command = barrierCommand;if (command != null)command.run();ranAction = true;// 这里面会唤醒所有阻塞的线程,同时重置计数器nextGeneration();return 0;} finally {if (!ranAction)breakBarrier();}}// 减完之后,count不为0就会执行下面的逻辑for (;;) {try {// 如果不是带超时时间的就直接用condition的await阻塞if (!timed)trip.await();else if (nanos > 0L)// 带超时时间的用condition的awaitNanos阻塞nanos = trip.awaitNanos(nanos);} catch (InterruptedException ie) {if (g == generation && ! g.broken) {breakBarrier();throw ie;} else {Thread.currentThread().interrupt();}}if (g.broken)throw new BrokenBarrierException();if (g != generation)return index;if (timed && nanos <= 0L) {breakBarrier();throw new TimeoutException();}}} finally {lock.unlock();}
}private void nextGeneration() {// 唤醒所有通过Condition阻塞的线程trip.signalAll();// 重置计数器count = parties;generation = new Generation();
}

Semaphore

Semaphore是线程同步的辅助类,可以控制当前访问自身的线程个数,并提供了同步机制。使用Semaphore可以控制同时访问资源的线程个数,例如通过 Semaphore 限流。

Semaphore的使用

private final static int POOL_SIZE = 10;private final Semaphore useful,useless;//useful表示可用的数据库连接,useless表示已用的数据库连接public DBPoolSemaphore() {this. useful = new Semaphore(POOL_SIZE);this.useless = new Semaphore(0);
}//存放数据库连接的容器
private static LinkedList<Connection> pool = new LinkedList<Connection>();
//初始化池
static {for (int i = 0; i < POOL_SIZE; i++) {pool.addLast(SqlConnectImpl.fetchConnection());}
}/*归还连接*/
public void returnConnect(Connection connection) throws InterruptedException {if(connection!=null) {System.out.println("当前有"+useful.getQueueLength()+"个线程等待数据库连接!!"+"可用连接数:"+useful.availablePermits());useless.acquire();synchronized (pool) {pool.addLast(connection);}useful.release();}
}/*从池子拿连接*/
public Connection takeConnect() throws InterruptedException {useful.acquire();Connection conn;synchronized (pool) {conn = pool.removeFirst();}useless.release();return conn;
}

这篇关于Java并发-同步器原理的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/623902

相关文章

Java实现检查多个时间段是否有重合

《Java实现检查多个时间段是否有重合》这篇文章主要为大家详细介绍了如何使用Java实现检查多个时间段是否有重合,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 目录流程概述步骤详解China编程步骤1:定义时间段类步骤2:添加时间段步骤3:检查时间段是否有重合步骤4:输出结果示例代码结语作

Java中String字符串使用避坑指南

《Java中String字符串使用避坑指南》Java中的String字符串是我们日常编程中用得最多的类之一,看似简单的String使用,却隐藏着不少“坑”,如果不注意,可能会导致性能问题、意外的错误容... 目录8个避坑点如下:1. 字符串的不可变性:每次修改都创建新对象2. 使用 == 比较字符串,陷阱满

Java判断多个时间段是否重合的方法小结

《Java判断多个时间段是否重合的方法小结》这篇文章主要为大家详细介绍了Java中判断多个时间段是否重合的方法,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 目录判断多个时间段是否有间隔判断时间段集合是否与某时间段重合判断多个时间段是否有间隔实体类内容public class D

IDEA编译报错“java: 常量字符串过长”的原因及解决方法

《IDEA编译报错“java:常量字符串过长”的原因及解决方法》今天在开发过程中,由于尝试将一个文件的Base64字符串设置为常量,结果导致IDEA编译的时候出现了如下报错java:常量字符串过长,... 目录一、问题描述二、问题原因2.1 理论角度2.2 源码角度三、解决方案解决方案①:StringBui

Java覆盖第三方jar包中的某一个类的实现方法

《Java覆盖第三方jar包中的某一个类的实现方法》在我们日常的开发中,经常需要使用第三方的jar包,有时候我们会发现第三方的jar包中的某一个类有问题,或者我们需要定制化修改其中的逻辑,那么应该如何... 目录一、需求描述二、示例描述三、操作步骤四、验证结果五、实现原理一、需求描述需求描述如下:需要在

Java中ArrayList和LinkedList有什么区别举例详解

《Java中ArrayList和LinkedList有什么区别举例详解》:本文主要介绍Java中ArrayList和LinkedList区别的相关资料,包括数据结构特性、核心操作性能、内存与GC影... 目录一、底层数据结构二、核心操作性能对比三、内存与 GC 影响四、扩容机制五、线程安全与并发方案六、工程

JavaScript中的reduce方法执行过程、使用场景及进阶用法

《JavaScript中的reduce方法执行过程、使用场景及进阶用法》:本文主要介绍JavaScript中的reduce方法执行过程、使用场景及进阶用法的相关资料,reduce是JavaScri... 目录1. 什么是reduce2. reduce语法2.1 语法2.2 参数说明3. reduce执行过程

如何使用Java实现请求deepseek

《如何使用Java实现请求deepseek》这篇文章主要为大家详细介绍了如何使用Java实现请求deepseek功能,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 目录1.deepseek的api创建2.Java实现请求deepseek2.1 pom文件2.2 json转化文件2.2

Java调用DeepSeek API的最佳实践及详细代码示例

《Java调用DeepSeekAPI的最佳实践及详细代码示例》:本文主要介绍如何使用Java调用DeepSeekAPI,包括获取API密钥、添加HTTP客户端依赖、创建HTTP请求、处理响应、... 目录1. 获取API密钥2. 添加HTTP客户端依赖3. 创建HTTP请求4. 处理响应5. 错误处理6.

Spring AI集成DeepSeek的详细步骤

《SpringAI集成DeepSeek的详细步骤》DeepSeek作为一款卓越的国产AI模型,越来越多的公司考虑在自己的应用中集成,对于Java应用来说,我们可以借助SpringAI集成DeepSe... 目录DeepSeek 介绍Spring AI 是什么?1、环境准备2、构建项目2.1、pom依赖2.2