Skip to content

Commit aa3eeea

Browse files
author
gituser
committed
Merge branch '1.8_release_3.9.x' into 1.8_release_3.10.x
2 parents 7f82c33 + 3ca35af commit aa3eeea

File tree

2 files changed

+5
-4
lines changed

2 files changed

+5
-4
lines changed

core/src/main/java/com/dtstack/flink/sql/Main.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -240,7 +240,8 @@ private static Set<URL> registerTable(SqlTree sqlTree, StreamExecutionEnvironmen
240240

241241
RowTypeInfo typeInfo = new RowTypeInfo(adaptTable.getSchema().getTypes(), adaptTable.getSchema().getColumnNames());
242242
DataStream adaptStream = tableEnv.toRetractStream(adaptTable, typeInfo)
243-
.map((Tuple2<Boolean, Row> f0) -> { return f0.f1; })
243+
.filter((Tuple2<Boolean, Row> f0) -> f0.f0)
244+
.map((Tuple2<Boolean, Row> f0) -> f0.f1)
244245
.returns(typeInfo);
245246

246247
String fields = String.join(",", typeInfo.getFieldNames());

core/src/main/java/com/dtstack/flink/sql/side/SideSqlExec.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -774,9 +774,9 @@ private void joinFun(Object pollObj, Map<String, Table> localTableCache,
774774
RowTypeInfo typeInfo = new RowTypeInfo(targetTable.getSchema().getTypes(), targetTable.getSchema().getColumnNames());
775775

776776
DataStream adaptStream = tableEnv.toRetractStream(targetTable, org.apache.flink.types.Row.class)
777-
.map((Tuple2<Boolean, Row> f0) -> { return f0.f1; })
778-
.returns(Row.class);
779-
777+
.filter((Tuple2<Boolean, Row> f0) -> f0.f0)
778+
.map((Tuple2<Boolean, Row> f0) -> f0.f1)
779+
.returns(Row.class);
780780

781781
//join side table before keyby ===> Reducing the size of each dimension table cache of async
782782
if(sideTableInfo.isPartitionedJoin()){

0 commit comments

Comments
 (0)