Skip to content

Commit b47b9ce

Browse files
authored
Merge pull request #71 from dingxin-tech/release/0.50.x
feat(fallback-policy): 添加始终回退策略及自定义回退策略支持
2 parents 7ddebd7 + 18d5a82 commit b47b9ce

File tree

2 files changed

+70
-40
lines changed

2 files changed

+70
-40
lines changed

odps-sdk/odps-sdk-core/src/main/java/com/aliyun/odps/sqa/FallbackPolicy.java

Lines changed: 59 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
* Created by dongxiao on 2020/4/21.
55
*/
66
public class FallbackPolicy {
7+
78
// fallback when resource not enough exception happened
89
private boolean fallback4ResourceNotEnough = true;
910
// fallback when unsupported feature
@@ -17,17 +18,18 @@ public class FallbackPolicy {
1718
// fallback when attach session failed
1819
private boolean fallback4AttachError = true;
1920

21+
// always fallback
22+
private boolean alwaysFallback = false;
23+
24+
// user defined fallback policy
25+
private UserDefinedFallbackPolicy userDefinedFallbackPolicy;
26+
2027
FallbackPolicy() {
2128
}
2229

2330
public static FallbackPolicy alwaysFallbackPolicy() {
2431
FallbackPolicy policy = new FallbackPolicy();
25-
return policy.fallback4ResourceNotEnough(true)
26-
.fallback4UnsupportedFeature(true)
27-
.fallback4RunningTimeout(true)
28-
.fallback4Upgrading(true)
29-
.fallback4UnknownError(true)
30-
.fallback4AttachError(true);
32+
return policy.alwaysFallback(true);
3133
}
3234

3335
public static FallbackPolicy alwaysFallbackExceptAttachPolicy() {
@@ -37,7 +39,8 @@ public static FallbackPolicy alwaysFallbackExceptAttachPolicy() {
3739
.fallback4RunningTimeout(true)
3840
.fallback4Upgrading(true)
3941
.fallback4UnknownError(true)
40-
.fallback4AttachError(false);
42+
.fallback4AttachError(false)
43+
.alwaysFallback(false);
4144
}
4245

4346
public static FallbackPolicy nonFallbackPolicy() {
@@ -47,7 +50,8 @@ public static FallbackPolicy nonFallbackPolicy() {
4750
.fallback4RunningTimeout(false)
4851
.fallback4Upgrading(false)
4952
.fallback4UnknownError(false)
50-
.fallback4AttachError(false);
53+
.fallback4AttachError(false)
54+
.alwaysFallback(false);
5155
}
5256

5357
public FallbackPolicy fallback4ResourceNotEnough(boolean enable) {
@@ -80,6 +84,16 @@ public FallbackPolicy fallback4AttachError(boolean enable) {
8084
return this;
8185
}
8286

87+
public FallbackPolicy alwaysFallback(boolean enable) {
88+
alwaysFallback = enable;
89+
return this;
90+
}
91+
92+
public FallbackPolicy addUserDefinedFallbackPolicy(UserDefinedFallbackPolicy policy) {
93+
userDefinedFallbackPolicy = policy;
94+
return this;
95+
}
96+
8397
public boolean isFallback4ResourceNotEnough() {
8498
return fallback4ResourceNotEnough;
8599
}
@@ -105,7 +119,42 @@ public boolean isFallback4AttachError() {
105119
}
106120

107121
public boolean isAlwaysFallBack() {
108-
return fallback4ResourceNotEnough && fallback4UnsupportedFeature && fallback4RunningTimeout
109-
&& fallback4Upgrading && fallback4UnknownError && fallback4AttachError;
122+
return alwaysFallback;
123+
}
124+
125+
public boolean shouldFallback(String errorCode, String errorMessage) {
126+
if (alwaysFallback) {
127+
return true;
128+
} else if (userDefinedFallbackPolicy != null
129+
&& userDefinedFallbackPolicy.shouldFallback(errorCode, errorMessage)) {
130+
return true;
131+
} else if (isFallback4UnsupportedFeature()
132+
&& errorMessage.contains(SQLExecutorConstants.sessionUnsupportedFeatureFlag)) {
133+
return true;
134+
} else if (isFallback4Upgrading()
135+
&& errorMessage.contains(SQLExecutorConstants.sessionUnavailableFlag)) {
136+
return true;
137+
} else if (isFallback4Upgrading()
138+
&& errorMessage.contains(SQLExecutorConstants.sessionAccessDenyFlag)) {
139+
return true;
140+
} else if (isFallback4ResourceNotEnough()
141+
&& errorMessage.contains(SQLExecutorConstants.sessionResourceNotEnoughFlag)) {
142+
return true;
143+
} else if (isFallback4RunningTimeout()
144+
&& (errorMessage.contains(SQLExecutorConstants.sessionQueryTimeoutFlag) ||
145+
errorMessage.contains(SQLExecutorConstants.sessionTunnelTimeoutMessage) ||
146+
errorMessage.contains(
147+
SQLExecutorConstants.sessionTunnelGetSelectDescTimeoutMessage))) {
148+
return true;
149+
} else if (isFallback4UnknownError()
150+
&& errorMessage.contains(SQLExecutorConstants.sessionExceptionFlag)) {
151+
return true;
152+
} else {
153+
return false;
154+
}
155+
}
156+
157+
public interface UserDefinedFallbackPolicy {
158+
boolean shouldFallback(String errorCode, String errorMessage);
110159
}
111160
}

odps-sdk/odps-sdk-core/src/main/java/com/aliyun/odps/sqa/SQLExecutorImpl.java

Lines changed: 11 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -832,34 +832,15 @@ private void reattach(String errorMessage) throws OdpsException {
832832
}
833833
}
834834

835-
private ExecuteMode handleSessionException(String errorMessage) throws OdpsException {
836-
if (errorMessage.indexOf(SQLExecutorConstants.sessionReattachFlag) != -1) {
835+
private ExecuteMode handleSessionException(String errorCode, String errorMessage)
836+
throws OdpsException {
837+
if (errorMessage.contains(SQLExecutorConstants.sessionReattachFlag)) {
837838
reattach(errorMessage);
838839
return ExecuteMode.INTERACTIVE;
839-
} else if (errorMessage.indexOf(SQLExecutorConstants.sessionJobCancelledComplierFlag) != -1 ||
840-
errorMessage.indexOf(SQLExecutorConstants.sessionJobCancelledFlag) != -1) {
840+
} else if (errorMessage.contains(SQLExecutorConstants.sessionJobCancelledComplierFlag) ||
841+
errorMessage.contains(SQLExecutorConstants.sessionJobCancelledFlag)) {
841842
throw new OdpsException(errorMessage);
842-
} else if (fallbackPolicy.isFallback4UnsupportedFeature()
843-
&& errorMessage.indexOf(SQLExecutorConstants.sessionUnsupportedFeatureFlag) != -1) {
844-
return ExecuteMode.OFFLINE;
845-
} else if (fallbackPolicy.isFallback4Upgrading()
846-
&& errorMessage.indexOf(SQLExecutorConstants.sessionUnavailableFlag) != -1) {
847-
return ExecuteMode.OFFLINE;
848-
} else if (fallbackPolicy.isFallback4Upgrading()
849-
&& errorMessage.indexOf(SQLExecutorConstants.sessionAccessDenyFlag) != -1) {
850-
return ExecuteMode.OFFLINE;
851-
} else if (fallbackPolicy.isFallback4ResourceNotEnough()
852-
&& errorMessage.indexOf(SQLExecutorConstants.sessionResourceNotEnoughFlag) != -1) {
853-
return ExecuteMode.OFFLINE;
854-
} else if (fallbackPolicy.isFallback4RunningTimeout()
855-
&& (errorMessage.indexOf(SQLExecutorConstants.sessionQueryTimeoutFlag) != -1 ||
856-
errorMessage.indexOf(SQLExecutorConstants.sessionTunnelTimeoutMessage) != -1 ||
857-
errorMessage.indexOf(SQLExecutorConstants.sessionTunnelGetSelectDescTimeoutMessage) != -1)) {
858-
return ExecuteMode.OFFLINE;
859-
} else if (fallbackPolicy.isFallback4UnknownError()
860-
&& errorMessage.indexOf(SQLExecutorConstants.sessionExceptionFlag) != -1) {
861-
return ExecuteMode.OFFLINE;
862-
} else if (fallbackPolicy.isAlwaysFallBack()) {
843+
} else if (fallbackPolicy.shouldFallback(errorCode, errorMessage)) {
863844
return ExecuteMode.OFFLINE;
864845
} else {
865846
throw new OdpsException(errorMessage);
@@ -1065,7 +1046,7 @@ private List<Record> getSessionResult()
10651046
result = session.getRawSubQueryResult(queryInfo.getId());
10661047
}
10671048
} catch (OdpsException e) {
1068-
ExecuteMode executeMode = handleSessionException(e.getMessage());
1049+
ExecuteMode executeMode = handleSessionException(e.getErrorCode(), e.getMessage());
10691050
runQueryInternal(executeMode, e.getMessage(), true);
10701051
return getResultInternal(null, null, null, true);
10711052
}
@@ -1110,7 +1091,7 @@ private List<Record> getSessionResultByInstanceTunnel(Long offset, Long countLim
11101091
runQueryInternal(ExecuteMode.OFFLINE, retryInfo.errMsg, true);
11111092
return getResultInternal(offset, countLimit, sizeLimit, limitEnabled);
11121093
} else {
1113-
ExecuteMode executeMode = handleSessionException(retryInfo.errMsg);
1094+
ExecuteMode executeMode = handleSessionException(retryInfo.errCode, retryInfo.errMsg);
11141095
runQueryInternal(executeMode, retryInfo.errMsg, true);
11151096
return getResultInternal(offset, countLimit, sizeLimit, limitEnabled);
11161097
}
@@ -1178,7 +1159,7 @@ private ResultSet getSessionResultSet()
11781159
result = session.getRawSubQueryResult(queryInfo.getId());
11791160
}
11801161
} catch (OdpsException e) {
1181-
ExecuteMode executeMode = handleSessionException(e.getMessage());
1162+
ExecuteMode executeMode = handleSessionException(e.getErrorCode(), e.getMessage());
11821163
runQueryInternal(executeMode, e.getMessage(), true);
11831164
return getResultSetInternal(null, null, null, true);
11841165
}
@@ -1219,7 +1200,7 @@ private ResultSet getSessionResultSetByInstanceTunnel(Long offset, Long countLim
12191200
runQueryInternal(ExecuteMode.OFFLINE, retryInfo.errMsg, true);
12201201
return getResultSetInternal(offset, countLimit, sizeLimit, limitEnabled);
12211202
} else {
1222-
ExecuteMode executeMode = handleSessionException(retryInfo.errMsg);
1203+
ExecuteMode executeMode = handleSessionException(retryInfo.errCode, retryInfo.errMsg);
12231204
runQueryInternal(executeMode, retryInfo.errMsg, true);
12241205
return getResultSetInternal(offset, countLimit, sizeLimit, limitEnabled);
12251206
}
@@ -1308,7 +1289,7 @@ private void runInSessionWithRetry(String rerunMsg) throws OdpsException {
13081289
session.runSubQuery(queryInfo.getSql(), queryInfo.getHint());
13091290
if (subQueryInfo.status.equals(Session.SubQueryInfo.kOKCode)) {
13101291
if (subQueryInfo.queryId == -1) {
1311-
ExecuteMode executeMode = handleSessionException(subQueryInfo.result);
1292+
ExecuteMode executeMode = handleSessionException(subQueryInfo.result, subQueryInfo.result);
13121293
runQueryInternal(executeMode, subQueryInfo.result, true);
13131294
} else {
13141295
// submit success, do not generate logview now

0 commit comments

Comments
 (0)