Skip to content

Commit aa4bc63

Browse files
committed
Merge branch 'revert-ecbb3949' into '1.8_release_3.10.x'
Revert "Merge branch 'feat_1.8release3.10.x_mergeTest' into '1.8_release_3.10.x'" See merge request dt-insight-engine/flinkStreamSQL!19
2 parents ecbb394 + 5f30a74 commit aa4bc63

File tree

258 files changed

+2672
-3589
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

258 files changed

+2672
-3589
lines changed

.gitlab-ci.yml

Lines changed: 0 additions & 10 deletions
This file was deleted.

README.md

Lines changed: 24 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,34 @@
77
> > * 支持原生FLinkSQL所有的语法
88
> > * 扩展了输入和输出的性能指标到promethus
99
10+
## 新特性:
11+
* 1.kafka源表支持not null语法,支持字符串类型的时间转换。
12+
* 2.rdb维表与DB建立连接时,周期进行连接,防止连接断开。rdbsink写入时,对连接进行检查。
13+
* 3.异步维表支持非等值连接,比如:<>,<,>。
14+
* 4.增加kafka数组解析
15+
* 5.增加kafka1.0以上版本的支持
16+
* 6.增加postgresql、kudu、clickhouse维表、结果表的支持
17+
* 7.支持插件的依赖方式,参考pluginLoadMode参数
18+
* 8.支持cep处理
19+
* 9.支持udaf
20+
* 10.支持谓词下移
21+
* 11.支持状态的ttl
22+
23+
## BUG修复:
24+
* 1.修复不能解析sql中orderby,union语法。
25+
* 2.修复yarnPer模式提交失败的异常。
26+
* 3.一些bug的修复
27+
1028
# 已支持
1129
* 源表:kafka 0.9、0.10、0.11、1.x版本
1230
* 维表:mysql, SQlServer,oracle, hbase, mongo, redis, cassandra, serversocket, kudu, postgresql, clickhouse, impala, db2, sqlserver
1331
* 结果表:mysql, SQlServer, oracle, hbase, elasticsearch5.x, mongo, redis, cassandra, console, kudu, postgresql, clickhouse, impala, db2, sqlserver
1432

33+
# 后续开发计划
34+
* 维表快照
35+
* kafka avro格式
36+
* topN
37+
1538
## 1 快速起步
1639
### 1.1 运行模式
1740

@@ -126,10 +149,7 @@ sh submit.sh -sql D:\sideSql.txt -name xctest -remoteSqlPluginPath /opt/dtstack
126149
* taskmanager.slots:per_job模式下指定每个taskmanager对应的slot数量(默认1)
127150
* savePointPath:任务恢复点的路径(默认无)
128151
* allowNonRestoredState:指示保存点是否允许非还原状态的标志(默认false)
129-
* restore.enable:是否失败重启(默认是true)
130-
* failure.interval:衡量失败率的时间段,单位分钟(默认6m)
131-
* delay.interval:连续两次重启尝试间的间隔,单位是秒(默认10s)
132-
* logLevel: 日志级别动态配置(默认info)
152+
* logLevel: 日志级别动态配置(默认info)
133153
* [prometheus 相关参数](docs/prometheus.md) per_job可指定metric写入到外部监控组件,以prometheus pushgateway举例
134154

135155

@@ -182,7 +202,6 @@ sh submit.sh -sql D:\sideSql.txt -name xctest -remoteSqlPluginPath /opt/dtstack
182202
* [impala 结果表插件](docs/impalaSink.md)
183203
* [db2 结果表插件](docs/db2Sink.md)
184204
* [sqlserver 结果表插件](docs/sqlserverSink.md)
185-
* [kafka 结果表插件](docs/kafkaSink.md)
186205

187206
### 2.3 维表插件
188207
* [hbase 维表插件](docs/hbaseSide.md)

