Skip to content

Commit bff5325

Browse files
committed
Merge pull request #9 from qq254963746/develop
针对 客户端 提交任务过快, 进行过载保护
2 parents c3adf1d + 4df0580 commit bff5325

File tree

14 files changed

+212
-30
lines changed

14 files changed

+212
-30
lines changed

job-client/src/main/java/com/lts/job/client/JobClient.java

Lines changed: 24 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@
66
import com.lts.job.client.domain.ResponseCode;
77
import com.lts.job.client.processor.RemotingDispatcher;
88
import com.lts.job.client.support.JobFinishedHandler;
9+
import com.lts.job.client.support.JobSubmitExecutor;
10+
import com.lts.job.client.support.JobSubmitProtector;
911
import com.lts.job.core.Application;
1012
import com.lts.job.core.cluster.AbstractClientNode;
1113
import com.lts.job.core.constant.Constants;
@@ -35,15 +37,21 @@
3537
*/
3638
public class JobClient<T extends JobClientNode, App extends Application> extends AbstractClientNode<JobClientNode, JobClientApplication> {
3739

38-
private static final Logger LOGGER = LoggerFactory.getLogger("JobClient");
40+
protected static final Logger LOGGER = LoggerFactory.getLogger("JobClient");
3941

4042
private static final int BATCH_SIZE = 50;
4143

44+
// 过载保护的提交者
45+
private JobSubmitProtector protector;
46+
4247
private JobFinishedHandler jobFinishedHandler;
4348

4449
public JobClient() {
4550
// 设置默认节点组
4651
config.setNodeGroup(Constants.DEFAULT_NODE_JOB_CLIENT_GROUP);
52+
//
53+
int concurrentSize = config.getParameter(Constants.JOB_SUBMIT_CONCURRENCY_SIZE, Constants.DEFAULT_JOB_SUBMIT_CONCURRENCY_SIZE);
54+
protector = new JobSubmitProtector(concurrentSize);
4755
}
4856

4957
@Override
@@ -60,11 +68,20 @@ protected void nodeDisable() {
6068
* @param job
6169
* @return
6270
*/
63-
public Response submitJob(Job job) {
64-
return submitJob(Arrays.asList(job));
71+
public Response submitJob(Job job) throws JobSubmitException {
72+
return protectSubmit(Arrays.asList(job));
73+
}
74+
75+
private Response protectSubmit(List<Job> jobs) throws JobSubmitException {
76+
return protector.execute(jobs, new JobSubmitExecutor<Response>() {
77+
@Override
78+
public Response execute(List<Job> jobs) throws JobSubmitException {
79+
return _submitJob(jobs);
80+
}
81+
});
6582
}
6683

67-
protected Response _submitJob(final List<Job> jobs) {
84+
private Response _submitJob(final List<Job> jobs) throws JobSubmitException {
6885
// 参数验证
6986
if (CollectionUtils.isEmpty(jobs)) {
7087
throw new JobSubmitException("提交任务不能为空!");
@@ -76,17 +93,14 @@ protected Response _submitJob(final List<Job> jobs) {
7693
job.checkField();
7794
}
7895
}
79-
8096
final Response response = new Response();
81-
8297
try {
8398
JobSubmitRequest jobSubmitRequest = application.getCommandBodyWrapper().wrapper(new JobSubmitRequest());
8499
jobSubmitRequest.setJobs(jobs);
85100

86101
RemotingCommand requestCommand = RemotingCommand.createRequestCommand(JobProtos.RequestCode.SUBMIT_JOB.code(), jobSubmitRequest);
87102

88103
final CountDownLatch latch = new CountDownLatch(1);
89-
90104
remotingClient.invokeAsync(requestCommand, new InvokeCallback() {
91105
@Override
92106
public void operationComplete(ResponseFuture responseFuture) {
@@ -144,16 +158,16 @@ public void operationComplete(ResponseFuture responseFuture) {
144158
* @param jobs
145159
* @return
146160
*/
147-
public Response submitJob(List<Job> jobs) {
161+
public Response submitJob(List<Job> jobs) throws JobSubmitException {
148162

149163
Response response = new Response();
150164
response.setSuccess(true);
151165

152166
for (int i = 0; i <= jobs.size() / BATCH_SIZE; i++) {
153167
List<Job> subJobs = BatchUtils.getBatchList(i, BATCH_SIZE, jobs);
154168

155-
if (subJobs != null && subJobs.size() > 0) {
156-
Response subResponse = _submitJob(subJobs);
169+
if (CollectionUtils.isNotEmpty(subJobs)) {
170+
Response subResponse = protectSubmit(subJobs);
157171
if (!subResponse.isSuccess()) {
158172
response.setSuccess(false);
159173
response.addFailedJobs(subJobs);

job-client/src/main/java/com/lts/job/client/RetryJobClient.java

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import com.lts.job.client.domain.Response;
66
import com.lts.job.client.domain.ResponseCode;
77
import com.lts.job.core.domain.Job;
8+
import com.lts.job.core.exception.JobSubmitException;
89
import com.lts.job.core.support.RetryScheduler;
910

1011
import java.util.Arrays;
@@ -29,7 +30,12 @@ protected boolean isRemotingEnable() {
2930

3031
@Override
3132
protected boolean retry(List<Job> jobs) {
32-
return superSubmitJob(jobs).isSuccess();
33+
try {
34+
return superSubmitJob(jobs).isSuccess();
35+
} catch (Throwable t) {
36+
LOGGER.error(t.getMessage(), t);
37+
}
38+
return false;
3339
}
3440
};
3541
super.innerStart();
@@ -43,12 +49,12 @@ protected void innerStop() {
4349
}
4450

4551
@Override
46-
public Response submitJob(Job job) {
52+
public Response submitJob(Job job) throws JobSubmitException {
4753
return submitJob(Arrays.asList(job));
4854
}
4955

5056
@Override
51-
public Response submitJob(List<Job> jobs) {
57+
public Response submitJob(List<Job> jobs) throws JobSubmitException {
5258
Response response = superSubmitJob(jobs);
5359

5460
if (!response.isSuccess()) {
@@ -67,7 +73,7 @@ public Response submitJob(List<Job> jobs) {
6773
return response;
6874
}
6975

70-
private Response superSubmitJob(List<Job> jobs) {
76+
private Response superSubmitJob(List<Job> jobs) throws JobSubmitException {
7177
return super.submitJob(jobs);
7278
}
7379
}
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
package com.lts.job.client.support;
2+
3+
import com.lts.job.core.domain.Job;
4+
import com.lts.job.core.exception.JobSubmitException;
5+
6+
import java.util.List;
7+
8+
/**
9+
* @author Robert HG ([email protected]) on 5/21/15.
10+
*/
11+
public interface JobSubmitExecutor<T> {
12+
13+
T execute(List<Job> jobs) throws JobSubmitException;
14+
15+
}
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
package com.lts.job.client.support;
2+
3+
import com.lts.job.core.exception.JobSubmitException;
4+
5+
/**
6+
* @author Robert HG ([email protected]) on 5/21/15.
7+
*/
8+
public class JobSubmitProtectException extends JobSubmitException {
9+
10+
int concurrentSize;
11+
12+
public JobSubmitProtectException(int concurrentSize) {
13+
super();
14+
this.concurrentSize = concurrentSize;
15+
}
16+
17+
public JobSubmitProtectException(int concurrentSize, String message) {
18+
super(message);
19+
this.concurrentSize = concurrentSize;
20+
}
21+
22+
public JobSubmitProtectException(int concurrentSize, String message, Throwable cause) {
23+
super(message, cause);
24+
this.concurrentSize = concurrentSize;
25+
}
26+
27+
public JobSubmitProtectException(int concurrentSize, Throwable cause) {
28+
super(cause);
29+
this.concurrentSize = concurrentSize;
30+
}
31+
}
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
package com.lts.job.client.support;
2+
3+
import com.lts.job.client.domain.Response;
4+
import com.lts.job.core.constant.Constants;
5+
import com.lts.job.core.domain.Job;
6+
import com.lts.job.core.exception.JobSubmitException;
7+
8+
import java.util.List;
9+
import java.util.concurrent.Semaphore;
10+
import java.util.concurrent.TimeUnit;
11+
12+
/**
13+
* 用来处理客户端请求过载问题
14+
*
15+
* @author Robert HG ([email protected]) on 5/21/15.
16+
*/
17+
public class JobSubmitProtector {
18+
19+
private int concurrentSize = Constants.AVAILABLE_PROCESSOR * 4;
20+
// 用信号量进行过载保护
21+
private Semaphore semaphore;
22+
private int timeout = 500;
23+
private String errorMsg;
24+
;
25+
26+
public JobSubmitProtector() {
27+
errorMsg = "the concurrent size is " + concurrentSize + " , submit too fast ! use " + Constants.JOB_SUBMIT_CONCURRENCY_SIZE + " can change the concurrent size .";
28+
}
29+
30+
public JobSubmitProtector(int concurrentSize) {
31+
this();
32+
if (concurrentSize > 0) {
33+
this.concurrentSize = concurrentSize;
34+
}
35+
semaphore = new Semaphore(this.concurrentSize);
36+
}
37+
38+
public Response execute(final List<Job> jobs, final JobSubmitExecutor<Response> jobSubmitExecutor) throws JobSubmitException {
39+
try {
40+
try {
41+
boolean acquire = semaphore.tryAcquire(timeout, TimeUnit.MILLISECONDS);
42+
if (!acquire) {
43+
throw new JobSubmitProtectException(concurrentSize, errorMsg);
44+
}
45+
} catch (InterruptedException e) {
46+
throw new JobSubmitProtectException(concurrentSize, errorMsg);
47+
}
48+
return jobSubmitExecutor.execute(jobs);
49+
} finally {
50+
semaphore.release();
51+
}
52+
}
53+
54+
public void getConcurrentSize(int concurrentSize) {
55+
this.concurrentSize = concurrentSize;
56+
}
57+
}

job-core/src/main/java/com/lts/job/core/constant/Constants.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,4 +67,7 @@ public interface Constants {
6767
public static final String JOB_LOGGER_KEY = "job.logger";
6868

6969
public static final String JOB_QUEUE_KEY = "job.queue";
70+
// 客户端提交并发请求size
71+
public static final String JOB_SUBMIT_CONCURRENCY_SIZE = "job.submit.concurrency.size";
72+
public static final int DEFAULT_JOB_SUBMIT_CONCURRENCY_SIZE = Constants.AVAILABLE_PROCESSOR * 4;
7073
}

job-core/src/main/java/com/lts/job/core/exception/JobSubmitException.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
/**
44
* @author Robert HG ([email protected]) on 5/12/15.
55
*/
6-
public class JobSubmitException extends RuntimeException {
6+
public class JobSubmitException extends Exception {
77

88
public JobSubmitException() {
99
super();

job-core/src/main/java/com/lts/job/core/util/Holder.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,13 @@ public class Holder<T> {
77

88
private volatile T value;
99

10+
public Holder(T value) {
11+
this.value = value;
12+
}
13+
14+
public Holder(){
15+
}
16+
1017
public void set(T value) {
1118
this.value = value;
1219
}

job-core/src/main/java/com/lts/job/store/mongo/MongoFactoryBean.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,8 @@ public MongoFactoryBean(String[] serverAddresses) {
2424
replSeeds(serverAddresses);
2525
}
2626

27-
public MongoFactoryBean(String serverAddresse) {
28-
replSeeds(new String[]{serverAddresse});
27+
public MongoFactoryBean(String serverAddress) {
28+
replSeeds(new String[]{serverAddress});
2929
}
3030

3131
public MongoFactoryBean(MongoClientOptions mongoClientOptions) {

job-example/src/main/java/com/lts/job/example/api/JobClientTest.java

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,12 @@
11
package com.lts.job.example.api;
22

3+
import com.alibaba.fastjson.JSONObject;
34
import com.lts.job.client.JobClient;
45
import com.lts.job.client.RetryJobClient;
6+
import com.lts.job.client.domain.Response;
7+
import com.lts.job.core.domain.Job;
8+
import com.lts.job.core.exception.JobSubmitException;
9+
import com.lts.job.core.util.StringUtils;
510
import com.lts.job.example.support.BaseJobClientTest;
611
import com.lts.job.example.support.JobFinishedHandlerImpl;
712
import com.lts.job.example.support.MasterChangeListenerImpl;
@@ -14,7 +19,11 @@
1419
public class JobClientTest extends BaseJobClientTest {
1520

1621
public static void main(String[] args) throws IOException {
22+
console();
23+
// testProtector();
24+
}
1725

26+
public static void console() throws IOException {
1827
JobClient jobClient = new RetryJobClient();
1928
// final JobClient jobClient = new JobClient();
2029
jobClient.setNodeGroup("test_jobClient");
@@ -33,4 +42,40 @@ public static void main(String[] args) throws IOException {
3342
jobClientTest.startConsole();
3443
}
3544

45+
public static void testProtector() {
46+
47+
final JobClient jobClient = new RetryJobClient();
48+
// final JobClient jobClient = new JobClient();
49+
jobClient.setNodeGroup("test_jobClient");
50+
jobClient.setClusterName("test_cluster");
51+
jobClient.setRegistryAddress("zookeeper://127.0.0.1:2181");
52+
// jobClient.setRegistryAddress("redis://127.0.0.1:6379");
53+
// 任务重试保存地址,默认用户目录下
54+
// jobClient.setJobInfoSavePath(Constants.USER_HOME);
55+
jobClient.setJobFinishedHandler(new JobFinishedHandlerImpl());
56+
jobClient.addMasterChangeListener(new MasterChangeListenerImpl());
57+
jobClient.addConfig("job.submit.concurrency.size", "3");
58+
jobClient.start();
59+
60+
for (int i = 0; i < 50; i++) {
61+
62+
new Thread(new Runnable() {
63+
@Override
64+
public void run() {
65+
Job job = new Job();
66+
job.setTaskId(StringUtils.generateUUID());
67+
job.setTaskTrackerNodeGroup("test_trade_TaskTracker");
68+
job.setParam("shopId", "111");
69+
try {
70+
Response response = jobClient.submitJob(job);
71+
System.out.println(JSONObject.toJSONString(response));
72+
} catch (JobSubmitException e) {
73+
e.printStackTrace();
74+
}
75+
}
76+
}).start();
77+
}
78+
79+
}
80+
3681
}

0 commit comments

Comments
 (0)