Skip to content

Commit 80f6aa2

Browse files
committed
Merge pull request #110 from qq254963746/develop
FIX
2 parents 4716346 + 2b207d5 commit 80f6aa2

File tree

1 file changed

+58
-66
lines changed

1 file changed

+58
-66
lines changed

lts-jobtracker/src/main/java/com/lts/jobtracker/processor/JobFinishedProcessor.java

Lines changed: 58 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -66,8 +66,7 @@ public void handleFailed(List<TaskTrackerJobResult> results) {
6666
new ArrayList<JobFeedbackPo>(results.size());
6767

6868
for (TaskTrackerJobResult result : results) {
69-
JobFeedbackPo jobFeedbackPo =
70-
JobDomainConverter.convert(result);
69+
JobFeedbackPo jobFeedbackPo = JobDomainConverter.convert(result);
7170
jobFeedbackPos.add(jobFeedbackPo);
7271
}
7372
// 2. 失败的存储在反馈队列
@@ -308,92 +307,85 @@ private void finishProcess(List<TaskTrackerJobResult> results) {
308307
for (TaskTrackerJobResult result : results) {
309308

310309
JobWrapper jobWrapper = result.getJobWrapper();
311-
if (jobWrapper.getJob().isSchedule()) {
312310

313-
JobPo cronJobPo = application.getCronJobQueue().finish(jobWrapper.getJobId());
314-
if (cronJobPo == null) {
315-
// 可能任务队列中改条记录被删除了
316-
return;
317-
}
318-
Date nextTriggerTime = CronExpressionUtils.getNextTriggerTime(cronJobPo.getCronExpression());
319-
if (nextTriggerTime == null) {
320-
application.getCronJobQueue().remove(jobWrapper.getJobId());
321-
return;
322-
}
323-
// 表示下次还要执行
324-
try {
325-
cronJobPo.setTaskTrackerIdentity(null);
326-
cronJobPo.setIsRunning(false);
327-
cronJobPo.setTriggerTime(nextTriggerTime.getTime());
328-
cronJobPo.setGmtModified(SystemClock.now());
329-
application.getExecutableJobQueue().add(cronJobPo);
330-
} catch (DuplicateJobException e) {
331-
LOGGER.warn(e.getMessage(), e);
332-
}
311+
if (jobWrapper.getJob().isSchedule()) {
312+
finishScheduleJob(jobWrapper.getJobId());
333313
}
334314
// 从正在执行的队列中移除
335315
application.getExecutingJobQueue().remove(jobWrapper.getJobId());
336316
}
337317
}
338318

319+
private void finishScheduleJob(String jobId) {
320+
JobPo cronJobPo = application.getCronJobQueue().finish(jobId);
321+
if (cronJobPo == null) {
322+
// 可能任务队列中改条记录被删除了
323+
return;
324+
}
325+
Date nextTriggerTime = CronExpressionUtils.getNextTriggerTime(cronJobPo.getCronExpression());
326+
if (nextTriggerTime == null) {
327+
// 从CronJob队列中移除
328+
application.getCronJobQueue().remove(jobId);
329+
return;
330+
}
331+
// 表示下次还要执行
332+
try {
333+
cronJobPo.setTaskTrackerIdentity(null);
334+
cronJobPo.setIsRunning(false);
335+
cronJobPo.setTriggerTime(nextTriggerTime.getTime());
336+
cronJobPo.setGmtModified(SystemClock.now());
337+
application.getExecutableJobQueue().add(cronJobPo);
338+
} catch (DuplicateJobException e) {
339+
LOGGER.warn(e.getMessage(), e);
340+
}
341+
}
342+
339343
/**
340344
* 将任务加入重试队列
341345
*/
342-
343346
private void retryProcess(List<TaskTrackerJobResult> results) {
344347
if (CollectionUtils.isEmpty(results)) {
345348
return;
346349
}
347350
for (TaskTrackerJobResult result : results) {
351+
348352
JobWrapper jobWrapper = result.getJobWrapper();
349353
// 1. 加入到重试队列
350354
JobPo jobPo = application.getExecutingJobQueue().get(jobWrapper.getJobId());
351-
if (jobPo != null) {
352-
353-
// 重试次数+1
354-
jobPo.setRetryTimes((jobPo.getRetryTimes() == null ? 0 : jobPo.getRetryTimes()) + 1);
355-
Long nextRetryTriggerTime = DateUtils.addMinute(new Date(), jobPo.getRetryTimes()).getTime();
356-
357-
boolean needAdd = true;
358-
359-
if (jobPo.isSchedule()) {
360-
// 如果是 cron Job, 判断任务下一次执行时间和重试时间的比较
361-
JobPo cronJobPo = application.getCronJobQueue().finish(jobWrapper.getJobId());
362-
if (cronJobPo != null) {
363-
Date nextTriggerTime = CronExpressionUtils.getNextTriggerTime(cronJobPo.getCronExpression());
364-
if (nextTriggerTime != null && nextTriggerTime.getTime() < nextRetryTriggerTime) {
365-
// 表示下次还要执行, 并且下次执行时间比下次重试时间要早, 那么不重试,直接使用下次的执行时间
366-
try {
367-
cronJobPo.setTaskTrackerIdentity(null);
368-
cronJobPo.setIsRunning(false);
369-
cronJobPo.setTriggerTime(nextTriggerTime.getTime());
370-
cronJobPo.setGmtModified(SystemClock.now());
371-
application.getExecutableJobQueue().add(cronJobPo);
372-
} catch (DuplicateJobException e) {
373-
LOGGER.error(e.getMessage(), e);
374-
}
375-
needAdd = false;
376-
}
377-
}
378-
}
379-
if (needAdd) {
380-
// 加入到队列, 重试
381-
jobPo.setIsRunning(false);
382-
jobPo.setTaskTrackerIdentity(null);
383-
jobPo.setGmtModified(SystemClock.now());
384-
// 延迟重试时间就等于重试次数(分钟)
385-
jobPo.setTriggerTime(nextRetryTriggerTime);
386-
try {
387-
application.getExecutableJobQueue().add(jobPo);
388-
} catch (DuplicateJobException e) {
389-
LOGGER.error(e.getMessage(), e);
355+
if (jobPo == null) { // 表示已经被删除了
356+
continue;
357+
}
358+
359+
// 重试次数+1
360+
jobPo.setRetryTimes((jobPo.getRetryTimes() == null ? 0 : jobPo.getRetryTimes()) + 1);
361+
Long nextRetryTriggerTime = DateUtils.addMinute(new Date(), jobPo.getRetryTimes()).getTime();
362+
363+
if (jobPo.isSchedule()) {
364+
// 如果是 cron Job, 判断任务下一次执行时间和重试时间的比较
365+
JobPo cronJobPo = application.getCronJobQueue().finish(jobWrapper.getJobId());
366+
if (cronJobPo != null) {
367+
Date nextTriggerTime = CronExpressionUtils.getNextTriggerTime(cronJobPo.getCronExpression());
368+
if (nextTriggerTime != null && nextTriggerTime.getTime() < nextRetryTriggerTime) {
369+
// 表示下次还要执行, 并且下次执行时间比下次重试时间要早, 那么不重试,直接使用下次的执行时间
370+
nextRetryTriggerTime = nextTriggerTime.getTime();
371+
jobPo = cronJobPo;
390372
}
391373
}
374+
}
392375

393-
// 从正在执行的队列中移除
394-
application.getExecutingJobQueue().remove(jobPo.getJobId());
395-
376+
// 加入到队列, 重试
377+
jobPo.setTaskTrackerIdentity(null);
378+
jobPo.setIsRunning(false);
379+
jobPo.setGmtModified(SystemClock.now());
380+
// 延迟重试时间就等于重试次数(分钟)
381+
jobPo.setTriggerTime(nextRetryTriggerTime);
382+
try {
383+
application.getExecutableJobQueue().add(jobPo);
384+
} catch (DuplicateJobException e) {
385+
LOGGER.error(e.getMessage(), e);
396386
}
387+
// 从正在执行的队列中移除
388+
application.getExecutingJobQueue().remove(jobPo.getJobId());
397389
}
398390
}
399391
}

0 commit comments

Comments
 (0)