cassandra/cassandra-side/cassandra-all-side/src/main/java/com/dtstack/flink/sql/side/cassandra/CassandraAllReqRow.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,10 +34,10 @@
3434
import com.datastax.driver.core.SocketOptions;
3535
import com.datastax.driver.core.policies.DowngradingConsistencyRetryPolicy;
3636
import com.datastax.driver.core.policies.RetryPolicy;
37-
import com.dtstack.flink.sql.side.BaseAllReqRow;
37+
import com.dtstack.flink.sql.side.AllReqRow;
3838
import com.dtstack.flink.sql.side.FieldInfo;
3939
import com.dtstack.flink.sql.side.JoinInfo;
40-
import com.dtstack.flink.sql.side.AbstractSideTableInfo;
40+
import com.dtstack.flink.sql.side.SideTableInfo;
4141
import com.dtstack.flink.sql.side.cassandra.table.CassandraSideTableInfo;
4242
import com.google.common.collect.Lists;
4343
import com.google.common.collect.Maps;
@@ -62,12 +62,14 @@
6262
*
6363
* @author xuqianjin
6464
*/
65-
public class CassandraAllReqRow extends BaseAllReqRow {
65+
public class CassandraAllReqRow extends AllReqRow {
6666

6767
private static final long serialVersionUID = 54015343561288219L;
6868

6969
private static final Logger LOG = LoggerFactory.getLogger(CassandraAllReqRow.class);
7070

71+
private static final String cassandra_DRIVER = "com.cassandra.jdbc.Driver";
72+
7173
private static final int CONN_RETRY_NUM = 3;
7274

7375
private static final int FETCH_SIZE = 1000;
@@ -77,7 +79,7 @@ public class CassandraAllReqRow extends BaseAllReqRow {
7779

7880
private AtomicReference<Map<String, List<Map<String, Object>>>> cacheRef = new AtomicReference<>();
7981

80-
public CassandraAllReqRow(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, List<FieldInfo> outFieldInfoList, AbstractSideTableInfo sideTableInfo) {
82+
public CassandraAllReqRow(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, List<FieldInfo> outFieldInfoList, SideTableInfo sideTableInfo) {
8183
super(new com.dtstack.flink.sql.side.cassandra.CassandraAllSideInfo(rowTypeInfo, joinInfo, outFieldInfoList, sideTableInfo));
8284
}
8385

cassandra/cassandra-side/cassandra-all-side/src/main/java/com/dtstack/flink/sql/side/cassandra/CassandraAllSideInfo.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,8 @@
2020

2121
import com.dtstack.flink.sql.side.FieldInfo;
2222
import com.dtstack.flink.sql.side.JoinInfo;
23-
import com.dtstack.flink.sql.side.BaseSideInfo;
24-
import com.dtstack.flink.sql.side.AbstractSideTableInfo;
23+
import com.dtstack.flink.sql.side.SideInfo;
24+
import com.dtstack.flink.sql.side.SideTableInfo;
2525
import com.dtstack.flink.sql.side.cassandra.table.CassandraSideTableInfo;
2626
import com.dtstack.flink.sql.util.ParseUtils;
2727
import org.apache.calcite.sql.SqlNode;
@@ -37,16 +37,16 @@
3737
*
3838
* @author xuqianjin
3939
*/
40-
public class CassandraAllSideInfo extends BaseSideInfo {
40+
public class CassandraAllSideInfo extends SideInfo {
4141

4242
private static final long serialVersionUID = -8690814317653033557L;
4343

44-
public CassandraAllSideInfo(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, List<FieldInfo> outFieldInfoList, AbstractSideTableInfo sideTableInfo) {
44+
public CassandraAllSideInfo(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, List<FieldInfo> outFieldInfoList, SideTableInfo sideTableInfo) {
4545
super(rowTypeInfo, joinInfo, outFieldInfoList, sideTableInfo);
4646
}
4747

4848
@Override
49-
public void buildEqualInfo(JoinInfo joinInfo, AbstractSideTableInfo sideTableInfo) {
49+
public void buildEqualInfo(JoinInfo joinInfo, SideTableInfo sideTableInfo) {
5050
CassandraSideTableInfo cassandraSideTableInfo = (CassandraSideTableInfo) sideTableInfo;
5151

5252
sqlCondition = "select ${selectField} from ${tableName} ";

cassandra/cassandra-side/cassandra-async-side/src/main/java/com/dtstack/flink/sql/side/cassandra/CassandraAsyncReqRow.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -37,11 +37,11 @@
3737
import com.datastax.driver.core.policies.DowngradingConsistencyRetryPolicy;
3838
import com.datastax.driver.core.policies.RetryPolicy;
3939
import com.dtstack.flink.sql.enums.ECacheContentType;
40-
import com.dtstack.flink.sql.side.BaseAsyncReqRow;
40+
import com.dtstack.flink.sql.side.AsyncReqRow;
4141
import com.dtstack.flink.sql.side.CacheMissVal;
4242
import com.dtstack.flink.sql.side.FieldInfo;
4343
import com.dtstack.flink.sql.side.JoinInfo;
44-
import com.dtstack.flink.sql.side.AbstractSideTableInfo;
44+
import com.dtstack.flink.sql.side.SideTableInfo;
4545
import com.dtstack.flink.sql.side.cache.CacheObj;
4646
import com.dtstack.flink.sql.side.cassandra.table.CassandraSideTableInfo;
4747
import com.google.common.base.Function;
@@ -67,7 +67,7 @@
6767
*
6868
* @author xuqianjin
6969
*/
70-
public class CassandraAsyncReqRow extends BaseAsyncReqRow {
70+
public class CassandraAsyncReqRow extends AsyncReqRow {
7171

7272
private static final long serialVersionUID = 6631584128079864735L;
7373

@@ -83,7 +83,7 @@ public class CassandraAsyncReqRow extends BaseAsyncReqRow {
8383
private transient ListenableFuture session;
8484
private transient CassandraSideTableInfo cassandraSideTableInfo;
8585

86-
public CassandraAsyncReqRow(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, List<FieldInfo> outFieldInfoList, AbstractSideTableInfo sideTableInfo) {
86+
public CassandraAsyncReqRow(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, List<FieldInfo> outFieldInfoList, SideTableInfo sideTableInfo) {
8787
super(new com.dtstack.flink.sql.side.cassandra.CassandraAsyncSideInfo(rowTypeInfo, joinInfo, outFieldInfoList, sideTableInfo));
8888
}
8989

@@ -216,7 +216,7 @@ public void asyncInvoke(CRow input, ResultFuture<CRow> resultFuture) throws Exce
216216
connCassandraDB(cassandraSideTableInfo);
217217

218218
String sqlCondition = sideInfo.getSqlCondition() + " " + sqlWhere + " ALLOW FILTERING ";
219-
LOG.info("sqlCondition:{}" + sqlCondition);
219+
System.out.println("sqlCondition:" + sqlCondition);
220220

221221
ListenableFuture<ResultSet> resultSet = Futures.transformAsync(session,
222222
new AsyncFunction<Session, ResultSet>() {
@@ -265,6 +265,7 @@ public void onSuccess(List<com.datastax.driver.core.Row> rows) {
265265
public void onFailure(Throwable t) {
266266
LOG.error("Failed to retrieve the data: %s%n",
267267
t.getMessage());
268+
System.out.println("Failed to retrieve the data: " + t.getMessage());
268269
cluster.closeAsync();
269270
resultFuture.completeExceptionally(t);
270271
}

cassandra/cassandra-side/cassandra-async-side/src/main/java/com/dtstack/flink/sql/side/cassandra/CassandraAsyncSideInfo.java

Lines changed: 7 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,8 @@
2020

2121
import com.dtstack.flink.sql.side.FieldInfo;
2222
import com.dtstack.flink.sql.side.JoinInfo;
23-
import com.dtstack.flink.sql.side.BaseSideInfo;
24-
import com.dtstack.flink.sql.side.AbstractSideTableInfo;
23+
import com.dtstack.flink.sql.side.SideInfo;
24+
import com.dtstack.flink.sql.side.SideTableInfo;
2525
import com.dtstack.flink.sql.side.cassandra.table.CassandraSideTableInfo;
2626
import com.dtstack.flink.sql.util.ParseUtils;
2727
import org.apache.calcite.sql.SqlBasicCall;
@@ -30,8 +30,6 @@
3030
import org.apache.calcite.sql.SqlNode;
3131
import org.apache.flink.api.java.typeutils.RowTypeInfo;
3232
import com.google.common.collect.Lists;
33-
import org.slf4j.Logger;
34-
import org.slf4j.LoggerFactory;
3533

3634
import java.util.List;
3735

@@ -41,18 +39,16 @@
4139
*
4240
* @author xuqianjin
4341
*/
44-
public class CassandraAsyncSideInfo extends BaseSideInfo {
42+
public class CassandraAsyncSideInfo extends SideInfo {
4543

4644
private static final long serialVersionUID = -4403313049809013362L;
47-
private static final Logger LOG = LoggerFactory.getLogger(CassandraAsyncSideInfo.class.getSimpleName());
4845

49-
50-
public CassandraAsyncSideInfo(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, List<FieldInfo> outFieldInfoList, AbstractSideTableInfo sideTableInfo) {
46+
public CassandraAsyncSideInfo(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, List<FieldInfo> outFieldInfoList, SideTableInfo sideTableInfo) {
5147
super(rowTypeInfo, joinInfo, outFieldInfoList, sideTableInfo);
5248
}
5349

5450
@Override
55-
public void buildEqualInfo(JoinInfo joinInfo, AbstractSideTableInfo sideTableInfo) {
51+
public void buildEqualInfo(JoinInfo joinInfo, SideTableInfo sideTableInfo) {
5652
CassandraSideTableInfo cassandraSideTableInfo = (CassandraSideTableInfo) sideTableInfo;
5753

5854
String sideTableName = joinInfo.getSideTableName();
@@ -67,9 +63,9 @@ public void buildEqualInfo(JoinInfo joinInfo, AbstractSideTableInfo sideTableInf
6763
}
6864

6965
sqlCondition = "select ${selectField} from ${tableName}";
70-
sqlCondition = sqlCondition.replace("${tableName}", cassandraSideTableInfo.getDatabase()+"."+cassandraSideTableInfo.getTableName()).replace("${selectField}", sideSelectFields);
7166

72-
LOG.info("---------side_exe_sql-----\n{}" + sqlCondition);
67+
sqlCondition = sqlCondition.replace("${tableName}", cassandraSideTableInfo.getDatabase()+"."+cassandraSideTableInfo.getTableName()).replace("${selectField}", sideSelectFields);
68+
System.out.println("---------side_exe_sql-----\n" + sqlCondition);
7369
}
7470

7571

cassandra/cassandra-side/cassandra-side-core/src/main/java/com/dtstack/flink/sql/side/cassandra/table/CassandraSideParser.java

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -19,24 +19,26 @@
1919

2020
package com.dtstack.flink.sql.side.cassandra.table;
2121

22-
import com.dtstack.flink.sql.table.AbstractSideTableParser;
23-
import com.dtstack.flink.sql.table.AbstractTableInfo;
22+
import com.dtstack.flink.sql.table.AbsSideTableParser;
23+
import com.dtstack.flink.sql.table.TableInfo;
2424
import com.dtstack.flink.sql.util.MathUtil;
2525

26+
import java.math.BigDecimal;
27+
import java.sql.Date;
2628
import java.sql.Timestamp;
2729
import java.util.Map;
2830
import java.util.regex.Matcher;
2931
import java.util.regex.Pattern;
3032

31-
import static com.dtstack.flink.sql.table.AbstractTableInfo.PARALLELISM_KEY;
33+
import static com.dtstack.flink.sql.table.TableInfo.PARALLELISM_KEY;
3234

3335
/**
3436
* Reason:
3537
* Date: 2018/11/22
3638
*
3739
* @author xuqianjin
3840
*/
39-
public class CassandraSideParser extends AbstractSideTableParser {
41+
public class CassandraSideParser extends AbsSideTableParser {
4042

4143
private final static String SIDE_SIGN_KEY = "sideSignKey";
4244

@@ -71,7 +73,7 @@ public CassandraSideParser() {
7173
}
7274

7375
@Override
74-
public AbstractTableInfo getTableInfo(String tableName, String fieldsInfo, Map<String, Object> props) {
76+
public TableInfo getTableInfo(String tableName, String fieldsInfo, Map<String, Object> props) {
7577
com.dtstack.flink.sql.side.cassandra.table.CassandraSideTableInfo cassandraSideTableInfo = new com.dtstack.flink.sql.side.cassandra.table.CassandraSideTableInfo();
7678
cassandraSideTableInfo.setName(tableName);
7779
parseFieldsInfo(fieldsInfo, cassandraSideTableInfo);
@@ -94,10 +96,9 @@ public AbstractTableInfo getTableInfo(String tableName, String fieldsInfo, Map<S
9496
return cassandraSideTableInfo;
9597
}
9698

97-
private void dealSideSign(Matcher matcher, AbstractTableInfo tableInfo) {
99+
private void dealSideSign(Matcher matcher, TableInfo tableInfo) {
98100
}
99101

100-
@Override
101102
public Class dbTypeConvertToJavaType(String fieldType) {
102103
switch (fieldType.toLowerCase()) {
103104
case "bigint":
@@ -120,8 +121,6 @@ public Class dbTypeConvertToJavaType(String fieldType) {
120121
return Double.class;
121122
case "timestamp":
122123
return Timestamp.class;
123-
default:
124-
break;
125124
}
126125

127126
throw new RuntimeException("不支持 " + fieldType + " 类型");

cassandra/cassandra-side/cassandra-side-core/src/main/java/com/dtstack/flink/sql/side/cassandra/table/CassandraSideTableInfo.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919

2020
package com.dtstack.flink.sql.side.cassandra.table;
2121

22-
import com.dtstack.flink.sql.side.AbstractSideTableInfo;
22+
import com.dtstack.flink.sql.side.SideTableInfo;
2323
import com.google.common.base.Preconditions;
2424

2525
/**
@@ -28,7 +28,7 @@
2828
*
2929
* @author xuqianjin
3030
*/
31-
public class CassandraSideTableInfo extends AbstractSideTableInfo {
31+
public class CassandraSideTableInfo extends SideTableInfo {
3232

3333
private static final long serialVersionUID = -5556431094535478915L;
3434

cassandra/cassandra-sink/src/main/java/com/dtstack/flink/sql/sink/cassandra/CassandraOutputFormat.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@
5454
import com.datastax.driver.core.SocketOptions;
5555
import com.datastax.driver.core.policies.DowngradingConsistencyRetryPolicy;
5656
import com.datastax.driver.core.policies.RetryPolicy;
57-
import com.dtstack.flink.sql.outputformat.AbstractDtRichOutputFormat;
57+
import com.dtstack.flink.sql.outputformat.DtRichOutputFormat;
5858
import org.apache.commons.lang3.StringUtils;
5959
import org.slf4j.Logger;
6060
import org.slf4j.LoggerFactory;
@@ -72,7 +72,7 @@
7272
* @see Tuple
7373
* @see DriverManager
7474
*/
75-
public class CassandraOutputFormat extends AbstractDtRichOutputFormat<Tuple2> {
75+
public class CassandraOutputFormat extends DtRichOutputFormat<Tuple2> {
7676
private static final long serialVersionUID = -7994311331389155692L;
7777

7878
private static final Logger LOG = LoggerFactory.getLogger(CassandraOutputFormat.class);

cassandra/cassandra-sink/src/main/java/com/dtstack/flink/sql/sink/cassandra/CassandraSink.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222

2323
import com.dtstack.flink.sql.sink.IStreamSinkGener;
2424
import com.dtstack.flink.sql.sink.cassandra.table.CassandraTableInfo;
25-
import com.dtstack.flink.sql.table.AbstractTargetTableInfo;
25+
import com.dtstack.flink.sql.table.TargetTableInfo;
2626
import org.apache.flink.api.common.typeinfo.TypeInformation;
2727
import org.apache.flink.api.java.tuple.Tuple2;
2828
import org.apache.flink.api.java.typeutils.RowTypeInfo;
@@ -63,7 +63,7 @@ public CassandraSink() {
6363
}
6464

6565
@Override
66-
public CassandraSink genStreamSink(AbstractTargetTableInfo targetTableInfo) {
66+
public CassandraSink genStreamSink(TargetTableInfo targetTableInfo) {
6767
CassandraTableInfo cassandraTableInfo = (CassandraTableInfo) targetTableInfo;
6868
this.address = cassandraTableInfo.getAddress();
6969
this.tableName = cassandraTableInfo.getTableName();

0 commit comments

Comments
 (0)