Skip to content

Commit 6f6bd11

Browse files
committed
for exp
1 parent 64133da commit 6f6bd11

File tree

3 files changed

+26
-19
lines changed

3 files changed

+26
-19
lines changed

lts-jobtracker/src/main/java/com/lts/jobtracker/processor/JobFinishedProcessor.java

Lines changed: 18 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -168,13 +168,15 @@ private RemotingCommand process(boolean receiveNewJob,
168168

169169
// 判断是否接受新任务
170170
if (receiveNewJob) {
171-
// 查看有没有其他可以执行的任务
172-
JobPushRequest jobPushRequest = getNewJob(taskTrackerNodeGroup, taskTrackerIdentity);
173-
// 返回 新的任务
174-
return RemotingCommand.createResponseCommand(RemotingProtos
175-
.ResponseCode.SUCCESS.code(), jobPushRequest);
171+
try {
172+
// 查看有没有其他可以执行的任务
173+
JobPushRequest jobPushRequest = getNewJob(taskTrackerNodeGroup, taskTrackerIdentity);
174+
// 返回 新的任务
175+
return RemotingCommand.createResponseCommand(RemotingProtos
176+
.ResponseCode.SUCCESS.code(), jobPushRequest);
177+
} catch (Exception ignored) {
178+
}
176179
}
177-
178180
// 返回给 任务执行端
179181
return RemotingCommand.createResponseCommand(RemotingProtos
180182
.ResponseCode.SUCCESS.code());
@@ -276,7 +278,9 @@ private JobPushRequest getNewJob(String taskTrackerNodeGroup, String taskTracker
276278
try {
277279
application.getExecutingJobQueue().add(jobPo);
278280
} catch (DuplicateJobException e) {
279-
throw e;
281+
LOGGER.warn(e.getMessage(), e);
282+
application.getExecutableJobQueue().resume(jobPo);
283+
return null;
280284
}
281285
application.getExecutableJobQueue().remove(jobPo.getTaskTrackerNodeGroup(), jobPo.getJobId());
282286

@@ -304,9 +308,6 @@ private void finishProcess(List<TaskTrackerJobResult> results) {
304308
for (TaskTrackerJobResult result : results) {
305309

306310
JobWrapper jobWrapper = result.getJobWrapper();
307-
// 从正在执行的队列中移除 TODO 如果在这个时候down机了,数据丢失了
308-
application.getExecutingJobQueue().remove(jobWrapper.getJobId());
309-
310311
if (jobWrapper.getJob().isSchedule()) {
311312

312313
JobPo cronJobPo = application.getCronJobQueue().finish(jobWrapper.getJobId());
@@ -330,6 +331,9 @@ private void finishProcess(List<TaskTrackerJobResult> results) {
330331
LOGGER.error(e.getMessage(), e);
331332
}
332333
}
334+
// 从正在执行的队列中移除
335+
application.getExecutingJobQueue().remove(jobWrapper.getJobId());
336+
333337
}
334338
}
335339

@@ -347,9 +351,6 @@ private void retryProcess(List<TaskTrackerJobResult> results) {
347351
JobPo jobPo = application.getExecutingJobQueue().get(jobWrapper.getJobId());
348352
if (jobPo != null) {
349353

350-
// 从正在执行的队列中移除 TODO 如果在这个时候down机了,数据丢失了
351-
application.getExecutingJobQueue().remove(jobPo.getJobId());
352-
353354
// 重试次数+1
354355
jobPo.setRetryTimes((jobPo.getRetryTimes() == null ? 0 : jobPo.getRetryTimes()) + 1);
355356
Long nextRetryTriggerTime = DateUtils.addMinute(new Date(), jobPo.getRetryTimes()).getTime();
@@ -389,6 +390,10 @@ private void retryProcess(List<TaskTrackerJobResult> results) {
389390
LOGGER.error(e.getMessage(), e);
390391
}
391392
}
393+
394+
// 从正在执行的队列中移除
395+
application.getExecutingJobQueue().remove(jobPo.getJobId());
396+
392397
}
393398
}
394399
}

lts-jobtracker/src/main/java/com/lts/jobtracker/support/JobPusher.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,9 @@ public void operationComplete(ResponseFuture responseFuture) {
171171
try {
172172
application.getExecutingJobQueue().add(jobPo);
173173
} catch (DuplicateJobException e) {
174-
throw e;
174+
LOGGER.warn(e.getMessage(), e);
175+
application.getExecutableJobQueue().resume(jobPo);
176+
return PushResult.FAILED;
175177
}
176178
application.getExecutableJobQueue().remove(jobPo.getTaskTrackerNodeGroup(), jobPo.getJobId());
177179

lts-jobtracker/src/main/java/com/lts/jobtracker/support/checker/ExecutingDeadJobChecker.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -176,20 +176,20 @@ private void fixDeadJob(List<JobPo> jobPos) {
176176
private void fixDeadJob(JobPo jobPo) {
177177
try {
178178

179-
// 1. remove from executing queue TODO 如果在这个时候down机了,数据丢失了
180-
application.getExecutingJobQueue().remove(jobPo.getJobId());
181-
182179
jobPo.setGmtModified(SystemClock.now());
183180
jobPo.setTaskTrackerIdentity(null);
184181
jobPo.setIsRunning(false);
185-
// 2. add to executable queue
182+
// 1. add to executable queue
186183
try {
187184
application.getExecutableJobQueue().add(jobPo);
188185
} catch (DuplicateJobException e) {
189186
// ignore
190-
LOGGER.error(e.getMessage(), e);
187+
LOGGER.warn(e.getMessage(), e);
191188
}
192189

190+
// 2. remove from executing queue
191+
application.getExecutingJobQueue().remove(jobPo.getJobId());
192+
193193
JobLogPo jobLogPo = JobDomainConverter.convertJobLog(jobPo);
194194
jobLogPo.setSuccess(true);
195195
jobLogPo.setLevel(Level.WARN);

0 commit comments

Comments
 (0)