Skip to content

Commit 49d88c9

Browse files
committed
Merge branch 'feat_1.8_internalRetractStreamDeal' into 'v1.8.0_dev'
内部回撤流重新注册表 See merge request !212
2 parents e37e0bb + 56ef522 commit 49d88c9

File tree

19 files changed

+206
-198
lines changed

19 files changed

+206
-198
lines changed

cassandra/cassandra-side/cassandra-all-side/src/main/java/com/dtstack/flink/sql/side/cassandra/CassandraAllReqRow.java

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import org.apache.flink.api.java.typeutils.RowTypeInfo;
3939
import com.google.common.collect.Lists;
4040
import com.google.common.collect.Maps;
41+
import org.apache.flink.table.runtime.types.CRow;
4142
import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo;
4243
import org.apache.flink.types.Row;
4344
import org.apache.flink.util.Collector;
@@ -129,14 +130,14 @@ protected void reloadCache() {
129130

130131

131132
@Override
132-
public void flatMap(Row value, Collector<Row> out) throws Exception {
133+
public void flatMap(CRow input, Collector<CRow> out) throws Exception {
133134
List<Object> inputParams = Lists.newArrayList();
134135
for (Integer conValIndex : sideInfo.getEqualValIndex()) {
135-
Object equalObj = value.getField(conValIndex);
136+
Object equalObj = input.row().getField(conValIndex);
136137
if (equalObj == null) {
137138
if(sideInfo.getJoinType() == JoinType.LEFT){
138-
Row data = fillData(value, null);
139-
out.collect(data);
139+
Row data = fillData(input.row(), null);
140+
out.collect(new CRow(data, input.change()));
140141
}
141142
return;
142143
}
@@ -148,8 +149,8 @@ public void flatMap(Row value, Collector<Row> out) throws Exception {
148149
List<Map<String, Object>> cacheList = cacheRef.get().get(key);
149150
if (CollectionUtils.isEmpty(cacheList)) {
150151
if (sideInfo.getJoinType() == JoinType.LEFT) {
151-
Row row = fillData(value, null);
152-
out.collect(row);
152+
Row row = fillData(input.row(), null);
153+
out.collect(new CRow(row, input.change()));
153154
} else {
154155
return;
155156
}
@@ -158,7 +159,7 @@ public void flatMap(Row value, Collector<Row> out) throws Exception {
158159
}
159160

160161
for (Map<String, Object> one : cacheList) {
161-
out.collect(fillData(value, one));
162+
out.collect(new CRow(fillData(input.row(), one), input.change()));
162163
}
163164

164165
}

cassandra/cassandra-side/cassandra-async-side/src/main/java/com/dtstack/flink/sql/side/cassandra/CassandraAsyncReqRow.java

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
import org.apache.flink.configuration.Configuration;
4848
import com.google.common.collect.Lists;
4949
import org.apache.flink.streaming.api.functions.async.ResultFuture;
50+
import org.apache.flink.table.runtime.types.CRow;
5051
import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo;
5152
import org.apache.flink.types.Row;
5253
import org.slf4j.Logger;
@@ -160,17 +161,17 @@ private void connCassandraDB(CassandraSideTableInfo tableInfo) {
160161
}
161162

162163
@Override
163-
public void asyncInvoke(Row input, ResultFuture<Row> resultFuture) throws Exception {
164-
Row inputRow = Row.copy(input);
164+
public void asyncInvoke(CRow input, ResultFuture<CRow> resultFuture) throws Exception {
165+
CRow inputCopy = new CRow(input.row(), input.change());
165166
JsonArray inputParams = new JsonArray();
166167
StringBuffer stringBuffer = new StringBuffer();
167168
String sqlWhere = " where ";
168169

169170
for (int i = 0; i < sideInfo.getEqualFieldList().size(); i++) {
170171
Integer conValIndex = sideInfo.getEqualValIndex().get(i);
171-
Object equalObj = inputRow.getField(conValIndex);
172+
Object equalObj = inputCopy.row().getField(conValIndex);
172173
if (equalObj == null) {
173-
dealMissKey(inputRow, resultFuture);
174+
dealMissKey(inputCopy, resultFuture);
174175
return;
175176
}
176177
inputParams.add(equalObj);
@@ -194,13 +195,13 @@ public void asyncInvoke(Row input, ResultFuture<Row> resultFuture) throws Except
194195
if (val != null) {
195196

196197
if (ECacheContentType.MissVal == val.getType()) {
197-
dealMissKey(inputRow, resultFuture);
198+
dealMissKey(inputCopy, resultFuture);
198199
return;
199200
} else if (ECacheContentType.MultiLine == val.getType()) {
200-
List<Row> rowList = Lists.newArrayList();
201+
List<CRow> rowList = Lists.newArrayList();
201202
for (Object jsonArray : (List) val.getContent()) {
202-
Row row = fillData(inputRow, jsonArray);
203-
rowList.add(row);
203+
Row row = fillData(inputCopy.row(), jsonArray);
204+
rowList.add(new CRow(row, inputCopy.change()));
204205
}
205206
resultFuture.complete(rowList);
206207
} else {
@@ -238,20 +239,20 @@ public void onSuccess(List<com.datastax.driver.core.Row> rows) {
238239
cluster.closeAsync();
239240
if (rows.size() > 0) {
240241
List<com.datastax.driver.core.Row> cacheContent = Lists.newArrayList();
241-
List<Row> rowList = Lists.newArrayList();
242+
List<CRow> rowList = Lists.newArrayList();
242243
for (com.datastax.driver.core.Row line : rows) {
243-
Row row = fillData(inputRow, line);
244+
Row row = fillData(inputCopy.row(), line);
244245
if (openCache()) {
245246
cacheContent.add(line);
246247
}
247-
rowList.add(row);
248+
rowList.add(new CRow(row,inputCopy.change()));
248249
}
249250
resultFuture.complete(rowList);
250251
if (openCache()) {
251252
putCache(key, CacheObj.buildCacheObj(ECacheContentType.MultiLine, cacheContent));
252253
}
253254
} else {
254-
dealMissKey(inputRow, resultFuture);
255+
dealMissKey(inputCopy, resultFuture);
255256
if (openCache()) {
256257
putCache(key, CacheMissVal.getMissKeyObj());
257258
}

console/console-sink/src/main/java/com/dtstack/flink/sql/sink/console/ConsoleOutputFormat.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ public void open(int taskNumber, int numTasks) throws IOException {
5656

5757
@Override
5858
public void writeRecord(Tuple2 tuple2) throws IOException {
59+
System.out.println("received oriainal data:" + tuple2);
5960
Tuple2<Boolean, Row> tupleTrans = tuple2;
6061
Boolean retract = tupleTrans.getField(0);
6162
if (!retract) {

core/src/main/java/com/dtstack/flink/sql/side/AllReqRow.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
import com.dtstack.flink.sql.factory.DTThreadFactory;
2424
import org.apache.flink.api.common.functions.RichFlatMapFunction;
2525
import org.apache.flink.configuration.Configuration;
26-
import org.apache.flink.types.Row;
26+
import org.apache.flink.table.runtime.types.CRow;
2727

2828
import java.sql.SQLException;
2929
import java.util.concurrent.Executors;
@@ -37,7 +37,7 @@
3737
* @author xuchao
3838
*/
3939

40-
public abstract class AllReqRow extends RichFlatMapFunction<Row, Row> implements ISideReqRow {
40+
public abstract class AllReqRow extends RichFlatMapFunction<CRow, CRow> implements ISideReqRow {
4141

4242
protected SideInfo sideInfo;
4343

core/src/main/java/com/dtstack/flink/sql/side/AsyncReqRow.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -31,11 +31,11 @@
3131
import org.apache.flink.streaming.api.functions.async.ResultFuture;
3232
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
3333
import org.apache.flink.streaming.api.operators.async.queue.StreamRecordQueueEntry;
34+
import org.apache.flink.table.runtime.types.CRow;
3435
import org.apache.flink.types.Row;
3536
import org.slf4j.Logger;
3637
import org.slf4j.LoggerFactory;
3738

38-
import java.util.Collection;
3939
import java.util.Collections;
4040
import java.util.concurrent.TimeoutException;
4141

@@ -47,7 +47,7 @@
4747
* @author xuchao
4848
*/
4949

50-
public abstract class AsyncReqRow extends RichAsyncFunction<Row, Row> implements ISideReqRow {
50+
public abstract class AsyncReqRow extends RichAsyncFunction<CRow, CRow> implements ISideReqRow {
5151
private static final Logger LOG = LoggerFactory.getLogger(AsyncReqRow.class);
5252
private static final long serialVersionUID = 2098635244857937717L;
5353

@@ -97,12 +97,12 @@ protected boolean openCache(){
9797
return sideInfo.getSideCache() != null;
9898
}
9999

100-
protected void dealMissKey(Row input, ResultFuture<Row> resultFuture){
100+
protected void dealMissKey(CRow input, ResultFuture<CRow> resultFuture){
101101
if(sideInfo.getJoinType() == JoinType.LEFT){
102102
//Reserved left table data
103103
try {
104-
Row row = fillData(input, null);
105-
resultFuture.complete(Collections.singleton(row));
104+
Row row = fillData(input.row(), null);
105+
resultFuture.complete(Collections.singleton(new CRow(row, input.change())));
106106
} catch (Exception e) {
107107
dealFillDataError(resultFuture, e, input);
108108
}
@@ -118,8 +118,8 @@ protected void dealCacheData(String key, CacheObj missKeyObj) {
118118
}
119119

120120
@Override
121-
public void timeout(Row input, ResultFuture<Row> resultFuture) throws Exception {
122-
StreamRecordQueueEntry<Row> future = (StreamRecordQueueEntry<Row>)resultFuture;
121+
public void timeout(CRow input, ResultFuture<CRow> resultFuture) throws Exception {
122+
StreamRecordQueueEntry<CRow> future = (StreamRecordQueueEntry<CRow>)resultFuture;
123123
try {
124124
if (null == future.get()) {
125125
resultFuture.completeExceptionally(new TimeoutException("Async function call has timed out."));
@@ -130,7 +130,7 @@ public void timeout(Row input, ResultFuture<Row> resultFuture) throws Exception
130130
}
131131

132132

133-
protected void dealFillDataError(ResultFuture<Row> resultFuture, Exception e, Object sourceData) {
133+
protected void dealFillDataError(ResultFuture<CRow> resultFuture, Exception e, Object sourceData) {
134134
LOG.debug("source data {} join side table error ", sourceData);
135135
LOG.debug("async buid row error..{}", e);
136136
parseErrorRecords.inc();

core/src/main/java/com/dtstack/flink/sql/side/SideSqlExec.java

Lines changed: 27 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -54,12 +54,17 @@
5454
import org.apache.flink.streaming.api.datastream.DataStream;
5555
import org.apache.flink.table.api.StreamQueryConfig;
5656
import org.apache.flink.table.api.Table;
57+
import org.apache.flink.table.api.TableSchema;
5758
import org.apache.flink.table.api.java.StreamTableEnvironment;
59+
import org.apache.flink.table.runtime.CRowKeySelector;
60+
import org.apache.flink.table.runtime.types.CRow;
61+
import org.apache.flink.table.runtime.types.CRowTypeInfo;
5862
import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo;
5963
import org.apache.flink.types.Row;
6064
import org.slf4j.Logger;
6165
import org.slf4j.LoggerFactory;
6266
import java.sql.Timestamp;
67+
import java.util.Arrays;
6368
import java.util.Collection;
6469
import java.util.LinkedList;
6570
import java.util.List;
@@ -249,27 +254,6 @@ public RowTypeInfo buildOutRowTypeInfo(List<FieldInfo> sideJoinFieldInfo, HashBa
249254
return new RowTypeInfo(sideOutTypes, sideOutNames);
250255
}
251256

252-
/**
253-
* 对时间类型进行类型转换
254-
* @param leftTypeInfo
255-
* @return
256-
*/
257-
private RowTypeInfo buildLeftTableOutType(RowTypeInfo leftTypeInfo) {
258-
TypeInformation[] sideOutTypes = new TypeInformation[leftTypeInfo.getFieldNames().length];
259-
TypeInformation<?>[] fieldTypes = leftTypeInfo.getFieldTypes();
260-
for (int i = 0; i < sideOutTypes.length; i++) {
261-
sideOutTypes[i] = convertTimeAttributeType(fieldTypes[i]);
262-
}
263-
RowTypeInfo rowTypeInfo = new RowTypeInfo(sideOutTypes, leftTypeInfo.getFieldNames());
264-
return rowTypeInfo;
265-
}
266-
267-
private TypeInformation convertTimeAttributeType(TypeInformation typeInformation) {
268-
if (typeInformation instanceof TimeIndicatorTypeInfo) {
269-
return TypeInformation.of(Timestamp.class);
270-
}
271-
return typeInformation;
272-
}
273257

274258
//需要考虑更多的情况
275259
private void replaceFieldName(SqlNode sqlNode, HashBasedTable<String, String, String> mappingTable, String targetTableName, String tableAlias) {
@@ -773,21 +757,21 @@ private void joinFun(Object pollObj, Map<String, Table> localTableCache,
773757

774758
RowTypeInfo typeInfo = new RowTypeInfo(targetTable.getSchema().getTypes(), targetTable.getSchema().getColumnNames());
775759

776-
DataStream adaptStream = tableEnv.toRetractStream(targetTable, org.apache.flink.types.Row.class)
777-
.map((Tuple2<Boolean, Row> f0) -> { return f0.f1; })
778-
.returns(Row.class);
760+
DataStream<CRow> adaptStream = tableEnv.toRetractStream(targetTable, org.apache.flink.types.Row.class)
761+
.map((Tuple2<Boolean, Row> tp2) -> {
762+
return new CRow(tp2.f1, tp2.f0);
763+
}).returns(CRow.class);
779764

780765

781766
//join side table before keyby ===> Reducing the size of each dimension table cache of async
782-
if(sideTableInfo.isPartitionedJoin()){
783-
RowTypeInfo leftTableOutType = buildLeftTableOutType(leftTypeInfo);
784-
adaptStream.getTransformation().setOutputType(leftTableOutType);
767+
if (sideTableInfo.isPartitionedJoin()) {
785768
List<String> leftJoinColList = getConditionFields(joinInfo.getCondition(), joinInfo.getLeftTableAlias(), sideTableInfo);
786-
String[] leftJoinColArr = leftJoinColList.toArray(new String[leftJoinColList.size()]);
787-
adaptStream = adaptStream.keyBy(leftJoinColArr);
769+
List<String> fieldNames = Arrays.asList(targetTable.getSchema().getFieldNames());
770+
int[] keyIndex = leftJoinColList.stream().mapToInt(fieldNames::indexOf).toArray();
771+
adaptStream = adaptStream.keyBy(new CRowKeySelector(keyIndex, projectedTypeInfo(keyIndex, targetTable.getSchema())));
788772
}
789773

790-
DataStream dsOut = null;
774+
DataStream<CRow> dsOut = null;
791775
if(ECacheType.ALL.name().equalsIgnoreCase(sideTableInfo.getCacheType())){
792776
dsOut = SideWithAllCacheOperator.getSideJoinDataStream(adaptStream, sideTableInfo.getType(), localSqlPluginPath, typeInfo, joinInfo, sideJoinFieldInfo, sideTableInfo);
793777
}else{
@@ -796,7 +780,10 @@ private void joinFun(Object pollObj, Map<String, Table> localTableCache,
796780

797781
HashBasedTable<String, String, String> mappingTable = HashBasedTable.create();
798782
RowTypeInfo sideOutTypeInfo = buildOutRowTypeInfo(sideJoinFieldInfo, mappingTable);
799-
dsOut.getTransformation().setOutputType(sideOutTypeInfo);
783+
784+
CRowTypeInfo cRowTypeInfo = new CRowTypeInfo(sideOutTypeInfo);
785+
dsOut.getTransformation().setOutputType(cRowTypeInfo);
786+
800787
String targetTableName = joinInfo.getNewTableName();
801788
String targetTableAlias = joinInfo.getNewTableAlias();
802789

@@ -808,10 +795,18 @@ private void joinFun(Object pollObj, Map<String, Table> localTableCache,
808795
replaceInfoList.add(replaceInfo);
809796

810797
if (!tableEnv.isRegistered(joinInfo.getNewTableName())){
811-
tableEnv.registerDataStream(joinInfo.getNewTableName(), dsOut, String.join(",", sideOutTypeInfo.getFieldNames()));
798+
tableEnv.registerDataStream(joinInfo.getNewTableName(), dsOut);
812799
}
813800
}
814801

802+
private TypeInformation<Row> projectedTypeInfo(int[] fields, TableSchema schema) {
803+
String[] fieldNames = schema.getFieldNames();
804+
TypeInformation<?>[] fieldTypes = schema.getFieldTypes();
805+
806+
String[] projectedNames = Arrays.stream(fields).mapToObj(i -> fieldNames[i]).toArray(String[]::new);
807+
TypeInformation[] projectedTypes = Arrays.stream(fields).mapToObj(i -> fieldTypes[i]).toArray(TypeInformation[]::new);
808+
return new RowTypeInfo(projectedTypes, projectedNames);
809+
}
815810

816811

817812
private boolean checkFieldsInfo(CreateTmpTableParser.SqlParserResult result, Table table) {

hbase/hbase-side/hbase-all-side/src/main/java/com/dtstack/flink/sql/side/hbase/HbaseAllReqRow.java

Lines changed: 15 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.apache.commons.collections.map.HashedMap;
2727
import org.apache.flink.api.java.typeutils.RowTypeInfo;
2828
import com.google.common.collect.Maps;
29+
import org.apache.flink.table.runtime.types.CRow;
2930
import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo;
3031
import org.apache.flink.types.Row;
3132
import org.apache.flink.util.Collector;
@@ -114,41 +115,39 @@ protected void reloadCache() {
114115
}
115116

116117
@Override
117-
public void flatMap(Row value, Collector<Row> out) throws Exception {
118+
public void flatMap(CRow input, Collector<CRow> out) throws Exception {
118119
Map<String, Object> refData = Maps.newHashMap();
119120
for (int i = 0; i < sideInfo.getEqualValIndex().size(); i++) {
120121
Integer conValIndex = sideInfo.getEqualValIndex().get(i);
121-
Object equalObj = value.getField(conValIndex);
122-
if(equalObj == null){
123-
if(sideInfo.getJoinType() == JoinType.LEFT){
124-
Row data = fillData(value, null);
125-
out.collect(data);
122+
Object equalObj = input.row().getField(conValIndex);
123+
if (equalObj == null) {
124+
if (sideInfo.getJoinType() == JoinType.LEFT) {
125+
Row data = fillData(input.row(), null);
126+
out.collect(new CRow(data, input.change()));
126127
}
127128
return;
128129
}
129130
refData.put(sideInfo.getEqualFieldList().get(i), equalObj);
130131
}
131132

132-
String rowKeyStr = ((HbaseAllSideInfo)sideInfo).getRowKeyBuilder().getRowKey(refData);
133+
String rowKeyStr = ((HbaseAllSideInfo) sideInfo).getRowKeyBuilder().getRowKey(refData);
133134

134135
Map<String, Object> cacheList = null;
135136

136137
SideTableInfo sideTableInfo = sideInfo.getSideTableInfo();
137138
HbaseSideTableInfo hbaseSideTableInfo = (HbaseSideTableInfo) sideTableInfo;
138-
if (hbaseSideTableInfo.isPreRowKey())
139-
{
140-
for (Map.Entry<String, Map<String, Object>> entry : cacheRef.get().entrySet()){
141-
if (entry.getKey().startsWith(rowKeyStr))
142-
{
139+
if (hbaseSideTableInfo.isPreRowKey()) {
140+
for (Map.Entry<String, Map<String, Object>> entry : cacheRef.get().entrySet()) {
141+
if (entry.getKey().startsWith(rowKeyStr)) {
143142
cacheList = cacheRef.get().get(entry.getKey());
144-
Row row = fillData(value, cacheList);
145-
out.collect(row);
143+
Row row = fillData(input.row(), cacheList);
144+
out.collect(new CRow(row, input.change()));
146145
}
147146
}
148147
} else {
149148
cacheList = cacheRef.get().get(rowKeyStr);
150-
Row row = fillData(value, cacheList);
151-
out.collect(row);
149+
Row row = fillData(input.row(), cacheList);
150+
out.collect(new CRow(row, input.change()));
152151
}
153152

154153
}

0 commit comments

Comments
 (0)