Skip to content

Commit f028ddc

Browse files
committed
Merge pull request #22 from qq254963746/develop
Develop
2 parents 13d927b + 287a1cb commit f028ddc

File tree

29 files changed

+662
-207
lines changed

29 files changed

+662
-207
lines changed

README.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,8 @@ LTS 轻量级分布式任务调度框架(Light Task Schedule)
6363
jobTracker.setClusterName("test_cluster");
6464
jobTracker.addMasterChangeListener(new MasterChangeListenerImpl());
6565
// 设置业务日志记录
66+
// jobTracker.addConfig("job.logger", "console"); // 默认
67+
// jobTracker.addConfig("job.logger", "mysql");
6668
// jobTracker.addConfig("job.logger", "mongo");
6769

6870
// 1. 任务队列用mongo
@@ -74,7 +76,7 @@ LTS 轻量级分布式任务调度框架(Light Task Schedule)
7476
// 2. 任务队里用mysql
7577
// jobTracker.addConfig("job.queue", "mysql");
7678
// mysql 配置
77-
// jobTracker.addConfig("jdbc.url", "jdbc:mysql://test.superboss.cc:3306/lts");
79+
// jobTracker.addConfig("jdbc.url", "jdbc:mysql://127.0.0.1:3306/lts");
7880
// jobTracker.addConfig("jdbc.username", "root");
7981
// jobTracker.addConfig("jdbc.password", "root");
8082

job-core/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313

1414
<dependencies>
1515
<dependency>
16-
<groupId>com.google.code.morphia</groupId>
16+
<groupId>org.mongodb.morphia</groupId>
1717
<artifactId>morphia</artifactId>
1818
<scope>provided</scope>
1919
</dependency>

