@@ -277,6 +277,7 @@ private JobPushRequest getNewJob(String taskTrackerNodeGroup, String taskTracker
277277 application .getExecutingJobQueue ().add (jobPo );
278278 } catch (DuplicateJobException e ) {
279279 // ignore
280+ LOGGER .error (e .getMessage (), e );
280281 }
281282 application .getExecutableJobQueue ().remove (jobPo .getTaskTrackerNodeGroup (), jobPo .getJobId ());
282283
@@ -304,37 +305,42 @@ private void finishProcess(List<TaskTrackerJobResult> results) {
304305 for (TaskTrackerJobResult result : results ) {
305306
306307 JobWrapper jobWrapper = result .getJobWrapper ();
307- // 移除
308- application .getExecutingJobQueue ().remove (jobWrapper .getJobId ());
309308
310- if (jobWrapper .getJob ().isSchedule ()) {
309+ try {
310+ if (jobWrapper .getJob ().isSchedule ()) {
311311
312- JobPo cronJobPo = application .getCronJobQueue ().finish (jobWrapper .getJobId ());
313- if (cronJobPo == null ) {
314- // 可能任务队列中改条记录被删除了
315- return ;
316- }
317- Date nextTriggerTime = CronExpressionUtils .getNextTriggerTime (cronJobPo .getCronExpression ());
318- if (nextTriggerTime == null ) {
319- application .getCronJobQueue ().remove (jobWrapper .getJobId ());
320- } else {
312+ JobPo cronJobPo = application .getCronJobQueue ().finish (jobWrapper .getJobId ());
313+ if (cronJobPo == null ) {
314+ // 可能任务队列中改条记录被删除了
315+ return ;
316+ }
317+ Date nextTriggerTime = CronExpressionUtils .getNextTriggerTime (cronJobPo .getCronExpression ());
318+ if (nextTriggerTime == null ) {
319+ application .getCronJobQueue ().remove (jobWrapper .getJobId ());
320+ return ;
321+ }
321322 // 表示下次还要执行
322323 try {
323324 cronJobPo .setTaskTrackerIdentity (null );
324325 cronJobPo .setIsRunning (false );
325326 cronJobPo .setTriggerTime (nextTriggerTime .getTime ());
326327 cronJobPo .setGmtModified (SystemClock .now ());
327328 application .getExecutableJobQueue ().add (cronJobPo );
328- } catch (DuplicateJobException ignore ) {
329+ } catch (DuplicateJobException e ) {
330+ LOGGER .error (e .getMessage (), e );
329331 }
330332 }
333+ } finally {
334+ // 移除
335+ application .getExecutingJobQueue ().remove (jobWrapper .getJobId ());
331336 }
332337 }
333338 }
334339
335340 /**
336341 * 将任务加入重试队列
337342 */
343+
338344 private void retryProcess (List <TaskTrackerJobResult > results ) {
339345 if (CollectionUtils .isEmpty (results )) {
340346 return ;
@@ -363,7 +369,8 @@ private void retryProcess(List<TaskTrackerJobResult> results) {
363369 cronJobPo .setTriggerTime (nextTriggerTime .getTime ());
364370 cronJobPo .setGmtModified (SystemClock .now ());
365371 application .getExecutableJobQueue ().add (cronJobPo );
366- } catch (DuplicateJobException ignore ) {
372+ } catch (DuplicateJobException e ) {
373+ LOGGER .error (e .getMessage (), e );
367374 }
368375 needAdd = false ;
369376 }
@@ -378,7 +385,8 @@ private void retryProcess(List<TaskTrackerJobResult> results) {
378385 jobPo .setTriggerTime (nextRetryTriggerTime );
379386 try {
380387 application .getExecutableJobQueue ().add (jobPo );
381- } catch (DuplicateJobException ignore ) {
388+ } catch (DuplicateJobException e ) {
389+ LOGGER .error (e .getMessage (), e );
382390 }
383391 }
384392 // 从正在执行的队列中移除
0 commit comments