Skip to content

Commit 260ddcd

Browse files
author
gituser
committed
Merge branch 'hotfix_1.8_3.9.x_23442' into 1.8_release_3.9.x
2 parents 82e182d + 7c3f0a2 commit 260ddcd

File tree

8 files changed

+111
-21
lines changed

8 files changed

+111
-21
lines changed

core/src/main/java/com/dtstack/flink/sql/table/AbsTableParser.java

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,9 +42,11 @@ public abstract class AbsTableParser {
4242

4343
private static final String PRIMARY_KEY = "primaryKey";
4444
private static final String NEST_JSON_FIELD_KEY = "nestFieldKey";
45+
private static final String CHAR_TYPE_NO_LENGTH = "CHAR";
4546

4647
private static Pattern primaryKeyPattern = Pattern.compile("(?i)PRIMARY\\s+KEY\\s*\\((.*)\\)");
4748
private static Pattern nestJsonFieldKeyPattern = Pattern.compile("(?i)((@*\\S+\\.)*\\S+)\\s+(\\w+)\\s+AS\\s+(\\w+)(\\s+NOT\\s+NULL)?$");
49+
private static Pattern charTypePattern = Pattern.compile("(?i)CHAR\\((\\d*)\\)$");
4850

4951
private Map<String, Pattern> patternMap = Maps.newHashMap();
5052

@@ -105,13 +107,25 @@ public void parseFieldsInfo(String fieldsInfo, TableInfo tableInfo){
105107
System.arraycopy(filedInfoArr, 0, filedNameArr, 0, filedInfoArr.length - 1);
106108
String fieldName = String.join(" ", filedNameArr);
107109
String fieldType = filedInfoArr[filedInfoArr.length - 1 ].trim();
108-
Class fieldClass = dbTypeConvertToJavaType(fieldType);
110+
111+
112+
Class fieldClass = null;
113+
TableInfo.FieldExtraInfo fieldExtraInfo = null;
114+
115+
Matcher matcher = charTypePattern.matcher(fieldType);
116+
if (matcher.find()) {
117+
fieldClass = dbTypeConvertToJavaType(CHAR_TYPE_NO_LENGTH);
118+
fieldExtraInfo = new TableInfo.FieldExtraInfo();
119+
fieldExtraInfo.setLength(Integer.valueOf(matcher.group(1)));
120+
} else {
121+
fieldClass = dbTypeConvertToJavaType(fieldType);
122+
}
109123

110124
tableInfo.addPhysicalMappings(filedInfoArr[0],filedInfoArr[0]);
111125
tableInfo.addField(fieldName);
112126
tableInfo.addFieldClass(fieldClass);
113127
tableInfo.addFieldType(fieldType);
114-
tableInfo.addFieldExtraInfo(null);
128+
tableInfo.addFieldExtraInfo(fieldExtraInfo);
115129
}
116130

117131
tableInfo.finish();

core/src/main/java/com/dtstack/flink/sql/table/TableInfo.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -194,6 +194,18 @@ public static class FieldExtraInfo implements Serializable {
194194
* default false:allow field is null
195195
*/
196196
boolean notNull = false;
197+
/**
198+
* field length,eg.char(4)
199+
*/
200+
int length;
201+
202+
public int getLength() {
203+
return length;
204+
}
205+
206+
public void setLength(int length) {
207+
this.length = length;
208+
}
197209

198210
public boolean getNotNull() {
199211
return notNull;
@@ -202,5 +214,13 @@ public boolean getNotNull() {
202214
public void setNotNull(boolean notNull) {
203215
this.notNull = notNull;
204216
}
217+
218+
@Override
219+
public String toString() {
220+
return "FieldExtraInfo{" +
221+
"notNull=" + notNull +
222+
", length=" + length +
223+
'}';
224+
}
205225
}
206226
}

oracle/oracle-side/oracle-async-side/src/main/java/com/dtstack/flink/sql/side/oracle/OracleAsyncSideInfo.java

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,13 +23,10 @@
2323
import com.dtstack.flink.sql.side.SideTableInfo;
2424
import com.dtstack.flink.sql.side.rdb.async.RdbAsyncSideInfo;
2525
import com.dtstack.flink.sql.side.rdb.table.RdbSideTableInfo;
26+
import com.dtstack.flink.sql.table.TableInfo;
2627
import com.dtstack.flink.sql.util.DtStringUtil;
27-
import com.dtstack.flink.sql.util.ParseUtils;
28-
import org.apache.calcite.sql.SqlNode;
28+
import org.apache.commons.lang3.StringUtils;
2929
import org.apache.flink.api.java.typeutils.RowTypeInfo;
30-
import com.google.common.collect.Lists;
31-
32-
import java.util.Arrays;
3330
import java.util.List;
3431

3532

@@ -49,4 +46,21 @@ public String quoteIdentifier(String identifier) {
4946
return "\"" + identifier + "\"";
5047
}
5148

49+
@Override
50+
public String wrapperPlaceholder(String fieldName) {
51+
int pos = sideTableInfo.getFieldList().indexOf(fieldName);
52+
String type = sideTableInfo.getFieldTypeList().get(pos);
53+
54+
String sqlDefaultPlaceholder = " ? ";
55+
String rpadFormat = "rpad(?, %d, ' ')";
56+
57+
if (StringUtils.contains(type.toLowerCase(), "char")) {
58+
TableInfo.FieldExtraInfo fieldExtraInfo = sideTableInfo.getFieldExtraInfoList().get(pos);
59+
int charLength = fieldExtraInfo == null ? 0 : fieldExtraInfo.getLength();
60+
if (charLength > 0) {
61+
return String.format(rpadFormat, charLength);
62+
}
63+
}
64+
return sqlDefaultPlaceholder;
65+
}
5266
}

oracle/oracle-sink/src/main/java/com/dtstack/flink/sql/sink/oracle/OracleSink.java

Lines changed: 32 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,14 +21,17 @@
2121
import com.dtstack.flink.sql.sink.rdb.RdbSink;
2222
import com.dtstack.flink.sql.sink.rdb.format.ExtendOutputFormat;
2323
import com.dtstack.flink.sql.sink.rdb.format.RetractJDBCOutputFormat;
24+
import com.dtstack.flink.sql.table.TableInfo;
2425
import com.dtstack.flink.sql.util.DtStringUtil;
2526
import org.apache.commons.lang3.StringUtils;
2627
import com.google.common.collect.Lists;
2728

2829
import java.util.ArrayList;
30+
import java.util.Arrays;
2931
import java.util.Iterator;
3032
import java.util.List;
3133
import java.util.Map;
34+
import java.util.stream.Collectors;
3235

3336
/**
3437
* Reason:
@@ -40,6 +43,10 @@
4043
public class OracleSink extends RdbSink implements IStreamSinkGener<RdbSink> {
4144
private static final String ORACLE_DRIVER = "oracle.jdbc.driver.OracleDriver";
4245

46+
private final String SQL_DEFAULT_PLACEHOLDER = " ? ";
47+
private final String DEAL_CHAR_KEY = "char";
48+
private String RPAD_FORMAT = " rpad(?, %d, ' ') ";
49+
4350
@Override
4451
public String getDriverName() {
4552
return ORACLE_DRIVER;
@@ -193,16 +200,35 @@ public String updateKeySql(Map<String, List<String>> updateKey) {
193200
*/
194201
public String makeValues(List<String> column) {
195202
StringBuilder sb = new StringBuilder("SELECT ");
196-
for (int i = 0; i < column.size(); ++i) {
197-
if (i != 0) {
198-
sb.append(",");
203+
String collect = column.stream()
204+
.map(col -> wrapperPlaceholder(col) + DtStringUtil.addQuoteForStr(col))
205+
.collect(Collectors.joining(", "));
206+
207+
sb.append(collect).append(" FROM DUAL");
208+
return sb.toString();
209+
}
210+
211+
/**
212+
* char type is wrapped with rpad
213+
* @param fieldName
214+
* @return
215+
*/
216+
public String wrapperPlaceholder(String fieldName) {
217+
int pos = rdbTableInfo.getFieldList().indexOf(fieldName);
218+
String type = rdbTableInfo.getFieldTypeList().get(pos);
219+
220+
if (StringUtils.contains(type.toLowerCase(), DEAL_CHAR_KEY)) {
221+
TableInfo.FieldExtraInfo fieldExtraInfo = rdbTableInfo.getFieldExtraInfoList().get(pos);
222+
int charLength = fieldExtraInfo == null ? 0 : fieldExtraInfo.getLength();
223+
if (charLength > 0) {
224+
return String.format(RPAD_FORMAT, charLength);
199225
}
200-
sb.append("? " + DtStringUtil.addQuoteForStr(column.get(i)));
201226
}
202-
sb.append(" FROM DUAL");
203-
return sb.toString();
227+
return SQL_DEFAULT_PLACEHOLDER;
204228
}
205229

230+
231+
206232
public boolean containsIgnoreCase(List<String> l, String s) {
207233
Iterator<String> it = l.iterator();
208234
while (it.hasNext()) {

rdb/rdb-side/src/main/java/com/dtstack/flink/sql/side/rdb/async/RdbAsyncSideInfo.java

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -136,14 +136,26 @@ public String getAdditionalWhereClause() {
136136

137137
public String getSelectFromStatement(String tableName, List<String> selectFields, List<String> conditionFields, List<String> sqlJoinCompareOperate,
138138
List<PredicateInfo> predicateInfoes) {
139-
String fromClause = selectFields.stream().map(this::quoteIdentifier).collect(Collectors.joining(", "));
140-
String whereClause = conditionFields.stream().map(f -> quoteIdentifier(f) + sqlJoinCompareOperate.get(conditionFields.indexOf(f)) + " ? ")
139+
String fromClause = selectFields.stream()
140+
.map(this::quoteIdentifier)
141+
.collect(Collectors.joining(", "));
142+
143+
String whereClause = conditionFields.stream()
144+
.map(f -> quoteIdentifier(f) + sqlJoinCompareOperate.get(conditionFields.indexOf(f)) + wrapperPlaceholder(f))
145+
.collect(Collectors.joining(" AND "));
146+
147+
String predicateClause = predicateInfoes.stream()
148+
.map(this::buildFilterCondition)
141149
.collect(Collectors.joining(" AND "));
142-
String predicateClause = predicateInfoes.stream().map(this::buildFilterCondition).collect(Collectors.joining(" AND "));
143150

144-
String sql = "SELECT " + fromClause + " FROM " + tableName + (conditionFields.size() > 0 ? " WHERE " + whereClause : "")
151+
String dimQuerySql = "SELECT " + fromClause + " FROM " + tableName + (conditionFields.size() > 0 ? " WHERE " + whereClause : "")
145152
+ (predicateInfoes.size() > 0 ? " AND " + predicateClause : "") + getAdditionalWhereClause();
146-
return sql;
153+
154+
return dimQuerySql;
155+
}
156+
157+
public String wrapperPlaceholder(String fieldName) {
158+
return " ? ";
147159
}
148160

149161
public String buildFilterCondition(PredicateInfo info) {

rdb/rdb-side/src/main/java/com/dtstack/flink/sql/side/rdb/table/RdbSideTableInfo.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,8 @@ public boolean check() {
4747
Preconditions.checkNotNull(tableName, "rdb of tableName is required");
4848
Preconditions.checkNotNull(userName, "rdb of userName is required");
4949
Preconditions.checkNotNull(password, "rdb of password is required");
50+
Preconditions.checkArgument(getFieldList().size() == getFieldExtraInfoList().size(),
51+
"fields and fieldExtraInfoList attributes must be the same length");
5052
return true;
5153
}
5254

rdb/rdb-sink/src/main/java/com/dtstack/flink/sql/sink/rdb/RdbSink.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,8 @@ public abstract class RdbSink implements RetractStreamTableSink<Row>, Serializab
8383

8484
private String schema;
8585

86+
protected RdbTableInfo rdbTableInfo;
87+
8688
public RichSinkFunction createJdbcSinkFunc() {
8789
if (driverName == null || dbURL == null || userName == null
8890
|| password == null || sqlTypes == null || tableName == null) {
@@ -111,8 +113,7 @@ public RichSinkFunction createJdbcSinkFunc() {
111113

112114
@Override
113115
public RdbSink genStreamSink(TargetTableInfo targetTableInfo) {
114-
RdbTableInfo rdbTableInfo = (RdbTableInfo) targetTableInfo;
115-
116+
this.rdbTableInfo = (RdbTableInfo) targetTableInfo;
116117
String tmpDbURL = rdbTableInfo.getUrl();
117118
String tmpUserName = rdbTableInfo.getUserName();
118119
String tmpPassword = rdbTableInfo.getPassword();
@@ -263,7 +264,6 @@ public void setDbType(String dbType) {
263264
/**
264265
* sqlserver and oracle maybe implement
265266
*
266-
* @param tableName
267267
* @param fieldNames
268268
* @param realIndexes
269269
* @return

rdb/rdb-sink/src/main/java/com/dtstack/flink/sql/sink/rdb/table/RdbTableInfo.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,8 @@ public boolean check() {
144144
Preconditions.checkNotNull(tableName, "rdb field of tableName is required");
145145
Preconditions.checkNotNull(userName, "rdb field of userName is required");
146146
Preconditions.checkNotNull(password, "rdb field of password is required");
147+
Preconditions.checkArgument(getFieldList().size() == getFieldExtraInfoList().size(),
148+
"fields and fieldExtraInfoList attributes must be the same length");
147149
return true;
148150
}
149151

0 commit comments

Comments
 (0)