本文主要是介绍聊聊PowerJob的HeavyTaskTracker的dispatchTask,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
序
本文主要研究一下PowerJob的HeavyTaskTracker的dispatchTask
dispatchTask
tech/powerjob/worker/core/tracker/task/heavy/HeavyTaskTracker.java
protected void dispatchTask(TaskDO task, String processorTrackerAddress) {// 1. 持久化,更新数据库(如果更新数据库失败,可能导致重复执行,先不处理)TaskDO updateEntity = new TaskDO();updateEntity.setStatus(TaskStatus.DISPATCH_SUCCESS_WORKER_UNCHECK.getValue());// 写入处理该任务的 ProcessorTrackerupdateEntity.setAddress(processorTrackerAddress);boolean success = taskPersistenceService.updateTask(instanceId, task.getTaskId(), updateEntity);if (!success) {log.warn("[TaskTracker-{}] dispatch task(taskId={},taskName={}) failed due to update task status failed.", instanceId, task.getTaskId(), task.getTaskName());return;}// 2. 更新 ProcessorTrackerStatus 状态ptStatusHolder.getProcessorTrackerStatus(processorTrackerAddress).setDispatched(true);// 3. 初始化缓存taskId2BriefInfo.put(task.getTaskId(), new TaskBriefInfo(task.getTaskId(), TaskStatus.DISPATCH_SUCCESS_WORKER_UNCHECK, -1L));// 4. 任务派发TaskTrackerStartTaskReq startTaskReq = new TaskTrackerStartTaskReq(instanceInfo, task, workerRuntime.getWorkerAddress());TransportUtils.ttStartPtTask(startTaskReq, processorTrackerAddress, workerRuntime.getTransporter());log.debug("[TaskTracker-{}] dispatch task(taskId={},taskName={}) successfully.", instanceId, task.getTaskId(), task.getTaskName());}
HeavyTaskTracker的dispatchTask方法通过TransportUtils.ttStartPtTask将任务派发到ProcessorTracker
ttStartPtTask
tech/powerjob/worker/common/utils/TransportUtils.java
public static void ttStartPtTask(TaskTrackerStartTaskReq req, String address, Transporter transporter) {final URL url = easyBuildUrl(ServerType.WORKER, WPT_PATH, WPT_HANDLER_START_TASK, address);transporter.tell(url, req);}
ttStartPtTask方法往rootPath为processorTracker,handlerPath为startTask的接口发送TaskTrackerStartTaskReq
TaskTrackerStartTaskReq
tech/powerjob/worker/pojo/request/TaskTrackerStartTaskReq.java
@Getter
@Setter
@NoArgsConstructor
public class TaskTrackerStartTaskReq implements PowerSerializable {// TaskTracker 地址private String taskTrackerAddress;private InstanceInfo instanceInfo;private String taskId;private String taskName;private byte[] taskContent;// 子任务当前重试次数private int taskCurrentRetryNums;// 秒级任务专用private long subInstanceId;private String logConfig;/*** 创建 TaskTrackerStartTaskReq,该构造方法必须在 TaskTracker 节点调用*/public TaskTrackerStartTaskReq(InstanceInfo instanceInfo, TaskDO task, String taskTrackerAddress) {this.taskTrackerAddress = taskTrackerAddress;this.instanceInfo = instanceInfo;this.taskId = task.getTaskId();this.taskName = task.getTaskName();this.taskContent = task.getTaskContent();this.taskCurrentRetryNums = task.getFailedCnt();this.subInstanceId = task.getSubInstanceId();this.logConfig = instanceInfo.getLogConfig();}
}
TaskTrackerStartTaskReq包含了taskTrackerAddress、instanceInfo、taskId、taskName、taskContent等信息
onReceiveTaskTrackerStartTaskReq
tech/powerjob/worker/actors/ProcessorTrackerActor.java
@Slf4j
@Actor(path = RemoteConstant.WPT_PATH)
public class ProcessorTrackerActor {private final WorkerRuntime workerRuntime;public ProcessorTrackerActor(WorkerRuntime workerRuntime) {this.workerRuntime = workerRuntime;}/*** 处理来自TaskTracker的task执行请求* @param req 请求*/@Handler(path = RemoteConstant.WPT_HANDLER_START_TASK, processType = ProcessType.NO_BLOCKING)public void onReceiveTaskTrackerStartTaskReq(TaskTrackerStartTaskReq req) {Long instanceId = req.getInstanceInfo().getInstanceId();// 创建 ProcessorTracker 一定能成功ProcessorTracker processorTracker = ProcessorTrackerManager.getProcessorTracker(instanceId,req.getTaskTrackerAddress(),() -> new ProcessorTracker(req, workerRuntime));TaskDO task = new TaskDO();task.setTaskId(req.getTaskId());task.setTaskName(req.getTaskName());task.setTaskContent(req.getTaskContent());task.setFailedCnt(req.getTaskCurrentRetryNums());task.setSubInstanceId(req.getSubInstanceId());processorTracker.submitTask(task);}//......
}
ProcessorTrackerActor的onReceiveTaskTrackerStartTaskReq方法用于处理TaskTrackerStartTaskReq,它主要是创建ProcessorTracker,然后执行性submit方法
ProcessorTracker
tech/powerjob/worker/core/tracker/processor/ProcessorTracker.java
public ProcessorTracker(TaskTrackerStartTaskReq request, WorkerRuntime workerRuntime) {try {// 赋值this.startTime = System.currentTimeMillis();this.workerRuntime = workerRuntime;this.instanceInfo = request.getInstanceInfo();this.instanceId = request.getInstanceInfo().getInstanceId();this.taskTrackerAddress = request.getTaskTrackerAddress();this.omsLogger = OmsLoggerFactory.build(instanceId, request.getLogConfig(), workerRuntime);this.statusReportRetryQueue = Queues.newLinkedBlockingQueue();this.lastIdleTime = -1L;this.lastCompletedTaskCount = 0L;// 初始化 线程池,TimingPool 启动的任务会检查 ThreadPool,所以必须先初始化线程池,否则NPEinitThreadPool();// 初始化定时任务initTimingJob();// 初始化 ProcessorprocessorBean = workerRuntime.getProcessorLoader().load(new ProcessorDefinition().setProcessorType(instanceInfo.getProcessorType()).setProcessorInfo(instanceInfo.getProcessorInfo()));log.info("[ProcessorTracker-{}] ProcessorTracker was successfully created!", instanceId);} catch (Throwable t) {log.warn("[ProcessorTracker-{}] create ProcessorTracker failed, all tasks submitted here will fail.", instanceId, t);lethal = true;lethalReason = ExceptionUtils.getMessage(t);}}
ProcessorTracker的构造器会initThreadPool、initTimingJob、加载processorBean
initThreadPool
private static final int THREAD_POOL_QUEUE_MAX_SIZE = 128;private void initThreadPool() {int poolSize = calThreadPoolSize();// 待执行队列,为了防止对内存造成较大压力,内存队列不能太大BlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(THREAD_POOL_QUEUE_MAX_SIZE);// 自定义线程池中线程名称 (PowerJob Processor Pool -> PPP)ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("PPP-%d").build();// 拒绝策略:直接抛出异常RejectedExecutionHandler rejectionHandler = new ThreadPoolExecutor.AbortPolicy();threadPool = new ThreadPoolExecutor(poolSize, poolSize, 60L, TimeUnit.SECONDS, queue, threadFactory, rejectionHandler);// 当没有任务执行时,允许销毁核心线程(即线程池最终存活线程个数可能为0)threadPool.allowCoreThreadTimeOut(true);}private int calThreadPoolSize() {ExecuteType executeType = ExecuteType.valueOf(instanceInfo.getExecuteType());ProcessorType processorType = ProcessorType.valueOf(instanceInfo.getProcessorType());// 脚本类自带线程池,不过为了少一点逻辑判断,还是象征性分配一个线程if (processorType == ProcessorType.PYTHON || processorType == ProcessorType.SHELL) {return 1;}if (executeType == ExecuteType.MAP_REDUCE || executeType == ExecuteType.MAP) {return instanceInfo.getThreadConcurrency();}if (TimeExpressionType.FREQUENT_TYPES.contains(instanceInfo.getTimeExpressionType())) {return instanceInfo.getThreadConcurrency();}return 2;}
initThreadPool先根据不同的processorType获取线程池大小,然后创建大小为128的ArrayBlockingQueue,然后设定线程名称前缀
PPP-%d"
,设定rejectionHandler为AbortPolicy,最后创建ThreadPoolExecutor,设置allowCoreThreadTimeOut为true
initTimingJob
private void initTimingJob() {// PowerJob Processor TimingPoolThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("PPT-%d").build();timingPool = Executors.newSingleThreadScheduledExecutor(threadFactory);timingPool.scheduleAtFixedRate(new CheckerAndReporter(), 0, 10, TimeUnit.SECONDS);}
initTimingJob方法设定线程名称前缀为
PPT-%d"
,然后通过Executors.newSingleThreadScheduledExecutor创建timingPool,调度CheckerAndReporter
submitTask
public void submitTask(TaskDO newTask) {// 一旦 ProcessorTracker 出现异常,所有提交到此处的任务直接返回失败,防止形成死锁// 死锁分析:TT创建PT,PT创建失败,无法定期汇报心跳,TT长时间未收到PT心跳,认为PT宕机(确实宕机了),无法选择可用的PT再次派发任务,死锁形成,GG斯密达 T_Tif (lethal) {ProcessorReportTaskStatusReq report = new ProcessorReportTaskStatusReq().setInstanceId(instanceId).setSubInstanceId(newTask.getSubInstanceId()).setTaskId(newTask.getTaskId()).setStatus(TaskStatus.WORKER_PROCESS_FAILED.getValue()).setResult(lethalReason).setReportTime(System.currentTimeMillis());TransportUtils.ptReportTask(report, taskTrackerAddress, workerRuntime);return;}boolean success = false;// 1. 设置值并提交执行newTask.setInstanceId(instanceInfo.getInstanceId());newTask.setAddress(taskTrackerAddress);HeavyProcessorRunnable heavyProcessorRunnable = new HeavyProcessorRunnable(instanceInfo, taskTrackerAddress, newTask, processorBean, omsLogger, statusReportRetryQueue, workerRuntime);try {threadPool.submit(heavyProcessorRunnable);success = true;} catch (RejectedExecutionException ignore) {log.warn("[ProcessorTracker-{}] submit task(taskId={},taskName={}) to ThreadPool failed due to ThreadPool has too much task waiting to process, this task will dispatch to other ProcessorTracker.",instanceId, newTask.getTaskId(), newTask.getTaskName());} catch (Exception e) {log.error("[ProcessorTracker-{}] submit task(taskId={},taskName={}) to ThreadPool failed.", instanceId, newTask.getTaskId(), newTask.getTaskName(), e);}// 2. 回复接收成功if (success) {ProcessorReportTaskStatusReq reportReq = new ProcessorReportTaskStatusReq();reportReq.setInstanceId(instanceId);reportReq.setSubInstanceId(newTask.getSubInstanceId());reportReq.setTaskId(newTask.getTaskId());reportReq.setStatus(TaskStatus.WORKER_RECEIVED.getValue());reportReq.setReportTime(System.currentTimeMillis());TransportUtils.ptReportTask(reportReq, taskTrackerAddress, workerRuntime);log.debug("[ProcessorTracker-{}] submit task(taskId={}, taskName={}) success, current queue size: {}.",instanceId, newTask.getTaskId(), newTask.getTaskName(), threadPool.getQueue().size());}}
submitTask方法主要是创建HeavyProcessorRunnable,然后通过threadPool提交
HeavyProcessorRunnable
tech/powerjob/worker/core/processor/runnable/HeavyProcessorRunnable.java
@Slf4j
@AllArgsConstructor
@SuppressWarnings("squid:S1181")
public class HeavyProcessorRunnable implements Runnable {private final InstanceInfo instanceInfo;private final String taskTrackerAddress;private final TaskDO task;private final ProcessorBean processorBean;private final OmsLogger omsLogger;/*** 重试队列,ProcessorTracker 将会定期重新上报处理结果*/private final Queue<ProcessorReportTaskStatusReq> statusReportRetryQueue;private final WorkerRuntime workerRuntime;//......public void run() {// 切换线程上下文类加载器(否则用的是 Worker 类加载器,不存在容器类,在序列化/反序列化时会报 ClassNotFoundException)Thread.currentThread().setContextClassLoader(processorBean.getClassLoader());try {innerRun();} catch (InterruptedException ignore) {// ignore} catch (Throwable e) {reportStatus(TaskStatus.WORKER_PROCESS_FAILED, e.toString(), null, null);log.error("[ProcessorRunnable-{}] execute failed, please contact the author(@KFCFans) to fix the bug!", task.getInstanceId(), e);} finally {ThreadLocalStore.clear();}}//......
}
HeavyProcessorRunnable的run方法主要是执行innerRun方法
innerRun
public void innerRun() throws InterruptedException {final BasicProcessor processor = processorBean.getProcessor();String taskId = task.getTaskId();Long instanceId = task.getInstanceId();log.debug("[ProcessorRunnable-{}] start to run task(taskId={}&taskName={})", instanceId, taskId, task.getTaskName());ThreadLocalStore.setTask(task);ThreadLocalStore.setRuntimeMeta(workerRuntime);// 0. 构造任务上下文WorkflowContext workflowContext = constructWorkflowContext();TaskContext taskContext = constructTaskContext();taskContext.setWorkflowContext(workflowContext);// 1. 上报执行信息reportStatus(TaskStatus.WORKER_PROCESSING, null, null, null);ProcessResult processResult;ExecuteType executeType = ExecuteType.valueOf(instanceInfo.getExecuteType());// 2. 根任务 & 广播执行 特殊处理if (TaskConstant.ROOT_TASK_NAME.equals(task.getTaskName()) && executeType == ExecuteType.BROADCAST) {// 广播执行:先选本机执行 preProcess,完成后 TaskTracker 再为所有 Worker 生成子 TaskhandleBroadcastRootTask(instanceId, taskContext);return;}// 3. 最终任务特殊处理(一定和 TaskTracker 处于相同的机器)if (TaskConstant.LAST_TASK_NAME.equals(task.getTaskName())) {handleLastTask(taskId, instanceId, taskContext, executeType);return;}// 4. 正式提交运行try {processResult = processor.process(taskContext);if (processResult == null) {processResult = new ProcessResult(false, "ProcessResult can't be null");}} catch (Throwable e) {log.warn("[ProcessorRunnable-{}] task(id={},name={}) process failed.", instanceId, taskContext.getTaskId(), taskContext.getTaskName(), e);processResult = new ProcessResult(false, e.toString());}reportStatus(processResult.isSuccess() ? TaskStatus.WORKER_PROCESS_SUCCESS : TaskStatus.WORKER_PROCESS_FAILED, suit(processResult.getMsg()), null, workflowContext.getAppendedContextData());}
innerRun先构建taskContext,然后reportStatus为WORKER_PROCESSING,之后针对广播的root任务执行handleBroadcastRootTask,最后通过processor.process(taskContext)执行任务,这里catch了Throwable,出现异常则返回ProcessResult(false, e.toString()),最后根据处理结果reportStatus为WORKER_PROCESS_SUCCESS或者WORKER_PROCESS_FAILED
小结
HeavyTaskTracker的dispatchTask方法通过TransportUtils.ttStartPtTask将任务派发到ProcessorTracker;ProcessorTrackerActor的onReceiveTaskTrackerStartTaskReq方法用于处理TaskTrackerStartTaskReq,它主要是创建ProcessorTracker,然后执行性submit方法;ProcessorTracker的构造器会initThreadPool、initTimingJob、加载processorBean;submitTask方法主要是创建HeavyProcessorRunnable,然后通过threadPool提交;HeavyProcessorRunnable的run方法主要是执行innerRun方法,innerRun主要是执行processor.process(taskContext),这里catch了Throwable,出现异常则返回ProcessResult(false, e.toString())。
这篇关于聊聊PowerJob的HeavyTaskTracker的dispatchTask的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!