java.util.concurrent包 (备忘)

2024-05-16 03:18
文章标签 java util 备忘 concurrent

本文主要是介绍java.util.concurrent包 (备忘),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

newFixedThreadPool

创建一个固定大小的线程池。

shutdown():用于关闭启动线程,如果不调用该语句,jvm不会关闭。

awaitTermination():用于等待子线程结束,再继续执行下面的代码。该例中我设置一直等着子线程结束。

Java代码   收藏代码
  1. public class Test {  
  2.   
  3.     public static void main(String[] args) throws IOException, InterruptedException {  
  4.         ExecutorService service = Executors.newFixedThreadPool(2);  
  5.         for (int i = 0; i < 4; i++) {  
  6.             Runnable run = new Runnable() {  
  7.                 @Override  
  8.                 public void run() {  
  9.                     System.out.println("thread start");  
  10.                 }  
  11.             };  
  12.             service.execute(run);  
  13.         }  
  14.         service.shutdown();  
  15.         service.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);  
  16.         System.out.println("all thread complete");  
  17.     }  
  18. }  

 

输出:
thread start
thread start
thread start
thread start
all thread complete

newScheduledThreadPool

这个先不说,我喜欢用spring quartz.

CyclicBarrier

假设有只有的一个场景:每个线程代表一个跑步运动员,当运动员都准备好后,才一起出发,只要有一个人没有准备好,大家都等待.

 

Java代码   收藏代码
  1. import java.io.IOException;  
  2. import java.util.Random;  
  3. import java.util.concurrent.BrokenBarrierException;  
  4. import java.util.concurrent.CyclicBarrier;  
  5. import java.util.concurrent.ExecutorService;  
  6. import java.util.concurrent.Executors;  
  7.   
  8. class Runner implements Runnable {  
  9.   
  10.     private CyclicBarrier barrier;  
  11.   
  12.     private String name;  
  13.   
  14.     public Runner(CyclicBarrier barrier, String name) {  
  15.         super();  
  16.         this.barrier = barrier;  
  17.         this.name = name;  
  18.     }  
  19.   
  20.     @Override  
  21.     public void run() {  
  22.         try {  
  23.             Thread.sleep(1000 * (new Random()).nextInt(8));  
  24.             System.out.println(name + " 准备OK.");  
  25.             barrier.await();  
  26.         } catch (InterruptedException e) {  
  27.             e.printStackTrace();  
  28.         } catch (BrokenBarrierException e) {  
  29.             e.printStackTrace();  
  30.         }  
  31.         System.out.println(name + " Go!!");  
  32.     }  
  33. }  
  34.   
  35. public class Race {  
  36.   
  37.     public static void main(String[] args) throws IOException, InterruptedException {  
  38.         CyclicBarrier barrier = new CyclicBarrier(3);  
  39.   
  40.         ExecutorService executor = Executors.newFixedThreadPool(3);  
  41.         executor.submit(new Thread(new Runner(barrier, "zhangsan")));  
  42.         executor.submit(new Thread(new Runner(barrier, "lisi")));  
  43.         executor.submit(new Thread(new Runner(barrier, "wangwu")));  
  44.   
  45.         executor.shutdown();  
  46.     }  
  47.   
  48. }  

 

输出:
wangwu 准备OK.
zhangsan 准备OK.
lisi 准备OK.
lisi Go!!
zhangsan Go!!
wangwu Go!!

ThreadPoolExecutor

 

newFixedThreadPool生成一个固定的线程池,顾名思义,线程池的线程是不会释放的,即使它是Idle。这就会产生性能问题,比如如 果线程池的大小为200,当全部使用完毕后,所有的线程会继续留在池中,相应的内存和线程切换(while(true)+sleep循环)都会增加。如果 要避免这个问题,就必须直接使用ThreadPoolExecutor()来构造。可以像Tomcat的线程池一样设置“最大线程数”、“最小线程数”和 “空闲线程keepAlive的时间”。

 

ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler)  


