Skip to content

Commit c3adf1d

Browse files
committed
Merge pull request #8 from qq254963746/develop
Develop
2 parents 688d1ce + a290f98 commit c3adf1d

File tree

13 files changed

+358
-73
lines changed

13 files changed

+358
-73
lines changed

job-core/src/main/java/com/lts/job/core/cluster/Config.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,13 @@ private Map<String, Number> getNumbers() {
146146
}
147147
return numbers;
148148
}
149+
public boolean getParameter(String key, boolean defaultValue) {
150+
String value = getParameter(key);
151+
if (value == null || value.length() == 0) {
152+
return defaultValue;
153+
}
154+
return Boolean.parseBoolean(value);
155+
}
149156
public int getParameter(String key, int defaultValue) {
150157
Number n = getNumbers().get(key);
151158
if (n != null) {

job-core/src/main/java/com/lts/job/core/cluster/Node.java

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
package com.lts.job.core.cluster;
22

3+
import com.lts.job.core.registry.NodeRegistryUtils;
4+
35
import java.util.ArrayList;
46
import java.util.List;
57

@@ -11,7 +13,7 @@ public class Node {
1113

1214
// 是否可用
1315
private boolean isAvailable = true;
14-
16+
private String clusterName;
1517
private NodeType nodeType;
1618
private String ip;
1719
private Integer port;
@@ -25,6 +27,8 @@ public class Node {
2527
// 自己关注的节点类型
2628
private List<NodeType> listenNodeTypes;
2729

30+
private String fullString;
31+
2832
public boolean isAvailable() {
2933
return isAvailable;
3034
}
@@ -104,6 +108,14 @@ public void addListenNodeType(NodeType nodeType) {
104108
this.listenNodeTypes.add(nodeType);
105109
}
106110

111+
public String getClusterName() {
112+
return clusterName;
113+
}
114+
115+
public void setClusterName(String clusterName) {
116+
this.clusterName = clusterName;
117+
}
118+
107119
@Override
108120
public boolean equals(Object o) {
109121
if (this == o) return true;
@@ -124,10 +136,18 @@ public String getAddress() {
124136
return ip + ":" + port;
125137
}
126138

139+
public String toFullString() {
140+
if (fullString == null) {
141+
fullString = NodeRegistryUtils.getFullPath(this);
142+
}
143+
return fullString;
144+
}
145+
127146
@Override
128147
public String toString() {
129148
return "Node{" +
130149
"identity='" + identity + '\'' +
150+
", clusterName='" + clusterName + '\'' +
131151
", nodeType=" + nodeType +
132152
", ip='" + ip + '\'' +
133153
", port=" + port +

job-core/src/main/java/com/lts/job/core/constant/Constants.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,9 @@ public interface Constants {
4040

4141
public static final String UNREGISTER = "unregister";
4242

43+
public static final String SUBSCRIBE = "subscribe";
44+
45+
public static final String UNSUBSCRIBE = "unsubscribe";
4346
/**
4447
* 注册中心失败事件重试事件
4548
*/

job-core/src/main/java/com/lts/job/core/extension/ExtensionLoader.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -569,7 +569,7 @@ private String createAdaptiveExtensionClassCode() {
569569
break;
570570
}
571571
}
572-
// 有类型为URL的参数
572+
// 有类型为Config的参数
573573
if (configTypeIndex != -1) {
574574
// Null Point check
575575
String s = String.format("\nif (arg%d == null) throw new IllegalArgumentException(\"config == null\");",
@@ -579,11 +579,11 @@ private String createAdaptiveExtensionClassCode() {
579579
s = String.format("\n%s config = arg%d;", Config.class.getName(), configTypeIndex);
580580
code.append(s);
581581
}
582-
// 参数没有URL类型
582+
// 参数没有Config类型
583583
else {
584584
String attribMethod = null;
585585

586-
// 找到参数的URL属性
586+
// 找到参数的Config属性
587587
LBL_PTS:
588588
for (int i = 0; i < pts.length; ++i) {
589589
Method[] ms = pts[i].getMethods();

job-core/src/main/java/com/lts/job/core/factory/NodeFactory.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ public static <T extends Node> T create(Class<T> clazz, Config config) {
1818
node.setThreads(config.getWorkThreads());
1919
node.setPort(config.getListenPort());
2020
node.setIdentity(config.getIdentity());
21+
node.setClusterName(config.getClusterName());
2122
return node;
2223
} catch (InstantiationException e) {
2324
throw new RuntimeException(e);

job-core/src/main/java/com/lts/job/core/registry/NodeRegistryUtils.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -65,10 +65,10 @@ public static Node parse(String fullPath) {
6565
return node;
6666
}
6767

68-
public static String getFullPath(String clusterName, Node node) {
68+
public static String getFullPath(Node node) {
6969
StringBuilder path = new StringBuilder();
7070

71-
path.append(getRootPath(clusterName))
71+
path.append(getRootPath(node.getClusterName()))
7272
.append("/")
7373
.append(node.getNodeType())
7474
.append("/")
@@ -106,13 +106,14 @@ public static void main(String[] args) {
106106
node.setNodeType(NodeType.JOB_TRACKER);
107107
node.setCreateTime(new Date().getTime());
108108
node.setPort(2313);
109+
node.setClusterName("lts");
109110
node.setIp(NetUtils.getLocalHost());
110-
String fullPath = NodeRegistryUtils.getFullPath("lts", node);
111+
String fullPath = NodeRegistryUtils.getFullPath(node);
111112
System.out.println(fullPath);
112113

113114
node = NodeRegistryUtils.parse(fullPath);
114115
node.setNodeType(NodeType.JOB_CLIENT);
115-
fullPath = NodeRegistryUtils.getFullPath("lts", node);
116+
fullPath = NodeRegistryUtils.getFullPath(node);
116117
System.out.println(fullPath);
117118

118119
node = NodeRegistryUtils.parse(fullPath);

job-core/src/main/java/com/lts/job/core/registry/RegistryFactory.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,11 @@ public static Registry getRegistry(Config config) {
2626
address.replace("redis://", "")
2727
);
2828
return new RedisRegistry(config);
29+
} else if(address.startsWith("multicast://")){
30+
// config.setRegistryAddress(
31+
// address.replace("multicast://", "")
32+
// );
33+
// return new MulticastRegistry(config);
2934
}
3035
throw new IllegalArgumentException("illegal address protocol");
3136
}

0 commit comments

Comments
 (0)