本文主要是介绍【JAVA并发包】,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
Java 并发包是 Java 标准库中提供的一套用于支持多线程编程的类和接口。这些工具帮助开发者更容易地编写、调试和维护并发程序。Java 并发包位于 java.util.concurrent 及其子包中,包含了用于线程管理、任务调度、同步控制、并发数据结构等多种功能。
Executor 框架
概念:
Executor 框架是 Java 并发包中的核心组件之一,它通过定义一个标准化的任务执行机制,将任务的提交和执行解耦。开发者无需直接管理线程的生命周期,可以通过 Executor 接口提交任务,由框架负责执行。
关键类:
- Executor:核心接口,定义了任务执行的标准方法 execute(Runnable command)。
- ExecutorService:扩展了 Executor 接口,增加了管理线程池的功能,如关闭、任务提交及任务取消。
- ThreadPoolExecutor:线程池的具体实现类,灵活配置线程池的大小、任务队列等。
- ScheduledExecutorService:扩展了 ExecutorService,支持任务调度(如定时执行任务)。
代码
ThreadPoolExecutor
import java.util.concurrent.*;public class ThreadPoolExecutorExample {public static void main(String[] args) {// 核心线程数为2,最大线程数为4,线程空闲时间为10秒,队列最大容量为100ThreadPoolExecutor executor = new ThreadPoolExecutor(2, // corePoolSize: 核心线程数4, // maximumPoolSize: 最大线程数10, // keepAliveTime: 空闲线程等待新任务的时间TimeUnit.SECONDS, // 时间单位new LinkedBlockingQueue<>(100), // 工作队列,用于存放等待执行的任务new ThreadPoolExecutor.AbortPolicy() // 拒绝策略,当任务太多而无法处理时抛出异常);// 提交任务到线程池for (int i = 0; i < 10; i++) {final int index = i;executor.submit(() -> {System.out.println("Task " + index + " is running by " + Thread.currentThread().getName());try {Thread.sleep(2000); // 模拟任务执行时间} catch (InterruptedException e) {Thread.currentThread().interrupt();}});}// 关闭线程池,当所有任务完成后终止executor.shutdown();try {// 等待线程池终止if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {executor.shutdownNow(); // 强制终止}} catch (InterruptedException e) {executor.shutdownNow();Thread.currentThread().interrupt();}}
}
- 核心线程数和最大线程数:corePoolSize 和 maximumPoolSize 控制了线程池中核心线程和最大线程的数量。在任务量较小时,线程池会保持 corePoolSize 数量的线程,当任务量增加时,可以增加到 maximumPoolSize 数量的线程。
- 线程空闲时间:keepAliveTime 指定了多余的空闲线程(即超过 corePoolSize 的线程)在终止前等待新任务的最长时间。
- 工作队列:使用 LinkedBlockingQueue 来存放等待执行的任务。
- 拒绝策略:当任务太多而无法处理时,ThreadPoolExecutor.AbortPolicy 会抛出异常。其他策略包括 CallerRunsPolicy(调用者线程执行任务)、DiscardPolicy(丢弃任务)、DiscardOldestPolicy(丢弃最老的任务)。
- shutdown 和 awaitTermination:shutdown 会让线程池停止接受新任务,并在完成已提交任务后终止。awaitTermination 用于等待线程池完全终止。
ScheduledExecutorService
import java.util.concurrent.*;public class ScheduledExecutorServiceExample {public static void main(String[] args) {// 创建一个 ScheduledExecutorService,线程池大小为1ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);// 创建一个 Runnable 任务Runnable task = () -> System.out.println("Task executed at " + System.currentTimeMillis());// 延迟5秒后执行任务scheduledExecutorService.schedule(task, 5, TimeUnit.SECONDS);// 每3秒执行一次任务,延迟2秒后开始执行scheduledExecutorService.scheduleAtFixedRate(task, 2, 3, TimeUnit.SECONDS);// 每5秒执行一次任务,但每次任务执行完后,等待5秒再执行下一次scheduledExecutorService.scheduleWithFixedDelay(task, 2, 5, TimeUnit.SECONDS);// 为了演示,运行10秒后关闭 ScheduledExecutorServiceExecutors.newSingleThreadScheduledExecutor().schedule(() -> {scheduledExecutorService.shutdown();try {if (!scheduledExecutorService.awaitTermination(5, TimeUnit.SECONDS)) {scheduledExecutorService.shutdownNow();}} catch (InterruptedException e) {scheduledExecutorService.shutdownNow();}}, 10, TimeUnit.SECONDS);}
}
同步工具类
概念:
Java 并发包提供了一系列的同步工具类,用于管理线程之间的协调和控制,避免竞争条件和死锁。
关键类:
- CountDownLatch:允许一个或多个线程等待其他线程完成操作。通过一个计数器,线程可以在计数器减到零之前等待。
- CyclicBarrier:让一组线程彼此等待,直到所有线程都到达一个共同的屏障点,再继续执行。
- Semaphore:控制对资源的访问许可数。适用于限制并发访问资源的数量(如数据库连接数)。
- Exchanger:两个线程之间交换数据的同步点。
Phaser:一个更加灵活和强大的 CyclicBarrier 替代品,允许动态调整屏障点。
代码
// 使用 CountDownLatch
CountDownLatch latch = new CountDownLatch(3);
for (int i = 0; i < 3; i++) {new Thread(() -> {try {// 模拟任务Thread.sleep(1000);System.out.println("Task completed");} catch (InterruptedException e) {Thread.currentThread().interrupt();} finally {latch.countDown();}}).start();
}
latch.await(); // 主线程等待所有任务完成
System.out.println("All tasks completed");
并发数据结构
概念:
并发数据结构是在多线程环境下设计的线程安全的集合类,避免了手动同步的复杂性。
关键类:
- ConcurrentHashMap:线程安全的哈希表,支持高效的并发读写操作。
- CopyOnWriteArrayList:用于读取频繁、写入较少的场景。写操作会创建数组副本,以避免并发修改问题。
- ConcurrentLinkedQueue:高效的无锁并发队列,适用于高并发场景下的任务队列。
- BlockingQueue:支持阻塞操作的队列,如 ArrayBlockingQueue、LinkedBlockingQueue、PriorityBlockingQueue 等。
代码
ConcurrentHashMap
// 使用 ConcurrentHashMap
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;public class ConcurrentHashMapExample {public static void main(String[] args) {// 创建一个 ConcurrentHashMap 实例ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();// 创建一个线程池ExecutorService executor = Executors.newFixedThreadPool(5);// 创建多个任务来更新和读取 mapRunnable writeTask = () -> {for (int i = 0; i < 5; i++) {String key = Thread.currentThread().getName() + "-" + i;map.put(key, i);System.out.println(Thread.currentThread().getName() + " put: " + key + " -> " + i);try {Thread.sleep(100); // 模拟写操作的延迟} catch (InterruptedException e) {Thread.currentThread().interrupt();}}};Runnable readTask = () -> {for (int i = 0; i < 5; i++) {String key = Thread.currentThread().getName() + "-" + i;Integer value = map.get(key);System.out.println(Thread.currentThread().getName() + " get: " + key + " -> " + value);try {Thread.sleep(100); // 模拟读操作的延迟} catch (InterruptedException e) {Thread.currentThread().interrupt();}}};// 提交写任务到线程池for (int i = 0; i < 3; i++) {executor.submit(writeTask);}// 提交读任务到线程池for (int i = 0; i < 3; i++) {executor.submit(readTask);}// 关闭线程池executor.shutdown();}
}
ConcurrentHashMap 采用了一种分段锁机制来确保线程安全,能够在高并发环境下提供良好的性能。
读取操作(如 get)通常不需要锁定,性能较高。写入操作(如 put)会锁定部分数据结构,以减少锁竞争。
原子操作类
概念:
Java 并发包提供了多种原子操作类,确保基本类型和对象引用的操作是线程安全的,这些类使用了底层的硬件指令(如 CAS)实现高效的线程安全操作。
关键类:
- AtomicInteger:支持原子性的 int 类型操作。
- AtomicLong:支持原子性的 long 类型操作。
- AtomicBoolean:支持原子性的 boolean 类型操作。
- AtomicReference:支持原子性的对象引用操作。
- AtomicStampedReference:解决 ABA 问题的原子类,通过版本号(戳)跟踪对象引用的变化。
代码
import java.util.concurrent.atomic.AtomicInteger;public class AtomicIntegerExample {public static void main(String[] args) {// 创建一个 AtomicInteger 对象,初始值为0AtomicInteger atomicInteger = new AtomicInteger(0);// 创建多个线程来操作 AtomicIntegerThread t1 = new Thread(() -> {for (int i = 0; i < 1000; i++) {atomicInteger.incrementAndGet(); // 原子递增}});Thread t2 = new Thread(() -> {for (int i = 0; i < 1000; i++) {atomicInteger.incrementAndGet(); // 原子递增}});t1.start();t2.start();// 等待两个线程结束try {t1.join();t2.join();} catch (InterruptedException e) {e.printStackTrace();}// 输出最终的值System.out.println("Final value: " + atomicInteger.get());}
}// 使用 ForkJoinPool
ForkJoinPool pool = new ForkJoinPool();
RecursiveTask<Integer> task = new RecursiveTask<Integer>() {@Overrideprotected Integer compute() {// 分解任务并合并结果return 1 + 2; // 示例任务}
};
Integer result = pool.invoke(task);
System.out.println("ForkJoin result: " + result);
锁和同步器
概念:
相比于传统的 synchronized,Java 并发包提供了更灵活和功能强大的锁和同步器。
关键类:
- ReentrantLock:可重入锁,提供了与 synchronized 类似的锁机制,但具备更灵活的锁获取和释放控制。
- ReentrantReadWriteLock:读写锁,允许多个线程并发读取或单个线程进行写入操作,提升了读多写少场景的性能。
- Lock:提供了锁机制的标准接口,允许显式的锁定和解锁操作。
- Condition:与 Lock 配合使用,替代 Object 的 wait/notify 机制,提供了更细粒度的线程等待/通知功能。
- StampedLock:增强版的读写锁,提供了乐观读锁以进一步提高性能。
代码
// 使用 ReentrantLock
ReentrantLock lock = new ReentrantLock();
lock.lock();
try {// 线程安全的操作System.out.println("Locked section");
} finally {lock.unlock();
}
Fork/Join 框架
概念:
Fork/Join 框架是 Java 7 引入的用于并行处理任务的框架,特别适合将大任务拆分为多个小任务,并在多个处理器核心上并行执行。
关键类:
- ForkJoinPool:执行并行任务的线程池,自动管理任务的拆分和合并。
- RecursiveTask:用于有返回值的任务。
- RecursiveAction:用于无返回值的任务。
代码
// 使用 ForkJoinPool
ForkJoinPool pool = new ForkJoinPool();
RecursiveTask<Integer> task = new RecursiveTask<Integer>() {@Overrideprotected Integer compute() {// 分解任务并合并结果return 1 + 2; // 示例任务}
};
Integer result = pool.invoke(task);
System.out.println("ForkJoin result: " + result);
这篇关于【JAVA并发包】的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!