Druid Supervisor启动task流程分析

2024-04-29 12:58

本文主要是介绍Druid Supervisor启动task流程分析,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

前言


        继前一篇文章关于supervisor启动流程分析的启动过程,然后来分析一下supervisor启动后是如何启动并管理task的运行的。又是如何将KafkaTask的对象创建的。

上图


        创建完持续执行的supervisor线程后,紧接着通过一个定时的单线程池来创建RunNotice()对象并放入notice队列中供supervisor进行poll并运行handle()方法。定时的时间则是配置的task的运行周期,默认是1秒

supervisor获取到RunNotice的时候,开始执行RunNotice的hadle(), 然后开始执行创建task并将创建的kafkaTask添加到TaskMaster管理的TaskQueue中供taskRunner执行。整个task的创建和被执行的过程是消费者模式启动的。

        taskRunner调用task的start方法后开始具体的数据传输。此处的taskRunner包含:ForkingTaskRunner、HttpRemoteTaskRunner、RemoteTaskRunner、SingleTaskBackgroundRunner 四种实现。具体每中taskRunner的创建过程以及操作原理在后面的文章中做描述,本篇文章不做描述。

 上代码


        在类SeekableStreamSupervisor中执行tryInit() 来启动supervisor, 然后紧接着创建定时创建task的线程池。

