本文主要是介绍CountDownLatch、CyclicBarrier 和 Semaphore,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
文章目录
- 一、CountDownLatch
- 1、实现原理
- 2、使用场景
- 3、代码
- 二、CyclicBarrier
- 1、实现原理
- 2、使用场景
- 3、代码
- 4、CountDownLatch与CyclicBarrier区别
- 三、Semaphore
- 1、实现原理
- 2、使用场景
- 3、代码
- 四、总结
一、CountDownLatch
CountDownLatch计数器
1、实现原理
主要基于计数器和阻塞队列。
CountDownLatch 内部维护一个计数器,这个计数器的初始值通常设置为需要等待的线程数量。当一个线程调用 CountDownLatch 的 await() 方法时,如果计数器的值大于 0,则该线程会被放入一个阻塞队列中等待,并处于挂起状态。每当一个线程完成了自己的任务后,它会调用 CountDownLatch 的 countDown() 方法,使计数器递减。当计数器的值递减到 0 时,CountDownLatch 会唤醒阻塞队列中所有等待的线程,使它们能够继续执行后续的任务。
2、使用场景
用于等待多个线程完成后进行指定操作。
常见场景:
- 服务启动时要等待多个资源初始化
- 并行任务处理,有多个并行处理的任务,并且需要在任务都处理完毕后,再做其他处理。比如:并行计算成绩,最终汇总分数;分开去多个服务查询前置数据,然后进行校验
- 模拟高并发测试,在测试一个多线程并发访问的共享资源时,可以使用 CountDownLatch 来确保所有线程都准备好访问共享资源后再进行实际测试。
- 异步编程中的等待机制,等待某个异步操作完成后才继续执行后续代码
3、代码
伪代码如下:
// 初始化 CountDownLatch,计数器设为 N
CountDownLatch latch = new CountDownLatch(N); // 在 N 个线程中
for (int i = 0; i < N; i++) { new Thread(() -> { // 执行一些任务 // ... // 任务完成后,计数器减一 latch.countDown(); }).start();
} // 在主线程中等待所有线程完成任务
latch.await(); // 阻塞直到计数器为0 // 所有任务都已完成,继续执行后续代码
// ...
案例:主要模拟3个任务并行,然后主线程阻塞等待
static ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(3);
static CountDownLatch countDownLatch = new CountDownLatch(3);
public static void main(String[] args) throws InterruptedException {// 多个任务并发执行,都执行完毕后,再执行主线程,也就是await的线程System.out.println("主业务开始执行");sleep(1000);executor.execute(CountDownLatchTest::a);executor.execute(CountDownLatchTest::b);executor.execute(CountDownLatchTest::c);System.out.println("三个任务并行执行,主业务线程等待");// 死等任务结束// countDownLatch.await();// 如果在规定时间内,任务没有结束,返回falseif (countDownLatch.await(2, TimeUnit.SECONDS)) {System.out.println("三个任务处理完毕,主业务线程继续执行");} else {System.out.println("三个任务没有全部处理完毕,执行其他的操作");}
}private static void a() {System.out.println("A任务开始");sleep(3000);System.out.println("A任务结束");countDownLatch.countDown();
}
private static void b() {System.out.println("B任务开始");sleep(1500);System.out.println("B任务结束");countDownLatch.countDown();
}
private static void c() {System.out.println("C任务开始");sleep(2000);System.out.println("C任务结束");countDownLatch.countDown();
}
private static void sleep(long timeout) {try {Thread.sleep(timeout);} catch (InterruptedException e) {e.printStackTrace();}
}
其中await()是死等任务结束,不限制时间;await(long timeout, TimeUnit unit)是在规定时间内任务没有结束,就返回false
二、CyclicBarrier
CyclicBarrier栅栏
1、实现原理
主要基于计数器、等待队列、循环栅栏。
CyclicBarrier 内部维护一个计数器,用于记录当前到达屏障点的线程数量,就是我们创建时指定的线程数。当一个线程到达屏障点时,如果计数器的值大于 0,则该线程会被放入一个等待队列中等待,并处于挂起状态,如果计数器的值变为 0,则说明所有线程都已到达屏障点,直接唤醒等待队列中的所有线程,并继续执行后续任务。在 CyclicBarrier 的构造函数中,可以指定一个可选的栅栏动作。当所有线程都到达屏障点时,这个栅栏动作会被执行一次,然后重置回初始状态并再次使用。
- Barrier屏障:让一个或多个线程达到一个屏障点,会被阻塞。屏障点会有一个数值,当一个线程到达屏障点时,就会对屏障点的数值进行-1操作,当屏障点数值减为0时,屏障就会打开,唤醒所有阻塞在屏障点的线程。在释放屏障点之后,可以先执行一个任务,再让所有阻塞被唤醒的线程继续之后的任务。基于ReentrantLock锁的await方法阻塞在屏障点。
- Cyclic循环:所有线程被释放后,屏障点的数值可以再次被重置。
2、使用场景
用于让一组线程在某个屏障点相互等待,直到所有线程都到达该屏障点,然后它们才能继续执行。
常见场景:
- 将任务分解成多个阶段,每个阶段由一组线程执行,并且需要在所有阶段完成后才能继续下一个阶段,比如游戏中所有人到达终点,才开启下一关
3、代码
伪代码如下:
// 初始化 CyclicBarrier,参与线程数为 N,可选的屏障动作(barrierAction)
CyclicBarrier barrier = new CyclicBarrier(N, () -> { // 所有线程到达屏障点时执行的代码 // ...
}); // 在 N 个线程中
for (int i = 0; i < N; i++) { new Thread(() -> { // 执行一些任务 // ... // 到达屏障点,等待其他线程 barrier.await(); // 阻塞直到所有线程到达 // 所有线程都已到达屏障点,继续执行后续代码 // ... }).start();
}
案例:大家集合完毕后,再一起出发
CyclicBarrier barrier = new CyclicBarrier(3,() -> {System.out.println("各位大佬集合完毕,发护照准备出发!");
});
new Thread(() -> {System.out.println("Tom到位!!!");try {barrier.await();} catch (Exception e) {System.out.println("悲剧,人没到齐!");return;}System.out.println("Tom出发!!!");
}).start();
Thread.sleep(100);
new Thread(() -> {System.out.println("Jack到位!!!");try {barrier.await();} catch (Exception e) {System.out.println("悲剧,人没到齐!");return;}System.out.println("Jack出发!!!");
}).start();
Thread.sleep(100);
new Thread(() -> {System.out.println("Rose到位!!!");try {barrier.await();} catch (Exception e) {System.out.println("悲剧,人没到齐!");return;}System.out.println("Rose出发!!!");
}).start();
4、CountDownLatch与CyclicBarrier区别
- 底层实现不同:CountDownLatch基于AQS。CyclicBarrier基于ReentrantLock。
- 应用场景不同:CountDownLatch的计数器只能使用一次。而CyclicBarrier在计数器达到0之后,可以重置计数器,可以实现相比CountDownLatch更复杂的业务,如果执行业务时出现了错误,可以重置CyclicBarrier计数器,再次执行一次。
- 等待对象不同:CountDownLatch一般是让主线程等待,让子线程对计数器–。CyclicBarrier更多的让子线程也一起计数和等待,等待的线程达到数值后,再统一唤醒
三、Semaphore
Semaphore(信号量),保证x个资源可以被多个线程同时访问
1、实现原理
Semaphore底层也是基于AQS的state属性做一个计数器的维护。state的值就代表当前共享资源的个数。如果一个线程需要获取的x个资源,
直接查看state的标识的资源个数是否足够,如果足够的,直接对state-x拿到当前资源。如果资源不够,当前线程就需要挂起等待。
知道持有资源的线程释放资源后,会归还给Semaphore中的state属性,挂起的线程就可以被唤醒。
2、使用场景
用于控制对共享资源的并发访问数量。它维护了一个可用的许可证数量,并允许线程通过获取(acquire)和释放(release)许可证来访问资源。当没有可用许可证时,线程会等待。
常见场景:
- 数据库连接池管理,限制同时访问数据库连接的线程数量
- 线程池管理,限制同时执行的线程数量
- 实现互斥锁,Semaphore的初始值设置为1,确保同一时间只能有一个线程可以访问
- 流量控制,平衡系统的负载和资源利用
3、代码
伪代码如下:
// 初始化 Semaphore,允许同时访问的线程数为 M
Semaphore semaphore = new Semaphore(M); // 在多个线程中
for (int i = 0; i < 任意数量; i++) { new Thread(() -> { // 请求一个许可 semaphore.acquire(); // 阻塞直到有一个许可可用 try { // 进入临界区,执行受保护的代码 // ... // 临界区结束 } finally { // 释放一个许可 semaphore.release(); } // 继续执行其他代码 // ... }).start();
}
案例:环球影城,每天接受的人流量是固定的,每有一个人购票后,就对信号量进行–操作,如果信号量已经达到了0,或者是资源不足,此时就不能买票。
Semaphore semaphore = new Semaphore(10);
new Thread(() -> {System.out.println("一家三口来了");try {semaphore.acquire(3);System.out.println("一家三口进去了~~~");Thread.sleep(10000);} catch (InterruptedException e) {e.printStackTrace();} finally {System.out.println("一家三口走了~~~");semaphore.release(3);}
}).start();
for (int i = 0; i < 7; i++) {int j = i;new Thread(() -> {System.out.println(j + "大哥来了");try {semaphore.acquire();System.out.println(j + "大哥进去了~~~");Thread.sleep(5000);} catch (InterruptedException e) {e.printStackTrace();} finally {System.out.println(j + "大哥走了~~~");semaphore.release();}}).start();
}
Thread.sleep(2000);
System.out.println("main大哥来了");
if (semaphore.tryAcquire()) {System.out.println("main大哥进去了~~~");
} else {System.out.println("资源不够,main大哥停止进去");
}
Thread.sleep(3000);
System.out.println("main大哥又来了");
if (semaphore.tryAcquire()) {System.out.println("main大哥进去了~~~");semaphore.release();
} else {System.out.println("资源不够,main大哥停止进去");
}
四、总结
总的来说,CountDownLatch、CyclicBarrier 和 Semaphore 是 JVM 级别的同步工具,它们的状态是存储在JVM 的内存中的,主要用于单个 JVM 进程内的线程同步和协作,主要用于解决多线程编程中的一些问题,例如等待多个线程完成某些任务、让一组线程在某个点同步继续执行,或者限制对共享资源的并发访问数量。而在分布式场景下并不直接适用,可以考虑其它解决方案来实现类似的功能,比如:分布式锁、数据库事务、外部存储系统等
这篇关于CountDownLatch、CyclicBarrier 和 Semaphore的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!