Skip to content

Commit fa12f9e

Browse files
Merge pull request #27 from catcherwong/saga
feat: support saga
2 parents 34665cc + b93913c commit fa12f9e

File tree

6 files changed

+176
-30
lines changed

6 files changed

+176
-30
lines changed

pom.xml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66

77
<groupId>pub.dtm</groupId>
88
<artifactId>dtmcli-java</artifactId>
9-
<version>1.5.4-SNAPSHOT</version>
9+
<version>1.5.5-SNAPSHOT</version>
1010

1111
<properties>
1212
<maven.compiler.source>8</maven.compiler.source>
@@ -17,6 +17,7 @@
1717
<commons-lang3.version>3.12.0</commons-lang3.version>
1818
<slf4j-api.version>1.7.30</slf4j-api.version>
1919
<mysql-connector-java.version>8.0.27</mysql-connector-java.version>
20+
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
2021
</properties>
2122

2223
<dependencies>

src/main/java/client/DtmClient.java

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import common.model.DtmServerInfo;
3131
import common.utils.HttpUtil;
3232
import okhttp3.Response;
33+
import saga.Saga;
3334
import tcc.Tcc;
3435

3536
import java.util.Objects;
@@ -38,13 +39,13 @@
3839
* @author lixiaoshuang
3940
*/
4041
public class DtmClient {
41-
42+
4243
private String ipPort;
43-
44+
4445
public DtmClient(String ipPort) {
4546
this.ipPort = ipPort;
4647
}
47-
48+
4849
/**
4950
* 生成全局事务id
5051
*
@@ -73,7 +74,7 @@ public String genGid() throws Exception {
7374
}
7475
return jsonObject.get("gid").toString();
7576
}
76-
77+
7778
/**
7879
* tcc事务
7980
*
@@ -86,6 +87,13 @@ public void tccGlobalTransaction(String gid, DtmConsumer<Tcc> function) throws E
8687
Tcc tcc = new Tcc(ipPort, gid);
8788
tcc.tccGlobalTransaction(function);
8889
}
89-
90-
90+
91+
/**
92+
* create a new saga
93+
* @param gid
94+
*/
95+
public Saga newSaga(String gid) {
96+
Saga saga = new Saga(ipPort, gid);
97+
return saga;
98+
}
9199
}

src/main/java/common/constant/ParamFieldConstant.java

Lines changed: 33 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -28,29 +28,49 @@
2828
* @author lixiaoshuang
2929
*/
3030
public class ParamFieldConstant {
31-
31+
3232
public static final String GID = "gid";
33-
33+
3434
public static final String TRANS_TYPE = "trans_type";
35-
35+
3636
public static final String BRANCH_ID = "branch_id";
37-
37+
3838
public static final String STATUS = "status";
39-
39+
4040
public static final String DATA = "data";
41-
41+
4242
public static final String TRY = "try";
43-
43+
4444
public static final String CONFIRM = "confirm";
45-
45+
4646
public static final String CANCEL = "cancel";
47-
47+
4848
public static final String OP = "op";
49-
49+
5050
public static final String CODE = "code";
51-
51+
5252
public static final String MESSAGE = "message";
53-
53+
5454
public static final String DTM_RESULT = "dtm_result";
55-
55+
56+
public static final String ACTION = "action";
57+
58+
public static final String COMPENSATE = "compensate";
59+
60+
public static final String STEPS = "steps";
61+
62+
public static final String PAYLOADS = "payloads";
63+
64+
public static final String CUSTOM_DATA = "custom_data";
65+
66+
public static final String WAIT_RESULT = "wait_result";
67+
68+
public static final String TIMEOUT_TO_FAIL = "timeout_to_fail";
69+
70+
public static final String RETRY_INTERVAL = "retry_interval";
71+
72+
public static final String PASSTHROGH_HEADERS = "passthrough_headers";
73+
74+
public static final String BRANCH_HEADERS = "branch_headers";
75+
5676
}

src/main/java/common/enums/TransTypeEnum.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,16 +26,16 @@
2626

2727
public enum TransTypeEnum {
2828
// 事务类型
29-
TCC("tcc");
30-
29+
TCC("tcc"), SAGA("saga");
30+
3131
TransTypeEnum(String value) {
3232
this.value = value;
3333
}
34-
34+
3535
private String value;
36-
36+
3737
public String getValue() {
3838
return this.value;
3939
}
40-
40+
4141
}

src/main/java/common/model/TransBase.java

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,23 +28,40 @@
2828
import lombok.Data;
2929
import lombok.NoArgsConstructor;
3030

