Druid Task被Overload分配到zk上的流程分析

2024-04-29 12:58

本文主要是介绍Druid Task被Overload分配到zk上的流程分析,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

前言


         继前一篇文章关于tasksupervisor创建的过程分析,那么task被创建后是怎样分配给zk的呢?task选择middlemanager的策略又是什么?

上图


       supervisor创建完task后,会将task扔到一个由TaskMaster创建的TaskQueue中。此后的事情就是如果将TaskQueue中的task进行分配运行了。 

      TaskQueue会启动一个线程一直运行,用来读取taskqueue队列中的tasks,对于已经准备好的task通过TaskRunner进行run()。而TaskRunner是 在TaskMaster中创建的。

       TaskRunner中会判断task的状态是否为pending状态,如果是就会对该task进行分配work, 而分配策略默认是根据middlemanager的slot最大空闲数分配的。让后会将work信息、task信息通过jsonMapper进行序列化为byte写到zk的一个能被work识别的路径下面。

 上代码


        首先从TaskMaster入手,因为TaskMaster是被注入的对象,它管理着TaskQueue和TaskRunner两大对象。

       其中TaskRunner是被taskRunnerFactory创建的,而TaskRunner包括:ForkingTaskRunner,RemoteTaskRunner, HttpRemoteTaskRunner 。至于使用哪一个是通过在overload的配置文件中配置的,配置项为:druid.indexer.runner.type=remote/loacl/httpRemote

  • local表示从本地运行任务
  • remote表示分配到分布式系统中
  • httpRemote是在试用期的功能(目前是根据0.16版本分析的),和remote功能一样,只是httpRemote不通过zk而是直接和middlemanager交互

 

      然后此时创建完TaskQueue之后,taskQueue会进行启动并创建一个持续运行的线程。该线程的作用就是不断的轮询判断taskqueue中的task进行处理,调用taskRunner的run()方法做分配task的处理。

