异步线程traceId如何实现传递

2025-02-08 04:50

本文主要是介绍异步线程traceId如何实现传递,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

《异步线程traceId如何实现传递》文章介绍了如何在异步请求中传递traceId,通过重写ThreadPoolTaskExecutor的方法和实现TaskDecorator接口来增强线程池,确保异步...

前言

在日常问题排查中,我们经常在ELK中根据traceId来查询请求的日志链路,在同步请求中,根据traceId一站到底,很爽,那如果是异步请求该如何处理呢?

项目中的异步请求都是结合线程池来开启异步线程,下面结合slf4j中的MDC和线程池来实现异步线程的traceId传递。

重写ThreadPoolTaskExecutor中方法

下面的工具类,分别在Callable和Runnable异步任务执行前通过MDC.setContextMap(context)设置请求映射上下文

import org.slf4j.MDC;
import org.springframework.util.CollectionUtils;

import Java.util.Map;
import java.util.concurrent.Callable;

/**
 * @desc: 定义MDC工具类,支持Runnable和Callable两种,目的就是为了把父线程的traceId设置给子线程
 */
public class MdcUtil {

    public static <T> Callable<T> wrap(final Callable<T> callable, final Map<String, String> context) {
        return () -> {
            if (CollectionUtils.isEmpty(context)) {
                MDC.clear();
            } else {
                MDC.setContextMap(context);
            }
            try {
                return callable.call();
            } finally {
                // 清除子线程的,避免内存溢出,就和ThreadLocal.remove()一个原因
                MDC.clear();
            }
        };
    }

    public static Runnable wrap(final Runnable runnable, final Map<String, String> context) {
        return () -> {
            if (CollectionUtils.isEmpty(context)) {
                MDC.clear();
            } else {
                MDC.setContextMap(context);
            }
            try {
                runnable.run();
            } finally {
                MDC.clear();
            }
        };
    }
}

下面定义一个ThreadPoolMdcExecutor 类来继承ThreadPoolTaskExecutor 类,重写execute和submit方法

import java.util.concurrent.Callable;
import java.util.concurrent.Future;

/**
 * @desc: 把当前的traceId透传到子线程特意加的实现。
 *   重点就是 MDC.getCopyOfContextMap(),此方法获取当js前线程(父线程)的traceId
 */
public class ThreadPoolMdcExecutor extends ThreadPoolTaskExecutor {
    @Override
    publpythonic void execute(Runnable task) {
        super.execute(MdcUtil.wrap(task, MDC.getCopyOfContextMap()));
    }

    @Override
    public Future<?> submit(Runnable task) {
        return super.submit(MdcUtil.wrap(task, MDC.getCopyOfContextMap()));
    }

    @Override
    public <T> Future<T> submit(Callable<T> task) {
        return super.submit(MdcUtil.wrap(task, MDC.getCopyOfContextMap()));
    }
}

下面定义线程池,就可以使用ThreadPoolMdcExecutor

    @Bean(name = "callBackExecutorConfig")
    public Executor callBackExecutorConfig() {
        ThreadPoolTaskExecutor executor = new ThreadPoolMdcExecutor();
        // 配置核心线程数
        executor.setCorePoolSize(10);
        // 配置最大线程数
        executor.setMaxPoolSize(20);
        // 配置队列大小
        executor.setQueueCapacity(200);
        // 配置线程池中的线程的名称前缀
        executor.setThreadNamePrefix("async-Thread-");
        // rejection-policy:当pool已经达到max size的时候,如何处理新任务
        // abort:在调用executor执行的方法中抛出异常 RejectedExecutionException
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
        // 执行初始化
        executor.initialize();
        return executor;
    }

定义好线程池之后,我们就可以使用callBackExecutorConfig线程池进行异步任务,避免异步线程中的traceId丢失。

线程池增强

上面是通过继承ThreadPoolTaskExecutor来,重写execute和submit方法,设置MDC.setContextMap(context)设置上下文,我们也可以通过实现TaskDecorator 接口来增强线程池

public class ContextTransferTaskDecorator implements TaskDecorator {
    @Override
    public Runnable decorate(Runnable runnable) {
        Map<String, String> context = MDC.getCopyOfContextMap();
        RequestAttributes requestAttributes = RequestContextHolder.currentRequestAttributes();
        return () -> {
            try {
                MDC.setContextMap(context);
                RequestContextHolder.setRequestAttributes(requestAttributes);
                runnable.run();
            } finally {
                MDC.clear();
                RequestContextHolder.resetRequestAttributes();
            }
        };
    }
}