31+
import java.util.ArrayList;
32+
import java.util.HashMap;
33+
import java.util.Map;
34+
3135
@Data
3236
@NoArgsConstructor
3337
public class TransBase {
34-
38+
3539
/**
3640
* 全局事务id
3741
*/
3842
private String gid;
39-
43+
4044
/**
4145
* 事务类型
4246
*/
4347
private TransTypeEnum transTypeEnum;
44-
45-
48+
4649
private boolean waitResult;
47-
50+
51+
private long timeoutToFail;
52+
53+
private long retryInterval;
54+
55+
private Map<String, String> branchHeaders = new HashMap<>();
56+
57+
private ArrayList<String> passthroughHeaders = new ArrayList<>();
58+
59+
private String customData;
60+
61+
private ArrayList<Map<String, String>> steps = new ArrayList<>();
62+
63+
private ArrayList<String> payloads = new ArrayList<>();
64+
4865
public TransBase(TransTypeEnum transTypeEnum, String gid, boolean waitResult) {
4966
this.gid = gid;
5067
this.transTypeEnum = transTypeEnum;

src/main/java/saga/Saga.java

Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,106 @@
2424

2525
package saga;
2626

27+
import com.alibaba.fastjson.JSONObject;
28+
import common.constant.Constant;
29+
import common.constant.ParamFieldConstant;
30+
import common.model.DtmServerInfo;
31+
import common.utils.HttpUtil;
32+
import common.enums.TransTypeEnum;
33+
import common.model.TransBase;
34+
35+
import java.util.ArrayList;
36+
import java.util.HashMap;
37+
import java.util.Map;
38+
2739
public class Saga {
2840

41+
private static final String ORDERS = "orders";
42+
43+
private static final String CONCURRENT = "concurrent";
44+
45+
private TransBase transBase;
46+
47+
private DtmServerInfo dtmServerInfo;
48+
49+
private boolean concurrent;
50+
51+
private Map<String, ArrayList<Integer>> orders;
52+
53+
public Saga(String ipPort, String gid) {
54+
this.dtmServerInfo = new DtmServerInfo(ipPort);
55+
this.transBase = new TransBase(TransTypeEnum.SAGA, gid, false);
56+
this.concurrent = false;
57+
this.orders = new HashMap<>();
58+
}
59+
60+
public Saga add(String action, String compensate, Object postData) {
61+
HashMap<String, String> step = new HashMap<>();
62+
step.put(ParamFieldConstant.ACTION, action);
63+
step.put(ParamFieldConstant.COMPENSATE, compensate);
64+
transBase.getSteps().add(step);
65+
transBase.getPayloads().add(JSONObject.toJSONString(postData));
66+
return this;
67+
}
68+
69+
public Saga addBranchOrder(Integer branch, ArrayList<Integer> preBranches) {
70+
orders.put(branch.toString(), preBranches);
71+
return this;
72+
}
73+
74+
public Saga enableConcurrent() {
75+
concurrent = true;
76+
return this;
77+
}
78+
79+
public void submit() throws Exception {
80+
addConcurrentContext();
81+
HashMap<String, Object> mapParam = new HashMap<>(Constant.DEFAULT_INITIAL_CAPACITY);
82+
mapParam.put(ParamFieldConstant.GID, transBase.getGid());
83+
mapParam.put(ParamFieldConstant.TRANS_TYPE, TransTypeEnum.SAGA.getValue());
84+
mapParam.put(ParamFieldConstant.STEPS, transBase.getSteps());
85+
mapParam.put(ParamFieldConstant.PAYLOADS, transBase.getPayloads());
86+
mapParam.put(ParamFieldConstant.CUSTOM_DATA, transBase.getCustomData());
87+
mapParam.put(ParamFieldConstant.WAIT_RESULT, transBase.isWaitResult());
88+
mapParam.put(ParamFieldConstant.TIMEOUT_TO_FAIL, transBase.getTimeoutToFail());
89+
mapParam.put(ParamFieldConstant.RETRY_INTERVAL, transBase.getRetryInterval());
90+
mapParam.put(ParamFieldConstant.PASSTHROGH_HEADERS, transBase.getPassthroughHeaders());
91+
mapParam.put(ParamFieldConstant.BRANCH_HEADERS, transBase.getBranchHeaders());
92+
93+
HttpUtil.post(dtmServerInfo.submit(), JSONObject.toJSONString(mapParam));
94+
}
95+
96+
public Saga enableWaitResult() {
97+
transBase.setWaitResult(true);
98+
return this;
99+
}
100+
101+
public Saga setTimeoutToFail(long timeoutToFail) {
102+
transBase.setTimeoutToFail(timeoutToFail);
103+
return this;
104+
}
105+
106+
public Saga setRetryInterval(long retryInterval) {
107+
transBase.setRetryInterval(retryInterval);
108+
return this;
109+
}
110+
111+
public Saga setBranchHeaders(Map<String, String> headers) {
112+
transBase.setBranchHeaders(headers);
113+
return this;
114+
}
115+
116+
public Saga setPassthroughHeaders(ArrayList<String> passthroughHeaders) {
117+
transBase.setPassthroughHeaders(passthroughHeaders);
118+
return this;
119+
}
120+
121+
private void addConcurrentContext() {
122+
if (concurrent) {
123+
HashMap<String, Object> data = new HashMap<>();
124+
data.put(ORDERS, orders);
125+
data.put(CONCURRENT, true);
126+
transBase.setCustomData(JSONObject.toJSONString(data));
127+
}
128+
}
29129
}

0 commit comments

Comments
 (0)