本文主要是介绍聊聊powerjob的failedTaskNum,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
序
本文主要研究一下powerjob的failedTaskNum
InstanceStatisticsHolder
powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/heavy/HeavyTaskTracker.java
@Dataprotected static class InstanceStatisticsHolder {// 等待派发状态(仅存在 TaskTracker 数据库中)protected long waitingDispatchNum;// 已派发,但 ProcessorTracker 未确认,可能由于网络错误请求未送达,也有可能 ProcessorTracker 线程池满,拒绝执行protected long workerUnreceivedNum;// ProcessorTracker确认接收,存在与线程池队列中,排队执行protected long receivedNum;// ProcessorTracker正在执行protected long runningNum;protected long failedNum;protected long succeedNum;public long getTotalTaskNum() {return waitingDispatchNum + workerUnreceivedNum + receivedNum + runningNum + failedNum + succeedNum;}}
InstanceStatisticsHolder用于存储task的各种状态的数量
fetchRunningStatus
powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/heavy/CommonTaskTracker.java
public InstanceDetail fetchRunningStatus() {InstanceDetail detail = new InstanceDetail();// 填充基础信息detail.setActualTriggerTime(createTime);detail.setStatus(InstanceStatus.RUNNING.getV());detail.setTaskTrackerAddress(workerRuntime.getWorkerAddress());// 填充详细信息InstanceStatisticsHolder holder = getInstanceStatisticsHolder(instanceId);InstanceDetail.TaskDetail taskDetail = new InstanceDetail.TaskDetail();taskDetail.setSucceedTaskNum(holder.succeedNum);taskDetail.setFailedTaskNum(holder.failedNum);taskDetail.setTotalTaskNum(holder.getTotalTaskNum());detail.setTaskDetail(taskDetail);return detail;}
CommonTaskTracker的fetchRunningStatus会执行getInstanceStatisticsHolder获取InstanceStatisticsHolder,之后用holder.failedNum填充taskDetail的failedTaskNum
getInstanceStatisticsHolder
powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/heavy/HeavyTaskTracker.java
protected InstanceStatisticsHolder getInstanceStatisticsHolder(long subInstanceId) {Map<TaskStatus, Long> status2Num = taskPersistenceService.getTaskStatusStatistics(instanceId, subInstanceId);InstanceStatisticsHolder holder = new InstanceStatisticsHolder();holder.waitingDispatchNum = status2Num.getOrDefault(TaskStatus.WAITING_DISPATCH, 0L);holder.workerUnreceivedNum = status2Num.getOrDefault(TaskStatus.DISPATCH_SUCCESS_WORKER_UNCHECK, 0L);holder.receivedNum = status2Num.getOrDefault(TaskStatus.WORKER_RECEIVED, 0L);holder.runningNum = status2Num.getOrDefault(TaskStatus.WORKER_PROCESSING, 0L);holder.failedNum = status2Num.getOrDefault(TaskStatus.WORKER_PROCESS_FAILED, 0L);holder.succeedNum = status2Num.getOrDefault(TaskStatus.WORKER_PROCESS_SUCCESS, 0L);return holder;}
getInstanceStatisticsHolder通过taskPersistenceService.getTaskStatusStatistics获取status2Num,然后获取状态为TaskStatus.WORKER_PROCESS_FAILED的数量作为failedNum
getTaskStatusStatistics
powerjob-worker/src/main/java/tech/powerjob/worker/persistence/TaskPersistenceService.java
public Map<TaskStatus, Long> getTaskStatusStatistics(Long instanceId, Long subInstanceId) {try {SimpleTaskQuery query = new SimpleTaskQuery();query.setInstanceId(instanceId);query.setSubInstanceId(subInstanceId);query.setQueryContent("status, count(*) as num");query.setOtherCondition("GROUP BY status");return execute(() -> {List<Map<String, Object>> dbRES = taskDAO.simpleQueryPlus(query);Map<TaskStatus, Long> result = Maps.newHashMap();dbRES.forEach(row -> {// H2 数据库都是大写...int status = Integer.parseInt(String.valueOf(row.get("status")));long num = Long.parseLong(String.valueOf(row.get("num")));result.put(TaskStatus.of(status), num);});return result;});}catch (Exception e) {log.error("[TaskPersistenceService] getTaskStatusStatistics for instance(id={}) failed.", instanceId, e);}return Maps.newHashMap();}
TaskPersistenceService的getTaskStatusStatistics执行
select status, count(*) as num from task_info where instance_id= ? and sub_instance_id=? GROUP BY status
TaskTracker will have a retry
LightTaskTracker
powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/light/LightTaskTracker.java
private ProcessResult processTask() {executeThread.set(Thread.currentThread());// 设置任务开始执行的时间taskStartTime = System.currentTimeMillis();status = TaskStatus.WORKER_PROCESSING;// 开始执行时,提交任务判断是否超时ProcessResult res = null;do {Thread.currentThread().setContextClassLoader(processorBean.getClassLoader());if (res != null && !res.isSuccess()) {// 重试taskContext.setCurrentRetryTimes(taskContext.getCurrentRetryTimes() + 1);log.warn("[TaskTracker-{}] process failed, TaskTracker will have a retry,current retryTimes : {}", instanceId, taskContext.getCurrentRetryTimes());}try {res = processorBean.getProcessor().process(taskContext);} catch (InterruptedException e) {log.warn("[TaskTracker-{}] task has been interrupted !", instanceId, e);Thread.currentThread().interrupt();if (timeoutFlag.get()) {res = new ProcessResult(false, SystemInstanceResult.INSTANCE_EXECUTE_TIMEOUT_INTERRUPTED);} else if (stopFlag.get()) {res = new ProcessResult(false, SystemInstanceResult.USER_STOP_INSTANCE_INTERRUPTED);} else {res = new ProcessResult(false, e.toString());}} catch (Exception e) {log.warn("[TaskTracker-{}] process failed !", instanceId, e);res = new ProcessResult(false, e.toString());}if (res == null) {log.warn("[TaskTracker-{}] processor return null !", instanceId);res = new ProcessResult(false, "Processor return null");}} while (!res.isSuccess() && taskContext.getCurrentRetryTimes() < taskContext.getMaxRetryTimes() && !timeoutFlag.get() && !stopFlag.get());executeThread.set(null);taskEndTime = System.currentTimeMillis();finished.set(true);result = res;status = result.isSuccess() ? TaskStatus.WORKER_PROCESS_SUCCESS : TaskStatus.WORKER_PROCESS_FAILED;// 取消超时检查任务if (timeoutCheckScheduledFuture != null) {timeoutCheckScheduledFuture.cancel(true);}log.info("[TaskTracker-{}] task complete ! create time:{},queue time:{},use time:{},result:{}", instanceId, createTime, taskStartTime - createTime, System.currentTimeMillis() - taskStartTime, result);// 执行完成后立即上报一次checkAndReportStatus();return result;}
LightTaskTracker的processTask的时候,在ProcessResult不为成功的时候,会递增重试次数,打印
[TaskTracker-{}] process failed, TaskTracker will have a retry,current retryTimes : {}
updateTaskStatus
powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/heavy/HeavyTaskTracker.java
public void updateTaskStatus(Long subInstanceId, String taskId, int newStatus, long reportTime, @Nullable String result) {if (finished.get()) {return;}TaskStatus nTaskStatus = TaskStatus.of(newStatus);int lockId = taskId.hashCode();try {// 阻塞获取锁segmentLock.lockInterruptible(lockId);TaskBriefInfo taskBriefInfo = taskId2BriefInfo.getIfPresent(taskId);// 缓存中不存在,从数据库查if (taskBriefInfo == null) {Optional<TaskDO> taskOpt = taskPersistenceService.getTask(instanceId, taskId);if (taskOpt.isPresent()) {TaskDO taskDO = taskOpt.get();taskBriefInfo = new TaskBriefInfo(taskId, TaskStatus.of(taskDO.getStatus()), taskDO.getLastReportTime());} else {// 理论上不存在这种情况,除非数据库异常log.error("[TaskTracker-{}-{}] can't find task by taskId={}.", instanceId, subInstanceId, taskId);taskBriefInfo = new TaskBriefInfo(taskId, TaskStatus.WAITING_DISPATCH, -1L);}// 写入缓存taskId2BriefInfo.put(taskId, taskBriefInfo);}// 过滤过期的请求(潜在的集群时间一致性需求,重试跨 Worker 时,时间不一致可能导致问题)if (taskBriefInfo.getLastReportTime() > reportTime) {log.warn("[TaskTracker-{}-{}] receive expired(last {} > current {}) task status report(taskId={},newStatus={}), TaskTracker will drop this report.",instanceId, subInstanceId, taskBriefInfo.getLastReportTime(), reportTime, taskId, newStatus);return;}// 检查状态转移是否合法,fix issue 404if (nTaskStatus.getValue() < taskBriefInfo.getStatus().getValue()) {log.warn("[TaskTracker-{}-{}] receive invalid task status report(taskId={},currentStatus={},newStatus={}), TaskTracker will drop this report.",instanceId, subInstanceId, taskId, taskBriefInfo.getStatus().getValue(), newStatus);return;}// 此时本次请求已经有效,先更新相关信息taskBriefInfo.setLastReportTime(reportTime);taskBriefInfo.setStatus(nTaskStatus);// 处理失败的情况int configTaskRetryNum = instanceInfo.getTaskRetryNum();if (nTaskStatus == TaskStatus.WORKER_PROCESS_FAILED && configTaskRetryNum >= 1) {// 失败不是主要的情况,多查一次数据库也问题不大(况且前面有缓存顶着,大部分情况之前不会去查DB)Optional<TaskDO> taskOpt = taskPersistenceService.getTask(instanceId, taskId);// 查询DB再失败的话,就不重试了...if (taskOpt.isPresent()) {int failedCnt = taskOpt.get().getFailedCnt();if (failedCnt < configTaskRetryNum) {TaskDO updateEntity = new TaskDO();updateEntity.setFailedCnt(failedCnt + 1);/*地址规则:1. 当前存储的地址为任务派发的目的地(ProcessorTracker地址)2. 根任务、最终任务必须由TaskTracker所在机器执行(如果是根任务和最终任务,不应当修改地址)3. 广播任务每台机器都需要执行,因此不应该重新分配worker(广播任务不应当修改地址)*/String taskName = taskOpt.get().getTaskName();ExecuteType executeType = ExecuteType.valueOf(instanceInfo.getExecuteType());if (!taskName.equals(TaskConstant.ROOT_TASK_NAME) && !taskName.equals(TaskConstant.LAST_TASK_NAME) && executeType != ExecuteType.BROADCAST) {updateEntity.setAddress(RemoteConstant.EMPTY_ADDRESS);}updateEntity.setStatus(TaskStatus.WAITING_DISPATCH.getValue());updateEntity.setLastReportTime(reportTime);boolean retryTask = taskPersistenceService.updateTask(instanceId, taskId, updateEntity);if (retryTask) {log.info("[TaskTracker-{}-{}] task(taskId={}) process failed, TaskTracker will have a retry.", instanceId, subInstanceId, taskId);return;}}}}// 更新状态(失败重试写入DB失败的,也就不重试了...谁让你那么倒霉呢...)result = result == null ? "" : result;boolean updateResult = taskPersistenceService.updateTaskStatus(instanceId, taskId, newStatus, reportTime, result);if (!updateResult) {log.warn("[TaskTracker-{}-{}] update task status failed, this task(taskId={}) may be processed repeatedly!", instanceId, subInstanceId, taskId);}} catch (InterruptedException ignore) {// ignore} catch (Exception e) {log.warn("[TaskTracker-{}-{}] update task status failed.", instanceId, subInstanceId, e);} finally {segmentLock.unlock(lockId);}}
HeavyTaskTracker在updateTaskStatus的时候,对于retryTask会打印
[TaskTracker-{}-{}] task(taskId={}) process failed, TaskTracker will have a retry.
这里前提是failedCnt < configTaskRetryNum
,而这个configTaskRetryNum为instanceInfo.getTaskRetryNum()
taskRetryNum
powerjob-worker/src/main/java/tech/powerjob/worker/pojo/model/InstanceInfo.java
@Data
public class InstanceInfo implements Serializable {/*** 基础信息*/private Long jobId;private Long instanceId;private Long wfInstanceId;/*** 任务执行处理器信息*/// 任务执行类型,单机、广播、MRprivate String executeType;// 处理器类型(JavaBean、Jar、脚本等)private String processorType;// 处理器信息private String processorInfo;// 定时类型private int timeExpressionType;/*** 超时时间*/// 整个任务的总体超时时间private long instanceTimeoutMS;/*** 任务运行参数*/// 任务级别的参数,相当于类的static变量private String jobParams;// 实例级别的参数,相当于类的普通变量private String instanceParams;// 每台机器的处理线程数上限private int threadConcurrency;// 子任务重试次数(任务本身的重试机制由server控制)private int taskRetryNum;private String logConfig;
}
InstanceInfo定义了taskRetryNum,用于指定子任务的重试次数,默认是1
StatusCheckRunnable
powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/heavy/CommonTaskTracker.java
private class StatusCheckRunnable implements Runnable {private static final long DISPATCH_TIME_OUT_MS = 15000;@SuppressWarnings("squid:S3776")private void innerRun() {InstanceStatisticsHolder holder = getInstanceStatisticsHolder(instanceId);long finishedNum = holder.succeedNum + holder.failedNum;long unfinishedNum = holder.waitingDispatchNum + holder.workerUnreceivedNum + holder.receivedNum + holder.runningNum;log.debug("[TaskTracker-{}] status check result: {}", instanceId, holder);TaskTrackerReportInstanceStatusReq req = new TaskTrackerReportInstanceStatusReq();req.setAppId(workerRuntime.getAppId());req.setJobId(instanceInfo.getJobId());req.setInstanceId(instanceId);req.setWfInstanceId(instanceInfo.getWfInstanceId());req.setTotalTaskNum(finishedNum + unfinishedNum);req.setSucceedTaskNum(holder.succeedNum);req.setFailedTaskNum(holder.failedNum);req.setReportTime(System.currentTimeMillis());req.setStartTime(createTime);req.setSourceAddress(workerRuntime.getWorkerAddress());boolean success = false;String result = null;// 2. 如果未完成任务数为0,判断是否真正结束,并获取真正结束任务的执行结果if (unfinishedNum == 0) {// 数据库中一个任务都没有,说明根任务创建失败,该任务实例失败if (finishedNum == 0) {finished.set(true);result = SystemInstanceResult.TASK_INIT_FAILED;} else {ExecuteType executeType = ExecuteType.valueOf(instanceInfo.getExecuteType());switch (executeType) {// STANDALONE 只有一个任务,完成即结束case STANDALONE:finished.set(true);List<TaskDO> allTask = taskPersistenceService.getAllTask(instanceId, instanceId);if (CollectionUtils.isEmpty(allTask) || allTask.size() > 1) {result = SystemInstanceResult.UNKNOWN_BUG;log.warn("[TaskTracker-{}] there must have some bug in TaskTracker.", instanceId);} else {result = allTask.get(0).getResult();success = allTask.get(0).getStatus() == TaskStatus.WORKER_PROCESS_SUCCESS.getValue();}break;// MAP 不关心结果,最简单case MAP:finished.set(true);success = holder.failedNum == 0;result = String.format("total:%d,succeed:%d,failed:%d", holder.getTotalTaskNum(), holder.succeedNum, holder.failedNum);break;// MapReduce 和 Broadcast 任务实例是否完成根据**LastTask**的执行情况判断default:Optional<TaskDO> lastTaskOptional = taskPersistenceService.getLastTask(instanceId, instanceId);if (lastTaskOptional.isPresent()) {// 存在则根据 reduce 任务来判断状态TaskDO resultTask = lastTaskOptional.get();TaskStatus lastTaskStatus = TaskStatus.of(resultTask.getStatus());if (lastTaskStatus == TaskStatus.WORKER_PROCESS_SUCCESS || lastTaskStatus == TaskStatus.WORKER_PROCESS_FAILED) {finished.set(true);success = lastTaskStatus == TaskStatus.WORKER_PROCESS_SUCCESS;result = resultTask.getResult();}} else {// 不存在,代表前置任务刚刚执行完毕,需要创建 lastTask,最终任务必须在本机执行!TaskDO newLastTask = new TaskDO();newLastTask.setTaskName(TaskConstant.LAST_TASK_NAME);newLastTask.setTaskId(LAST_TASK_ID);newLastTask.setSubInstanceId(instanceId);newLastTask.setAddress(workerRuntime.getWorkerAddress());submitTask(Lists.newArrayList(newLastTask));}}}}// 3. 检查任务实例整体是否超时if (isTimeout()) {finished.set(true);success = false;result = SystemInstanceResult.INSTANCE_EXECUTE_TIMEOUT;}// 4. 执行完毕,报告服务器if (finished.get()) {req.setResult(result);// 上报追加的工作流上下文信息req.setAppendedWfContext(appendedWfContext);req.setInstanceStatus(success ? InstanceStatus.SUCCEED.getV() : InstanceStatus.FAILED.getV());reportFinalStatusThenDestroy(workerRuntime, req);return;}// 5. 未完成,上报状态req.setInstanceStatus(InstanceStatus.RUNNING.getV());TransportUtils.ttReportInstanceStatus(req, workerRuntime.getServerDiscoveryService().getCurrentServerAddress(), workerRuntime.getTransporter());// 6.1 定期检查 -> 重试派发后未确认的任务long currentMS = System.currentTimeMillis();if (holder.workerUnreceivedNum != 0) {taskPersistenceService.getTaskByStatus(instanceId, TaskStatus.DISPATCH_SUCCESS_WORKER_UNCHECK, 100).forEach(uncheckTask -> {long elapsedTime = currentMS - uncheckTask.getLastModifiedTime();if (elapsedTime > DISPATCH_TIME_OUT_MS) {TaskDO updateEntity = new TaskDO();updateEntity.setStatus(TaskStatus.WAITING_DISPATCH.getValue());// 特殊任务只能本机执行if (!TaskConstant.LAST_TASK_NAME.equals(uncheckTask.getTaskName())) {updateEntity.setAddress(RemoteConstant.EMPTY_ADDRESS);}// 失败次数 + 1updateEntity.setFailedCnt(uncheckTask.getFailedCnt() + 1);taskPersistenceService.updateTask(instanceId, uncheckTask.getTaskId(), updateEntity);log.warn("[TaskTracker-{}] task(id={},name={}) try to dispatch again due to unreceived the response from ProcessorTracker.",instanceId, uncheckTask.getTaskId(), uncheckTask.getTaskName());}});}// 6.2 定期检查 -> 重新执行被派发到宕机ProcessorTracker上的任务List<String> disconnectedPTs = ptStatusHolder.getAllDisconnectedProcessorTrackers();if (!disconnectedPTs.isEmpty()) {log.warn("[TaskTracker-{}] some ProcessorTracker disconnected from TaskTracker,their address is {}.", instanceId, disconnectedPTs);if (taskPersistenceService.updateLostTasks(instanceId, disconnectedPTs, true)) {ptStatusHolder.remove(disconnectedPTs);log.warn("[TaskTracker-{}] removed these ProcessorTracker from StatusHolder: {}", instanceId, disconnectedPTs);}}}/*** 任务是否超时*/public boolean isTimeout() {if (instanceInfo.getInstanceTimeoutMS() > 0) {return System.currentTimeMillis() - createTime > instanceInfo.getInstanceTimeoutMS();}return false;}@Overridepublic void run() {try {innerRun();} catch (Exception e) {log.warn("[TaskTracker-{}] status checker execute failed, please fix the bug (@tjq)!", instanceId, e);}}}
StatusCheckRunnable默认每隔13s会汇报一次TaskTrackerReportInstanceStatusReq,针对MAP任务,它通过
holder.failedNum == 0
来判断任务实例是否执行成功与否,true则更新instance的status为InstanceStatus.SUCCEED,否则为InstanceStatus.FAILED
小结
powerjob的map reduce任务实例执行结果展示的failed次数取的是failedTaskNum,它来源于TaskPersistenceService的getTaskStatusStatistics执行select status, count(*) as num from task_info where instance_id= ? and sub_instance_id=? GROUP BY status
的TaskStatus.WORKER_PROCESS_FAILED的数量;默认子任务会有1次重试的机会。若有子任务失败,则最终该任务实例的状态为失败。而目前powerjob没有入口针对这些失败的子任务再进行重试,只能单独重新执行整个map reduce任务。
这篇关于聊聊powerjob的failedTaskNum的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!