corePoolSize:池中所保存的线程数,包括空闲线程(非最大同时干活的线程数)。如果池中线程数多于 corePoolSize,则这些多出的线程在空闲时间超过 keepAliveTime 时将会终止。

maximumPoolSize:线程池中最大线程数

keepAliveTime:线程空闲回收的时间

unit:keepAliveTime的单位

workQueue:保存任务的队列,可以如下选择:

 

  •   无界队列: new LinkedBlockingQueue<Runnable>();
  •   有界队列: new ArrayBlockingQueue<Runnable>(8);你不想让客户端无限的请求吃光你的CPU和内存吧,那就用有界队列

handler:当提交任务数大于队列size会抛出RejectedExecutionException,可选的值为:

 

  • ThreadPoolExecutor.CallerRunsPolicy 等待队列空闲
  • ThreadPoolExecutor.DiscardPolicy:丢弃要插入队列的任务
  • ThreadPoolExecutor.DiscardOldestPolicy:删除队头的任务

关于corePoolSize和maximumPoolSize:

 

Java官方Docs写道:
当新任务在方法 execute(java.lang.Runnable) 中提交时,如果运行的线程少于 corePoolSize,则创建新线程来处理请求(即使存在空闲线程)。如果运行的线程多于 corePoolSize 而少于 maximumPoolSize,则仅当队列(queue)满时才创建新线程。如果设置的 corePoolSize 和 maximumPoolSize 相同,则创建了固定大小的线程池。如果将 maximumPoolSize 设置为基本的无界值(如 Integer.MAX_VALUE),则允许池适应任意数量的并发任务。

 

Java代码   收藏代码
  1. public class Test {  
  2.   
  3.     public static void main(String[] args) {  
  4.         BlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>();  
  5.         ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 6, 1, TimeUnit.DAYS, queue);  
  6.   
  7.         for (int i = 0; i < 20; i++) {  
  8.             final int index = i;  
  9.             executor.execute(new Runnable() {  
  10.                 public void run() {  
  11.                     try {  
  12.                         Thread.sleep(4000);  
  13.                     } catch (InterruptedException e) {  
  14.                         e.printStackTrace();  
  15.                     }  
  16.                     System.out.println(String.format("thread %d finished", index));  
  17.                 }  
  18.             });  
  19.         }  
  20.         executor.shutdown();  
  21.     }  
  22. }  

 

原子变量(Atomic )

并发库中的BlockingQueue是一个比较好玩的类,顾名思义,就是阻塞队列。该类主要提供了两个方法put()和take(),前者将一个 对象放到队列中,如果队列已经满了,就等待直到有空闲节点;后者从head取一个对象,如果没有对象,就等待直到有可取的对象。

 

下面的例子比较简单,一个读线程,用于将要处理的文件对象添加到阻塞队列中,另外四个写线程用于取出文件对象,为了模拟写操作耗时长的特点,特让线 程睡眠一段随机长度的时间。另外,该Demo也使用到了线程池和原子整型(AtomicInteger),AtomicInteger可以在并发情况下达 到原子化更新,避免使用了synchronized,而且性能非常高。由于阻塞队列的put和take操作会阻塞,为了使线程退出,在队列中添加了一个 “标识”,算法中也叫“哨兵”,当发现这个哨兵后,写线程就退出。

 

