Skip to content

Commit 8536d16

Browse files
committed
Add TaskTracker FailStore close config
1 parent 3d749e2 commit 8536d16

File tree

3 files changed

+73
-43
lines changed

3 files changed

+73
-43
lines changed

lts-core/src/main/java/com/github/ltsopensource/core/constant/ExtConfig.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -226,4 +226,7 @@ public interface ExtConfig {
226226
String JOB_TRACKER_PUSH_BATCH_SIZE = "lts.job.tracker.push.batch.size";
227227

228228
String TASK_TRACKER_BIZ_LOGGER_FAIL_STORE_CLOSE = "lts.task.tracker.biz.logger.failstore.close";
229+
230+
String TASK_TRACKER_JOB_RESULT_FAIL_STORE_CLOSE = "lts.task.tracker.job.result.failstore.close";
231+
229232
}

lts-tasktracker/src/main/java/com/github/ltsopensource/tasktracker/logger/BizLoggerImpl.java

Lines changed: 25 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -49,27 +49,31 @@ public BizLoggerImpl(Level level, final RemotingClientDelegate remotingClient, T
4949
}
5050
this.appContext = appContext;
5151
this.remotingClient = remotingClient;
52-
this.retryScheduler = new RetryScheduler<BizLog>(BizLogger.class.getSimpleName(), appContext, FailStorePathBuilder.getBizLoggerPath(appContext)) {
53-
@Override
54-
protected boolean isRemotingEnable() {
55-
return remotingClient.isServerEnable();
56-
}
5752

58-
@Override
59-
protected boolean retry(List<BizLog> list) {
60-
return sendBizLog(list);
61-
}
62-
};
6353
if (isEnableBizLoggerFailStore()) {
54+
55+
this.retryScheduler = new RetryScheduler<BizLog>(BizLogger.class.getSimpleName(), appContext, FailStorePathBuilder.getBizLoggerPath(appContext)) {
56+
@Override
57+
protected boolean isRemotingEnable() {
58+
return remotingClient.isServerEnable();
59+
}
60+
61+
@Override
62+
protected boolean retry(List<BizLog> list) {
63+
return sendBizLog(list);
64+
}
65+
};
66+
6467
this.retryScheduler.start();
68+
69+
NodeShutdownHook.registerHook(appContext, this.getClass().getName(), new Callable() {
70+
@Override
71+
public void call() throws Exception {
72+
retryScheduler.stop();
73+
}
74+
});
6575
}
6676

67-
NodeShutdownHook.registerHook(appContext, this.getClass().getName(), new Callable() {
68-
@Override
69-
public void call() throws Exception {
70-
retryScheduler.stop();
71-
}
72-
});
7377
}
7478

