Skip to content

Commit 210b9c3

Browse files
author
gituser
committed
Merge branch '1.8_test_3.10.x' into 1.8_release_3.10.x
2 parents 3df78d7 + 25439e2 commit 210b9c3

File tree

158 files changed

+7501
-2554
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

158 files changed

+7501
-2554
lines changed

README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,7 @@ sh submit.sh -sql D:\sideSql.txt -name xctest -remoteSqlPluginPath /opt/dtstack
149149
* taskmanager.slots:per_job模式下指定每个taskmanager对应的slot数量(默认1)
150150
* savePointPath:任务恢复点的路径(默认无)
151151
* allowNonRestoredState:指示保存点是否允许非还原状态的标志(默认false)
152+
* logLevel: 日志级别动态配置(默认info)
152153
* [prometheus 相关参数](docs/prometheus.md) per_job可指定metric写入到外部监控组件,以prometheus pushgateway举例
153154

154155

@@ -181,6 +182,7 @@ sh submit.sh -sql D:\sideSql.txt -name xctest -remoteSqlPluginPath /opt/dtstack
181182
* 描述:yarn session 模式下指定的运行的一些参数,[可参考](https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/cli.html),目前只支持指定yid
182183
* 必选:否
183184
* 默认值:false
185+
184186

185187
## 2 结构
186188
### 2.1 源表插件

cassandra/cassandra-side/cassandra-all-side/src/main/java/com/dtstack/flink/sql/side/cassandra/CassandraAllReqRow.java

Lines changed: 20 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,12 @@
1818

1919
package com.dtstack.flink.sql.side.cassandra;
2020

21+
import org.apache.flink.api.java.typeutils.RowTypeInfo;
22+
import org.apache.flink.table.runtime.types.CRow;
23+
import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo;
24+
import org.apache.flink.types.Row;
25+
import org.apache.flink.util.Collector;
26+
2127
import com.datastax.driver.core.Cluster;
2228
import com.datastax.driver.core.ConsistencyLevel;
2329
import com.datastax.driver.core.HostDistance;
@@ -33,14 +39,11 @@
3339
import com.dtstack.flink.sql.side.JoinInfo;
3440
import com.dtstack.flink.sql.side.SideTableInfo;
3541
import com.dtstack.flink.sql.side.cassandra.table.CassandraSideTableInfo;
36-
import org.apache.calcite.sql.JoinType;
37-
import org.apache.commons.collections.CollectionUtils;
38-
import org.apache.flink.api.java.typeutils.RowTypeInfo;
3942
import com.google.common.collect.Lists;
4043
import com.google.common.collect.Maps;
41-
import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo;
42-
import org.apache.flink.types.Row;
43-
import org.apache.flink.util.Collector;
44+
import org.apache.calcite.sql.JoinType;
45+
import org.apache.commons.collections.CollectionUtils;
46+
import org.apache.commons.lang3.StringUtils;
4447
import org.slf4j.Logger;
4548
import org.slf4j.LoggerFactory;
4649