Java代码   收藏代码
  1. import java.io.File;  
  2. import java.io.FileFilter;  
  3. import java.util.concurrent.BlockingQueue;  
  4. import java.util.concurrent.ExecutorService;  
  5. import java.util.concurrent.Executors;  
  6. import java.util.concurrent.LinkedBlockingQueue;  
  7. import java.util.concurrent.atomic.AtomicInteger;  
  8.   
  9. public class Test {  
  10.   
  11.     static long randomTime() {  
  12.         return (long) (Math.random() * 1000);  
  13.     }  
  14.   
  15.     public static void main(String[] args) {  
  16.         // 能容纳100个文件  
  17.         final BlockingQueue<File> queue = new LinkedBlockingQueue<File>(100);  
  18.         // 线程池  
  19.         final ExecutorService exec = Executors.newFixedThreadPool(5);  
  20.         final File root = new File("D:\\dist\\blank");  
  21.         // 完成标志  
  22.         final File exitFile = new File("");  
  23.         // 读个数  
  24.         final AtomicInteger rc = new AtomicInteger();  
  25.         // 写个数  
  26.         final AtomicInteger wc = new AtomicInteger();  
  27.         // 读线程  
  28.         Runnable read = new Runnable() {  
  29.             public void run() {  
  30.                 scanFile(root);  
  31.                 scanFile(exitFile);  
  32.             }  
  33.   
  34.             public void scanFile(File file) {  
  35.                 if (file.isDirectory()) {  
  36.                     File[] files = file.listFiles(new FileFilter() {  
  37.                         public boolean accept(File pathname) {  
  38.                             return pathname.isDirectory() || pathname.getPath().endsWith(".log");  
  39.                         }  
  40.                     });  
  41.                     for (File one : files)  
  42.                         scanFile(one);  
  43.                 } else {  
  44.                     try {  
  45.                         int index = rc.incrementAndGet();  
  46.                         System.out.println("Read0: " + index + " " + file.getPath());  
  47.                         queue.put(file);  
  48.                     } catch (InterruptedException e) {  
  49.                     }  
  50.                 }  
  51.             }  
  52.         };  
  53.         exec.submit(read);  
  54.         // 四个写线程  
  55.         for (int index = 0; index < 4; index++) {  
  56.             // write thread  
  57.             final int num = index;  
  58.             Runnable write = new Runnable() {  
  59.                 String threadName = "Write" + num;  
  60.   
  61.                 public void run() {  
  62.                     while (true) {  
  63.                         try {  
  64.                             Thread.sleep(randomTime());  
  65.                             int index = wc.incrementAndGet();  
  66.                             File file = queue.take();  
  67.                             // 队列已经无对象  
  68.                             if (file == exitFile) {  
  69.                                 // 再次添加"标志",以让其他线程正常退出  
  70.                                 queue.put(exitFile);  
  71.                                 break;  
  72.                             }  
  73.                             System.out.println(threadName + ": " + index + " " + file.getPath());  
  74.                         } catch (InterruptedException e) {  
  75.                         }  
  76.                     }  
  77.                 }  
  78.   
  79.             };  
  80.             exec.submit(write);  
  81.         }  
  82.         exec.shutdown();  
  83.     }  
  84.   
  85. }  
 

CountDownLatch

 

从名字可以看出,CountDownLatch是一个倒数计数的锁,当倒数到0时触发事件,也就是开锁,其他人就可以进入了。在一些应用场合中,需要等待某个条件达到要求后才能做后面的事情;同时当线程都完成后也会触发事件,以便进行后面的操作。 

CountDownLatch最重要的方法是countDown()和await(),前者主要是倒数一次,后者是等待倒数到0,如果没有到达0,就只有阻塞等待了。

一个CountDouwnLatch实例是不能重复使用的,也就是说它是一次性的,锁一经被打开就不能再关闭使用了,如果想重复使用,请考虑使用CyclicBarrier。

下面的例子简单的说明了CountDownLatch的使用方法,模拟了100米赛跑,10名选手已经准备就绪,只等裁判一声令下。当所有人都到达终点时,比赛结束。

 

