Skip to content

Commit 64133da

Browse files
committed
for exp
1 parent 6ae0aef commit 64133da

File tree

3 files changed

+36
-35
lines changed

3 files changed

+36
-35
lines changed

lts-jobtracker/src/main/java/com/lts/jobtracker/processor/JobFinishedProcessor.java

Lines changed: 27 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -276,8 +276,7 @@ private JobPushRequest getNewJob(String taskTrackerNodeGroup, String taskTracker
276276
try {
277277
application.getExecutingJobQueue().add(jobPo);
278278
} catch (DuplicateJobException e) {
279-
// ignore
280-
LOGGER.error(e.getMessage(), e);
279+
throw e;
281280
}
282281
application.getExecutableJobQueue().remove(jobPo.getTaskTrackerNodeGroup(), jobPo.getJobId());
283282

@@ -305,34 +304,31 @@ private void finishProcess(List<TaskTrackerJobResult> results) {
305304
for (TaskTrackerJobResult result : results) {
306305

307306
JobWrapper jobWrapper = result.getJobWrapper();
307+
// 从正在执行的队列中移除 TODO 如果在这个时候down机了,数据丢失了
308+
application.getExecutingJobQueue().remove(jobWrapper.getJobId());
308309

309-
try {
310-
if (jobWrapper.getJob().isSchedule()) {
310+
if (jobWrapper.getJob().isSchedule()) {
311311

312-
JobPo cronJobPo = application.getCronJobQueue().finish(jobWrapper.getJobId());
313-
if (cronJobPo == null) {
314-
// 可能任务队列中改条记录被删除了
315-
return;
316-
}
317-
Date nextTriggerTime = CronExpressionUtils.getNextTriggerTime(cronJobPo.getCronExpression());
318-
if (nextTriggerTime == null) {
319-
application.getCronJobQueue().remove(jobWrapper.getJobId());
320-
return;
321-
}
322-
// 表示下次还要执行
323-
try {
324-
cronJobPo.setTaskTrackerIdentity(null);
325-
cronJobPo.setIsRunning(false);
326-
cronJobPo.setTriggerTime(nextTriggerTime.getTime());
327-
cronJobPo.setGmtModified(SystemClock.now());
328-
application.getExecutableJobQueue().add(cronJobPo);
329-
} catch (DuplicateJobException e) {
330-
LOGGER.error(e.getMessage(), e);
331-
}
312+
JobPo cronJobPo = application.getCronJobQueue().finish(jobWrapper.getJobId());
313+
if (cronJobPo == null) {
314+
// 可能任务队列中改条记录被删除了
315+
return;
316+
}
317+
Date nextTriggerTime = CronExpressionUtils.getNextTriggerTime(cronJobPo.getCronExpression());
318+
if (nextTriggerTime == null) {
319+
application.getCronJobQueue().remove(jobWrapper.getJobId());
320+
return;
321+
}
322+
// 表示下次还要执行
323+
try {
324+
cronJobPo.setTaskTrackerIdentity(null);
325+
cronJobPo.setIsRunning(false);
326+
cronJobPo.setTriggerTime(nextTriggerTime.getTime());
327+
cronJobPo.setGmtModified(SystemClock.now());
328+
application.getExecutableJobQueue().add(cronJobPo);
329+
} catch (DuplicateJobException e) {
330+
LOGGER.error(e.getMessage(), e);
332331
}
333-
} finally {
334-
// 移除
335-
application.getExecutingJobQueue().remove(jobWrapper.getJobId());
336332
}
337333
}
338334
}
@@ -350,6 +346,10 @@ private void retryProcess(List<TaskTrackerJobResult> results) {
350346
// 1. 加入到重试队列
351347
JobPo jobPo = application.getExecutingJobQueue().get(jobWrapper.getJobId());
352348
if (jobPo != null) {
349+
350+
// 从正在执行的队列中移除 TODO 如果在这个时候down机了,数据丢失了
351+
application.getExecutingJobQueue().remove(jobPo.getJobId());
352+
353353
// 重试次数+1
354354
jobPo.setRetryTimes((jobPo.getRetryTimes() == null ? 0 : jobPo.getRetryTimes()) + 1);
355355
Long nextRetryTriggerTime = DateUtils.addMinute(new Date(), jobPo.getRetryTimes()).getTime();
@@ -389,8 +389,6 @@ private void retryProcess(List<TaskTrackerJobResult> results) {
389389
LOGGER.error(e.getMessage(), e);
390390
}
391391
}
392-
// 从正在执行的队列中移除
393-
application.getExecutingJobQueue().remove(jobPo.getJobId());
394392
}
395393
}
396394
}

lts-jobtracker/src/main/java/com/lts/jobtracker/support/JobPusher.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -167,15 +167,15 @@ public void operationComplete(ResponseFuture responseFuture) {
167167
application.getExecutableJobQueue().resume(jobPo);
168168
return PushResult.FAILED;
169169
}
170+
170171
try {
171172
application.getExecutingJobQueue().add(jobPo);
172173
} catch (DuplicateJobException e) {
173-
// ignore
174-
LOGGER.error(e.getMessage(), e);
174+
throw e;
175175
}
176176
application.getExecutableJobQueue().remove(jobPo.getTaskTrackerNodeGroup(), jobPo.getJobId());
177-
// 记录日志
178177

178+
// 记录日志
179179
JobLogPo jobLogPo = JobDomainConverter.convertJobLog(jobPo);
180180
jobLogPo.setSuccess(true);
181181
jobLogPo.setLogType(LogType.SENT);

lts-jobtracker/src/main/java/com/lts/jobtracker/support/checker/ExecutingDeadJobChecker.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -175,18 +175,21 @@ private void fixDeadJob(List<JobPo> jobPos) {
175175

176176
private void fixDeadJob(JobPo jobPo) {
177177
try {
178+
179+
// 1. remove from executing queue TODO 如果在这个时候down机了,数据丢失了
180+
application.getExecutingJobQueue().remove(jobPo.getJobId());
181+
178182
jobPo.setGmtModified(SystemClock.now());
179183
jobPo.setTaskTrackerIdentity(null);
180184
jobPo.setIsRunning(false);
181-
// 1. add to executable queue
185+
// 2. add to executable queue
182186
try {
183187
application.getExecutableJobQueue().add(jobPo);
184188
} catch (DuplicateJobException e) {
185189
// ignore
186190
LOGGER.error(e.getMessage(), e);
187191
}
188-
// 2. remove from executing queue
189-
application.getExecutingJobQueue().remove(jobPo.getJobId());
192+
190193
JobLogPo jobLogPo = JobDomainConverter.convertJobLog(jobPo);
191194
jobLogPo.setSuccess(true);
192195
jobLogPo.setLevel(Level.WARN);

0 commit comments

Comments
 (0)