job-core/src/main/java/com/lts/job/core/cluster/Config.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ public class Config {
2020
private String nodeGroup;
2121
// 唯一标识
2222
private String identity;
23-
// 工作线程
23+
// 工作线程, 目前只对 TaskTracker 有效
2424
private int workThreads;
2525
// 节点类型
2626
private NodeType nodeType;

job-core/src/main/java/com/lts/job/core/constant/Constants.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,4 +75,9 @@ public interface Constants {
7575
public static final int DEFAULT_PROCESSOR_THREAD = 32 + AVAILABLE_PROCESSOR * 5;
7676

7777
public static final int LATCH_TIMEOUT_MILLIS = 10 * 60 * 1000; // 10分钟
78+
79+
// 取任务的时候的并发数控制
80+
public static final String JOB_TAKE_PARALLEL_SIZE = "job.take.parallel.size";
81+
public static final String JOB_TAKE_ACQUIRE_TIMEOUT = "job.take.acquire.timeout";
82+
public static final int DEFAULT_JOB_TAKE_PARALLEL_SIZE = 20;
7883
}

job-core/src/main/java/com/lts/job/core/factory/JobNodeConfigFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ public static Config getDefaultConfig() {
1616
config.setNodeGroup("lts");
1717
config.setRegistryAddress("zookeeper://127.0.0.1:2181");
1818
config.setInvokeTimeoutMillis(1000 * 60);
19-
config.setListenPort(0);
19+
config.setListenPort(Constants.JOB_TRACKER_DEFAULT_LISTEN_PORT);
2020
config.setFailStorePath(Constants.USER_HOME);
2121
config.setClusterName(Constants.DEFAULT_CLUSTER_NAME);
2222
return config;

job-core/src/main/java/com/lts/job/core/registry/FailbackRegistry.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -195,6 +195,7 @@ protected void retry() {
195195
try {
196196
for (Node node : failed) {
197197
doRegister(node);
198+
failedRegistered.remove(node);
198199
}
199200
} catch (Throwable t) { // 忽略所有异常,等待下次重试
200201
LOGGER.warn("Failed to retry register " + failed + ", waiting for again, cause: " + t.getMessage(), t);
@@ -210,6 +211,7 @@ protected void retry() {
210211
try {
211212
for (Node node : failed) {
212213
doUnRegister(node);
214+
failedUnRegistered.remove(node);
213215
}
214216
} catch (Throwable t) { // 忽略所有异常,等待下次重试
215217
LOGGER.warn("Failed to retry unregister " + failed + ", waiting for again, cause: " + t.getMessage(), t);
@@ -235,6 +237,7 @@ protected void retry() {
235237
try {
236238
doSubscribe(node, listener);
237239
listeners.remove(listener);
240+
failedSubscribed.remove(entry.getKey());
238241
} catch (Throwable t) { // 忽略所有异常,等待下次重试
239242
LOGGER.warn("Failed to retry subscribe " + failed + ", waiting for again, cause: " + t.getMessage(), t);
240243
}

job-core/src/main/java/com/lts/job/core/util/StringUtils.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -118,14 +118,14 @@ public static String toString(String msg, Throwable e) {
118118
}
119119
}
120120

121-
public static String concat(Object... objects) {
122-
if (objects == null) {
121+
public static String concat(String ... strings) {
122+
if (strings == null) {
123123
return null;
124124
}
125125
StringBuilder sb = new StringBuilder();
126-
for (Object object : objects) {
127-
if (object != null) {
128-
sb.append(object.toString());
126+
for (String str : strings) {
127+
if (str != null) {
128+
sb.append(str);
129129
}
130130
}
131131
return sb.toString();

job-core/src/main/java/com/lts/job/ec/injvm/InjvmEventCenter.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,6 @@
99
import com.lts.job.ec.EventInfo;
1010
import com.lts.job.ec.EventSubscriber;
1111

12-
import java.util.List;
13-
import java.util.Map;
1412
import java.util.Set;
1513
import java.util.concurrent.ConcurrentHashMap;
1614
import java.util.concurrent.ExecutorService;
@@ -25,7 +23,7 @@ public class InjvmEventCenter implements EventCenter {
2523

2624
private static final Logger LOGGER = LoggerFactory.getLogger(EventCenter.class.getName());
2725

28-
private final Map<String, Set<EventSubscriber>> ecMap =
26+
private final ConcurrentHashMap<String, Set<EventSubscriber>> ecMap =
2927
new ConcurrentHashMap<String, Set<EventSubscriber>>();
3028

3129
private final ExecutorService executor = Executors.newFixedThreadPool(Constants.AVAILABLE_PROCESSOR * 2);

job-core/src/main/java/com/lts/job/store/mongo/DataStoreProvider.java

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
package com.lts.job.store.mongo;
22

3-
import com.google.code.morphia.Datastore;
4-
import com.google.code.morphia.Morphia;
53
import com.lts.job.core.cluster.Config;
64
import com.lts.job.core.util.StringUtils;
7-
import com.mongodb.Mongo;
5+
import com.mongodb.MongoClient;
6+
import org.mongodb.morphia.Datastore;
7+
import org.mongodb.morphia.Morphia;
88

99
import java.util.concurrent.ConcurrentHashMap;
1010

@@ -22,8 +22,10 @@ public static Datastore getDataStore(Config config) {
2222

2323
String[] addresses = config.getParameter(ADDRESSES_KEY, new String[]{DEFAULT_ADDRESSES});
2424
String database = config.getParameter(DATABASE_KEY, DEFAULT_DATABASE);
25+
String username = config.getParameter(USERNAME);
26+
String pwd = config.getParameter(PASSWORD);
2527

26-
String cachedKey = StringUtils.concat(addresses, database);
28+
String cachedKey = StringUtils.concat(StringUtils.concat(addresses), database, username, pwd);
2729

2830
Datastore datastore = DATA_STORE_MAP.get(cachedKey);
2931
if (datastore == null) {
@@ -34,8 +36,8 @@ public static Datastore getDataStore(Config config) {
3436
return datastore;
3537
}
3638
Morphia morphia = new Morphia();
37-
MongoFactoryBean mongoFactoryBean = new MongoFactoryBean(addresses);
38-
Mongo mongo = mongoFactoryBean.createInstance();
39+
MongoFactoryBean mongoFactoryBean = new MongoFactoryBean(addresses, username, database, pwd);
40+
MongoClient mongo = mongoFactoryBean.createInstance();
3941
datastore = morphia.createDatastore(mongo, database);
4042
DATA_STORE_MAP.put(cachedKey, datastore);
4143
}
@@ -52,4 +54,8 @@ public static Datastore getDataStore(Config config) {
5254
private static final String DEFAULT_ADDRESSES = "127.0.0.1:27017";
5355
private static final String DATABASE_KEY = "mongo.database";
5456
private static final String DEFAULT_DATABASE = "lts";
57+
private static final String USERNAME = "mongo.username";
58+
private static final String PASSWORD = "mongo.password";
59+
60+
5561
}

job-core/src/main/java/com/lts/job/store/mongo/MongoFactoryBean.java

Lines changed: 31 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -2,42 +2,62 @@
22

33
import com.lts.job.core.logger.Logger;
44
import com.lts.job.core.logger.LoggerFactory;
5-
import com.mongodb.Mongo;
6-
import com.mongodb.MongoClient;
7-
import com.mongodb.MongoClientOptions;
8-
import com.mongodb.ServerAddress;
5+
import com.lts.job.core.util.StringUtils;
6+
import com.mongodb.*;
97

108
import java.util.ArrayList;
119
import java.util.List;
1210

1311
/**
1412
* @author Robert HG ([email protected]) on 8/8/14.
15-
* Mongo 工厂类
13+
* Mongo 工厂类
1614
*/
1715
public class MongoFactoryBean {
1816

1917
private static final Logger LOGGER = LoggerFactory.getLogger(MongoFactoryBean.class);
2018
private List<ServerAddress> replicaSetSeeds = new ArrayList<ServerAddress>();
2119
private MongoClientOptions mongoClientOptions;
20+
private List<MongoCredential> mongoCredentials;
21+
2222

2323
public MongoFactoryBean(String[] serverAddresses) {
2424
replSeeds(serverAddresses);
2525
}
2626

27-
public MongoFactoryBean(String serverAddress) {
28-
replSeeds(new String[]{serverAddress});
27+
public MongoFactoryBean(String[] serverAddresses, String username, String database, String pwd) {
28+
this(serverAddresses, MongoCredential.MONGODB_CR_MECHANISM, username, database, pwd);
2929
}
3030

31-
public MongoFactoryBean(MongoClientOptions mongoClientOptions) {
31+
public MongoFactoryBean(String[] serverAddresses, String mechanism, String username, String database, String pwd) {
32+
replSeeds(serverAddresses);
33+
if (StringUtils.isNotEmpty(username)) {
34+
if (MongoCredential.GSSAPI_MECHANISM.equals(mechanism)) {
35+
mongoCredentials.add(MongoCredential.createGSSAPICredential(username));
36+
} else {
37+
mongoCredentials.add(MongoCredential.createMongoCRCredential(username, database, pwd.toCharArray()));
38+
}
39+
}
40+
}
41+
42+
public MongoFactoryBean(String[] serverAddresses, MongoClientOptions mongoClientOptions) {
43+
this(serverAddresses);
3244
this.mongoClientOptions = mongoClientOptions;
3345
}
3446

35-
public Mongo createInstance() throws Exception {
47+
public MongoClient createInstance() throws Exception {
3648
if (replicaSetSeeds.size() > 0) {
3749
if (mongoClientOptions != null) {
38-
return new MongoClient(replicaSetSeeds, mongoClientOptions);
50+
if (mongoCredentials != null) {
51+
return new MongoClient(replicaSetSeeds, mongoCredentials, mongoClientOptions);
52+
} else {
53+
return new MongoClient(replicaSetSeeds, mongoClientOptions);
54+
}
55+
}
56+
if (mongoCredentials != null) {
57+
return new MongoClient(replicaSetSeeds, mongoCredentials);
58+
} else {
59+
return new MongoClient(replicaSetSeeds);
3960
}
40-
return new MongoClient(replicaSetSeeds);
4161
}
4262
return new MongoClient();
4363
}

0 commit comments

Comments
 (0)