Java代码   收藏代码
  1. import java.util.concurrent.CountDownLatch;  
  2. import java.util.concurrent.ExecutorService;  
  3. import java.util.concurrent.Executors;  
  4.   
  5. public class Test {  
  6.   
  7.     public static void main(String[] args) throws InterruptedException {  
  8.   
  9.         // 开始的倒数锁  
  10.         final CountDownLatch begin = new CountDownLatch(1);  
  11.   
  12.         // 结束的倒数锁  
  13.         final CountDownLatch end = new CountDownLatch(10);  
  14.   
  15.         // 十名选手  
  16.         final ExecutorService exec = Executors.newFixedThreadPool(10);  
  17.   
  18.         for (int index = 0; index < 10; index++) {  
  19.             final int NO = index + 1;  
  20.             Runnable run = new Runnable() {  
  21.                 public void run() {  
  22.                     try {  
  23.                         begin.await();  
  24.                         Thread.sleep((long) (Math.random() * 10000));  
  25.                         System.out.println("No." + NO + " arrived");  
  26.                     } catch (InterruptedException e) {  
  27.                     } finally {  
  28.                         end.countDown();  
  29.                     }  
  30.                 }  
  31.             };  
  32.             exec.submit(run);  
  33.         }  
  34.         System.out.println("Game Start");  
  35.         begin.countDown();  
  36.         end.await();  
  37.         System.out.println("Game Over");  
  38.         exec.shutdown();  
  39.     }  
  40.   
  41. }  

使用Callable和Future实现线程等待和多线程返回值

假设在main线程启动一个线程,然后main线程需要等待子线程结束后,再继续下面的操作,我们会通过join方法阻塞main线程,代码如下:
Java代码   收藏代码
  1. Runnable runnable = ...;  
  2. Thread t = new Thread(runnable);  
  3. t.start();  
  4. t.join();  
  5. ......  
通过JDK1.5线程池管理的线程可以使用Callable和Future实现(join()方法无法应用到在线程池线程)
Java代码   收藏代码
  1. import java.util.concurrent.Callable;  
  2. import java.util.concurrent.ExecutionException;  
  3. import java.util.concurrent.ExecutorService;  
  4. import java.util.concurrent.Executors;  
  5. import java.util.concurrent.Future;  
  6.   
  7. public class Test {  
  8.   
  9.     public static void main(String[] args) throws InterruptedException, ExecutionException {  
  10.         System.out.println("start main thread");  
  11.         final ExecutorService exec = Executors.newFixedThreadPool(5);  
  12.           
  13.         Callable<String> call = new Callable<String>() {  
  14.             public String call() throws Exception {  
  15.                 System.out.println("  start new thread.");  
  16.                 Thread.sleep(1000 * 5);  
  17.                 System.out.println("  end new thread.");  
  18.                 return "some value.";  
  19.             }  
  20.         };  
  21.         Future<String> task = exec.submit(call);  
  22.         Thread.sleep(1000 * 2);  
  23.         task.get(); // 阻塞,并待子线程结束,  
  24.         exec.shutdown();  
  25.         exec.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);  
  26.         System.out.println("end main thread");  
  27.     }  
  28.   
  29. }  
 
Java代码   收藏代码
  1. import java.util.ArrayList;  
  2. import java.util.List;  
  3. import java.util.concurrent.Callable;  
  4. import java.util.concurrent.ExecutionException;  
  5. import java.util.concurrent.ExecutorService;  
  6. import java.util.concurrent.Executors;  
  7. import java.util.concurrent.Future;  
  8.   
  9. /** 
  10. * 多线程返回值测试 
  11. */  
  12. public class ThreadTest {  
  13.   
  14.     public static void main(String[] args) throws InterruptedException, ExecutionException {  
  15.         System.out.println("start main thread");  
  16.         int threadCount = 5;  
  17.         final ExecutorService exec = Executors.newFixedThreadPool(threadCount);  
  18.   
  19.         List<Future<Integer>> tasks = new ArrayList<Future<Integer>>();  
  20.         for (int i = 0; i < threadCount; i++) {  
  21.             Callable<Integer> call = new Callable<Integer>() {  
  22.                 public Integer call() throws Exception {  
  23.                     Thread.sleep(1000);  
  24.                     return 1;  
  25.                 }  
  26.             };  
  27.             tasks.add(exec.submit(call));  
  28.         }  
  29.         long total = 0;  
  30.         for (Future<Integer> future : tasks) {  
  31.             total += future.get();  
  32.         }  
  33.         exec.shutdown();  
  34.         System.out.println("total: " + total);  
  35.         System.out.println("end main thread");  
  36.     }  
  37. }  
 