接下来,定义线程池,对线程池进行增强

    @Bean(name = "callBackExecutorConfig")
    public Executor callBackExecutorConfig() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor ();
        // 配置核心线程数
        executor.setCorePoolSize(10);
        // 配置最大线程数
        executor.setMaxPoolSize(20);
        // 配置队列大小
        executor.setQueueCapacity(200);
        // 配置线程池中的线程的名称前缀
        executor.setThreadNamePpythonrefix("async-Thread-");
        // rejection-policy:当pool已经达到max size的时候,如何处理新任务
        // abort:在调用executor执行的方法中抛出异常 RejectedExecutionException
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
        //线程池增强
        threadPoolTaskExecutor.setTaskDecorator(new ContextTransferTaskDecorator());
        // 执行初始化
        executor.initialize();
        return executphpor;
    python}

总结

上面两种方式其实本质都是通过Mdc来进行异步线程间的traceId同步,可以看下Mdc的源码,最终还是通过InheritableThreadLocal来实现子线程获取父线程信息

public class BasicMDCAdapter implements MDCAdapter {
    private InheritableThreadLocal<Map<String, String>> inheritableThreadLocal = 
        new InheritableThreadLocal<Map<String, String>>() {
        protected Map<String, String> childValue(Map<String, String> parentValue) {
            return parentValue == null ? null : new HashMap(parentValue);
        }
    };
    
    //省略若干
    ......
}

以上为个人经验,希望能给大家一个参考,也希望大家多多支持China编程(www.chinasem.cn)。

这篇关于异步线程traceId如何实现传递的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1153341

相关文章

Docker镜像修改hosts及dockerfile修改hosts文件的实现方式

《Docker镜像修改hosts及dockerfile修改hosts文件的实现方式》:本文主要介绍Docker镜像修改hosts及dockerfile修改hosts文件的实现方式,具有很好的参考价... 目录docker镜像修改hosts及dockerfile修改hosts文件准备 dockerfile 文

基于SpringBoot+Mybatis实现Mysql分表

《基于SpringBoot+Mybatis实现Mysql分表》这篇文章主要为大家详细介绍了基于SpringBoot+Mybatis实现Mysql分表的相关知识,文中的示例代码讲解详细,感兴趣的小伙伴可... 目录基本思路定义注解创建ThreadLocal创建拦截器业务处理基本思路1.根据创建时间字段按年进

SpringBoot3实现Gzip压缩优化的技术指南

《SpringBoot3实现Gzip压缩优化的技术指南》随着Web应用的用户量和数据量增加,网络带宽和页面加载速度逐渐成为瓶颈,为了减少数据传输量,提高用户体验,我们可以使用Gzip压缩HTTP响应,... 目录1、简述2、配置2.1 添加依赖2.2 配置 Gzip 压缩3、服务端应用4、前端应用4.1 N

SpringBoot实现数据库读写分离的3种方法小结

《SpringBoot实现数据库读写分离的3种方法小结》为了提高系统的读写性能和可用性,读写分离是一种经典的数据库架构模式,在SpringBoot应用中,有多种方式可以实现数据库读写分离,本文将介绍三... 目录一、数据库读写分离概述二、方案一:基于AbstractRoutingDataSource实现动态

Python FastAPI+Celery+RabbitMQ实现分布式图片水印处理系统

《PythonFastAPI+Celery+RabbitMQ实现分布式图片水印处理系统》这篇文章主要为大家详细介绍了PythonFastAPI如何结合Celery以及RabbitMQ实现简单的分布式... 实现思路FastAPI 服务器Celery 任务队列RabbitMQ 作为消息代理定时任务处理完整

Java枚举类实现Key-Value映射的多种实现方式

《Java枚举类实现Key-Value映射的多种实现方式》在Java开发中,枚举(Enum)是一种特殊的类,本文将详细介绍Java枚举类实现key-value映射的多种方式,有需要的小伙伴可以根据需要... 目录前言一、基础实现方式1.1 为枚举添加属性和构造方法二、http://www.cppcns.co

使用Python实现快速搭建本地HTTP服务器

《使用Python实现快速搭建本地HTTP服务器》:本文主要介绍如何使用Python快速搭建本地HTTP服务器,轻松实现一键HTTP文件共享,同时结合二维码技术,让访问更简单,感兴趣的小伙伴可以了... 目录1. 概述2. 快速搭建 HTTP 文件共享服务2.1 核心思路2.2 代码实现2.3 代码解读3.

MySQL双主搭建+keepalived高可用的实现

《MySQL双主搭建+keepalived高可用的实现》本文主要介绍了MySQL双主搭建+keepalived高可用的实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,... 目录一、测试环境准备二、主从搭建1.创建复制用户2.创建复制关系3.开启复制,确认复制是否成功4.同

Java实现文件图片的预览和下载功能

《Java实现文件图片的预览和下载功能》这篇文章主要为大家详细介绍了如何使用Java实现文件图片的预览和下载功能,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... Java实现文件(图片)的预览和下载 @ApiOperation("访问文件") @GetMapping("

使用Sentinel自定义返回和实现区分来源方式

《使用Sentinel自定义返回和实现区分来源方式》:本文主要介绍使用Sentinel自定义返回和实现区分来源方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录Sentinel自定义返回和实现区分来源1. 自定义错误返回2. 实现区分来源总结Sentinel自定