@@ -129,14 +132,14 @@ protected void reloadCache() {
129132

130133

131134
@Override
132-
public void flatMap(Row value, Collector<Row> out) throws Exception {
135+
public void flatMap(CRow input, Collector<CRow> out) throws Exception {
133136
List<Object> inputParams = Lists.newArrayList();
134137
for (Integer conValIndex : sideInfo.getEqualValIndex()) {
135-
Object equalObj = value.getField(conValIndex);
138+
Object equalObj = input.row().getField(conValIndex);
136139
if (equalObj == null) {
137140
if(sideInfo.getJoinType() == JoinType.LEFT){
138-
Row data = fillData(value, null);
139-
out.collect(data);
141+
Row data = fillData(input.row(), null);
142+
out.collect(new CRow(data, input.change()));
140143
}
141144
return;
142145
}
@@ -148,8 +151,8 @@ public void flatMap(Row value, Collector<Row> out) throws Exception {
148151
List<Map<String, Object>> cacheList = cacheRef.get().get(key);
149152
if (CollectionUtils.isEmpty(cacheList)) {
150153
if (sideInfo.getJoinType() == JoinType.LEFT) {
151-
Row row = fillData(value, null);
152-
out.collect(row);
154+
Row row = fillData(input.row(), null);
155+
out.collect(new CRow(row, input.change()));
153156
} else {
154157
return;
155158
}
@@ -158,7 +161,7 @@ public void flatMap(Row value, Collector<Row> out) throws Exception {
158161
}
159162

160163
for (Map<String, Object> one : cacheList) {
161-
out.collect(fillData(value, one));
164+
out.collect(new CRow(fillData(input.row(), one), input.change()));
162165
}
163166

164167
}
@@ -221,9 +224,9 @@ private Session getConn(CassandraSideTableInfo tableInfo) {
221224
//重试策略
222225
RetryPolicy retryPolicy = DowngradingConsistencyRetryPolicy.INSTANCE;
223226

224-
for (String server : address.split(",")) {
225-
cassandraPort = Integer.parseInt(server.split(":")[1]);
226-
serversList.add(InetAddress.getByName(server.split(":")[0]));
227+
for (String server : StringUtils.split(address, ",")) {
228+
cassandraPort = Integer.parseInt(StringUtils.split(server, ":")[1]);
229+
serversList.add(InetAddress.getByName(StringUtils.split(server, ":")[0]));
227230
}
228231

229232
if (userName == null || userName.isEmpty() || password == null || password.isEmpty()) {
@@ -277,7 +280,7 @@ private void loadData(Map<String, List<Map<String, Object>>> tmpCache) throws SQ
277280
//load data from table
278281
String sql = sideInfo.getSqlCondition() + " limit " + FETCH_SIZE;
279282
ResultSet resultSet = session.execute(sql);
280-
String[] sideFieldNames = sideInfo.getSideSelectFields().split(",");
283+
String[] sideFieldNames = StringUtils.split(sideInfo.getSideSelectFields(), ",");
281284
for (com.datastax.driver.core.Row row : resultSet) {
282285
Map<String, Object> oneRow = Maps.newHashMap();
283286
for (String fieldName : sideFieldNames) {

cassandra/cassandra-side/cassandra-async-side/src/main/java/com/dtstack/flink/sql/side/cassandra/CassandraAsyncReqRow.java

Lines changed: 24 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,13 @@
1919

2020
package com.dtstack.flink.sql.side.cassandra;
2121

22+
import org.apache.flink.api.java.typeutils.RowTypeInfo;
23+
import org.apache.flink.configuration.Configuration;
24+
import org.apache.flink.streaming.api.functions.async.ResultFuture;
25+
import org.apache.flink.table.runtime.types.CRow;
26+
import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo;
27+
import org.apache.flink.types.Row;
28+
2229
import com.datastax.driver.core.Cluster;
2330
import com.datastax.driver.core.ConsistencyLevel;
2431
import com.datastax.driver.core.HostDistance;
@@ -38,24 +45,19 @@
3845
import com.dtstack.flink.sql.side.cache.CacheObj;
3946
import com.dtstack.flink.sql.side.cassandra.table.CassandraSideTableInfo;
4047
import com.google.common.base.Function;
48+
import com.google.common.collect.Lists;
4149
import com.google.common.util.concurrent.AsyncFunction;
4250
import com.google.common.util.concurrent.FutureCallback;
4351
import com.google.common.util.concurrent.Futures;
4452
import com.google.common.util.concurrent.ListenableFuture;
4553
import io.vertx.core.json.JsonArray;
46-
import org.apache.flink.api.java.typeutils.RowTypeInfo;
47-
import org.apache.flink.configuration.Configuration;
48-
import com.google.common.collect.Lists;
49-
import org.apache.flink.streaming.api.functions.async.ResultFuture;
50-
import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo;
51-
import org.apache.flink.types.Row;
54+
import org.apache.commons.lang3.StringUtils;
5255
import org.slf4j.Logger;
5356
import org.slf4j.LoggerFactory;
5457

5558
import java.net.InetAddress;
5659
import java.sql.Timestamp;
5760
import java.util.ArrayList;
58-
import java.util.Collections;
5961
import java.util.List;
6062
import java.util.Map;
6163

@@ -133,9 +135,9 @@ private void connCassandraDB(CassandraSideTableInfo tableInfo) {
133135
//重试策略
134136
RetryPolicy retryPolicy = DowngradingConsistencyRetryPolicy.INSTANCE;
135137

136-
for (String server : address.split(",")) {
137-
cassandraPort = Integer.parseInt(server.split(":")[1]);
138-
serversList.add(InetAddress.getByName(server.split(":")[0]));
138+
for (String server : StringUtils.split(address, ",")) {
139+
cassandraPort = Integer.parseInt(StringUtils.split(server, ":")[1]);
140+
serversList.add(InetAddress.getByName(StringUtils.split(server, ":")[0]));
139141
}
140142

141143
if (userName == null || userName.isEmpty() || password == null || password.isEmpty()) {
@@ -160,17 +162,17 @@ private void connCassandraDB(CassandraSideTableInfo tableInfo) {
160162
}
161163

162164
@Override
163-
public void asyncInvoke(Row input, ResultFuture<Row> resultFuture) throws Exception {
164-
Row inputRow = Row.copy(input);
165+
public void asyncInvoke(CRow input, ResultFuture<CRow> resultFuture) throws Exception {
166+
CRow inputCopy = new CRow(input.row(), input.change());
165167
JsonArray inputParams = new JsonArray();
166168
StringBuffer stringBuffer = new StringBuffer();
167169
String sqlWhere = " where ";
168170

169171
for (int i = 0; i < sideInfo.getEqualFieldList().size(); i++) {
170172
Integer conValIndex = sideInfo.getEqualValIndex().get(i);
171-
Object equalObj = inputRow.getField(conValIndex);
173+
Object equalObj = inputCopy.row().getField(conValIndex);
172174
if (equalObj == null) {
173-
dealMissKey(inputRow, resultFuture);
175+
dealMissKey(inputCopy, resultFuture);
174176
return;
175177
}
176178
inputParams.add(equalObj);
@@ -194,13 +196,13 @@ public void asyncInvoke(Row input, ResultFuture<Row> resultFuture) throws Except
194196
if (val != null) {
195197

196198
if (ECacheContentType.MissVal == val.getType()) {
197-
dealMissKey(inputRow, resultFuture);
199+
dealMissKey(inputCopy, resultFuture);
198200
return;
199201
} else if (ECacheContentType.MultiLine == val.getType()) {
200-
List<Row> rowList = Lists.newArrayList();
202+
List<CRow> rowList = Lists.newArrayList();
201203
for (Object jsonArray : (List) val.getContent()) {
202-
Row row = fillData(inputRow, jsonArray);
203-
rowList.add(row);
204+
Row row = fillData(inputCopy.row(), jsonArray);
205+
rowList.add(new CRow(row, inputCopy.change()));
204206
}
205207
resultFuture.complete(rowList);
206208
} else {
@@ -238,20 +240,20 @@ public void onSuccess(List<com.datastax.driver.core.Row> rows) {
238240
cluster.closeAsync();
239241
if (rows.size() > 0) {
240242
List<com.datastax.driver.core.Row> cacheContent = Lists.newArrayList();
241-
List<Row> rowList = Lists.newArrayList();
243+
List<CRow> rowList = Lists.newArrayList();
242244
for (com.datastax.driver.core.Row line : rows) {
243-
Row row = fillData(inputRow, line);
245+
Row row = fillData(inputCopy.row(), line);
244246
if (openCache()) {
245247
cacheContent.add(line);
246248
}
247-
rowList.add(row);
249+
rowList.add(new CRow(row,inputCopy.change()));
248250
}
249251
resultFuture.complete(rowList);
250252
if (openCache()) {
251253
putCache(key, CacheObj.buildCacheObj(ECacheContentType.MultiLine, cacheContent));
252254
}
253255
} else {
254-
dealMissKey(inputRow, resultFuture);
256+
dealMissKey(inputCopy, resultFuture);
255257
if (openCache()) {
256258
putCache(key, CacheMissVal.getMissKeyObj());
257259
}

cassandra/cassandra-sink/src/main/java/com/dtstack/flink/sql/sink/cassandra/CassandraOutputFormat.java

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,12 @@
3838

3939
package com.dtstack.flink.sql.sink.cassandra;
4040

41+
import org.apache.flink.api.common.typeinfo.TypeInformation;
42+
import org.apache.flink.api.java.tuple.Tuple;
43+
import org.apache.flink.api.java.tuple.Tuple2;
44+
import org.apache.flink.configuration.Configuration;
45+
import org.apache.flink.types.Row;
46+
4147
import com.datastax.driver.core.Cluster;
4248
import com.datastax.driver.core.ConsistencyLevel;
4349
import com.datastax.driver.core.HostDistance;
@@ -49,13 +55,10 @@
4955
import com.datastax.driver.core.policies.DowngradingConsistencyRetryPolicy;
5056
import com.datastax.driver.core.policies.RetryPolicy;
5157
import com.dtstack.flink.sql.outputformat.DtRichOutputFormat;
52-
import org.apache.flink.api.common.typeinfo.TypeInformation;
53-
import org.apache.flink.api.java.tuple.Tuple;
54-
import org.apache.flink.api.java.tuple.Tuple2;
55-
import org.apache.flink.configuration.Configuration;
56-
import org.apache.flink.types.Row;
58+
import org.apache.commons.lang3.StringUtils;
5759
import org.slf4j.Logger;
5860
import org.slf4j.LoggerFactory;
61+
5962
import java.io.IOException;
6063
import java.net.InetAddress;
6164
import java.sql.DriverManager;
@@ -69,7 +72,7 @@
6972
* @see Tuple
7073
* @see DriverManager
7174
*/
72-
public class CassandraOutputFormat extends DtRichOutputFormat {
75+
public class CassandraOutputFormat extends DtRichOutputFormat<Tuple2> {
7376
private static final long serialVersionUID = -7994311331389155692L;
7477

7578
private static final Logger LOG = LoggerFactory.getLogger(CassandraOutputFormat.class);
@@ -145,9 +148,9 @@ public void open(int taskNumber, int numTasks) {
145148
//重试策略
146149
RetryPolicy retryPolicy = DowngradingConsistencyRetryPolicy.INSTANCE;
147150

148-
for (String server : address.split(",")) {
149-
cassandraPort = Integer.parseInt(server.split(":")[1]);
150-
serversList.add(InetAddress.getByName(server.split(":")[0]));
151+
for (String server : StringUtils.split(address, ",")) {
152+
cassandraPort = Integer.parseInt(StringUtils.split(server, ":")[1]);
153+
serversList.add(InetAddress.getByName(StringUtils.split(server, ":")[0]));
151154
}
152155

153156
if (userName == null || userName.isEmpty() || password == null || password.isEmpty()) {
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package com.dtstack.flink.sql.sink.clickhouse;
20+
21+
import com.dtstack.flink.sql.sink.rdb.dialect.JDBCDialect;
22+
23+
import java.util.Optional;
24+
25+
/**
26+
* Date: 2020/1/15
27+
* Company: www.dtstack.com
28+
* @author maqi
29+
*/
30+
public class ClickhouseDialect implements JDBCDialect {
31+
32+
@Override
33+
public boolean canHandle(String url) {
34+
return url.startsWith("jdbc:clickhouse:");
35+
}
36+
37+
@Override
38+
public Optional<String> defaultDriverName() {
39+
return Optional.of("ru.yandex.clickhouse.ClickHouseDriver");
40+
}
41+
42+
@Override
43+
public String getUpdateStatement(String tableName, String[] fieldNames, String[] conditionFields) {
44+
throw new RuntimeException("Clickhouse does not support update sql, please remove primary key or use append mode");
45+
}
46+
}

0 commit comments

Comments
 (0)