CompletionService

这个东西的使用上很类似上面的example,不同的是,它会首先取完成任务的线程。下面的参考文章里,专门提到这个,大家有兴趣可以看下,例子:

 

 

Java代码   收藏代码
  1. import java.util.concurrent.Callable;  
  2. import java.util.concurrent.CompletionService;  
  3. import java.util.concurrent.ExecutionException;  
  4. import java.util.concurrent.ExecutorCompletionService;  
  5. import java.util.concurrent.ExecutorService;  
  6. import java.util.concurrent.Executors;  
  7. import java.util.concurrent.Future;  
  8.   
  9. public class Test {  
  10.     public static void main(String[] args) throws InterruptedException,  
  11.     ExecutionException {  
  12.         ExecutorService exec = Executors.newFixedThreadPool(10);  
  13.         CompletionService<String> serv =  
  14.         new ExecutorCompletionService<String>(exec);  
  15.         for (int index = 0; index < 5; index++) {  
  16.             final int NO = index;  
  17.             Callable<String> downImg = new Callable<String>() {  
  18.                 public String call() throws Exception {  
  19.                     Thread.sleep((long) (Math.random() * 10000));  
  20.                     return "Downloaded Image " + NO;  
  21.                 }  
  22.             };  
  23.             serv.submit(downImg);  
  24.         }  
  25.         Thread.sleep(1000 * 2);  
  26.         System.out.println("Show web content");  
  27.         for (int index = 0; index < 5; index++) {  
  28.             Future<String> task = serv.take();  
  29.             String img = task.get();  
  30.             System.out.println(img);  
  31.         }  
  32.         System.out.println("End");  
  33.         // 关闭线程池  
  34.         exec.shutdown();  
  35.     }  
  36. }  

 

 

Semaphore信号量

 

拿到信号量的线程可以进入代码,否则就等待。通过acquire()和release()获取和释放访问许可。下面的例子只允许5个线程同时进入执行acquire()和release()之间的代码

 

 

Java代码   收藏代码
  1. import java.util.concurrent.ExecutorService;  
  2. import java.util.concurrent.Executors;  
  3. import java.util.concurrent.Semaphore;  
  4.   
  5. public class Test {  
  6.   
  7.     public static void main(String[] args) {  
  8.         // 线程池  
  9.         ExecutorService exec = Executors.newCachedThreadPool();  
  10.         // 只能5个线程同时访问  
  11.         final Semaphore semp = new Semaphore(5);  
  12.         // 模拟20个客户端访问  
  13.         for (int index = 0; index < 20; index++) {  
  14.             final int NO = index;  
  15.             Runnable run = new Runnable() {  
  16.                 public void run() {  
  17.                     try {  
  18.                         // 获取许可  
  19.                         semp.acquire();  
  20.                         System.out.println("Accessing: " + NO);  
  21.                         Thread.sleep((long) (Math.random() * 10000));  
  22.                         // 访问完后,释放  
  23.                         semp.release();  
  24.                     } catch (InterruptedException e) {  
  25.                     }  
  26.                 }  
  27.             };  
  28.             exec.execute(run);  
  29.         }  
  30.         // 退出线程池  
  31.         exec.shutdown();  
  32.     }  
  33.   
  34. }  


参考:

jdk1.5中的线程池使用简介

http://www.java3z.com/cwbwebhome/article/article2/2875.html

CAS原理

http://www.blogjava.net/syniii/archive/2010/11/18/338387.html?opt=admin

jdk1.5中java.util.concurrent包编写多线程

http://hi.baidu.com/luotoo/blog/item/b895c3c2d650591e0ef47731.html

ExecutorSerive vs CompletionService

http://www.coderanch.com/t/491704/threads/java/ExecutorSerive-vs-CompletionService

 

这篇关于java.util.concurrent包 (备忘)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/993720

相关文章

JVM 的类初始化机制

