聊聊PowerJob的HeavyTaskTracker的dispatchTask

2023-12-31 00:12

本文主要是介绍聊聊PowerJob的HeavyTaskTracker的dispatchTask,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

本文主要研究一下PowerJob的HeavyTaskTracker的dispatchTask

dispatchTask

tech/powerjob/worker/core/tracker/task/heavy/HeavyTaskTracker.java

    protected void dispatchTask(TaskDO task, String processorTrackerAddress) {// 1. 持久化,更新数据库(如果更新数据库失败,可能导致重复执行,先不处理)TaskDO updateEntity = new TaskDO();updateEntity.setStatus(TaskStatus.DISPATCH_SUCCESS_WORKER_UNCHECK.getValue());// 写入处理该任务的 ProcessorTrackerupdateEntity.setAddress(processorTrackerAddress);boolean success = taskPersistenceService.updateTask(instanceId, task.getTaskId(), updateEntity);if (!success) {log.warn("[TaskTracker-{}] dispatch task(taskId={},taskName={}) failed due to update task status failed.", instanceId, task.getTaskId(), task.getTaskName());return;}// 2. 更新 ProcessorTrackerStatus 状态ptStatusHolder.getProcessorTrackerStatus(processorTrackerAddress).setDispatched(true);// 3. 初始化缓存taskId2BriefInfo.put(task.getTaskId(), new TaskBriefInfo(task.getTaskId(), TaskStatus.DISPATCH_SUCCESS_WORKER_UNCHECK, -1L));// 4. 任务派发TaskTrackerStartTaskReq startTaskReq = new TaskTrackerStartTaskReq(instanceInfo, task, workerRuntime.getWorkerAddress());TransportUtils.ttStartPtTask(startTaskReq, processorTrackerAddress, workerRuntime.getTransporter());log.debug("[TaskTracker-{}] dispatch task(taskId={},taskName={}) successfully.", instanceId, task.getTaskId(), task.getTaskName());}

HeavyTaskTracker的dispatchTask方法通过TransportUtils.ttStartPtTask将任务派发到ProcessorTracker

ttStartPtTask

tech/powerjob/worker/common/utils/TransportUtils.java

    public static void ttStartPtTask(TaskTrackerStartTaskReq req, String address, Transporter transporter) {final URL url = easyBuildUrl(ServerType.WORKER, WPT_PATH, WPT_HANDLER_START_TASK, address);transporter.tell(url, req);}

ttStartPtTask方法往rootPath为processorTracker,handlerPath为startTask的接口发送TaskTrackerStartTaskReq

TaskTrackerStartTaskReq

tech/powerjob/worker/pojo/request/TaskTrackerStartTaskReq.java

@Getter
@Setter
@NoArgsConstructor
public class TaskTrackerStartTaskReq implements PowerSerializable {// TaskTracker 地址private String taskTrackerAddress;private InstanceInfo instanceInfo;private String taskId;private String taskName;private byte[] taskContent;// 子任务当前重试次数private int taskCurrentRetryNums;// 秒级任务专用private long subInstanceId;private String logConfig;/*** 创建 TaskTrackerStartTaskReq,该构造方法必须在 TaskTracker 节点调用*/public TaskTrackerStartTaskReq(InstanceInfo instanceInfo, TaskDO task, String taskTrackerAddress) {this.taskTrackerAddress = taskTrackerAddress;this.instanceInfo = instanceInfo;this.taskId = task.getTaskId();this.taskName = task.getTaskName();this.taskContent = task.getTaskContent();this.taskCurrentRetryNums = task.getFailedCnt();this.subInstanceId = task.getSubInstanceId();this.logConfig = instanceInfo.getLogConfig();}
}

TaskTrackerStartTaskReq包含了taskTrackerAddress、instanceInfo、taskId、taskName、taskContent等信息

onReceiveTaskTrackerStartTaskReq

tech/powerjob/worker/actors/ProcessorTrackerActor.java

@Slf4j
@Actor(path = RemoteConstant.WPT_PATH)
public class ProcessorTrackerActor {private final WorkerRuntime workerRuntime;public ProcessorTrackerActor(WorkerRuntime workerRuntime) {this.workerRuntime = workerRuntime;}/*** 处理来自TaskTracker的task执行请求* @param req 请求*/@Handler(path = RemoteConstant.WPT_HANDLER_START_TASK, processType = ProcessType.NO_BLOCKING)public void onReceiveTaskTrackerStartTaskReq(TaskTrackerStartTaskReq req) {Long instanceId = req.getInstanceInfo().getInstanceId();// 创建 ProcessorTracker 一定能成功ProcessorTracker processorTracker = ProcessorTrackerManager.getProcessorTracker(instanceId,req.getTaskTrackerAddress(),() -> new ProcessorTracker(req, workerRuntime));TaskDO task = new TaskDO();task.setTaskId(req.getTaskId());task.setTaskName(req.getTaskName());task.setTaskContent(req.getTaskContent());task.setFailedCnt(req.getTaskCurrentRetryNums());task.setSubInstanceId(req.getSubInstanceId());processorTracker.submitTask(task);}//......
}    

ProcessorTrackerActor的onReceiveTaskTrackerStartTaskReq方法用于处理TaskTrackerStartTaskReq,它主要是创建ProcessorTracker,然后执行性submit方法

ProcessorTracker

tech/powerjob/worker/core/tracker/processor/ProcessorTracker.java

    public ProcessorTracker(TaskTrackerStartTaskReq request, WorkerRuntime workerRuntime) {try {// 赋值this.startTime = System.currentTimeMillis();this.workerRuntime = workerRuntime;this.instanceInfo = request.getInstanceInfo();this.instanceId = request.getInstanceInfo().getInstanceId();this.taskTrackerAddress = request.getTaskTrackerAddress();this.omsLogger = OmsLoggerFactory.build(instanceId, request.getLogConfig(), workerRuntime);this.statusReportRetryQueue = Queues.newLinkedBlockingQueue();this.lastIdleTime = -1L;this.lastCompletedTaskCount = 0L;// 初始化 线程池,TimingPool 启动的任务会检查 ThreadPool,所以必须先初始化线程池,否则NPEinitThreadPool();// 初始化定时任务initTimingJob();// 初始化 ProcessorprocessorBean = workerRuntime.getProcessorLoader().load(new ProcessorDefinition().setProcessorType(instanceInfo.getProcessorType()).setProcessorInfo(instanceInfo.getProcessorInfo()));log.info("[ProcessorTracker-{}] ProcessorTracker was successfully created!", instanceId);} catch (Throwable t) {log.warn("[ProcessorTracker-{}] create ProcessorTracker failed, all tasks submitted here will fail.", instanceId, t);lethal = true;lethalReason = ExceptionUtils.getMessage(t);}}

ProcessorTracker的构造器会initThreadPool、initTimingJob、加载processorBean

initThreadPool

private static final int THREAD_POOL_QUEUE_MAX_SIZE = 128;private void initThreadPool() {int poolSize = calThreadPoolSize();// 待执行队列,为了防止对内存造成较大压力,内存队列不能太大BlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(THREAD_POOL_QUEUE_MAX_SIZE);// 自定义线程池中线程名称 (PowerJob Processor Pool -> PPP)ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("PPP-%d").build();// 拒绝策略:直接抛出异常RejectedExecutionHandler rejectionHandler = new ThreadPoolExecutor.AbortPolicy();threadPool = new ThreadPoolExecutor(poolSize, poolSize, 60L, TimeUnit.SECONDS, queue, threadFactory, rejectionHandler);// 当没有任务执行时,允许销毁核心线程(即线程池最终存活线程个数可能为0)threadPool.allowCoreThreadTimeOut(true);}private int calThreadPoolSize() {ExecuteType executeType = ExecuteType.valueOf(instanceInfo.getExecuteType());ProcessorType processorType = ProcessorType.valueOf(instanceInfo.getProcessorType());// 脚本类自带线程池,不过为了少一点逻辑判断,还是象征性分配一个线程if (processorType == ProcessorType.PYTHON || processorType == ProcessorType.SHELL) {return 1;}if (executeType == ExecuteType.MAP_REDUCE || executeType == ExecuteType.MAP) {return instanceInfo.getThreadConcurrency();}if (TimeExpressionType.FREQUENT_TYPES.contains(instanceInfo.getTimeExpressionType())) {return instanceInfo.getThreadConcurrency();}return 2;}    

initThreadPool先根据不同的processorType获取线程池大小,然后创建大小为128的ArrayBlockingQueue,然后设定线程名称前缀PPP-%d",设定rejectionHandler为AbortPolicy,最后创建ThreadPoolExecutor,设置allowCoreThreadTimeOut为true

initTimingJob

    private void initTimingJob() {// PowerJob Processor TimingPoolThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("PPT-%d").build();timingPool = Executors.newSingleThreadScheduledExecutor(threadFactory);timingPool.scheduleAtFixedRate(new CheckerAndReporter(), 0, 10, TimeUnit.SECONDS);}

initTimingJob方法设定线程名称前缀为PPT-%d",然后通过Executors.newSingleThreadScheduledExecutor创建timingPool,调度CheckerAndReporter

submitTask

    public void submitTask(TaskDO newTask) {// 一旦 ProcessorTracker 出现异常,所有提交到此处的任务直接返回失败,防止形成死锁// 死锁分析:TT创建PT,PT创建失败,无法定期汇报心跳,TT长时间未收到PT心跳,认为PT宕机(确实宕机了),无法选择可用的PT再次派发任务,死锁形成,GG斯密达 T_Tif (lethal) {ProcessorReportTaskStatusReq report = new ProcessorReportTaskStatusReq().setInstanceId(instanceId).setSubInstanceId(newTask.getSubInstanceId()).setTaskId(newTask.getTaskId()).setStatus(TaskStatus.WORKER_PROCESS_FAILED.getValue()).setResult(lethalReason).setReportTime(System.currentTimeMillis());TransportUtils.ptReportTask(report, taskTrackerAddress, workerRuntime);return;}boolean success = false;// 1. 设置值并提交执行newTask.setInstanceId(instanceInfo.getInstanceId());newTask.setAddress(taskTrackerAddress);HeavyProcessorRunnable heavyProcessorRunnable = new HeavyProcessorRunnable(instanceInfo, taskTrackerAddress, newTask, processorBean, omsLogger, statusReportRetryQueue, workerRuntime);try {threadPool.submit(heavyProcessorRunnable);success = true;} catch (RejectedExecutionException ignore) {log.warn("[ProcessorTracker-{}] submit task(taskId={},taskName={}) to ThreadPool failed due to ThreadPool has too much task waiting to process, this task will dispatch to other ProcessorTracker.",instanceId, newTask.getTaskId(), newTask.getTaskName());} catch (Exception e) {log.error("[ProcessorTracker-{}] submit task(taskId={},taskName={}) to ThreadPool failed.", instanceId, newTask.getTaskId(), newTask.getTaskName(), e);}// 2. 回复接收成功if (success) {ProcessorReportTaskStatusReq reportReq = new ProcessorReportTaskStatusReq();reportReq.setInstanceId(instanceId);reportReq.setSubInstanceId(newTask.getSubInstanceId());reportReq.setTaskId(newTask.getTaskId());reportReq.setStatus(TaskStatus.WORKER_RECEIVED.getValue());reportReq.setReportTime(System.currentTimeMillis());TransportUtils.ptReportTask(reportReq, taskTrackerAddress, workerRuntime);log.debug("[ProcessorTracker-{}] submit task(taskId={}, taskName={}) success, current queue size: {}.",instanceId, newTask.getTaskId(), newTask.getTaskName(), threadPool.getQueue().size());}}

submitTask方法主要是创建HeavyProcessorRunnable,然后通过threadPool提交

HeavyProcessorRunnable

tech/powerjob/worker/core/processor/runnable/HeavyProcessorRunnable.java

@Slf4j
@AllArgsConstructor
@SuppressWarnings("squid:S1181")
public class HeavyProcessorRunnable implements Runnable {private final InstanceInfo instanceInfo;private final String taskTrackerAddress;private final TaskDO task;private final ProcessorBean processorBean;private final OmsLogger omsLogger;/*** 重试队列,ProcessorTracker 将会定期重新上报处理结果*/private final Queue<ProcessorReportTaskStatusReq> statusReportRetryQueue;private final WorkerRuntime workerRuntime;//......public void run() {// 切换线程上下文类加载器(否则用的是 Worker 类加载器,不存在容器类,在序列化/反序列化时会报 ClassNotFoundException)Thread.currentThread().setContextClassLoader(processorBean.getClassLoader());try {innerRun();} catch (InterruptedException ignore) {// ignore} catch (Throwable e) {reportStatus(TaskStatus.WORKER_PROCESS_FAILED, e.toString(), null, null);log.error("[ProcessorRunnable-{}] execute failed, please contact the author(@KFCFans) to fix the bug!", task.getInstanceId(), e);} finally {ThreadLocalStore.clear();}}//......
}    

HeavyProcessorRunnable的run方法主要是执行innerRun方法

innerRun

    public void innerRun() throws InterruptedException {final BasicProcessor processor = processorBean.getProcessor();String taskId = task.getTaskId();Long instanceId = task.getInstanceId();log.debug("[ProcessorRunnable-{}] start to run task(taskId={}&taskName={})", instanceId, taskId, task.getTaskName());ThreadLocalStore.setTask(task);ThreadLocalStore.setRuntimeMeta(workerRuntime);// 0. 构造任务上下文WorkflowContext workflowContext = constructWorkflowContext();TaskContext taskContext = constructTaskContext();taskContext.setWorkflowContext(workflowContext);// 1. 上报执行信息reportStatus(TaskStatus.WORKER_PROCESSING, null, null, null);ProcessResult processResult;ExecuteType executeType = ExecuteType.valueOf(instanceInfo.getExecuteType());// 2. 根任务 & 广播执行 特殊处理if (TaskConstant.ROOT_TASK_NAME.equals(task.getTaskName()) && executeType == ExecuteType.BROADCAST) {// 广播执行:先选本机执行 preProcess,完成后 TaskTracker 再为所有 Worker 生成子 TaskhandleBroadcastRootTask(instanceId, taskContext);return;}// 3. 最终任务特殊处理(一定和 TaskTracker 处于相同的机器)if (TaskConstant.LAST_TASK_NAME.equals(task.getTaskName())) {handleLastTask(taskId, instanceId, taskContext, executeType);return;}// 4. 正式提交运行try {processResult = processor.process(taskContext);if (processResult == null) {processResult = new ProcessResult(false, "ProcessResult can't be null");}} catch (Throwable e) {log.warn("[ProcessorRunnable-{}] task(id={},name={}) process failed.", instanceId, taskContext.getTaskId(), taskContext.getTaskName(), e);processResult = new ProcessResult(false, e.toString());}reportStatus(processResult.isSuccess() ? TaskStatus.WORKER_PROCESS_SUCCESS : TaskStatus.WORKER_PROCESS_FAILED, suit(processResult.getMsg()), null, workflowContext.getAppendedContextData());}

innerRun先构建taskContext,然后reportStatus为WORKER_PROCESSING,之后针对广播的root任务执行handleBroadcastRootTask,最后通过processor.process(taskContext)执行任务,这里catch了Throwable,出现异常则返回ProcessResult(false, e.toString()),最后根据处理结果reportStatus为WORKER_PROCESS_SUCCESS或者WORKER_PROCESS_FAILED

小结

HeavyTaskTracker的dispatchTask方法通过TransportUtils.ttStartPtTask将任务派发到ProcessorTracker;ProcessorTrackerActor的onReceiveTaskTrackerStartTaskReq方法用于处理TaskTrackerStartTaskReq,它主要是创建ProcessorTracker,然后执行性submit方法;ProcessorTracker的构造器会initThreadPool、initTimingJob、加载processorBean;submitTask方法主要是创建HeavyProcessorRunnable,然后通过threadPool提交;HeavyProcessorRunnable的run方法主要是执行innerRun方法,innerRun主要是执行processor.process(taskContext),这里catch了Throwable,出现异常则返回ProcessResult(false, e.toString())。

这篇关于聊聊PowerJob的HeavyTaskTracker的dispatchTask的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/554425

相关文章

聊聊说话的习惯

1 在日常生活中,每个人都有固定的说话习惯。心理学研究表明,通过一个人的说话习惯,也可以分析出他的性格特点。对于每一个人来讲,说话习惯已经融为他们生活中的一部分。在社交活动中,一些不良的说话习惯很可能会给他们带来麻烦。因此,了解说话习惯对心理活动的影响是十分有必要的。 2 具有顺畅的说话习惯的人,大多思路清晰、语速适中、用词准确并且声声人耳,是典型的顺畅型说话方式这种类型的人要么不说话,要么

聊聊分布式,再讨论分布式解决方案

前言 最近很久没有写博客了,一方面是因为公司事情最近比较忙,另外一方面是因为在进行 CAP 的下一阶段的开发工作,不过目前已经告一段落了。 接下来还是开始我们今天的话题,说说分布式事务,或者说是我眼中的分布式事务,因为每个人可能对其的理解都不一样。 分布式事务是企业集成中的一个技术难点,也是每一个分布式系统架构中都会涉及到的一个东西,特别是在微服务架构中,几乎可以说是无法避免,本文就分布式事

聊聊资源调度

资源调度 般分为两个阶段: 是实现物理资源的虚拟化(即资源的抽象)于当前机器的性能越来越好,硬件配置越来越高,直接用物理机跑业务比较浪费,所以将物理机分割成更小单位的虚拟机,这样可以显著提升机器的利用效率,在公司内部一般采用容器技术来隔离资源 是将资源虚拟化后进 步在时间和空间上实现更细粒度的编排 ,优化资源的使用。 1 .一些数据 如果公司的几万台机器都是物理机,那么资源的使用率稍低: CP

聊聊Spark中的宽依赖和窄依赖

开门见山,本文就针对一个点,谈谈Spark中的宽依赖和窄依赖,这是Spark计算引擎划分Stage的根源所在,遇到宽依赖,则划分为多个stage,针对每个Stage,提交一个TaskSet: 上图:一张网上的图: 基于此图,分析下这里为什么前面的流程都是窄依赖,而后面的却是宽依赖: 我们仔细看看,map和filter算子中,对于父RDD来说,一个分区内的数据,有且仅有一个子RDD的分区来

聊聊灰度发布

有没有在北京面试java的小伙伴,每家公司面试问的问题都不一样,昨天面试官问到了灰度发布,一脸懵,好像在哪儿听说过,毕竟我都没发布过,之前都是项目组长在干这些事儿,所以聊聊,了解一下 什么是灰度发布 全量发布:把旧服务kill掉,把新服务启动,这个过程就可以理解为全量发布 回滚周期长 如果我们更新完应用之后,我们做线上回归测试的时候发现有BUG,这个时候就要做回滚,过程就是把新服

聊聊随机测试和猴子测试

目录 随机测试的特点 1.不可预测性 2.缺乏针对性 3.自动化 4.资源密集型 猴子测试 随机测试 (Random Testing) 猴子测试 (Monkey Testing) 特点: 区别 1.控制程度 2.目标差异 3.实现方式 在我们测试的过程中,通常会使用到随机测试和猴子测试,其中随机测试侧重于人工测试,猴子测试侧重于借助工具执行命令进行测试。 随机测试

【聊聊经济社会】论阶级跨越

为什么要在市场中寻求自由,在市场中寻求洒脱,原因不胜其数,其中便有一条,现实生活中多是xx,可能社会属性本身就具备党同伐异,像是一股意志,平庸一切不平庸,中和一切特立独行,最终以达到一种变态的稳定. 消其意志,断其未来,耗其钱财 ,而我称之为阶级壁垒 阶级之所以难以跨越,主要也在于这三点 一:没有这样的志向,像那种羡慕有钱,或者羡慕有权,权当做梦。这样的志向,正常人只停留于羡慕的层次,而一旦受到丁

聊聊PC端页面适配

聊聊PC端页面适配  目也pc端有适配的需求:目前我们pc项目的设计稿尺寸是宽度1920,高度最小是1080。 适配目标: 1.在不同分辨率的电脑上,网页可以正常显示 2.放大或者缩小屏幕,网页可以正常显示 对于宽度的适配   对于宽度适配: 首先设置html,body{width:100%;overflow-x:hidden;} 然后我们可以把页面分解为背景层(

来聊聊我用go手写redis这件事

写在文章开头 网上有看过一些实现redis的项目,要么完全脱离go语言的理念,要么又完全去迎合c的实现理念,也不是说这些项目写的不好,只能说不符合笔者所认为的那种"平衡",于是整理了一段时间的设计稿,自己尝试着用go语言写了一版"有redis味道"的mini-redis。 截至目前,笔者已经完成了redis服务端和客户端交互的基本通信架构和实现基调,如下所示,可以看到笔者已经实现了ping

供应链劫持?聊聊什么是RepoJacking

介绍        近几个月来,对开源存储库的主要威胁就包括存储仓库劫持,通常称为RepoJacking。RepoJacking 是指恶意攻击者通过一定手段接管托管存储库的所有权或维护者的账户。通过获取对账户的访问权限,攻击者可以将恶意代码注入到使用对应仓库作为依赖项的项目中。 RepoJacking 如何攻击?        存储库攻击,也称为供应链攻击,通常利用 GitH