Skip to content

Commit 18d5a82

Browse files
committed
feat(fallback-policy): 添加始终回退策略及自定义回退策略支持
1 parent 3cd5b9e commit 18d5a82

File tree

2 files changed

+70
-40
lines changed

2 files changed

+70
-40
lines changed

odps-sdk/odps-sdk-core/src/main/java/com/aliyun/odps/sqa/FallbackPolicy.java

Lines changed: 59 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
* Created by dongxiao on 2020/4/21.
55
*/
66
public class FallbackPolicy {
7+
78
// fallback when resource not enough exception happened
89
private boolean fallback4ResourceNotEnough = true;
910
// fallback when unsupported feature
@@ -17,17 +18,18 @@ public class FallbackPolicy {
1718
// fallback when attach session failed
1819
private boolean fallback4AttachError = true;
1920

21+
// always fallback
22+
private boolean alwaysFallback = false;
23+
24+
// user defined fallback policy
25+
private UserDefinedFallbackPolicy userDefinedFallbackPolicy;
26+
2027
FallbackPolicy() {
2128
}
2229

2330
public static FallbackPolicy alwaysFallbackPolicy() {
2431
FallbackPolicy policy = new FallbackPolicy();
25-
return policy.fallback4ResourceNotEnough(true)
26-
.fallback4UnsupportedFeature(true)
27-
.fallback4RunningTimeout(true)
28-
.fallback4Upgrading(true)
29-
.fallback4UnknownError(true)
30-
.fallback4AttachError(true);
32+
return policy.alwaysFallback(true);
3133
}
3234

3335
public static FallbackPolicy alwaysFallbackExceptAttachPolicy() {
@@ -37,7 +39,8 @@ public static FallbackPolicy alwaysFallbackExceptAttachPolicy() {
3739
.fallback4RunningTimeout(true)
3840
.fallback4Upgrading(true)
3941
.fallback4UnknownError(true)
40-
.fallback4AttachError(false);
42+
.fallback4AttachError(false)
43+
.alwaysFallback(false);
4144
}
4245

4346
public static FallbackPolicy nonFallbackPolicy() {
@@ -47,7 +50,8 @@ public static FallbackPolicy nonFallbackPolicy() {
4750
.fallback4RunningTimeout(false)
4851
.fallback4Upgrading(false)
4952
.fallback4UnknownError(false)
50-
.fallback4AttachError(false);
53+
.fallback4AttachError(false)
54+
.alwaysFallback(false);
5155
}
5256

5357
public FallbackPolicy fallback4ResourceNotEnough(boolean enable) {
@@ -80,6 +84,16 @@ public FallbackPolicy fallback4AttachError(boolean enable) {
8084
return this;
8185
}
8286

87+
public FallbackPolicy alwaysFallback(boolean enable) {
88+
alwaysFallback = enable;
89+
return this;
90+
}
91+
92+
public FallbackPolicy addUserDefinedFallbackPolicy(UserDefinedFallbackPolicy policy) {
93+
userDefinedFallbackPolicy = policy;
94+
return this;
95+
}
96+
8397
public boolean isFallback4ResourceNotEnough() {
8498
return fallback4ResourceNotEnough;
8599
}
@@ -105,7 +119,42 @@ public boolean isFallback4AttachError() {
105119
}
106120

107121
public boolean isAlwaysFallBack() {
108-
return fallback4ResourceNotEnough && fallback4UnsupportedFeature && fallback4RunningTimeout
109-
&& fallback4Upgrading && fallback4UnknownError && fallback4AttachError;
122+
return alwaysFallback;
123+
}
124+
125+
public boolean shouldFallback(String errorCode, String errorMessage) {
126+
if (alwaysFallback) {
127+
return true;
128+
} else if (userDefinedFallbackPolicy != null
129+
&& userDefinedFallbackPolicy.shouldFallback(errorCode, errorMessage)) {
130+
return true;
131+
} else if (isFallback4UnsupportedFeature()
132+
&& errorMessage.contains(SQLExecutorConstants.sessionUnsupportedFeatureFlag)) {
133+
return true;
134+
} else if (isFallback4Upgrading()
135+
&& errorMessage.contains(SQLExecutorConstants.sessionUnavailableFlag)) {
136+
return true;
137+
} else if (isFallback4Upgrading()
138+
&& errorMessage.contains(SQLExecutorConstants.sessionAccessDenyFlag)) {
139+
return true;
140+
} else if (isFallback4ResourceNotEnough()
141+
&& errorMessage.contains(SQLExecutorConstants.sessionResourceNotEnoughFlag)) {
142+
return true;
143+
} else if (isFallback4RunningTimeout()
144+
&& (errorMessage.contains(SQLExecutorConstants.sessionQueryTimeoutFlag) ||
145+
errorMessage.contains(SQLExecutorConstants.sessionTunnelTimeoutMessage) ||
146+
errorMessage.contains(
147+
SQLExecutorConstants.sessionTunnelGetSelectDescTimeoutMessage))) {
148+
return true;
149+
} else if (isFallback4UnknownError()
150+
&& errorMessage.contains(SQLExecutorConstants.sessionExceptionFlag)) {
151+
return true;
152+
} else {
153+
return false;
154+
}
155+
}
156+
157+
public interface UserDefinedFallbackPolicy {
158+
boolean shouldFallback(String errorCode, String errorMessage);
110159
}
111160
}

odps-sdk/odps-sdk-core/src/main/java/com/aliyun/odps/sqa/SQLExecutorImpl.java

Lines changed: 11 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -830,34 +830,15 @@ private void reattach(String errorMessage) throws OdpsException {
830830
}
831831
}
832832

833-
private ExecuteMode handleSessionException(String errorMessage) throws OdpsException {
834-
if (errorMessage.indexOf(SQLExecutorConstants.sessionReattachFlag) != -1) {
833+
private ExecuteMode handleSessionException(String errorCode, String errorMessage)
834+
throws OdpsException {
835+
if (errorMessage.contains(SQLExecutorConstants.sessionReattachFlag)) {
835836
reattach(errorMessage);
836837
return ExecuteMode.INTERACTIVE;
837-
} else if (errorMessage.indexOf(SQLExecutorConstants.sessionJobCancelledComplierFlag) != -1 ||
838-
errorMessage.indexOf(SQLExecutorConstants.sessionJobCancelledFlag) != -1) {
838+
} else if (errorMessage.contains(SQLExecutorConstants.sessionJobCancelledComplierFlag) ||
839+
errorMessage.contains(SQLExecutorConstants.sessionJobCancelledFlag)) {
839840
throw new OdpsException(errorMessage);
840-
} else if (fallbackPolicy.isFallback4UnsupportedFeature()
841-
&& errorMessage.indexOf(SQLExecutorConstants.sessionUnsupportedFeatureFlag) != -1) {
842-
return ExecuteMode.OFFLINE;
843-
} else if (fallbackPolicy.isFallback4Upgrading()
844-
&& errorMessage.indexOf(SQLExecutorConstants.sessionUnavailableFlag) != -1) {
845-
return ExecuteMode.OFFLINE;
846-
} else if (fallbackPolicy.isFallback4Upgrading()
847-
&& errorMessage.indexOf(SQLExecutorConstants.sessionAccessDenyFlag) != -1) {
848-
return ExecuteMode.OFFLINE;
849-
} else if (fallbackPolicy.isFallback4ResourceNotEnough()
850-
&& errorMessage.indexOf(SQLExecutorConstants.sessionResourceNotEnoughFlag) != -1) {
851-
return ExecuteMode.OFFLINE;
852-
} else if (fallbackPolicy.isFallback4RunningTimeout()
853-
&& (errorMessage.indexOf(SQLExecutorConstants.sessionQueryTimeoutFlag) != -1 ||
854-
errorMessage.indexOf(SQLExecutorConstants.sessionTunnelTimeoutMessage) != -1 ||
855-
errorMessage.indexOf(SQLExecutorConstants.sessionTunnelGetSelectDescTimeoutMessage) != -1)) {
856-
return ExecuteMode.OFFLINE;
857-
} else if (fallbackPolicy.isFallback4UnknownError()
858-
&& errorMessage.indexOf(SQLExecutorConstants.sessionExceptionFlag) != -1) {
859-
return ExecuteMode.OFFLINE;
860-
} else if (fallbackPolicy.isAlwaysFallBack()) {
841+
} else if (fallbackPolicy.shouldFallback(errorCode, errorMessage)) {
861842
return ExecuteMode.OFFLINE;
862843
} else {
863844
throw new OdpsException(errorMessage);
@@ -1063,7 +1044,7 @@ private List<Record> getSessionResult()
10631044
result = session.getRawSubQueryResult(queryInfo.getId());
10641045
}
10651046
} catch (OdpsException e) {
1066-
ExecuteMode executeMode = handleSessionException(e.getMessage());
1047+
ExecuteMode executeMode = handleSessionException(e.getErrorCode(), e.getMessage());
10671048
runQueryInternal(executeMode, e.getMessage(), true);
10681049
return getResultInternal(null, null, null, true);
10691050
}
@@ -1108,7 +1089,7 @@ private List<Record> getSessionResultByInstanceTunnel(Long offset, Long countLim
11081089
runQueryInternal(ExecuteMode.OFFLINE, retryInfo.errMsg, true);
11091090
return getResultInternal(offset, countLimit, sizeLimit, limitEnabled);
11101091
} else {
1111-
ExecuteMode executeMode = handleSessionException(retryInfo.errMsg);
1092+
ExecuteMode executeMode = handleSessionException(retryInfo.errCode, retryInfo.errMsg);
11121093
runQueryInternal(executeMode, retryInfo.errMsg, true);
11131094
return getResultInternal(offset, countLimit, sizeLimit, limitEnabled);
11141095
}
@@ -1176,7 +1157,7 @@ private ResultSet getSessionResultSet()
11761157
result = session.getRawSubQueryResult(queryInfo.getId());
11771158
}
11781159
} catch (OdpsException e) {
1179-
ExecuteMode executeMode = handleSessionException(e.getMessage());
1160+
ExecuteMode executeMode = handleSessionException(e.getErrorCode(), e.getMessage());
11801161
runQueryInternal(executeMode, e.getMessage(), true);
11811162
return getResultSetInternal(null, null, null, true);
11821163
}
@@ -1217,7 +1198,7 @@ private ResultSet getSessionResultSetByInstanceTunnel(Long offset, Long countLim
12171198
runQueryInternal(ExecuteMode.OFFLINE, retryInfo.errMsg, true);
12181199
return getResultSetInternal(offset, countLimit, sizeLimit, limitEnabled);
12191200
} else {
1220-
ExecuteMode executeMode = handleSessionException(retryInfo.errMsg);
1201+
ExecuteMode executeMode = handleSessionException(retryInfo.errCode, retryInfo.errMsg);
12211202
runQueryInternal(executeMode, retryInfo.errMsg, true);
12221203
return getResultSetInternal(offset, countLimit, sizeLimit, limitEnabled);
12231204
}
@@ -1306,7 +1287,7 @@ private void runInSessionWithRetry(String rerunMsg) throws OdpsException {
13061287
session.runSubQuery(queryInfo.getSql(), queryInfo.getHint());
13071288
if (subQueryInfo.status.equals(Session.SubQueryInfo.kOKCode)) {
13081289
if (subQueryInfo.queryId == -1) {
1309-
ExecuteMode executeMode = handleSessionException(subQueryInfo.result);
1290+
ExecuteMode executeMode = handleSessionException(subQueryInfo.result, subQueryInfo.result);
13101291
runQueryInternal(executeMode, subQueryInfo.result, true);
13111292
} else {
13121293
// submit success, do not generate logview now

0 commit comments

Comments
 (0)