Druid kafka-index supervisor启动流程分析

2024-04-29 12:58

本文主要是介绍Druid kafka-index supervisor启动流程分析,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

前言


    在维护druid服务的过程中,我们的物化视图的supervisor状态总是发生异常,通过日志以及MR程序的情况来看,目前猜测是因为yarn资源问题导致的这一现状。虽然现在通过脚本监控的形式来保证物化视图supervisor异常时重新拉起而不会产生物化视图延迟(尤其是晚上发生异常)。但是不能明确为什么supervisor的执行流程。因此本着探究supervisor的心态,因为kafka-index supervisor比较成熟且经典,于是先研究下kafka-index supervisor先熟悉supervisor在代码中是一个怎样的形式存在的。

上图


    先上一个启动supervisor的代码执行流程图。supervisor有一个管理器,是在启动overlord的时候创建的,即supervisor是被overlord管理的。supervisor管理器维护着druid服务中所有的supervisor服务并会将supervisor信息持久化到元数据库。overlord还会还会创建一个supervisor的资源请求入口,用于接收操作supervisor的RESTful请求。最终通过SupervisorResourcespecPost方法接收创建supervisor的请求。

 上代码


    启动overlord服务, 执行overlord.sh 会携带overlord参数跳转到执行node.sh,  如下图是node.sh的主要执行步骤。

     然后在代码中找到这个类,看一下是如何启动的?

     上图中,在main函数中执行了run方法,而cli的build,获取的其实是clioverlord这个runnable。那么supervisorResourceCliOverlord中是如何实现的呢?通过注入的方式创建supervisorResource对象。

     SupervisorResource是一个资源类,被@path注解,可以接收restful请求。外部发送一个创建supervisor的post请求,会路由的specPost方法上进行接下来的创建操作。

    然后通过supervisorSpec进行创建supervisor并调用start()方法进行启动。其中supervisorSpec是注入的配置文件信息对象。在调用SupervisorResource的specPost请求的时候传入的参数

@Path("/druid/indexer/v1/supervisor") // 表明SupervisorResource是个资源类, 并指定了URI访问路径,供RESTful请求
public class SupervisorResource
{@POST //用于接受post请求@Consumes(MediaType.APPLICATION_JSON) // 它定义资源类或MessageBodyReader的方法可以生成的媒体类型@Produces(MediaType.APPLICATION_JSON) // 它定义资源类或MessageBodyWriter的方法可以生成的媒体类型public Response specPost(final SupervisorSpec spec, @Context final HttpServletRequest req){return asLeaderWithSupervisorManager(manager -> {Preconditions.checkArgument(spec.getDataSources() != null && spec.getDataSources().size() > 0,"No dataSources found to perform authorization checks");Access authResult = AuthorizationUtils.authorizeAllResourceActions(req,Iterables.transform(spec.getDataSources(), AuthorizationUtils.DATASOURCE_WRITE_RA_GENERATOR),authorizerMapper);if (!authResult.isAllowed()) {throw new ForbiddenException(authResult.toString());}manager.createAndStartSupervisorInternal(spec); // 调用创建supervisor的函数return Response.ok(ImmutableMap.of("id", spec.getId())).build();});}/*** 如果存在已经创建的supervisor则返回false, 如果创建新的supervisor则返回true*/private boolean createAndStartSupervisorInternal(SupervisorSpec spec, boolean persistSpec){String id = spec.getId();if (supervisors.containsKey(id)) {return false;}// 先进行插入基本信息到元数据库if (persistSpec) { // persistSpec 如果为true表示创建新的supervisor;如果为false表示启动supervisorManager的时候从元数据库中恢复supervisormetadataSupervisorManager.insert(id, spec);}Supervisor supervisor;try {supervisor = spec.createSupervisor();supervisor.start(); // 通过supervisorSpec创建完supervisor后进行启动}catch (Exception e) {// 为了保证事务,如果创建supervisor异常了则将元数据进行更新// Supervisor creation or start failed write tombstone only when trying to start a new supervisorif (persistSpec) {metadataSupervisorManager.insert(id, new NoopSupervisorSpec(null, spec.getDataSources()));}throw new RuntimeException(e);}supervisors.put(id, Pair.of(supervisor, spec));return true;}
}

    KafkaSupervisor的父类SeekableStreamSupervisor中的start()方法来启动supervisor。start()方法调用tryInit()方法来真正的启动一个supervisor。

    1. supervisor线程循环处理一个Notice类型的阻塞队列,Notice大概的内容包括运行task的notice、做checkpoint的notice、resetSuperviser的notice、shutdownSupervisor的notice。具体notice是如何添加到阻塞队列的、notice做了哪些事情在后期详细分享。

    2. 在启动一个线程之前会先创建一个recordSupplier对象,即KafkaRecirdSupplier对象,这个类主要做的工作是关于处理kafka topic、kafka offset以及kakfa数据的类。比如kafka.poll() 进行获取数据,kafka.assign()和kafka.seek()方法进行处理partation和offset。

       在追溯KafkaRecordSupplier这个类的时候,发现有三个地方在创建KafkaRecordSupplier对象。为什么一个kafkaSupervisor任务有三个对象,分别在做什么?后续会单独对KafkaRecordSupplier做分享。

