Skip to content

Commit ad41aef

Browse files
committed
Merge branch 'feat_upsertMode_mergedDev' into 'v1.8.0_dev'
结果表upsert See merge request !208
2 parents 49d88c9 + 32281cc commit ad41aef

File tree

59 files changed

+2633
-1752
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

59 files changed

+2633
-1752
lines changed

cassandra/cassandra-sink/src/main/java/com/dtstack/flink/sql/sink/cassandra/CassandraOutputFormat.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@
6969
* @see Tuple
7070
* @see DriverManager
7171
*/
72-
public class CassandraOutputFormat extends DtRichOutputFormat {
72+
public class CassandraOutputFormat extends DtRichOutputFormat<Tuple2> {
7373
private static final long serialVersionUID = -7994311331389155692L;
7474

7575
private static final Logger LOG = LoggerFactory.getLogger(CassandraOutputFormat.class);
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package com.dtstack.flink.sql.sink.clickhouse;
20+
21+
import com.dtstack.flink.sql.sink.rdb.dialect.JDBCDialect;
22+
23+
import java.util.Optional;
24+
25+
/**
26+
* Date: 2020/1/15
27+
* Company: www.dtstack.com
28+
* @author maqi
29+
*/
30+
public class ClickhouseDialect implements JDBCDialect {
31+
32+
@Override
33+
public boolean canHandle(String url) {
34+
return url.startsWith("jdbc:clickhouse:");
35+
}
36+
37+
@Override
38+
public Optional<String> defaultDriverName() {
39+
return Optional.of("ru.yandex.clickhouse.ClickHouseDriver");
40+
}
41+
42+
@Override
43+
public String getUpdateStatement(String tableName, String[] fieldNames, String[] conditionFields) {
44+
throw new RuntimeException("Clickhouse does not support update sql, please remove primary key or use append mode");
45+
}
46+
}

clickhouse/clickhouse-sink/src/main/java/com/dtstack/flink/sql/sink/clickhouse/ClickhouseSink.java

Lines changed: 22 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -21,58 +21,39 @@
2121

2222

2323
import com.dtstack.flink.sql.sink.IStreamSinkGener;
24+
import com.dtstack.flink.sql.sink.rdb.JDBCOptions;
2425
import com.dtstack.flink.sql.sink.rdb.RdbSink;
25-
import com.dtstack.flink.sql.sink.rdb.format.RetractJDBCOutputFormat;
26+
import com.dtstack.flink.sql.sink.rdb.format.JDBCUpsertOutputFormat;
2627

2728
import java.util.List;
2829
import java.util.Map;
2930

3031

3132
public class ClickhouseSink extends RdbSink implements IStreamSinkGener<RdbSink> {
32-
33-
private static final String CLICKHOUSE_DRIVER = "ru.yandex.clickhouse.ClickHouseDriver";
34-
3533
public ClickhouseSink() {
34+
super(new ClickhouseDialect());
3635
}
3736

3837
@Override
39-
public RetractJDBCOutputFormat getOutputFormat() {
40-
return new RetractJDBCOutputFormat();
41-
}
42-
43-
@Override
44-
public void buildSql(String scheam, String tableName, List<String> fields) {
45-
buildInsertSql(tableName, fields);
46-
}
47-
48-
@Override
49-
public String buildUpdateSql(String schema, String tableName, List<String> fieldNames, Map<String, List<String>> realIndexes, List<String> fullField) {
50-
return null;
51-
}
52-
53-
private void buildInsertSql(String tableName, List<String> fields) {
54-
String sqlTmp = "insert into " + tableName + " (${fields}) values (${placeholder})";
55-
String fieldsStr = "";
56-
String placeholder = "";
57-
58-
for (String fieldName : fields) {
59-
fieldsStr += ",`" + fieldName + "`";
60-
placeholder += ",?";
61-
}
62-
63-
fieldsStr = fieldsStr.replaceFirst(",", "");
64-
placeholder = placeholder.replaceFirst(",", "");
65-
66-
sqlTmp = sqlTmp.replace("${fields}", fieldsStr).replace("${placeholder}", placeholder);
67-
this.sql = sqlTmp;
68-
System.out.println("---insert sql----");
69-
System.out.println(sql);
70-
}
71-
72-
73-
@Override
74-
public String getDriverName() {
75-
return CLICKHOUSE_DRIVER;
38+
public JDBCUpsertOutputFormat getOutputFormat() {
39+
JDBCOptions jdbcOptions = JDBCOptions.builder()
40+
.setDBUrl(dbURL)
41+
.setDialect(jdbcDialect)
42+
.setUsername(userName)
43+
.setPassword(password)
44+
.setTableName(tableName)
45+
.build();
46+
47+
return JDBCUpsertOutputFormat.builder()
48+
.setOptions(jdbcOptions)
49+
.setFieldNames(fieldNames)
50+
.setFlushMaxSize(batchNum)
51+
.setFlushIntervalMills(batchWaitInterval)
52+
.setFieldTypes(sqlTypes)
53+
.setKeyFields(primaryKeys)
54+
.setAllReplace(allReplace)
55+
.setUpdateMode(updateMode)
56+
.build();
7657
}
7758

7859

console/console-sink/src/main/java/com/dtstack/flink/sql/sink/console/ConsoleOutputFormat.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@
3737
*
3838
* @author xuqianjin
3939
*/
40-
public class ConsoleOutputFormat extends DtRichOutputFormat {
40+
public class ConsoleOutputFormat extends DtRichOutputFormat<Tuple2> {
4141

4242
private static final Logger LOG = LoggerFactory.getLogger(ConsoleOutputFormat.class);
4343

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package com.dtstack.flink.sql.enums;
20+
21+
/**
22+
* restract stream数据处理模式
23+
*
24+
* Reason:
25+
* Date: 2019/1/2
26+
* Company: www.dtstack.com
27+
* @author maqi
28+
*/
29+
public enum EUpdateMode {
30+
// 不回撤数据,只下发增量数据
31+
APPEND(0),
32+
// 先删除回撤数据,然后更新
33+
UPSERT(1);
34+
35+
private int type;
36+
37+
EUpdateMode(int type) {
38+
this.type = type;
39+
}
40+
41+
public int getType() {
42+
return this.type;
43+
}
44+
}

core/src/main/java/com/dtstack/flink/sql/outputformat/DtRichOutputFormat.java

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,6 @@
1818
package com.dtstack.flink.sql.outputformat;
1919

2020
import com.dtstack.flink.sql.metric.MetricConstant;
21-
import org.apache.flink.api.java.tuple.Tuple2;
22-
2321
import org.apache.flink.api.common.io.RichOutputFormat;
2422
import org.apache.flink.metrics.Counter;
2523
import org.apache.flink.metrics.Meter;
@@ -29,16 +27,13 @@
2927
* extend RichOutputFormat with metric 'dtNumRecordsOut', 'dtNumDirtyRecordsOut', 'dtNumRecordsOutRate'
3028
* Created by sishu.yss on 2018/11/28.
3129
*/
32-
public abstract class DtRichOutputFormat extends RichOutputFormat<Tuple2>{
33-
34-
protected transient Counter outRecords;
30+
public abstract class DtRichOutputFormat<T> extends RichOutputFormat<T>{
3531

36-
protected transient Counter outDirtyRecords;
37-
38-
protected transient Meter outRecordsRate;
32+
public transient Counter outRecords;
33+
public transient Counter outDirtyRecords;
34+
public transient Meter outRecordsRate;
3935

4036
protected static int ROW_PRINT_FREQUENCY = 1000;
41-
4237
protected static int DIRTY_PRINT_FREQUENCY = 1000;
4338

4439
public void initMetric() {
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package com.dtstack.flink.sql.sink.db;
20+
21+
import com.dtstack.flink.sql.sink.rdb.dialect.JDBCDialect;
22+
23+
import java.util.Optional;
24+
25+
/**
26+
* Date: 2020/1/19
27+
* Company: www.dtstack.com
28+
* @author maqi
29+
*/
30+
public class DbDialect implements JDBCDialect {
31+
@Override
32+
public boolean canHandle(String url) {
33+
return url.startsWith("jdbc:db2:");
34+
}
35+
36+
@Override
37+
public Optional<String> defaultDriverName() {
38+
return Optional.of("com.ibm.db2.jcc.DB2Driver");
39+
}
40+
41+
@Override
42+
public String quoteIdentifier(String identifier) {
43+
return identifier;
44+
}
45+
46+
}
Lines changed: 21 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -1,52 +1,35 @@
11
package com.dtstack.flink.sql.sink.db;
22

3+
import com.dtstack.flink.sql.sink.rdb.JDBCOptions;
34
import com.dtstack.flink.sql.sink.rdb.RdbSink;
4-
import com.dtstack.flink.sql.sink.rdb.format.RetractJDBCOutputFormat;
5+
import com.dtstack.flink.sql.sink.rdb.format.JDBCUpsertOutputFormat;
56

67
import java.util.List;
78
import java.util.Map;
89

910
public class DbSink extends RdbSink {
1011

11-
private static final String DB2_DRIVER = "com.ibm.db2.jcc.DB2Driver";
12-
1312
public DbSink() {
13+
super(new DbDialect());
1414
}
15-
16-
@Override
17-
public void buildSql(String schema, String tableName, List<String> fields) {
18-
buildInsertSql(tableName, fields);
19-
}
20-
21-
private void buildInsertSql(String tableName, List<String> fields) {
22-
String sqlTmp = "insert into " + tableName + " (${fields}) values (${placeholder})";
23-
String fieldsStr = "";
24-
String placeholder = "";
25-
26-
for (String fieldName : fields) {
27-
fieldsStr += "," + fieldName;
28-
placeholder += ",?";
29-
}
30-
31-
fieldsStr = fieldsStr.replaceFirst(",", "");
32-
placeholder = placeholder.replaceFirst(",", "");
33-
34-
sqlTmp = sqlTmp.replace("${fields}", fieldsStr).replace("${placeholder}", placeholder);
35-
this.sql = sqlTmp;
36-
}
37-
38-
@Override
39-
public String buildUpdateSql(String schema, String tableName, List<String> fieldNames, Map<String, List<String>> realIndexes, List<String> fullField) {
40-
return null;
41-
}
42-
43-
@Override
44-
public String getDriverName() {
45-
return DB2_DRIVER;
46-
}
47-
4815
@Override
49-
public RetractJDBCOutputFormat getOutputFormat() {
50-
return new RetractJDBCOutputFormat();
16+
public JDBCUpsertOutputFormat getOutputFormat() {
17+
JDBCOptions jdbcOptions = JDBCOptions.builder()
18+
.setDBUrl(dbURL)
19+
.setDialect(jdbcDialect)
20+
.setUsername(userName)
21+
.setPassword(password)
22+
.setTableName(tableName)
23+
.build();
24+
25+
return JDBCUpsertOutputFormat.builder()
26+
.setOptions(jdbcOptions)
27+
.setFieldNames(fieldNames)
28+
.setFlushMaxSize(batchNum)
29+
.setFlushIntervalMills(batchWaitInterval)
30+
.setFieldTypes(sqlTypes)
31+
.setKeyFields(primaryKeys)
32+
.setAllReplace(allReplace)
33+
.setUpdateMode(updateMode).build();
5134
}
5235
}

docs/postgresqlSink.md

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ CREATE TABLE tableName(
1616
```
1717

1818
## 2.支持版本
19-
postgresql-8.2+
19+
postgresql-9.5+
2020

2121
## 3.表结构定义
2222

@@ -36,8 +36,6 @@ CREATE TABLE tableName(
3636
| password | postgresql连接密码|||
3737
| tableName | postgresqll表名称|||
3838
| parallelism | 并行度设置||1|
39-
| isUpsert | 使用upsert模式插入数据(版本9.5之后才支持upsert) |否|false
40-
| keyField | 设置更新主键字段名(isupsert为true时为必填项)||
4139

4240
## 5.样例:
4341
```

0 commit comments

Comments
 (0)