Skip to content

Commit 10dd034

Browse files
FlechazoWHiLany
authored andcommitted
[hotfix-1772][rdb] 调整rdb errorLimit初始值,当出现脏数据,任务立即停止.
[hotfix-1772][rdb] 调整rdb errorLimit初始值,当出现脏数据,任务立即停止.
1 parent 5317336 commit 10dd034

File tree

4 files changed

+7
-33
lines changed

4 files changed

+7
-33
lines changed

core/src/main/java/com/dtstack/flink/sql/exception/ExceptionTrace.java

Lines changed: 0 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -20,21 +20,4 @@ public static String traceOriginalCause(Throwable e) {
2020
}
2121
return errorMsg;
2222
}
23-
24-
/**
25-
* 根据异常的种类来判断是否需要强制跳过Flink的重启{@link SuppressRestartsException}
26-
* @param e exception
27-
* @param errorMsg 需要抛出的异常信息
28-
*/
29-
public static void dealExceptionWithSuppressStart(Exception e, String errorMsg) {
30-
if (e instanceof SuppressRestartsException) {
31-
throw new SuppressRestartsException(
32-
new Throwable(
33-
errorMsg
34-
)
35-
);
36-
} else {
37-
throw new RuntimeException(errorMsg);
38-
}
39-
}
4023
}

rdb/rdb-sink/src/main/java/com/dtstack/flink/sql/sink/rdb/format/JDBCUpsertOutputFormat.java

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -210,16 +210,13 @@ private void checkConnectionOpen() {
210210
public synchronized void flush() {
211211
try {
212212
jdbcWriter.executeBatch(connection);
213-
batchCount = 0;
214-
} catch (Exception e) {
213+
} catch (SQLException e) {
215214
String errorMsg = String.format(
216215
"Writing records to JDBC failed. Cause: [%s]",
217216
ExceptionTrace.traceOriginalCause(e));
218-
219-
ExceptionTrace.dealExceptionWithSuppressStart(
220-
e,
221-
errorMsg
222-
);
217+
throw new RuntimeException(errorMsg);
218+
} finally {
219+
batchCount = 0;
223220
}
224221
}
225222

rdb/rdb-sink/src/main/java/com/dtstack/flink/sql/sink/rdb/table/RdbSinkParser.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ public AbstractTableInfo getTableInfo(String tableName, String fieldsInfo, Map<S
5151
rdbTableInfo.setAllReplace(MathUtil.getBoolean(props.get(RdbTableInfo.ALLREPLACE_KEY.toLowerCase()), false));
5252
rdbTableInfo.setDriverName(MathUtil.getString(props.get(RdbTableInfo.DRIVER_NAME)));
5353
rdbTableInfo.setFastCheck(MathUtil.getBoolean(props.getOrDefault(RdbTableInfo.FAST_CHECK.toLowerCase(), true)));
54-
rdbTableInfo.setErrorLimit(MathUtil.getLongVal(props.get(RdbTableInfo.ERROR_LIMIT.toLowerCase()), 1000L));
54+
rdbTableInfo.setErrorLimit(MathUtil.getLongVal(props.get(RdbTableInfo.ERROR_LIMIT.toLowerCase()), 0L));
5555
rdbTableInfo.setCheckProperties();
5656

5757
rdbTableInfo.check();

rdb/rdb-sink/src/main/java/com/dtstack/flink/sql/sink/rdb/writer/JDBCWriter.java

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -93,18 +93,12 @@ default void dealExecuteError(Connection connection,
9393
row.toString(),
9494
ExceptionTrace.traceOriginalCause(e)));
9595
}
96+
metricOutputFormat.outDirtyRecords.inc();
9697

9798
if (errorLimit > -1) {
9899
if (metricOutputFormat.outDirtyRecords.getCount() > errorLimit) {
99-
throw new SuppressRestartsException(
100-
new Throwable(
101-
String.format("dirty data Count: [%s]. Error cause: [%s]",
102-
metricOutputFormat.outDirtyRecords.getCount(),
103-
ExceptionTrace.traceOriginalCause(e)))
104-
);
100+
throw new SuppressRestartsException(e);
105101
}
106102
}
107-
108-
metricOutputFormat.outDirtyRecords.inc();
109103
}
110104
}

0 commit comments

Comments
 (0)