Skip to content

Commit 027103b

Browse files
committed
Merge pull request #195 from qq254963746/develop
增加有中断接口的 InterruptibleJobRunner
2 parents 2e89c50 + 6beb9a8 commit 027103b

File tree

1 file changed

+19
-26
lines changed

1 file changed

+19
-26
lines changed

lts-tasktracker/src/main/java/com/lts/tasktracker/runner/JobRunnerDelegate.java

Lines changed: 19 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -35,38 +35,33 @@ public class JobRunnerDelegate implements Runnable {
3535
private TaskTrackerMonitor monitor;
3636
private Interruptible interruptor;
3737
private JobRunner curJobRunner;
38-
private boolean isInterruptibleJobRunner = false;
38+
private boolean interrupted = false;
3939

4040
public JobRunnerDelegate(TaskTrackerAppContext appContext,
4141
JobWrapper jobWrapper, RunnerCallback callback) {
4242
this.appContext = appContext;
4343
this.callback = callback;
4444
this.jobWrapper = jobWrapper;
4545

46-
this.isInterruptibleJobRunner = isInterruptibleJobRunner(this.appContext);
4746
this.logger = (BizLoggerAdapter) BizLoggerFactory.getLogger(
4847
appContext.getBizLogLevel(),
4948
appContext.getRemotingClient(), appContext);
5049
monitor = (TaskTrackerMonitor) appContext.getMonitor();
5150

52-
if (isInterruptibleJobRunner()) {
53-
this.interruptor = new Interruptible() {
54-
@Override
55-
public void interrupt() {
56-
JobRunnerDelegate.this.interrupt();
57-
}
58-
};
59-
}
51+
this.interruptor = new Interruptible() {
52+
@Override
53+
public void interrupt() {
54+
JobRunnerDelegate.this.interrupt();
55+
}
56+
};
6057
}
6158

6259
@Override
6360
public void run() {
6461
try {
65-
if (isInterruptibleJobRunner()) {
66-
blockedOn(interruptor);
67-
if (Thread.currentThread().isInterrupted()) {
68-
interruptor.interrupt();
69-
}
62+
blockedOn(interruptor);
63+
if (Thread.currentThread().isInterrupted()) {
64+
interruptor.interrupt();
7065
}
7166

7267
LtsLoggerFactory.setLogger(logger);
@@ -116,32 +111,30 @@ public void run() {
116111
} catch (Throwable t) {
117112
LOGGER.warn("monitor error:" + t.getMessage(), t);
118113
}
119-
114+
if (isInterrupted()) {
115+
// 如果当前线程被阻断了,那么也就不接受新任务了
116+
response.setReceiveNewJob(false);
117+
}
120118
this.jobWrapper = callback.runComplete(response);
121119

122120
}
123121
} finally {
124122
LtsLoggerFactory.remove();
125123

126-
if (isInterruptibleJobRunner()) {
127-
blockedOn(null);
128-
}
124+
blockedOn(null);
129125
}
130126
}
131127

132128
private void interrupt() {
129+
interrupted = true;
130+
133131
if (this.curJobRunner != null && this.curJobRunner instanceof InterruptibleJobRunner) {
134132
((InterruptibleJobRunner) this.curJobRunner).interrupt();
135133
}
136134
}
137135

138-
private static boolean isInterruptibleJobRunner(TaskTrackerAppContext appContext) {
139-
Class<?> jobRunnerClass = appContext.getJobRunnerClass();
140-
return jobRunnerClass != null && InterruptibleJobRunner.class.isAssignableFrom(appContext.getJobRunnerClass());
141-
}
142-
143-
private boolean isInterruptibleJobRunner() {
144-
return this.isInterruptibleJobRunner;
136+
private boolean isInterrupted() {
137+
return this.interrupted;
145138
}
146139

147140
private void monitor(Action action) {

0 commit comments

Comments
 (0)