scheduledExec.scheduleAtFixedRate(buildRunTask(), // 创建RunNotice()对象ioConfig.getStartDelay().getMillis(),Math.max(ioConfig.getPeriod().getMillis(), MAX_RUN_FREQUENCY_MILLIS), // 配置的task执行周期TimeUnit.MILLISECONDS);

        具体创建task入口则是RunNotice的handle()方法:

private class RunNotice implements Notice{@Overridepublic void handle(){long nowTime = System.currentTimeMillis();// // MAX_RUN_FREQUENCY_MILLIS 是任务的运行周期,默认是一秒, 如果配置的是2个小时,即2个小时会运行一次runInternal()if (nowTime - lastRunTime < MAX_RUN_FREQUENCY_MILLIS) {return;}lastRunTime = nowTime;// 即2个小时会运行一次runInternal()runInternal();}}

        如果符合执行的时间要求则执行runInternal()方法:

public void runInternal(){try {/*** 此处省略了很多状态检查以及状态判断变成的操作* 主要是针对现有的task进行状态变更检查,为启动新的task做准备* 这里关于task状态变更的代码可作为细节详解来仔细研究下,具体就是描述了task在切换的时候需要做哪些事情*/if (!spec.isSuspended()) {log.info("[%s] supervisor is running.", dataSource);stateManager.maybeSetState(SeekableStreamSupervisorStateManager.SeekableStreamState.CREATING_TASKS);// 前置操作判断完后开始创建新的taskcreateNewTasks();} else {log.info("[%s] supervisor is suspended.", dataSource);gracefulShutdownInternal();}if (log.isDebugEnabled()) {log.debug(generateReport(true).toString());} else {log.info(generateReport(false).toString());}}catch (Exception e) {stateManager.recordThrowableEvent(e);log.warn(e, "Exception in supervisor run loop for dataSource [%s]", dataSource);}finally {stateManager.markRunFinished();}}

        调用创建task的方法后,会经过一些列的判断,最终组合封装出task的基本信息,io配置信息,封装分配TaskGroup,  创建出具体的task对象。

        createNewTasks() -> createTasksForGroup(groupId, ioConfig.getReplicas() - taskGroup.tasks.size()) -> createIndexTasks() -> new KafkaIndexTask() 创建完task对象后会调用taskMaster得到TaskQueue, 将创建的task对象添加到TaskQueue中供taskQueue进行处理。至此supervisor创建task的工作就做完了。

private void createTasksForGroup(int groupId, int replicas)throws JsonProcessingException{TaskGroup group = activelyReadingTaskGroups.get(groupId);Map<PartitionIdType, SequenceOffsetType> startPartitions = group.startingSequences;Map<PartitionIdType, SequenceOffsetType> endPartitions = new HashMap<>();for (PartitionIdType partition : startPartitions.keySet()) {endPartitions.put(partition, getEndOfPartitionMarker());}Set<PartitionIdType> exclusiveStartSequenceNumberPartitions = activelyReadingTaskGroups.get(groupId).exclusiveStartSequenceNumberPartitions;DateTime minimumMessageTime = group.minimumMessageTime.orNull();DateTime maximumMessageTime = group.maximumMessageTime.orNull();// 根据taskGroupId信息 创建task的IOConfigSeekableStreamIndexTaskIOConfig newIoConfig = createTaskIoConfig(groupId,startPartitions,endPartitions,group.baseSequenceName,minimumMessageTime,maximumMessageTime,exclusiveStartSequenceNumberPartitions,ioConfig);// 根据task的基本信息,创建kafkaTask, 因为可能有副本所以使用ListList<SeekableStreamIndexTask<PartitionIdType, SequenceOffsetType>> taskList = createIndexTasks(replicas,group.baseSequenceName,sortingMapper,group.checkpointSequences,newIoConfig,taskTuningConfig,rowIngestionMetersFactory);// 创建完task后 将task放到taskMaster的队列中,等待被启for (SeekableStreamIndexTask indexTask : taskList) {Optional<TaskQueue> taskQueue = taskMaster.getTaskQueue();if (taskQueue.isPresent()) {try {taskQueue.get().add(indexTask);}catch (EntryExistsException e) {stateManager.recordThrowableEvent(e);log.error("Tried to add task [%s] but it already exists", indexTask.getId());}} else {log.error("Failed to get task queue because I'm not the leader!");}}
}

        接着来简单看看把task扔给TaskQueue后,TaskQueue是做的什么操作:首先TaskQueue这个对象也是注入的,且是有生命周期的。注入时会调用TaskQueue的start方法。然后启动一个线程来循环处理任务。

@LifecycleStartpublic void start(){giant.lock();try {Preconditions.checkState(!active, "queue must be stopped");active = true;syncFromStorage();managerExec.submit(new Runnable(){@Overridepublic void run(){while (true) {try {manage(); // 开启线程来不断的调用该方案break;}catch (InterruptedException e) {log.info("Interrupted, exiting!");break;}catch (Exception e) {final long restartDelay = config.getRestartDelay().getMillis();log.makeAlert(e, "Failed to manage").addData("restartDelay", restartDelay).emit();try {Thread.sleep(restartDelay);}catch (InterruptedException e2) {log.info("Interrupted, exiting!");break;}}}}});
}

        TaskQueue启动一个线程在调用manage()方法,那我们来看看manage()方法在干什么?从代码可以看出manage()方法执行结束后该线程也会结束了。之所以加一个循环是为了重试机制。manage()方法中才是真正循环执行task的地方。 对于每个task会被TaskRunner调用并将其run()起来。此时task才是真正的被启动了。关于TaskRunner的内容后续文章在做详细描述。

/*** Main task runner management loop. Meant to run forever, or, at least until we're stopped.*/private void manage() throws InterruptedException{// ....while (active) {giant.lock();try {// ....for (final Task task : ImmutableList.copyOf(tasks)) {if (!taskFutures.containsKey(task.getId())) {final ListenableFuture<TaskStatus> runnerTaskFuture;if (runnerTaskFutures.containsKey(task.getId())) {runnerTaskFuture = runnerTaskFutures.get(task.getId());} else {// Task should be running, so run it.final boolean taskIsReady;try {taskIsReady = task.isReady(taskActionClientFactory.create(task));}catch (Exception e) {log.warn(e, "Exception thrown during isReady for task: %s", task.getId());notifyStatus(task, TaskStatus.failure(task.getId()), "failed because of exception[%s]", e.getClass());continue;}if (taskIsReady) {log.info("Asking taskRunner to run: %s", task.getId());runnerTaskFuture = taskRunner.run(task);} else {continue;}}taskFutures.put(task.getId(), attachCallbacks(task, runnerTaskFuture));} else if (isTaskPending(task)) {// if the taskFutures contain this task and this task is pending, also let the taskRunner// to run it to guarantee it will be assigned to run// see https://github.com/apache/incubator-druid/pull/6991taskRunner.run(task);}}// ....}finally {giant.unlock();}}}

END


        本篇文章涉及的很多细节性问题都没有展开详细描述,比如task的状态变更taskGroup的实现与应用taskRunner注册监听器问题task的chackpoint的问题task的消费序列记录问题task的副本问题KafkaIndexTask的执行流程(下一篇描述)、TaskQueue的实现TaskRunner的实现(重点)等等,每一个细节都非常值得研究~~ 。不过本篇重在task创建的大体流程,对task的存在形式整体有个认识。

这篇关于Druid Supervisor启动task流程分析的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/946148

相关文章

Linux流媒体服务器部署流程

《Linux流媒体服务器部署流程》文章详细介绍了流媒体服务器的部署步骤,包括更新系统、安装依赖组件、编译安装Nginx和RTMP模块、配置Nginx和FFmpeg,以及测试流媒体服务器的搭建... 目录流媒体服务器部署部署安装1.更新系统2.安装依赖组件3.解压4.编译安装(添加RTMP和openssl模块

Springboot中分析SQL性能的两种方式详解

《Springboot中分析SQL性能的两种方式详解》文章介绍了SQL性能分析的两种方式:MyBatis-Plus性能分析插件和p6spy框架,MyBatis-Plus插件配置简单,适用于开发和测试环... 目录SQL性能分析的两种方式:功能介绍实现方式:实现步骤:SQL性能分析的两种方式:功能介绍记录

0基础租个硬件玩deepseek,蓝耘元生代智算云|本地部署DeepSeek R1模型的操作流程

《0基础租个硬件玩deepseek,蓝耘元生代智算云|本地部署DeepSeekR1模型的操作流程》DeepSeekR1模型凭借其强大的自然语言处理能力,在未来具有广阔的应用前景,有望在多个领域发... 目录0基础租个硬件玩deepseek,蓝耘元生代智算云|本地部署DeepSeek R1模型,3步搞定一个应

Android里面的Service种类以及启动方式

《Android里面的Service种类以及启动方式》Android中的Service分为前台服务和后台服务,前台服务需要亮身份牌并显示通知,后台服务则有启动方式选择,包括startService和b... 目录一句话总结:一、Service 的两种类型:1. 前台服务(必须亮身份牌)2. 后台服务(偷偷干

最长公共子序列问题的深度分析与Java实现方式

《最长公共子序列问题的深度分析与Java实现方式》本文详细介绍了最长公共子序列(LCS)问题,包括其概念、暴力解法、动态规划解法,并提供了Java代码实现,暴力解法虽然简单,但在大数据处理中效率较低,... 目录最长公共子序列问题概述问题理解与示例分析暴力解法思路与示例代码动态规划解法DP 表的构建与意义动

Windows设置nginx启动端口的方法

《Windows设置nginx启动端口的方法》在服务器配置与开发过程中,nginx作为一款高效的HTTP和反向代理服务器,被广泛应用,而在Windows系统中,合理设置nginx的启动端口,是确保其正... 目录一、为什么要设置 nginx 启动端口二、设置步骤三、常见问题及解决一、为什么要设置 nginx

springboot启动流程过程

《springboot启动流程过程》SpringBoot简化了Spring框架的使用,通过创建`SpringApplication`对象,判断应用类型并设置初始化器和监听器,在`run`方法中,读取配... 目录springboot启动流程springboot程序启动入口1.创建SpringApplicat

树莓派启动python的实现方法

《树莓派启动python的实现方法》本文主要介绍了树莓派启动python的实现方法,文中通过图文介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧... 目录一、RASPBerry系统设置二、使用sandroidsh连接上开发板Raspberry Pi三、运

通过prometheus监控Tomcat运行状态的操作流程

《通过prometheus监控Tomcat运行状态的操作流程》文章介绍了如何安装和配置Tomcat,并使用Prometheus和TomcatExporter来监控Tomcat的运行状态,文章详细讲解了... 目录Tomcat安装配置以及prometheus监控Tomcat一. 安装并配置tomcat1、安装

MySQL的cpu使用率100%的问题排查流程

《MySQL的cpu使用率100%的问题排查流程》线上mysql服务器经常性出现cpu使用率100%的告警,因此本文整理一下排查该问题的常规流程,文中通过代码示例讲解的非常详细,对大家的学习或工作有一... 目录1. 确认CPU占用来源2. 实时分析mysql活动3. 分析慢查询与执行计划4. 检查索引与表