本文主要是介绍Java多线程父线程向子线程传值问题及解决,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
《Java多线程父线程向子线程传值问题及解决》文章总结了5种解决父子之间数据传递困扰的解决方案,包括ThreadLocal+TaskDecorator、UserUtils、CustomTaskDeco...
1 背景
在实际开发过程中我们需要父子之间传递一些数据,比如用户信息,日志异步生成数据传递等,该文章从5种解决方案解决父子之间数据传递困扰
2 ThreadLocal+TaskDecorator
用户工具类 UserUtils
/** *使用ThreadLocal存储共享的数据变量,如登录的用户信息 */ public class UserUtils { private static final ThreadLocal<String> userLocal=new ThreadLocal<>(); public static String getUserId(){ return userLocal.get(); } public static void setUserId(String userId){ userLocal.set(userId); } public static void clear(){ userLocal.remove(); } }
自定义CustomTaskDecorator
/** * 线程池修饰类 */ public class CustomTaskDecorator implements TaskDecorator { @Override public Runnable decorate(Runnable runandroidnable) { // 获取主线程中的请求信息(我们的用户信息也放在里面) String robotId = UserUtils.getUserId(); System.out.println(robotId); return () -> { try { // 将主线程的请求信息,设置到子线程中 UserUtils.setUserId(robotId); // 执行子线程,这一步不要忘了 runnable.run(); } finally { // 线程结束,清空这些信息,否则可能造成内存泄漏 UserUtils.clear(); } }; } }
ExecutorConfig
在原来的基础上增加 executor.setTaskDecorator(new CustomTaskDecorator());
@Bean(name = "asyncServiceExecutor") public Executor asyncServiceExecutor() { log.info("start asyncServiceExecutor----------------"); //ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); //使用可视化运行状态的线程池 ThreadPoolTaskExecutor executor = new VisiableThreadPoolTaskExecutor(); //配置核心线程数 executor.setCorePoolSize(corePoolSize); //配置最大线程数 executor.setMaxPoolSize(maxPoolSize); //配置队列大小 executor.setQueueCapacity(queueCapacity); //配置线程池中的线程的名称前缀 executor.setThreadNamePrefix(namePrefix); // rejection-policy:当pool已经达到max size的时候,如何处理新任务 // CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行 executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); //增加线程池修饰类 executor.setTaskDecorator(new CustomTaskDecorator()); //增加MDC的线程池修饰类 //executor.setTaskDecorator(new MDCTaskDecorator()); //执行初始化 executor.initialize(); log.info("end asyncServiceExecutor------------"); return executor; }
AsyncServiceImpl
/** * 使用ThreadLocal方式传递 * 带有返回值 * @throws InterruptedException */ @Async("asyncServiceExecutor") public CompletableFuture<String&gChina编程t; executeValueAsync2() throws InterruptedException { log.info("start executeValueAsync"); System.out.println("异步线程执行返回结果......+"); log.info("end executeValueAsync"); return CompletableFuture.completedFuture(UserUtils.getUserId()); }
Test2Controller
/** * 使用ThreadLocal+TaskDecorator的方式 * @return * @throws InterruptedException * @throws ExecutionException */ @GetMapping("/test2") public String test2() throws InterruptedException, ExecutionException { UserUtils.setUserId("123456"); CompletableFuture<String> completableFuture = asyncService.executeValueAsync2(); String s = completableFuture.get(); return s; }
3 RequestContextHolder+TaskDecorator
自定义CustomTaskDecorator
/** * 线程池修饰类 */ public class CustomTaskDecorator implements TaskDecorator { @Override public Runnable decorate(Runnable runnable) { // 获取主线程中的请求信息(我们的用户信息也放在里面) RequestAttributes attributes = RequestContextHolder.getRequestAttributes(); return () -> { try { // 将主线程的请求信息,设置到子线程中 RequestContextHolder.setRequestAttributes(attributes); // 执行子线程,这一步不要忘了 runnable.run(); } finally { // 线程结束,清空这些信息,否则可能造成内存泄漏 RequestContextHolder.resetRequestAttributes(); } }; } }
ExecutorConfig
在原来的基础上增加 executor.setTaskDecorator(new CustomTaskDecorator());
@Bean(name = "asyncServiceExecutor") public Executor asyncServiceExecutor() { log.info("start asyncServiceExecutor----------------"); //ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); //使用可视化运行状态的线程池 ThreadPoolTaskExecutor executor = new VisiableThreadPoolTaskExecutor(); //配置核心线程数 executor.setCorePoolSize(corePoolSize); //配置最大线程数 executor.setMaxPoolSize(maxPoolSize); //配置队列大小 executor.setQueueCapacity(queueCapacity); //配置线程池中的线程的名称前缀 executor.setThreadNamePrefix(namePrefix); // rejection-policy:当pool已经达到max size的时候,如何处理新任务 // CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行 executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); //增加线程池修饰类 executor.setTaskDecorator(new CustomTaskDecorator()); //增加MDC的线程池修饰类 //executor.setTaskDecorator(new MDCTaskDecorator()); //执行初始化 executor.initialize(); log.info("end asyncServiceExecutor------------"); return executor; }
AsyncServiceImpl
/** * 使用RequestAttributes获取主线程传递的数据 * @return * @throws InterruptedException */ @Async("asyncServiceExecutor") public CompletableFuture<String> executeValueAsync3() throws InterruptedException { log.info("start executeValueAsync"); System.out.println("异步线程执行返回结果......+"); RequestAttributes attributes = RequestContextHolder.getRequestAttributes(); Object userId = attributes.getAttribute("userId", 0); log.info("end executeValueAsync"); return CompletableFuture.completedFuture(userId.toString()); }
Test2Controller
/** * RequestContextHolder+TaskDecorator的方式 * @return * @throws InterruptedException * @throws ExecutionException */ @GetMapping("/test3") public String test3() throws InterruptedException, ExecutionException { RequestAttributes attributes = RequestContextHolder.getRequestAttributes(); attributes.setAttribute("userId","123456",0); CompletableFuture<String> completableFuture = asyncService.executeValueAsync3(); String s = completableFuture.get(); return s; }
4 MDC+TaskDecorator
自定义MDCTaskDecorator
/** * 线程池修饰类 */ public class MDCTaskDecorator implements TaskDecorator { @Override public Runnable decorate(Runnable runnablChina编程e) { // 获取主线程中的请求信息(我们的用户信息也放在里面) String userId = MDC.get("userId"); Map<String, String> copyOfContextMap = MDC.getCopyOfContextMap(); System.out.println(copyOfContextMap); return () -> { try { // 将主线程的请求信息,设置到子线程中 MDC.put("userId",userId); // 执行子线程,这一步不要忘了 runnable.run(); } finally { // 线程结束,清空这些信息,否则可能造成内存泄漏 MDC.clear(); } }; } }
ExecutorConfig
在原来的基础上增加 executor.setTaskDecorator(new MDCTaskDecorator());
@Bean(name = "asyncServiceExecutor") public Executor asyncServiceExecutor() { log.info("start asyncServiceExecutor----------------"); //ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); //使用可视化运行状态的线程池 ThreadPoolTaskExecutor executor = new VisiableThreadPoolTaskExecutor(); //配置核心线程数 executor.setCorePoolSize(corePoolSize); //配置最大线程数 executor.setMaxPoolSize(maxPoolSize); //配置队列大小 executor.setQueueCapacity(queueCapacity); //配置线程池中的线程的名称前缀 executor.setThreadNamePrefix(namePrefix); // rejection-policy:当pool已经达到max size的时候,如何处理新任务 // CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行 executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); //增加MDC的线程池修饰类 executor.setTaskDecorator(new MDCTaskDecorator()); //执行初始化 executor.initialize(); log.info("end asyncServiceExecutor------------"); return executor; }
AsyncServiceImpl
/** * 使用MDC获取主线程传递的数据 * @return * @throws InterruptedException */ @Async("asyncServiceExecutor") public CompletableFuture<String> executeValueAsync5() throws InterruptedException { log.info("start executeValueAsync"); System.out.println("异步线程执行返回结果......+"); log.info("end executeValueAsync"); return CompletableFuture.completedFuture(MDC.get("userId")); }
Test2Controller
/** * 使用MDC+TaskDecorator方式 * 本质也是ThreadLocal+TaskDecorator方式 * @return * @throws InterruptedException * @throws ExecutionException */ @GetMapping("/test5") public String test5() throws InterruptedException, ExecutionException { MDC.put("userId","123456"); CompletableFuture<String> completableFuture = asyncService.executeValueAsync5(); String s = completableFuture.get(); return s; }
5 InheritableThreadLocal
测试代码
public class TestThreadLocal { public static ThreadLocal<String> threadLocal = new ThreadLocal<>(); public static void main(String[] args) { //设置线程变量 threadLocal.set("hello world"); Thread thread = new Thread(new Runnable() { @Override public void run( ) { //子线程输出线程变量的值 System.out.androidprintln("thread:"+threadLocal.get()); } }); thread.start(); // 主线程输出线程变量的值 System.out.println("main:"+threadLocal.get()); } }
输出结果:
main:hello world
thread:null
从上面结果可以看出:同一个ThreadLocal变量在父线程中被设置后,在子线程中是获取不到的;
原因在子线程thread里面调用get方法时当前线程为thread线程,而这里调用set方法设置线程变量的是main线程,两者是不同的线程,自然子线程访问时返回null
为了解决上面的问题,InheritableThreadLocal应运而生,InheritableThreadLocal继承ThreadLocal,其提供一个特性,就是让子线程可以访问在父线程中设置的本地变量
将上面测试代码用InheritableThreadLocal修改
public class TestInheritableThreadLocal { public static InheritableThreadLocal<String> threadLocal = new InheritableThreadLocal<>(); public static void main(String[] args) { //设置线程变量 threadLocal.set("hello world"); Thread thread = new Thread(new Runnable() { @Override public void run( ) { //子线程输出线程变量的值 System.out.println("thread:"+threadLocal.get()); } }); thread.start(); // 主线程输出线程变量的值 System.out.println("main:"+threadLocal.get()); } }
输出结果:
main:hello world
thread:hello world
5.1 源码分析
public class InheritableThreadLocal<T> extends ThreadLocal<T> { protected T childValue(T parentValue) { return parentValue; } ThreadLocalMap getMap(Thread t) { return t.inheritableThreadLocals; } void createMap(Thread t, T firstValue) { t.inheritableThreadLocals = new ThreadLocalMap(this, firstValue); } }
InheritableThreadLocal 重写了childValue,getMap,createMap三个方法
在InheritableThreadLocal中,变量inheritableThreadLocals 替代了threadLocals;
那么如何让子线程可以访问父线程的本地变量。这要从创建Thread的代码说起,打开Thread类的默认构造方法,代码如下:
public Thread(Runnable target) { init(null, target, "Thread-" + nextThreadNum(), 0); } private void init(ThreadGroup g, Runnable target, String name, long stackSize, AccessControlContext acc, boolean inheritThreadLocals) { if (name == null) { throw new NullPointerException("name cannot be null"); } this.name = name; //获取当前线程 Thread parent = currentThread(); //如果父线程的 inheritableThreadLocals变量不为null if (inheritThreadLocals && parent.inheritableThreadLocals != null) //设置子线程inheritThreadLocals变量 this.inheritableThreadLocals = ThreadLocal.createInheritedMap(parent.inheritableThreadLocals); /* Stash the specified stack size in case the VM cares */ this.stackSize = stackSize; /* Set thread ID */ tid = nextThreadID(); }
我们看下createInheritedMap代码:
this.inheritableThreadLocals = ThreadLocal.createInheritedMap(parent.inheritableThreadLocals);
在createInheritedMap内部使用父线程的inheritableThreadLocals变量作为构造方法创建了一个新的ThreadLocalMap变量,然后赋值给子线程的inheritableThreadLocals变量。
下面看看ThreadLocalMap的构造函数内部做了什么事情;
private ThreadLocalMap(ThreadLocalMap parentMap) { Entry[] parentTable = parentMap.table; int len = parentTable.length; setThreshold(len); table = new Entry[len]; for (int j = 0; j < len; j++) { Entry e = parentTable[j]; if (e != null) { @SuppressWarnings("unchecked") ThreadLocal<Object> key = (ThreadLocal<Object>) e.get(); if (key != null) { Object value = key.childValue(e.value); Entry c = new Entry(key, value); int h = key.threadLocalHashCode & (len - 1); while (table[h] != null) h = nextIndex(h, len); table[h] = c; size++; } } } }
InheritableThreadLocal 类通过重写下面代码
ThreadLocalMap getMap(Thread t) { return t.inheritableThreadLocals; } /** * Create the map associated with a ThreadLocal. * * @param t the current thread * @param firstValue value for the initial entry of the table. */ void createMap(Thread t, T firstValue) { t.inheritableThreadLocals = new ThreadLocalMap(this, firstValue); }
让本地变量保存到了具体的线程的inheritableThreadLocals变量里面,那么线程在通过InheritableThreadLocal类实例的set或者get方法设置变量时,就会创建当前线程的inheritableThreadLocals变量。
当父线程创建子线程时,构造方法会把父线程中的inheritableThreadLocals变量里面的本地变量赋值一份保存到子线程的inheritableThreadLocals变量里面
5.2 InheritableThreadLocal存在的问题
虽然InheritableThreadLocal可以解决在子线程中获取父线程的值的问题,但是在使用线程池的情况下,由于不同的任务有可能是同一个线程处理,因此这些任务取到的值有可能并不是父线程设置的值
测试目标:任务1和任务2 获取父线程值一样,为测试代码中的hello world
测试代码:
public class TestInheritableThreadLocaIssue {
public static InheritableThreadLocal<String> threadLocal = new Inheri编程China编程tableThreadLocal<>();
public static ExecutorService executorService = Executors.newSingleThreadExecutor();
public static void main(String[] args) throws Exception {
//设置线程变量
threadLocal.set("hello world");
Thread thread1 = new Thread(new Runnable() {
@Override
public void run( ) {
//子线程输出线程变量的值
System.out.println("thread:"+threadLocal.get());
threadLocal.set("hello world 2");
}
},"task1");
Thread thread2 = new Thread(new Runnable() {
@Override
public void run( ) {
//子线程输出线程变量的值
System.out.println("thread:"+threadLocal.get());
threadLocal.set("hello world 2");
}
},"task2");
executorService.submit(thread1).get();
executorService.submit(thread2).get();
// 主线程输出线程变量的值
System.out.println("main:"+threadLocal.get());
}
}
输出结果:
thread:hello world
thread:hello world 2
main:hello world
结果分析:
很明显,任务2获取的不是父线程设置的hello world ,而是线程1修改后的值。如果在线程池中使用,需要注意这种情况(可以备份备份父线程的值)
6 TransmittableThreadLocal
解决线程池化值传递
阿里封装了一个工具,实现了在使用线程池等会池化复用线程的组件情况下,提供ThreadLocal值的传递功能,解决异步执行时上下文传递的问题
JDK的InheritableThreadLocal类可以完成父线程到子线程的值传递。但对于使用线程池等会池化复用线程的执行组件的情况,线程由线程池创建好,并且线程是池化起来反复使用的;
这时父子线程关系的ThreadLocal值传递已经没有意义,应用需要的实际上是把 任务提交给线程池时的ThreadLocal值传递到 任务执行时
https://github.com/alibaba/transmittable-thread-local
引入:
<dependency> <groupId>com.alibaba</groupId> <artifactId>transmittable-thread-local</artifactId> <version>2.11.5</version> </dependency>
需求场景:
- 1.分布式跟踪系统 或 全链路压测(即链路打标)
- 2.日志收集记录系统上下文
- 3.Session级Cache
- 4.应用容器或上层框架跨应用代码给下层SDK传递信息
测试代码:
1)父子线程信息传递
public static TransmittableThreadLocal<String> threadLocal = new TransmittableThreadLocal<>(); public static void main(String[] args) { //设置线程变量 threadLocal.set("hello world"); Thread thread = new Thread(new Runnable() { @Override public void run( ) { //子线程输出线程变量的值 System.out.println("thread:"+threadLocal.get()); } }); thread.start(); // 主线程输出线程变量的值 System.out.println("main:"+threadLocal.get()); } }
输出结果:
main:hello world
thread:hello world
2)线程池中传递值,参考github:修饰线程池
总结
以上为个人经验,希望能给大家一个参考,也希望大家多多支持China编程(www.chinasem.cn)。
这篇关于Java多线程父线程向子线程传值问题及解决的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!