7579
@Override
@@ -112,7 +116,11 @@ private void sendMsg(String msg, Level level) {
112116
requestBody.setBizLogs(Collections.singletonList(bizLog));
113117

114118
if (!remotingClient.isServerEnable()) {
115-
retryScheduler.inSchedule(StringUtils.generateUUID(), bizLog);
119+
if(isEnableBizLoggerFailStore()){
120+
retryScheduler.inSchedule(StringUtils.generateUUID(), bizLog);
121+
}else{
122+
logger.error("Send Biz Logger to JobTracker Error, server is down, bizLog={}", JSON.toJSONString(bizLog));
123+
}
116124
return;
117125
}
118126

lts-tasktracker/src/main/java/com/github/ltsopensource/tasktracker/processor/JobPushProcessor.java

Lines changed: 45 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import com.github.ltsopensource.core.commons.utils.Callable;
44
import com.github.ltsopensource.core.commons.utils.CollectionUtils;
55
import com.github.ltsopensource.core.constant.Constants;
6+
import com.github.ltsopensource.core.constant.ExtConfig;
67
import com.github.ltsopensource.core.domain.JobMeta;
78
import com.github.ltsopensource.core.domain.JobRunResult;
89
import com.github.ltsopensource.core.exception.JobTrackerNotFoundException;
@@ -50,29 +51,32 @@ public class JobPushProcessor extends AbstractProcessor {
5051
protected JobPushProcessor(TaskTrackerAppContext appContext) {
5152
super(appContext);
5253
this.remotingClient = appContext.getRemotingClient();
53-
retryScheduler = new RetryScheduler<JobRunResult>(JobPushProcessor.class.getSimpleName(), appContext,
54-
FailStorePathBuilder.getJobFeedbackPath(appContext), 3) {
55-
@Override
56-
protected boolean isRemotingEnable() {
57-
return remotingClient.isServerEnable();
58-
}
59-
60-
@Override
61-
protected boolean retry(List<JobRunResult> results) {
62-
return retrySendJobResults(results);
63-
}
64-
};
65-
retryScheduler.start();
66-
6754
// 线程安全的
6855
jobRunnerCallback = new JobRunnerCallback();
6956

70-
NodeShutdownHook.registerHook(appContext, this.getClass().getName(), new Callable() {
71-
@Override
72-
public void call() throws Exception {
73-
retryScheduler.stop();
74-
}
75-
});
57+
58+
if (isEnableFailStore()) {
59+
retryScheduler = new RetryScheduler<JobRunResult>(JobPushProcessor.class.getSimpleName(), appContext,
60+
FailStorePathBuilder.getJobFeedbackPath(appContext), 3) {
61+
@Override
62+
protected boolean isRemotingEnable() {
63+
return remotingClient.isServerEnable();
64+
}
65+
66+
@Override
67+
protected boolean retry(List<JobRunResult> results) {
68+
return retrySendJobResults(results);
69+
}
70+
};
71+
retryScheduler.start();
72+
73+
NodeShutdownHook.registerHook(appContext, this.getClass().getName(), new Callable() {
74+
@Override
75+
public void call() throws Exception {
76+
retryScheduler.stop();
77+
}
78+
});
79+
}
7680
}
7781

7882
@Override
@@ -152,9 +156,15 @@ public void operationComplete(ResponseFuture responseFuture) {
152156
LOGGER.info("Job feedback failed, save local files。{}", jobRunResult);
153157
}
154158
try {
155-
retryScheduler.inSchedule(
156-
jobRunResult.getJobMeta().getJobId().concat("_") + SystemClock.now(),
157-
jobRunResult);
159+
if (isEnableFailStore()) {
160+
retryScheduler.inSchedule(
161+
jobRunResult.getJobMeta().getJobId().concat("_") + SystemClock.now(),
162+
jobRunResult);
163+
} else {
164+
LOGGER.error("Send Job Result to JobTracker Error, code={}, jobRunResult={}",
165+
commandResponse != null ? commandResponse.getCode() : null, JSON.toJSONString(jobRunResult));
166+
}
167+
158168
} catch (Exception e) {
159169
LOGGER.error("Job feedback failed", e);
160170
}
@@ -173,9 +183,14 @@ public void operationComplete(ResponseFuture responseFuture) {
173183
} catch (JobTrackerNotFoundException e) {
174184
try {
175185
LOGGER.warn("No job tracker available! save local files.");
176-
retryScheduler.inSchedule(
177-
jobRunResult.getJobMeta().getJobId().concat("_") + SystemClock.now(),
178-
jobRunResult);
186+
187+
if (isEnableFailStore()) {
188+
retryScheduler.inSchedule(
189+
jobRunResult.getJobMeta().getJobId().concat("_") + SystemClock.now(),
190+
jobRunResult);
191+
} else {
192+
LOGGER.error("Send Job Result to JobTracker Error, server is down, jobRunResult={}", JSON.toJSONString(jobRunResult));
193+
}
179194
} catch (Exception e1) {
180195
LOGGER.error("Save files failed, {}", jobRunResult.getJobMeta(), e1);
181196
}
@@ -185,6 +200,10 @@ public void operationComplete(ResponseFuture responseFuture) {
185200
}
186201
}
187202

203+
private boolean isEnableFailStore() {
204+
return !appContext.getConfig().getParameter(ExtConfig.TASK_TRACKER_JOB_RESULT_FAIL_STORE_CLOSE, false);
205+
}
206+
188207
/**
189208
* 发送JobResults
190209
*/

0 commit comments

Comments
 (0)