前言 当你在 Java 程序中new对象时,有没有考虑过 JVM 是如何把静态的字节码(byte code)转化为运行时对象的呢,这个问题看似简单,但清楚的同学相信也不会太多,这篇文章首先介绍 JVM 类初始化的机制,然后给出几个易出错的实例来分析,帮助大家更好理解这个知识点。 JVM 将字节码转化为运行时对象分为三个阶段,分别是:loading 、Linking、initialization

Spring Security 基于表达式的权限控制

前言 spring security 3.0已经可以使用spring el表达式来控制授权,允许在表达式中使用复杂的布尔逻辑来控制访问的权限。 常见的表达式 Spring Security可用表达式对象的基类是SecurityExpressionRoot。 表达式描述hasRole([role])用户拥有制定的角色时返回true (Spring security默认会带有ROLE_前缀),去

浅析Spring Security认证过程

类图 为了方便理解Spring Security认证流程,特意画了如下的类图,包含相关的核心认证类 概述 核心验证器 AuthenticationManager 该对象提供了认证方法的入口,接收一个Authentiaton对象作为参数; public interface AuthenticationManager {Authentication authenticate(Authenti

Spring Security--Architecture Overview

1 核心组件 这一节主要介绍一些在Spring Security中常见且核心的Java类,它们之间的依赖,构建起了整个框架。想要理解整个架构,最起码得对这些类眼熟。 1.1 SecurityContextHolder SecurityContextHolder用于存储安全上下文(security context)的信息。当前操作的用户是谁,该用户是否已经被认证,他拥有哪些角色权限…这些都被保

Spring Security基于数据库验证流程详解

Spring Security 校验流程图 相关解释说明(认真看哦) AbstractAuthenticationProcessingFilter 抽象类 /*** 调用 #requiresAuthentication(HttpServletRequest, HttpServletResponse) 决定是否需要进行验证操作。* 如果需要验证,则会调用 #attemptAuthentica

Spring Security 从入门到进阶系列教程

Spring Security 入门系列 《保护 Web 应用的安全》 《Spring-Security-入门(一):登录与退出》 《Spring-Security-入门(二):基于数据库验证》 《Spring-Security-入门(三):密码加密》 《Spring-Security-入门(四):自定义-Filter》 《Spring-Security-入门(五):在 Sprin

Java架构师知识体认识

源码分析 常用设计模式 Proxy代理模式Factory工厂模式Singleton单例模式Delegate委派模式Strategy策略模式Prototype原型模式Template模板模式 Spring5 beans 接口实例化代理Bean操作 Context Ioc容器设计原理及高级特性Aop设计原理Factorybean与Beanfactory Transaction 声明式事物

Java进阶13讲__第12讲_1/2

多线程、线程池 1.  线程概念 1.1  什么是线程 1.2  线程的好处 2.   创建线程的三种方式 注意事项 2.1  继承Thread类 2.1.1 认识  2.1.2  编码实现  package cn.hdc.oop10.Thread;import org.slf4j.Logger;import org.slf4j.LoggerFactory

JAVA智听未来一站式有声阅读平台听书系统小程序源码

智听未来,一站式有声阅读平台听书系统 🌟&nbsp;开篇:遇见未来,从“智听”开始 在这个快节奏的时代,你是否渴望在忙碌的间隙,找到一片属于自己的宁静角落?是否梦想着能随时随地,沉浸在知识的海洋,或是故事的奇幻世界里?今天,就让我带你一起探索“智听未来”——这一站式有声阅读平台听书系统,它正悄悄改变着我们的阅读方式,让未来触手可及! 📚&nbsp;第一站:海量资源,应有尽有 走进“智听

在cscode中通过maven创建java项目

在cscode中创建java项目 可以通过博客完成maven的导入 建立maven项目 使用快捷键 Ctrl + Shift + P 建立一个 Maven 项目 1 Ctrl + Shift + P 打开输入框2 输入 "> java create"3 选择 maven4 选择 No Archetype5 输入 域名6 输入项目名称7 建立一个文件目录存放项目,文件名一般为项目名8 确定