@@ -78,16 +78,25 @@ public void run() {
7878 }
7979 }
8080 while (availableThreads > 0 ) {
81- if (LOGGER .isDebugEnabled ()){
81+ if (LOGGER .isDebugEnabled ()) {
8282 LOGGER .debug ("taskTrackerNodeGroup:{}, taskTrackerIdentity:{} , availableThreads:{}" , nodeGroup , identity , availableThreads );
8383 }
8484 // 推送任务
8585 PushResult result = sendJob (remotingServer , taskTrackerNode );
86- if (result == PushResult .SUCCESS ) {
87- availableThreads = taskTrackerNode .getAvailableThread ().decrementAndGet ();
88- monitor .incPushJobNum ();
89- } else {
90- break ;
86+ switch (result ) {
87+ case SUCCESS :
88+ availableThreads = taskTrackerNode .getAvailableThread ().decrementAndGet ();
89+ monitor .incPushJobNum ();
90+ break ;
91+ case FAILED :
92+ // 还是要继续发送
93+ break ;
94+ case NO_JOB :
95+ // 没有任务了
96+ return ;
97+ case SENT_ERROR :
98+ // TaskTracker链接失败
99+ return ;
91100 }
92101 }
93102 } catch (Exception e ) {
@@ -100,7 +109,8 @@ public void run() {
100109 private enum PushResult {
101110 NO_JOB , // 没有任务可执行
102111 SUCCESS , //推送成功
103- FAILED //推送失败
112+ FAILED , //推送失败
113+ SENT_ERROR
104114 }
105115
106116 /**
@@ -120,6 +130,17 @@ private PushResult sendJob(RemotingServerDelegate remotingServer, TaskTrackerNod
120130 return PushResult .NO_JOB ;
121131 }
122132
133+ // IMPORTANT: 这里要先切换队列
134+ try {
135+ application .getExecutingJobQueue ().add (jobPo );
136+ } catch (DuplicateJobException e ) {
137+ LOGGER .warn (e .getMessage (), e );
138+ application .getExecutableJobQueue ().resume (jobPo );
139+ return PushResult .FAILED ;
140+ }
141+ application .getExecutableJobQueue ().remove (jobPo .getTaskTrackerNodeGroup (), jobPo .getJobId ());
142+
143+ // 发送给TaskTracker执行
123144 JobPushRequest body = application .getCommandBodyWrapper ().wrapper (new JobPushRequest ());
124145 body .setJobWrapper (JobDomainConverter .convert (jobPo ));
125146 RemotingCommand commandRequest = RemotingCommand .createRequestCommand (JobProtos .RequestCode .PUSH_JOB .code (), body );
@@ -152,6 +173,7 @@ public void operationComplete(ResponseFuture responseFuture) {
152173
153174 } catch (RemotingSendException e ) {
154175 LOGGER .error (e .getMessage (), e );
176+ return PushResult .SENT_ERROR ;
155177 }
156178
157179 try {
@@ -164,18 +186,21 @@ public void operationComplete(ResponseFuture responseFuture) {
164186 if (LOGGER .isDebugEnabled ()) {
165187 LOGGER .debug ("Job push failed! nodeGroup=" + nodeGroup + ", identity=" + identity + ", job=" + jobPo );
166188 }
167- application .getExecutableJobQueue ().resume (jobPo );
168- return PushResult .FAILED ;
169- }
170-
171- try {
172- application .getExecutingJobQueue ().add (jobPo );
173- } catch (DuplicateJobException e ) {
174- LOGGER .warn (e .getMessage (), e );
175- application .getExecutableJobQueue ().resume (jobPo );
176- return PushResult .FAILED ;
189+ // 队列切回来
190+ boolean needResume = true ;
191+ try {
192+ jobPo .setIsRunning (true );
193+ application .getExecutableJobQueue ().add (jobPo );
194+ } catch (DuplicateJobException e ) {
195+ LOGGER .warn (e .getMessage (), e );
196+ needResume = false ;
197+ }
198+ application .getExecutingJobQueue ().remove (jobPo .getJobId ());
199+ if (needResume ) {
200+ application .getExecutableJobQueue ().resume (jobPo );
201+ }
202+ return PushResult .SENT_ERROR ;
177203 }
178- application .getExecutableJobQueue ().remove (jobPo .getTaskTrackerNodeGroup (), jobPo .getJobId ());
179204
180205 // 记录日志
181206 JobLogPo jobLogPo = JobDomainConverter .convertJobLog (jobPo );
0 commit comments