Skip to content

Commit 2e89c50

Browse files
committed
Merge pull request #194 from qq254963746/develop
增加有中断接口的 InterruptibleJobRunner
2 parents f85b196 + 2853333 commit 2e89c50

File tree

5 files changed

+61
-42
lines changed

5 files changed

+61
-42
lines changed

lts-example/src/main/java/com/lts/example/api/JobTrackerTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ public static void testMysqlQueue() {
6262
// 节点信息配置
6363
jobTracker.setRegistryAddress("zookeeper://127.0.0.1:2181");
6464
// jobTracker.setRegistryAddress("redis://127.0.0.1:6379");
65-
jobTracker.setListenPort(35002); // 默认 35001
65+
jobTracker.setListenPort(35001); // 默认 35001
6666
jobTracker.setClusterName("test_cluster");
6767

6868
jobTracker.addMasterChangeListener(new MasterChangeListenerImpl());

lts-tasktracker/src/main/java/com/lts/tasktracker/runner/JobRunnerDelegate.java

Lines changed: 36 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,8 @@
99
import com.lts.tasktracker.Result;
1010
import com.lts.tasktracker.domain.Response;
1111
import com.lts.tasktracker.domain.TaskTrackerAppContext;
12-
import com.lts.tasktracker.logger.BizLogger;
1312
import com.lts.tasktracker.logger.BizLoggerAdapter;
1413
import com.lts.tasktracker.logger.BizLoggerFactory;
15-
import com.lts.tasktracker.logger.BizLoggerImpl;
1614
import com.lts.tasktracker.monitor.TaskTrackerMonitor;
1715
import sun.nio.ch.Interruptible;
1816

@@ -37,31 +35,38 @@ public class JobRunnerDelegate implements Runnable {
3735
private TaskTrackerMonitor monitor;
3836
private Interruptible interruptor;
3937
private JobRunner curJobRunner;
38+
private boolean isInterruptibleJobRunner = false;
4039

4140
public JobRunnerDelegate(TaskTrackerAppContext appContext,
4241
JobWrapper jobWrapper, RunnerCallback callback) {
43-
this.jobWrapper = jobWrapper;
44-
this.callback = callback;
4542
this.appContext = appContext;
46-
this.logger = (BizLoggerAdapter)BizLoggerFactory.getLogger(
43+
this.callback = callback;
44+
this.jobWrapper = jobWrapper;
45+
46+
this.isInterruptibleJobRunner = isInterruptibleJobRunner(this.appContext);
47+
this.logger = (BizLoggerAdapter) BizLoggerFactory.getLogger(
4748
appContext.getBizLogLevel(),
4849
appContext.getRemotingClient(), appContext);
4950
monitor = (TaskTrackerMonitor) appContext.getMonitor();
50-
this.interruptor = new Interruptible() {
51-
@Override
52-
public void interrupt() {
53-
JobRunnerDelegate.this.interrupt();
54-
}
55-
};
51+
52+
if (isInterruptibleJobRunner()) {
53+
this.interruptor = new Interruptible() {
54+
@Override
55+
public void interrupt() {
56+
JobRunnerDelegate.this.interrupt();
57+
}
58+
};
59+
}
5660
}
5761

5862
@Override
5963
public void run() {
6064
try {
61-
62-
blockedOn(interruptor);
63-
if (Thread.currentThread().isInterrupted()) {
64-
interruptor.interrupt();
65+
if (isInterruptibleJobRunner()) {
66+
blockedOn(interruptor);
67+
if (Thread.currentThread().isInterrupted()) {
68+
interruptor.interrupt();
69+
}
6570
}
6671

6772
LtsLoggerFactory.setLogger(logger);
@@ -75,9 +80,8 @@ public void run() {
7580
try {
7681
appContext.getRunnerPool().getRunningJobManager()
7782
.in(jobWrapper.getJobId());
78-
7983
this.curJobRunner = appContext.getRunnerPool().getRunnerFactory().newRunner();
80-
Result result = curJobRunner.run(jobWrapper.getJob());
84+
Result result = this.curJobRunner.run(jobWrapper.getJob());
8185

8286
if (result == null) {
8387
response.setAction(Action.EXECUTE_SUCCESS);
@@ -113,11 +117,15 @@ public void run() {
113117
LOGGER.warn("monitor error:" + t.getMessage(), t);
114118
}
115119

116-
jobWrapper = callback.runComplete(response);
120+
this.jobWrapper = callback.runComplete(response);
121+
117122
}
118123
} finally {
119124
LtsLoggerFactory.remove();
120-
blockedOn(null);
125+
126+
if (isInterruptibleJobRunner()) {
127+
blockedOn(null);
128+
}
121129
}
122130
}
123131

@@ -127,6 +135,15 @@ private void interrupt() {
127135
}
128136
}
129137

138+
private static boolean isInterruptibleJobRunner(TaskTrackerAppContext appContext) {
139+
Class<?> jobRunnerClass = appContext.getJobRunnerClass();
140+
return jobRunnerClass != null && InterruptibleJobRunner.class.isAssignableFrom(appContext.getJobRunnerClass());
141+
}
142+
143+
private boolean isInterruptibleJobRunner() {
144+
return this.isInterruptibleJobRunner;
145+
}
146+
130147
private void monitor(Action action) {
131148
if (action == null) {
132149
return;

lts-tasktracker/src/test/java/com/lts/tasktracker/runner/NormalJobRunner.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,16 +4,22 @@
44
import com.lts.core.json.JSON;
55
import com.lts.tasktracker.Result;
66

7+
import java.util.concurrent.atomic.AtomicLong;
8+
79
/**
810
* @author Robert HG ([email protected]) on 2/21/16.
911
*/
1012
public class NormalJobRunner implements JobRunner {
11-
boolean stop = false;
13+
14+
protected boolean stop = false;
15+
16+
public static AtomicLong l = new AtomicLong(0);
17+
1218
@Override
1319
public Result run(Job job) throws Throwable {
1420
System.out.println("我开始执行:" + JSON.toJSONString(job));
1521
while (!stop) {
16-
int i = 1;
22+
l.incrementAndGet();
1723
}
1824
System.out.println("我退出了");
1925
return null;

lts-tasktracker/src/test/java/com/lts/tasktracker/runner/RunnerPoolTest.java

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,8 @@ public void testInterruptor() throws NoAvailableJobRunnerException {
3030
TaskTrackerAppContext appContext = new TaskTrackerAppContext();
3131
appContext.setConfig(config);
3232
appContext.setEventCenter(new InjvmEventCenter());
33-
// appContext.setJobRunnerClass(TestInterruptorJobRunner.class);
34-
appContext.setJobRunnerClass(NormalJobRunner.class);
33+
appContext.setJobRunnerClass(TestInterruptorJobRunner.class);
34+
// appContext.setJobRunnerClass(NormalJobRunner.class);
3535

3636
RunnerPool runnerPool = new RunnerPool(appContext);
3737

@@ -57,7 +57,7 @@ public JobWrapper runComplete(Response response) {
5757
jobWrapper.setJob(job);
5858

5959
runnerPool.execute(jobWrapper, callback);
60-
60+
System.out.println(runnerPool.getAvailablePoolSize());
6161

6262
try {
6363
Thread.sleep(5000L);
@@ -66,6 +66,18 @@ public JobWrapper runComplete(Response response) {
6666
}
6767
// 5s之后停止
6868
runnerPool.stopWorking();
69+
70+
while(true){
71+
try {
72+
// 如果这个数字还在增长,表示线程还在执行,测试发现 NormalJobRunner 确实还在执行 TestInterruptorJobRunner 会停止
73+
System.out.println(NormalJobRunner.l);
74+
Thread.sleep(1000L);
75+
} catch (InterruptedException e) {
76+
e.printStackTrace();
77+
}
78+
System.out.println(runnerPool.getAvailablePoolSize());
79+
}
80+
6981
}
7082

7183
}
Lines changed: 1 addition & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,29 +1,13 @@
11
package com.lts.tasktracker.runner;
22

3-
import com.lts.core.domain.Job;
4-
import com.lts.core.json.JSON;
5-
import com.lts.tasktracker.Result;
6-
73
/**
84
* @author Robert HG ([email protected]) on 2/21/16.
95
*/
10-
public class TestInterruptorJobRunner implements InterruptibleJobRunner {
11-
12-
private boolean stop = false;
6+
public class TestInterruptorJobRunner extends NormalJobRunner implements InterruptibleJobRunner {
137

148
@Override
159
public void interrupt() {
1610
System.out.println("我设置停止标识");
1711
stop = true;
1812
}
19-
20-
@Override
21-
public Result run(Job job) throws Throwable {
22-
System.out.println("我开始执行:" + JSON.toJSONString(job));
23-
while (!stop) {
24-
int i = 1;
25-
}
26-
System.out.println("我退出了");
27-
return null;
28-
}
2913
}

0 commit comments

Comments
 (0)