Skip to content

Commit d3eba0d

Browse files
committed
async use crow
1 parent 6e5aebc commit d3eba0d

File tree

12 files changed

+152
-123
lines changed

12 files changed

+152
-123
lines changed

cassandra/cassandra-side/cassandra-async-side/src/main/java/com/dtstack/flink/sql/side/cassandra/CassandraAsyncReqRow.java

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
import org.apache.flink.configuration.Configuration;
4848
import com.google.common.collect.Lists;
4949
import org.apache.flink.streaming.api.functions.async.ResultFuture;
50+
import org.apache.flink.table.runtime.types.CRow;
5051
import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo;
5152
import org.apache.flink.types.Row;
5253
import org.slf4j.Logger;
@@ -160,17 +161,17 @@ private void connCassandraDB(CassandraSideTableInfo tableInfo) {
160161
}
161162

162163
@Override
163-
public void asyncInvoke(Row input, ResultFuture<Row> resultFuture) throws Exception {
164-
Row inputRow = Row.copy(input);
164+
public void asyncInvoke(CRow input, ResultFuture<CRow> resultFuture) throws Exception {
165+
CRow inputCopy = new CRow(input.row(), input.change());
165166
JsonArray inputParams = new JsonArray();
166167
StringBuffer stringBuffer = new StringBuffer();
167168
String sqlWhere = " where ";
168169

169170
for (int i = 0; i < sideInfo.getEqualFieldList().size(); i++) {
170171
Integer conValIndex = sideInfo.getEqualValIndex().get(i);
171-
Object equalObj = inputRow.getField(conValIndex);
172+
Object equalObj = inputCopy.row().getField(conValIndex);
172173
if (equalObj == null) {
173-
dealMissKey(inputRow, resultFuture);
174+
dealMissKey(inputCopy, resultFuture);
174175
return;
175176
}
176177
inputParams.add(equalObj);
@@ -194,13 +195,13 @@ public void asyncInvoke(Row input, ResultFuture<Row> resultFuture) throws Except
194195
if (val != null) {
195196

196197
if (ECacheContentType.MissVal == val.getType()) {
197-
dealMissKey(inputRow, resultFuture);
198+
dealMissKey(inputCopy, resultFuture);
198199
return;
199200
} else if (ECacheContentType.MultiLine == val.getType()) {
200-
List<Row> rowList = Lists.newArrayList();
201+
List<CRow> rowList = Lists.newArrayList();
201202
for (Object jsonArray : (List) val.getContent()) {
202-
Row row = fillData(inputRow, jsonArray);
203-
rowList.add(row);
203+
Row row = fillData(inputCopy.row(), jsonArray);
204+
rowList.add(new CRow(row, inputCopy.change()));
204205
}
205206
resultFuture.complete(rowList);
206207
} else {
@@ -238,20 +239,20 @@ public void onSuccess(List<com.datastax.driver.core.Row> rows) {
238239
cluster.closeAsync();
239240
if (rows.size() > 0) {
240241
List<com.datastax.driver.core.Row> cacheContent = Lists.newArrayList();
241-
List<Row> rowList = Lists.newArrayList();
242+
List<CRow> rowList = Lists.newArrayList();
242243
for (com.datastax.driver.core.Row line : rows) {
243-
Row row = fillData(inputRow, line);
244+
Row row = fillData(inputCopy.row(), line);
244245
if (openCache()) {
245246
cacheContent.add(line);
246247
}
247-
rowList.add(row);
248+
rowList.add(new CRow(row,inputCopy.change()));
248249
}
249250
resultFuture.complete(rowList);
250251
if (openCache()) {
251252
putCache(key, CacheObj.buildCacheObj(ECacheContentType.MultiLine, cacheContent));
252253
}
253254
} else {
254-
dealMissKey(inputRow, resultFuture);
255+
dealMissKey(inputCopy, resultFuture);
255256
if (openCache()) {
256257
putCache(key, CacheMissVal.getMissKeyObj());
257258
}

console/console-sink/src/main/java/com/dtstack/flink/sql/sink/console/ConsoleOutputFormat.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ public void open(int taskNumber, int numTasks) throws IOException {
5656

5757
@Override
5858
public void writeRecord(Tuple2 tuple2) throws IOException {
59+
System.out.println("received oriainal data:" + tuple2);
5960
Tuple2<Boolean, Row> tupleTrans = tuple2;
6061
Boolean retract = tupleTrans.getField(0);
6162
if (!retract) {

core/src/main/java/com/dtstack/flink/sql/side/AsyncReqRow.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -31,11 +31,11 @@
3131
import org.apache.flink.streaming.api.functions.async.ResultFuture;
3232
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
3333
import org.apache.flink.streaming.api.operators.async.queue.StreamRecordQueueEntry;
34+
import org.apache.flink.table.runtime.types.CRow;
3435
import org.apache.flink.types.Row;
3536
import org.slf4j.Logger;
3637
import org.slf4j.LoggerFactory;
3738

38-
import java.util.Collection;
3939
import java.util.Collections;
4040
import java.util.concurrent.TimeoutException;
4141

@@ -47,7 +47,7 @@
4747
* @author xuchao
4848
*/
4949

50-
public abstract class AsyncReqRow extends RichAsyncFunction<Row, Row> implements ISideReqRow {
50+
public abstract class AsyncReqRow extends RichAsyncFunction<CRow, CRow> implements ISideReqRow {
5151
private static final Logger LOG = LoggerFactory.getLogger(AsyncReqRow.class);
5252
private static final long serialVersionUID = 2098635244857937717L;
5353

@@ -97,12 +97,12 @@ protected boolean openCache(){
9797
return sideInfo.getSideCache() != null;
9898
}
9999

100-
protected void dealMissKey(Row input, ResultFuture<Row> resultFuture){
100+
protected void dealMissKey(CRow input, ResultFuture<CRow> resultFuture){
101101
if(sideInfo.getJoinType() == JoinType.LEFT){
102102
//Reserved left table data
103103
try {
104-
Row row = fillData(input, null);
105-
resultFuture.complete(Collections.singleton(row));
104+
Row row = fillData(input.row(), null);
105+
resultFuture.complete(Collections.singleton(new CRow(row, input.change())));
106106
} catch (Exception e) {
107107
dealFillDataError(resultFuture, e, input);
108108
}
@@ -118,8 +118,8 @@ protected void dealCacheData(String key, CacheObj missKeyObj) {
118118
}
119119

120120
@Override
121-
public void timeout(Row input, ResultFuture<Row> resultFuture) throws Exception {
122-
StreamRecordQueueEntry<Row> future = (StreamRecordQueueEntry<Row>)resultFuture;
121+
public void timeout(CRow input, ResultFuture<CRow> resultFuture) throws Exception {
122+
StreamRecordQueueEntry<CRow> future = (StreamRecordQueueEntry<CRow>)resultFuture;
123123
try {
124124
if (null == future.get()) {
125125
resultFuture.completeExceptionally(new TimeoutException("Async function call has timed out."));
@@ -130,7 +130,7 @@ public void timeout(Row input, ResultFuture<Row> resultFuture) throws Exception
130130
}
131131

132132

133-
protected void dealFillDataError(ResultFuture<Row> resultFuture, Exception e, Object sourceData) {
133+
protected void dealFillDataError(ResultFuture<CRow> resultFuture, Exception e, Object sourceData) {
134134
LOG.debug("source data {} join side table error ", sourceData);
135135
LOG.debug("async buid row error..{}", e);
136136
parseErrorRecords.inc();

core/src/main/java/com/dtstack/flink/sql/side/SideSqlExec.java

Lines changed: 30 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -54,12 +54,17 @@
5454
import org.apache.flink.streaming.api.datastream.DataStream;
5555
import org.apache.flink.table.api.StreamQueryConfig;
5656
import org.apache.flink.table.api.Table;
57+
import org.apache.flink.table.api.TableSchema;
5758
import org.apache.flink.table.api.java.StreamTableEnvironment;
59+
import org.apache.flink.table.runtime.CRowKeySelector;
60+
import org.apache.flink.table.runtime.types.CRow;
61+
import org.apache.flink.table.runtime.types.CRowTypeInfo;
5862
import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo;
5963
import org.apache.flink.types.Row;
6064
import org.slf4j.Logger;
6165
import org.slf4j.LoggerFactory;
6266
import java.sql.Timestamp;
67+
import java.util.Arrays;
6368
import java.util.Collection;
6469
import java.util.LinkedList;
6570
import java.util.List;
@@ -773,21 +778,24 @@ private void joinFun(Object pollObj, Map<String, Table> localTableCache,
773778

774779
RowTypeInfo typeInfo = new RowTypeInfo(targetTable.getSchema().getTypes(), targetTable.getSchema().getColumnNames());
775780

776-
DataStream adaptStream = tableEnv.toRetractStream(targetTable, org.apache.flink.types.Row.class)
777-
.map((Tuple2<Boolean, Row> f0) -> { return f0.f1; })
778-
.returns(Row.class);
781+
DataStream<CRow> adaptStream = tableEnv.toRetractStream(targetTable, org.apache.flink.types.Row.class)
782+
.map((Tuple2<Boolean, Row> tp2) -> {
783+
return new CRow(tp2.f1, tp2.f0);
784+
}).returns(CRow.class);
779785

780786

781787
//join side table before keyby ===> Reducing the size of each dimension table cache of async
782-
if(sideTableInfo.isPartitionedJoin()){
783-
RowTypeInfo leftTableOutType = buildLeftTableOutType(leftTypeInfo);
784-
adaptStream.getTransformation().setOutputType(leftTableOutType);
788+
if (sideTableInfo.isPartitionedJoin()) {
789+
// RowTypeInfo leftTableOutType = buildLeftTableOutType(leftTypeInfo);
790+
// adaptStream.getTransformation().setOutputType(leftTableOutType);
791+
785792
List<String> leftJoinColList = getConditionFields(joinInfo.getCondition(), joinInfo.getLeftTableAlias(), sideTableInfo);
786-
String[] leftJoinColArr = leftJoinColList.toArray(new String[leftJoinColList.size()]);
787-
adaptStream = adaptStream.keyBy(leftJoinColArr);
793+
List<String> fieldNames = Arrays.asList(targetTable.getSchema().getFieldNames());
794+
int[] keyIndex = leftJoinColList.stream().mapToInt(fieldNames::indexOf).toArray();
795+
adaptStream = adaptStream.keyBy(new CRowKeySelector(keyIndex, projectedTypeInfo(keyIndex, targetTable.getSchema())));
788796
}
789797

790-
DataStream dsOut = null;
798+
DataStream<CRow> dsOut = null;
791799
if(ECacheType.ALL.name().equalsIgnoreCase(sideTableInfo.getCacheType())){
792800
dsOut = SideWithAllCacheOperator.getSideJoinDataStream(adaptStream, sideTableInfo.getType(), localSqlPluginPath, typeInfo, joinInfo, sideJoinFieldInfo, sideTableInfo);
793801
}else{
@@ -796,7 +804,10 @@ private void joinFun(Object pollObj, Map<String, Table> localTableCache,
796804

797805
HashBasedTable<String, String, String> mappingTable = HashBasedTable.create();
798806
RowTypeInfo sideOutTypeInfo = buildOutRowTypeInfo(sideJoinFieldInfo, mappingTable);
799-
dsOut.getTransformation().setOutputType(sideOutTypeInfo);
807+
808+
CRowTypeInfo cRowTypeInfo = new CRowTypeInfo(sideOutTypeInfo);
809+
dsOut.getTransformation().setOutputType(cRowTypeInfo);
810+
800811
String targetTableName = joinInfo.getNewTableName();
801812
String targetTableAlias = joinInfo.getNewTableAlias();
802813

@@ -808,10 +819,18 @@ private void joinFun(Object pollObj, Map<String, Table> localTableCache,
808819
replaceInfoList.add(replaceInfo);
809820

810821
if (!tableEnv.isRegistered(joinInfo.getNewTableName())){
811-
tableEnv.registerDataStream(joinInfo.getNewTableName(), dsOut, String.join(",", sideOutTypeInfo.getFieldNames()));
822+
tableEnv.registerDataStream(joinInfo.getNewTableName(), dsOut);
812823
}
813824
}
814825

826+
private TypeInformation<Row> projectedTypeInfo(int[] fields, TableSchema schema) {
827+
String[] fieldNames = schema.getFieldNames();
828+
TypeInformation<?>[] fieldTypes = schema.getFieldTypes();
829+
830+
String[] projectedNames = Arrays.stream(fields).mapToObj(i -> fieldNames[i]).toArray(String[]::new);
831+
TypeInformation[] projectedTypes = Arrays.stream(fields).mapToObj(i -> fieldTypes[i]).toArray(TypeInformation[]::new);
832+
return new RowTypeInfo(projectedTypes, projectedNames);
833+
}
815834

816835

817836
private boolean checkFieldsInfo(CreateTmpTableParser.SqlParserResult result, Table table) {

hbase/hbase-side/hbase-async-side/src/main/java/com/dtstack/flink/sql/side/hbase/HbaseAsyncReqRow.java

Lines changed: 19 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import org.apache.flink.api.java.typeutils.RowTypeInfo;
3737
import org.apache.flink.configuration.Configuration;
3838
import org.apache.flink.streaming.api.functions.async.ResultFuture;
39+
import org.apache.flink.table.runtime.types.CRow;
3940
import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo;
4041
import org.apache.flink.types.Row;
4142
import org.hbase.async.HBaseClient;
@@ -122,14 +123,14 @@ public void open(Configuration parameters) throws Exception {
122123
}
123124

124125
@Override
125-
public void asyncInvoke(Row input, ResultFuture<Row> resultFuture) throws Exception {
126-
Row inputRow = Row.copy(input);
126+
public void asyncInvoke(CRow input, ResultFuture<CRow> resultFuture) throws Exception {
127+
CRow inputCopy = new CRow(input.row(), input.change());
127128
Map<String, Object> refData = Maps.newHashMap();
128129
for (int i = 0; i < sideInfo.getEqualValIndex().size(); i++) {
129130
Integer conValIndex = sideInfo.getEqualValIndex().get(i);
130-
Object equalObj = inputRow.getField(conValIndex);
131+
Object equalObj = inputCopy.row().getField(conValIndex);
131132
if(equalObj == null){
132-
dealMissKey(inputRow, resultFuture);
133+
dealMissKey(inputCopy, resultFuture);
133134
return;
134135
}
135136
refData.put(sideInfo.getEqualFieldList().get(i), equalObj);
@@ -138,34 +139,34 @@ public void asyncInvoke(Row input, ResultFuture<Row> resultFuture) throws Except
138139
String rowKeyStr = ((HbaseAsyncSideInfo)sideInfo).getRowKeyBuilder().getRowKey(refData);
139140

140141
//get from cache
141-
if(openCache()){
142+
if (openCache()) {
142143
CacheObj val = getFromCache(rowKeyStr);
143-
if(val != null){
144-
if(ECacheContentType.MissVal == val.getType()){
145-
dealMissKey(inputRow, resultFuture);
144+
if (val != null) {
145+
if (ECacheContentType.MissVal == val.getType()) {
146+
dealMissKey(inputCopy, resultFuture);
146147
return;
147-
}else if(ECacheContentType.SingleLine == val.getType()){
148+
} else if (ECacheContentType.SingleLine == val.getType()) {
148149
try {
149-
Row row = fillData(inputRow, val);
150-
resultFuture.complete(Collections.singleton(row));
150+
Row row = fillData(inputCopy.row(), val);
151+
resultFuture.complete(Collections.singleton(new CRow(row, inputCopy.change())));
151152
} catch (Exception e) {
152-
dealFillDataError(resultFuture, e, inputRow);
153+
dealFillDataError(resultFuture, e, inputCopy);
153154
}
154-
}else if(ECacheContentType.MultiLine == val.getType()){
155+
} else if (ECacheContentType.MultiLine == val.getType()) {
155156
try {
156-
for(Object one : (List)val.getContent()){
157-
Row row = fillData(inputRow, one);
158-
resultFuture.complete(Collections.singleton(row));
157+
for (Object one : (List) val.getContent()) {
158+
Row row = fillData(inputCopy.row(), one);
159+
resultFuture.complete(Collections.singleton(new CRow(row, inputCopy.change())));
159160
}
160161
} catch (Exception e) {
161-
dealFillDataError(resultFuture, e, inputRow);
162+
dealFillDataError(resultFuture, e, inputCopy);
162163
}
163164
}
164165
return;
165166
}
166167
}
167168

168-
rowKeyMode.asyncGetData(tableName, rowKeyStr, inputRow, resultFuture, sideInfo.getSideCache());
169+
rowKeyMode.asyncGetData(tableName, rowKeyStr, inputCopy, resultFuture, sideInfo.getSideCache());
169170
}
170171

171172
@Override

hbase/hbase-side/hbase-async-side/src/main/java/com/dtstack/flink/sql/side/hbase/rowkeydealer/AbsRowKeyModeDealer.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.apache.calcite.sql.JoinType;
2626
import com.google.common.collect.Maps;
2727
import org.apache.flink.streaming.api.functions.async.ResultFuture;
28+
import org.apache.flink.table.runtime.types.CRow;
2829
import org.apache.flink.types.Row;
2930
import org.hbase.async.HBaseClient;
3031

@@ -72,12 +73,12 @@ public AbsRowKeyModeDealer(Map<String, String> colRefType, String[] colNames, HB
7273
this.sideFieldIndex = sideFieldIndex;
7374
}
7475

75-
protected void dealMissKey(Row input, ResultFuture<Row> resultFuture){
76+
protected void dealMissKey(CRow input, ResultFuture<CRow> resultFuture){
7677
if(joinType == JoinType.LEFT){
7778
try {
7879
//保留left 表数据
79-
Row row = fillData(input, null);
80-
resultFuture.complete(Collections.singleton(row));
80+
Row row = fillData(input.row(), null);
81+
resultFuture.complete(Collections.singleton(new CRow(row, input.change())));
8182
} catch (Exception e) {
8283
resultFuture.completeExceptionally(e);
8384
}
@@ -109,6 +110,6 @@ protected Row fillData(Row input, Object sideInput){
109110
return row;
110111
}
111112

112-
public abstract void asyncGetData(String tableName, String rowKeyStr, Row input, ResultFuture<Row> resultFuture,
113-
AbsSideCache sideCache);
113+
public abstract void asyncGetData(String tableName, String rowKeyStr, CRow input, ResultFuture<CRow> resultFuture,
114+
AbsSideCache sideCache);
114115
}

hbase/hbase-side/hbase-async-side/src/main/java/com/dtstack/flink/sql/side/hbase/rowkeydealer/PreRowKeyModeDealerDealer.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.apache.calcite.sql.JoinType;
3131
import com.google.common.collect.Lists;
3232
import org.apache.flink.streaming.api.functions.async.ResultFuture;
33+
import org.apache.flink.table.runtime.types.CRow;
3334
import org.apache.flink.types.Row;
3435
import org.hbase.async.BinaryPrefixComparator;
3536
import org.hbase.async.Bytes;
@@ -65,7 +66,7 @@ public PreRowKeyModeDealerDealer(Map<String, String> colRefType, String[] colNam
6566
}
6667

6768
@Override
68-
public void asyncGetData(String tableName, String rowKeyStr, Row input, ResultFuture<Row> resultFuture,
69+
public void asyncGetData(String tableName, String rowKeyStr, CRow input, ResultFuture<CRow> resultFuture,
6970
AbsSideCache sideCache) {
7071
Scanner prefixScanner = hBaseClient.newScanner(tableName);
7172
ScanFilter scanFilter = new RowFilter(CompareFilter.CompareOp.EQUAL, new BinaryPrefixComparator(Bytes.UTF8(rowKeyStr)));
@@ -79,7 +80,7 @@ public void asyncGetData(String tableName, String rowKeyStr, Row input, ResultFu
7980
}
8081

8182

82-
private String dealOneRow(ArrayList<ArrayList<KeyValue>> args, String rowKeyStr, Row input, ResultFuture<Row> resultFuture, AbsSideCache sideCache) {
83+
private String dealOneRow(ArrayList<ArrayList<KeyValue>> args, String rowKeyStr, CRow input, ResultFuture<CRow> resultFuture, AbsSideCache sideCache) {
8384
if(args == null || args.size() == 0){
8485
dealMissKey(input, resultFuture);
8586
if (openCache) {
@@ -88,7 +89,7 @@ private String dealOneRow(ArrayList<ArrayList<KeyValue>> args, String rowKeyStr,
8889
}
8990

9091
List<Object> cacheContent = Lists.newArrayList();
91-
List<Row> rowList = Lists.newArrayList();
92+
List<CRow> rowList = Lists.newArrayList();
9293

9394
for(List<KeyValue> oneRow : args){
9495
try {
@@ -117,11 +118,11 @@ private String dealOneRow(ArrayList<ArrayList<KeyValue>> args, String rowKeyStr,
117118
sideVal.add(val);
118119
}
119120

120-
Row row = fillData(input, sideVal);
121+
Row row = fillData(input.row(), sideVal);
121122
if (openCache) {
122123
cacheContent.add(sideVal);
123124
}
124-
rowList.add(row);
125+
rowList.add(new CRow(row, input.change()));
125126
}
126127
}catch (Exception e) {
127128
resultFuture.completeExceptionally(e);
@@ -144,7 +145,7 @@ private String dealOneRow(ArrayList<ArrayList<KeyValue>> args, String rowKeyStr,
144145
return "";
145146
}
146147

147-
private String dealFail(Object arg2, Row input, ResultFuture<Row> resultFuture){
148+
private String dealFail(Object arg2, CRow input, ResultFuture<CRow> resultFuture){
148149
LOG.error("record:" + input);
149150
LOG.error("get side record exception:" + arg2);
150151
resultFuture.complete(null);

0 commit comments

Comments
 (0)