/*** Main task runner management loop. Meant to run forever, or, at least until we're stopped.*/private void manage() throws InterruptedException{log.info("Beginning management in %s.", config.getStartDelay());Thread.sleep(config.getStartDelay().getMillis());// Ignore return value- we'll get the IDs and futures from getKnownTasks later.taskRunner.restore();while (active) {giant.lock();try {// Task futures available from the taskRunnerfinal Map<String, ListenableFuture<TaskStatus>> runnerTaskFutures = new HashMap<>();for (final TaskRunnerWorkItem workItem : taskRunner.getKnownTasks()) {runnerTaskFutures.put(workItem.getTaskId(), workItem.getResult());}// Attain futures for all active tasks (assuming they are ready to run).// Copy tasks list, as notifyStatus may modify it.for (final Task task : ImmutableList.copyOf(tasks)) {if (!taskFutures.containsKey(task.getId())) {final ListenableFuture<TaskStatus> runnerTaskFuture;if (runnerTaskFutures.containsKey(task.getId())) {runnerTaskFuture = runnerTaskFutures.get(task.getId());} else {// Task should be running, so run it.final boolean taskIsReady;try {taskIsReady = task.isReady(taskActionClientFactory.create(task));}catch (Exception e) {log.warn(e, "Exception thrown during isReady for task: %s", task.getId());notifyStatus(task, TaskStatus.failure(task.getId()), "failed because of exception[%s]", e.getClass());continue;}if (taskIsReady) {log.info("Asking taskRunner to run: %s", task.getId());// 该部分是已分配给taskrunner管理且没有被运行的runnerTaskFuture = taskRunner.run(task);} else {continue;}}taskFutures.put(task.getId(), attachCallbacks(task, runnerTaskFuture));} else if (isTaskPending(task)) {// if the taskFutures contain this task and this task is pending, also let the taskRunner// to run it to guarantee it will be assigned to run// see https://github.com/apache/incubator-druid/pull/6991// 判断task是否处于pending状态,如果是就进行分配运行taskRunner.run(task);}}// Kill tasks that shouldn't be runningfinal Set<String> tasksToKill = Sets.difference(runnerTaskFutures.keySet(),ImmutableSet.copyOf(Lists.transform(tasks,new Function<Task, Object>(){@Overridepublic String apply(Task task){return task.getId();}})));if (!tasksToKill.isEmpty()) {log.info("Asking taskRunner to clean up %,d tasks.", tasksToKill.size());for (final String taskId : tasksToKill) {try {taskRunner.shutdown(taskId,"task is not in runnerTaskFutures[%s]",runnerTaskFutures.keySet());}catch (Exception e) {log.warn(e, "TaskRunner failed to clean up task: %s", taskId);}}}// awaitNanos because management may become necessary without this condition signalling,// due to e.g. tasks becoming ready when other folks mess with the TaskLockbox.managementMayBeNecessary.awaitNanos(MANAGEMENT_WAIT_TIMEOUT_NANOS);}finally {giant.unlock();}}}

     此时决定运行的task时候,让taskRunnner根据策略去寻找work进行分配。判断出处于pending状态的task进行分配。

/*** This method uses a multi-threaded executor to extract all pending tasks and attempt to run them. Any tasks that* are successfully assigned to a worker will be moved from pendingTasks to runningTasks. This method is thread-safe.* This method should be run each time there is new worker capacity or if new tasks are assigned.*/private void runPendingTasks(){runPendingTasksExec.submit(new Callable<Void>(){@Overridepublic Void call(){try {// make a copy of the pending tasks because tryAssignTask may delete tasks from pending and move them// into running statusList<RemoteTaskRunnerWorkItem> copy = Lists.newArrayList(pendingTasks.values());sortByInsertionTime(copy);for (RemoteTaskRunnerWorkItem taskRunnerWorkItem : copy) {String taskId = taskRunnerWorkItem.getTaskId();if (tryAssignTasks.putIfAbsent(taskId, taskId) == null) {try {//this can still be null due to race from explicit task shutdown request//or if another thread steals and completes this task right after this thread makes copy//of pending tasks. See https://github.com/apache/incubator-druid/issues/2842 .Task task = pendingTaskPayloads.get(taskId);// 试图去分配taskif (task != null && tryAssignTask(task, taskRunnerWorkItem)) {pendingTaskPayloads.remove(taskId);}}catch (Exception e) {log.makeAlert(e, "Exception while trying to assign task").addData("taskId", taskRunnerWorkItem.getTaskId()).emit();RemoteTaskRunnerWorkItem workItem = pendingTasks.remove(taskId);if (workItem != null) {taskComplete(workItem, null, TaskStatus.failure(taskId));}}finally {tryAssignTasks.remove(taskId);}}}}catch (Exception e) {log.makeAlert(e, "Exception in running pending tasks").emit();}return null;}});}

     根据策略以及过滤条件得到task要分配的work的信息:

/*** Ensures no workers are already running a task before assigning the task to a worker.* It is possible that a worker is running a task that the RTR has no knowledge of. This occurs when the RTR* needs to bootstrap after a restart.** @param taskRunnerWorkItem - the task to assign** @return true iff the task is now assigned*/private boolean tryAssignTask(final Task task, final RemoteTaskRunnerWorkItem taskRunnerWorkItem) throws Exception{Preconditions.checkNotNull(task, "task");Preconditions.checkNotNull(taskRunnerWorkItem, "taskRunnerWorkItem");Preconditions.checkArgument(task.getId().equals(taskRunnerWorkItem.getTaskId()), "task id != workItem id");if (runningTasks.containsKey(task.getId()) || findWorkerRunningTask(task.getId()) != null) {log.info("Task[%s] already running.", task.getId());return true;} else {// Nothing running this task, announce it in ZK for a worker to run itWorkerBehaviorConfig workerConfig = workerConfigRef.get();//  确定分配的策略WorkerSelectStrategy strategy;if (workerConfig == null || workerConfig.getSelectStrategy() == null) {strategy = WorkerBehaviorConfig.DEFAULT_STRATEGY;log.debug("No worker selection strategy set. Using default of [%s]", strategy.getClass().getSimpleName());} else {strategy = workerConfig.getSelectStrategy();}ZkWorker assignedWorker = null;final ImmutableWorkerInfo immutableZkWorker;try {synchronized (workersWithUnacknowledgedTask) {immutableZkWorker = strategy.findWorkerForTask(config,ImmutableMap.copyOf(Maps.transformEntries(Maps.filterEntries(zkWorkers,new Predicate<Map.Entry<String, ZkWorker>>(){@Overridepublic boolean apply(Map.Entry<String, ZkWorker> input){return !lazyWorkers.containsKey(input.getKey()) &&!workersWithUnacknowledgedTask.containsKey(input.getKey()) &&!blackListedWorkers.contains(input.getValue());}}),(String key, ZkWorker value) -> value.toImmutable())),task);if (immutableZkWorker != null &&workersWithUnacknowledgedTask.putIfAbsent(immutableZkWorker.getWorker().getHost(), task.getId())== null) {assignedWorker = zkWorkers.get(immutableZkWorker.getWorker().getHost());}}if (assignedWorker != null) {// 得到work信息后进行发布到zk上return announceTask(task, assignedWorker, taskRunnerWorkItem);} else {log.debug("Unsuccessful task-assign attempt for task [%s] on workers [%s]. Workers to ack tasks are [%s].",task.getId(),zkWorkers.values(),workersWithUnacknowledgedTask);}return false;}finally {if (assignedWorker != null) {workersWithUnacknowledgedTask.remove(assignedWorker.getWorker().getHost());//if this attempt won the race to run the task then other task might be able to use this worker now after task ack.runPendingTasks();}}}}

      根据task信息和work信息写入zk的相应路径下面:

/*** Creates a ZK entry under a specific path associated with a worker. The worker is responsible for* removing the task ZK entry and creating a task status ZK entry.** @param theZkWorker        The worker the task is assigned to* @param taskRunnerWorkItem The task to be assigned** @return boolean indicating whether the task was successfully assigned or not*/private boolean announceTask(final Task task,final ZkWorker theZkWorker,final RemoteTaskRunnerWorkItem taskRunnerWorkItem) throws Exception{Preconditions.checkArgument(task.getId().equals(taskRunnerWorkItem.getTaskId()), "task id != workItem id");final String worker = theZkWorker.getWorker().getHost();synchronized (statusLock) {if (!zkWorkers.containsKey(worker) || lazyWorkers.containsKey(worker)) {// the worker might have been killed or marked as lazylog.info("Not assigning task to already removed worker[%s]", worker);return false;}log.info("Coordinator asking Worker[%s] to add task[%s]", worker, task.getId());// 将task的信息写到相应的的zk路径下面CuratorUtils.createIfNotExists(cf,JOINER.join(indexerZkConfig.getTasksPath(), worker, task.getId()),CreateMode.EPHEMERAL,jsonMapper.writeValueAsBytes(task),config.getMaxZnodeBytes());// ....}

END


    至此,要运行的task得到分配的work写到zk路径的核心代码以及过程就如上文所述。为了保证task能够准确无误的运行起来会很多细节性的逻辑判断,状态存储

这篇关于Druid Task被Overload分配到zk上的流程分析的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/946149

相关文章

使用MongoDB进行数据存储的操作流程

《使用MongoDB进行数据存储的操作流程》在现代应用开发中,数据存储是一个至关重要的部分,随着数据量的增大和复杂性的增加,传统的关系型数据库有时难以应对高并发和大数据量的处理需求,MongoDB作为... 目录什么是MongoDB?MongoDB的优势使用MongoDB进行数据存储1. 安装MongoDB

Redis主从/哨兵机制原理分析

《Redis主从/哨兵机制原理分析》本文介绍了Redis的主从复制和哨兵机制,主从复制实现了数据的热备份和负载均衡,而哨兵机制可以监控Redis集群,实现自动故障转移,哨兵机制通过监控、下线、选举和故... 目录一、主从复制1.1 什么是主从复制1.2 主从复制的作用1.3 主从复制原理1.3.1 全量复制

Python实现NLP的完整流程介绍

《Python实现NLP的完整流程介绍》这篇文章主要为大家详细介绍了Python实现NLP的完整流程,文中的示例代码讲解详细,具有一定的借鉴价值,感兴趣的小伙伴可以跟随小编一起学习一下... 目录1. 编程安装和导入必要的库2. 文本数据准备3. 文本预处理3.1 小写化3.2 分词(Tokenizatio

Redis主从复制的原理分析

《Redis主从复制的原理分析》Redis主从复制通过将数据镜像到多个从节点,实现高可用性和扩展性,主从复制包括初次全量同步和增量同步两个阶段,为优化复制性能,可以采用AOF持久化、调整复制超时时间、... 目录Redis主从复制的原理主从复制概述配置主从复制数据同步过程复制一致性与延迟故障转移机制监控与维

Redis连接失败:客户端IP不在白名单中的问题分析与解决方案

《Redis连接失败:客户端IP不在白名单中的问题分析与解决方案》在现代分布式系统中,Redis作为一种高性能的内存数据库,被广泛应用于缓存、消息队列、会话存储等场景,然而,在实际使用过程中,我们可能... 目录一、问题背景二、错误分析1. 错误信息解读2. 根本原因三、解决方案1. 将客户端IP添加到Re

Redis主从复制实现原理分析

《Redis主从复制实现原理分析》Redis主从复制通过Sync和CommandPropagate阶段实现数据同步,2.8版本后引入Psync指令,根据复制偏移量进行全量或部分同步,优化了数据传输效率... 目录Redis主DodMIK从复制实现原理实现原理Psync: 2.8版本后总结Redis主从复制实

锐捷和腾达哪个好? 两个品牌路由器对比分析

《锐捷和腾达哪个好?两个品牌路由器对比分析》在选择路由器时,Tenda和锐捷都是备受关注的品牌,各自有独特的产品特点和市场定位,选择哪个品牌的路由器更合适,实际上取决于你的具体需求和使用场景,我们从... 在选购路由器时,锐捷和腾达都是市场上备受关注的品牌,但它们的定位和特点却有所不同。锐捷更偏向企业级和专

SpringBoot使用minio进行文件管理的流程步骤

《SpringBoot使用minio进行文件管理的流程步骤》MinIO是一个高性能的对象存储系统,兼容AmazonS3API,该软件设计用于处理非结构化数据,如图片、视频、日志文件以及备份数据等,本文... 目录一、拉取minio镜像二、创建配置文件和上传文件的目录三、启动容器四、浏览器登录 minio五、

Spring中Bean有关NullPointerException异常的原因分析

《Spring中Bean有关NullPointerException异常的原因分析》在Spring中使用@Autowired注解注入的bean不能在静态上下文中访问,否则会导致NullPointerE... 目录Spring中Bean有关NullPointerException异常的原因问题描述解决方案总结

python中的与时间相关的模块应用场景分析

《python中的与时间相关的模块应用场景分析》本文介绍了Python中与时间相关的几个重要模块:`time`、`datetime`、`calendar`、`timeit`、`pytz`和`dateu... 目录1. time 模块2. datetime 模块3. calendar 模块4. timeit