@VisibleForTestingpublic void tryInit(){synchronized (stateChangeLock) {if (started) {log.warn("Supervisor was already started, skipping init");return;}if (stopped) {log.warn("Supervisor was already stopped, skipping init.");return;}try {// 这个地方创建了一个kafkarecordSupplier, 该对象是在setupRecordSupplier()方法中new的recordSupplier = setupRecordSupplier();// 向单线程池中提交一个线程, 这个线程运行supervisorexec.submit(() -> {try {// MAX_RUN_FREQUENCY_MILLIS 是任务的运行周期,默认是一秒long pollTimeout = Math.max(ioConfig.getPeriod().getMillis(), MAX_RUN_FREQUENCY_MILLIS);// 开一个循环执行, 所以supervisor的状态变更是需要加锁的,为了当其他线程stoped的时候是线程安全的while (!Thread.currentThread().isInterrupted() && !stopped) {// notices是一个阻塞的双端队列,存储Noticefinal Notice notice = notices.poll(pollTimeout, TimeUnit.MILLISECONDS);if (notice == null) {continue;}try {notice.handle();}catch (Throwable e) {stateManager.recordThrowableEvent(e);log.makeAlert(e, "SeekableStreamSupervisor[%s] failed to handle notice", dataSource).addData("noticeClass", notice.getClass().getSimpleName()).emit();}}}catch (InterruptedException e) {stateManager.recordThrowableEvent(e);log.info("SeekableStreamSupervisor[%s] interrupted, exiting", dataSource);}});firstRunTime = DateTimes.nowUtc().plus(ioConfig.getStartDelay());// 单独启动一个线程 将RunNotice添加到队列中,表示任务初次运行了scheduledExec.scheduleAtFixedRate(buildRunTask(),ioConfig.getStartDelay().getMillis(),Math.max(ioConfig.getPeriod().getMillis(), MAX_RUN_FREQUENCY_MILLIS),TimeUnit.MILLISECONDS);scheduleReporting(reportingExec);started = true;log.info("Started SeekableStreamSupervisor[%s], first run in [%s], with spec: [%s]",dataSource,ioConfig.getStartDelay(),spec.toString());}catch (Exception e) {stateManager.recordThrowableEvent(e);if (recordSupplier != null) {recordSupplier.close();}initRetryCounter++;log.makeAlert(e, "Exception starting SeekableStreamSupervisor[%s]", dataSource).emit();throw new RuntimeException(e);}}}

    至此,kafka supervisor任务已经启动并持续运转起来了。下一篇会讲述supervisor如何管理task的,task是如何启动并运行的。

END


    本篇只是对kafka supervisor的启动过程做了一个流程式的描述,限于篇幅其中有很多细节没有展开描述。主要目的是能够对supervisor在代码层次有一个直观的认识,揭开它神秘的面纱。

这篇关于Druid kafka-index supervisor启动流程分析的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/946147

相关文章

Java程序进程起来了但是不打印日志的原因分析

《Java程序进程起来了但是不打印日志的原因分析》:本文主要介绍Java程序进程起来了但是不打印日志的原因分析,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录Java程序进程起来了但是不打印日志的原因1、日志配置问题2、日志文件权限问题3、日志文件路径问题4、程序

将Java项目提交到云服务器的流程步骤

《将Java项目提交到云服务器的流程步骤》所谓将项目提交到云服务器即将你的项目打成一个jar包然后提交到云服务器即可,因此我们需要准备服务器环境为:Linux+JDK+MariDB(MySQL)+Gi... 目录1. 安装 jdk1.1 查看 jdk 版本1.2 下载 jdk2. 安装 mariadb(my

Java字符串操作技巧之语法、示例与应用场景分析

《Java字符串操作技巧之语法、示例与应用场景分析》在Java算法题和日常开发中,字符串处理是必备的核心技能,本文全面梳理Java中字符串的常用操作语法,结合代码示例、应用场景和避坑指南,可快速掌握字... 目录引言1. 基础操作1.1 创建字符串1.2 获取长度1.3 访问字符2. 字符串处理2.1 子字

深入理解Apache Kafka(分布式流处理平台)

《深入理解ApacheKafka(分布式流处理平台)》ApacheKafka作为现代分布式系统中的核心中间件,为构建高吞吐量、低延迟的数据管道提供了强大支持,本文将深入探讨Kafka的核心概念、架构... 目录引言一、Apache Kafka概述1.1 什么是Kafka?1.2 Kafka的核心概念二、Ka

Python 迭代器和生成器概念及场景分析

《Python迭代器和生成器概念及场景分析》yield是Python中实现惰性计算和协程的核心工具,结合send()、throw()、close()等方法,能够构建高效、灵活的数据流和控制流模型,这... 目录迭代器的介绍自定义迭代器省略的迭代器生产器的介绍yield的普通用法yield的高级用法yidle

Redis在windows环境下如何启动

《Redis在windows环境下如何启动》:本文主要介绍Redis在windows环境下如何启动的实现方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录Redis在Windows环境下启动1.在redis的安装目录下2.输入·redis-server.exe

解决SpringBoot启动报错:Failed to load property source from location 'classpath:/application.yml'

《解决SpringBoot启动报错:Failedtoloadpropertysourcefromlocationclasspath:/application.yml问题》这篇文章主要介绍... 目录在启动SpringBoot项目时报如下错误原因可能是1.yml中语法错误2.yml文件格式是GBK总结在启动S

C++ Sort函数使用场景分析

《C++Sort函数使用场景分析》sort函数是algorithm库下的一个函数,sort函数是不稳定的,即大小相同的元素在排序后相对顺序可能发生改变,如果某些场景需要保持相同元素间的相对顺序,可使... 目录C++ Sort函数详解一、sort函数调用的两种方式二、sort函数使用场景三、sort函数排序

kotlin中const 和val的区别及使用场景分析

《kotlin中const和val的区别及使用场景分析》在Kotlin中,const和val都是用来声明常量的,但它们的使用场景和功能有所不同,下面给大家介绍kotlin中const和val的区别,... 目录kotlin中const 和val的区别1. val:2. const:二 代码示例1 Java

Go标准库常见错误分析和解决办法

《Go标准库常见错误分析和解决办法》Go语言的标准库为开发者提供了丰富且高效的工具,涵盖了从网络编程到文件操作等各个方面,然而,标准库虽好,使用不当却可能适得其反,正所谓工欲善其事,必先利其器,本文将... 目录1. 使用了错误的time.Duration2. time.After导致的内存泄漏3. jsO