本文主要是介绍异步线程traceId如何实现传递,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
《异步线程traceId如何实现传递》文章介绍了如何在异步请求中传递traceId,通过重写ThreadPoolTaskExecutor的方法和实现TaskDecorator接口来增强线程池,确保异步...
前言
在日常问题排查中,我们经常在ELK中根据traceId来查询请求的日志链路,在同步请求中,根据traceId一站到底,很爽,那如果是异步请求该如何处理呢?
项目中的异步请求都是结合线程池来开启异步线程,下面结合slf4j中的MDC和线程池来实现异步线程的traceId传递。
重写ThreadPoolTaskExecutor中方法
下面的工具类,分别在Callable和Runnable异步任务执行前通过MDC.setContextMap(context)设置请求映射上下文
import org.slf4j.MDC; import org.springframework.util.CollectionUtils; import Java.util.Map; import java.util.concurrent.Callable; /** * @desc: 定义MDC工具类,支持Runnable和Callable两种,目的就是为了把父线程的traceId设置给子线程 */ public class MdcUtil { public static <T> Callable<T> wrap(final Callable<T> callable, final Map<String, String> context) { return () -> { if (CollectionUtils.isEmpty(context)) { MDC.clear(); } else { MDC.setContextMap(context); } try { return callable.call(); } finally { // 清除子线程的,避免内存溢出,就和ThreadLocal.remove()一个原因 MDC.clear(); } }; } public static Runnable wrap(final Runnable runnable, final Map<String, String> context) { return () -> { if (CollectionUtils.isEmpty(context)) { MDC.clear(); } else { MDC.setContextMap(context); } try { runnable.run(); } finally { MDC.clear(); } }; } }
下面定义一个ThreadPoolMdcExecutor 类来继承ThreadPoolTaskExecutor 类,重写execute和submit方法
import java.util.concurrent.Callable; import java.util.concurrent.Future; /** * @desc: 把当前的traceId透传到子线程特意加的实现。 * 重点就是 MDC.getCopyOfContextMap(),此方法获取当js前线程(父线程)的traceId */ public class ThreadPoolMdcExecutor extends ThreadPoolTaskExecutor { @Override publpythonic void execute(Runnable task) { super.execute(MdcUtil.wrap(task, MDC.getCopyOfContextMap())); } @Override public Future<?> submit(Runnable task) { return super.submit(MdcUtil.wrap(task, MDC.getCopyOfContextMap())); } @Override public <T> Future<T> submit(Callable<T> task) { return super.submit(MdcUtil.wrap(task, MDC.getCopyOfContextMap())); } }
下面定义线程池,就可以使用ThreadPoolMdcExecutor
@Bean(name = "callBackExecutorConfig") public Executor callBackExecutorConfig() { ThreadPoolTaskExecutor executor = new ThreadPoolMdcExecutor(); // 配置核心线程数 executor.setCorePoolSize(10); // 配置最大线程数 executor.setMaxPoolSize(20); // 配置队列大小 executor.setQueueCapacity(200); // 配置线程池中的线程的名称前缀 executor.setThreadNamePrefix("async-Thread-"); // rejection-policy:当pool已经达到max size的时候,如何处理新任务 // abort:在调用executor执行的方法中抛出异常 RejectedExecutionException executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy()); // 执行初始化 executor.initialize(); return executor; }
定义好线程池之后,我们就可以使用callBackExecutorConfig线程池进行异步任务,避免异步线程中的traceId丢失。
线程池增强
上面是通过继承ThreadPoolTaskExecutor来,重写execute和submit方法,设置MDC.setContextMap(context)设置上下文,我们也可以通过实现TaskDecorator 接口来增强线程池
public class ContextTransferTaskDecorator implements TaskDecorator { @Override public Runnable decorate(Runnable runnable) { Map<String, String> context = MDC.getCopyOfContextMap(); RequestAttributes requestAttributes = RequestContextHolder.currentRequestAttributes(); return () -> { try { MDC.setContextMap(context); RequestContextHolder.setRequestAttributes(requestAttributes); runnable.run(); } finally { MDC.clear(); RequestContextHolder.resetRequestAttributes(); } }; } }
接下来,定义线程池,对线程池进行增强
@Bean(name = "callBackExecutorConfig") public Executor callBackExecutorConfig() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor (); // 配置核心线程数 executor.setCorePoolSize(10); // 配置最大线程数 executor.setMaxPoolSize(20); // 配置队列大小 executor.setQueueCapacity(200); // 配置线程池中的线程的名称前缀 executor.setThreadNamePpythonrefix("async-Thread-"); // rejection-policy:当pool已经达到max size的时候,如何处理新任务 // abort:在调用executor执行的方法中抛出异常 RejectedExecutionException executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy()); //线程池增强 threadPoolTaskExecutor.setTaskDecorator(new ContextTransferTaskDecorator()); // 执行初始化 executor.initialize(); return executphpor; python}
总结
上面两种方式其实本质都是通过Mdc来进行异步线程间的traceId同步,可以看下Mdc的源码,最终还是通过InheritableThreadLocal来实现子线程获取父线程信息
public class BasicMDCAdapter implements MDCAdapter { private InheritableThreadLocal<Map<String, String>> inheritableThreadLocal = new InheritableThreadLocal<Map<String, String>>() { protected Map<String, String> childValue(Map<String, String> parentValue) { return parentValue == null ? null : new HashMap(parentValue); } }; //省略若干 ...... }
这篇关于异步线程traceId如何实现传递的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!