From c9fac180e6b12885b3572add19a4742832feae0d Mon Sep 17 00:00:00 2001 From: "jieying.li" Date: Thu, 19 Dec 2024 03:44:51 +0800 Subject: [PATCH] =?UTF-8?q?fix:=20=E6=89=A7=E8=A1=8C=E8=AE=A1=E5=88=92=201?= =?UTF-8?q?.=E6=B7=BB=E5=8A=A0=E6=89=B9=E6=AC=A1=E4=B8=BA=E7=A9=BA?= =?UTF-8?q?=E7=9A=84=E5=A4=84=E7=90=86=E9=80=BB=E8=BE=91=202.=E6=B7=BB?= =?UTF-8?q?=E5=8A=A0=E6=89=B9=E6=AC=A1=E4=B8=8B=E6=80=BB=E4=BB=BB=E5=8A=A1?= =?UTF-8?q?=E6=95=B0=E9=87=8F=E4=B8=8E=E7=94=9F=E6=88=90=E4=BB=BB=E5=8A=A1?= =?UTF-8?q?=E6=95=B0=E9=87=8F=E6=AF=94=E8=BE=83=E7=9A=84=E9=80=BB=E8=BE=91?= =?UTF-8?q?=E3=80=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../job/PlanBatchTaskDataUpdateJob.java | 80 ++++++++++++------- 1 file changed, 53 insertions(+), 27 deletions(-) diff --git a/cctp-atu/atu-execute-plan/src/main/java/net/northking/cctp/executePlan/job/PlanBatchTaskDataUpdateJob.java b/cctp-atu/atu-execute-plan/src/main/java/net/northking/cctp/executePlan/job/PlanBatchTaskDataUpdateJob.java index dfa1caf..e3ef07f 100644 --- a/cctp-atu/atu-execute-plan/src/main/java/net/northking/cctp/executePlan/job/PlanBatchTaskDataUpdateJob.java +++ b/cctp-atu/atu-execute-plan/src/main/java/net/northking/cctp/executePlan/job/PlanBatchTaskDataUpdateJob.java @@ -121,6 +121,8 @@ public class PlanBatchTaskDataUpdateJob { private final static String LOCK_KEY_BATCH_SUM = "LOCK:PLAN:BATCH-SUM-DATA-UPDATE-METHOD"; + private final static String BATCH_IS_NULL_COUNT_HASH = "PLAN:BATCH:NULL-COUNT-HASH"; + @Autowired private AtuPlanConfig atuPlanConfig; @@ -165,6 +167,37 @@ public class PlanBatchTaskDataUpdateJob { RedisConstant.CLUSTER_KEY_SUFFIX; // 通过数据库进行统计,减少统计数据出错的原因 + AtuPlanBatch batch = this.planBatchService.findByPrimaryKey(batchId); + if (batch == null) { + logger.info("批次【{}】信息目前不存在,再等待下一次查询...", batchId); + Boolean existFlag = redisTemplate.opsForHash().hasKey(BATCH_IS_NULL_COUNT_HASH, batchId); + if (existFlag) { + Object nullCount = redisTemplate.opsForHash().get(BATCH_IS_NULL_COUNT_HASH, batchId); + if (nullCount != null && NumberUtil.isNumber(nullCount.toString())) { + int i = Integer.parseInt(nullCount.toString()); + if (i < 100) { // 统计100次后依然没有批次信息,批次不存在 + redisTemplate.opsForHash().put(BATCH_IS_NULL_COUNT_HASH, batchId, i + 1); + } else { + // 没有对应批次数据,删除缓存数据 + logger.debug("没有对应的批次信息[{}],删除缓存数据", batchId); + redisTemplate.delete(key); + } + } + } else { + redisTemplate.opsForHash().put(BATCH_IS_NULL_COUNT_HASH, batchId, 1); + } + return; // 退出这个批次统计 + } + // 批次信息存在 + AtuPlanTask totalQ = new AtuPlanTask(); + totalQ.setBatchId(batchId); + long currentTaskTotal = planTaskService.count(totalQ); + // 判断批次任务是否已经全部生成 + if (batch.getTaskTotal() > currentTaskTotal) { + logger.info("批次[{}]正在生成任务中,结束统计...", batchId); + return; + } + // 以脚本维度统计批次下各脚本的执行情况 AtuPlanTask taskQ = new AtuPlanTask(); taskQ.setBatchId(batchId); List resultList = planTaskService.countTaskStatus(taskQ); @@ -192,36 +225,29 @@ public class PlanBatchTaskDataUpdateJob { if (waitCount > 0 || ingCount > 0) { // 批次还没有完成 // 检查批次的开始时间 - AtuPlanBatch batch = this.planBatchService.findByPrimaryKey(batchId); - if (batch != null) { - Date createdTime = batch.getCreatedTime(); - if (createdTime != null) { // 有创建时间,那么比较开始时间与现在的时间差 - LocalDateTime createDateTime = createdTime.toInstant().atZone(ZoneId.systemDefault()).toLocalDateTime(); - LocalDateTime now = LocalDateTime.now(); - Duration duration = Duration.between(createDateTime, now); - long hours = duration.toHours(); - if (atuPlanConfig.isBatchExecuteTimeoutEnabled() && hours > atuPlanConfig.getBatchExecuteTimeout().longValue()) { // 参数化 - logger.info("批次[{}-{}]已创建{}小时,任务没有执行完成,强制结束批次", batch.getId(), batch.getBatch(), 48); - // 如果批次的创建时间离现在已经过去48小时还没完成,那么判定批次已完成,等待中|执行中 的任务全部算超时处理 - timeoutCount += waitCount; - timeoutCount += ingCount; - waitCount = 0; - ingCount = 0; - // 更新批次下 等待中(0)|执行中(1) 的任务全部算超时处理(6) - this.planTaskService.batchUpdateTaskStatusToTimeout(batch.getId(), PlanConstant.TASK_TIMEOUT_STATUS, PlanConstant.TASK_WAIT_EXECUTE_STATUS, PlanConstant.TASK_START_EXECUTE_STATUS); - } else { - // 批次还没有完成,退出统计 - return; - } + Date createdTime = batch.getCreatedTime(); + if (createdTime != null) { // 有创建时间,那么比较开始时间与现在的时间差 + LocalDateTime createDateTime = createdTime.toInstant().atZone(ZoneId.systemDefault()).toLocalDateTime(); + LocalDateTime now = LocalDateTime.now(); + Duration duration = Duration.between(createDateTime, now); + long hours = duration.toHours(); + if (atuPlanConfig.isBatchExecuteTimeoutEnabled() && hours > atuPlanConfig.getBatchExecuteTimeout().longValue()) { // 参数化 + logger.info("批次[{}-{}]已创建{}小时,任务没有执行完成,强制结束批次", batch.getId(), batch.getBatch(), 48); + // 如果批次的创建时间离现在已经过去48小时还没完成,那么判定批次已完成,等待中|执行中 的任务全部算超时处理 + timeoutCount += waitCount; + timeoutCount += ingCount; + waitCount = 0; + ingCount = 0; + // 更新批次下 等待中(0)|执行中(1) 的任务全部算超时处理(6) + this.planTaskService.batchUpdateTaskStatusToTimeout(batch.getId(), PlanConstant.TASK_TIMEOUT_STATUS, PlanConstant.TASK_WAIT_EXECUTE_STATUS, PlanConstant.TASK_START_EXECUTE_STATUS); } else { - // 有批次信息,但是没有创建时间,应该是数据有问题 - logger.debug("批次[{}]没有创建时间", batch.getId()); + // 批次还没有完成,退出统计 return; } - } else { // 没有对应批次数据,删除缓存数据 - logger.debug("没有对应的批次信息[{}],删除缓存数据", batchId); - redisTemplate.delete(key); - return; // 退出这个批次统计 + } else { + // 有批次信息,但是没有创建时间,应该是数据有问题 + logger.debug("批次[{}]没有创建时间", batch.getId()); + return; } } // 批次已完成