Skip to content

Commit 56ef522

Browse files
committed
retract stream all mode
1 parent d3eba0d commit 56ef522

File tree

8 files changed

+57
-78
lines changed

8 files changed

+57
-78
lines changed

cassandra/cassandra-side/cassandra-all-side/src/main/java/com/dtstack/flink/sql/side/cassandra/CassandraAllReqRow.java

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import org.apache.flink.api.java.typeutils.RowTypeInfo;
3939
import com.google.common.collect.Lists;
4040
import com.google.common.collect.Maps;
41+
import org.apache.flink.table.runtime.types.CRow;
4142
import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo;
4243
import org.apache.flink.types.Row;
4344
import org.apache.flink.util.Collector;
@@ -129,14 +130,14 @@ protected void reloadCache() {
129130

130131

131132
@Override
132-
public void flatMap(Row value, Collector<Row> out) throws Exception {
133+
public void flatMap(CRow input, Collector<CRow> out) throws Exception {
133134
List<Object> inputParams = Lists.newArrayList();
134135
for (Integer conValIndex : sideInfo.getEqualValIndex()) {
135-
Object equalObj = value.getField(conValIndex);
136+
Object equalObj = input.row().getField(conValIndex);
136137
if (equalObj == null) {
137138
if(sideInfo.getJoinType() == JoinType.LEFT){
138-
Row data = fillData(value, null);
139-
out.collect(data);
139+
Row data = fillData(input.row(), null);
140+
out.collect(new CRow(data, input.change()));
140141
}
141142
return;
142143
}
@@ -148,8 +149,8 @@ public void flatMap(Row value, Collector<Row> out) throws Exception {
148149
List<Map<String, Object>> cacheList = cacheRef.get().get(key);
149150
if (CollectionUtils.isEmpty(cacheList)) {
150151
if (sideInfo.getJoinType() == JoinType.LEFT) {
151-
Row row = fillData(value, null);
152-
out.collect(row);
152+
Row row = fillData(input.row(), null);
153+
out.collect(new CRow(row, input.change()));
153154
} else {
154155
return;
155156
}
@@ -158,7 +159,7 @@ public void flatMap(Row value, Collector<Row> out) throws Exception {
158159
}
159160

160161
for (Map<String, Object> one : cacheList) {
161-
out.collect(fillData(value, one));
162+
out.collect(new CRow(fillData(input.row(), one), input.change()));
162163
}
163164

164165
}

core/src/main/java/com/dtstack/flink/sql/side/AllReqRow.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
import com.dtstack.flink.sql.factory.DTThreadFactory;
2424
import org.apache.flink.api.common.functions.RichFlatMapFunction;
2525
import org.apache.flink.configuration.Configuration;
26-
import org.apache.flink.types.Row;
26+
import org.apache.flink.table.runtime.types.CRow;
2727

2828
import java.sql.SQLException;
2929
import java.util.concurrent.Executors;
@@ -37,7 +37,7 @@
3737
* @author xuchao
3838
*/
3939

40-
public abstract class AllReqRow extends RichFlatMapFunction<Row, Row> implements ISideReqRow {
40+
public abstract class AllReqRow extends RichFlatMapFunction<CRow, CRow> implements ISideReqRow {
4141

4242
protected SideInfo sideInfo;
4343

core/src/main/java/com/dtstack/flink/sql/side/SideSqlExec.java

Lines changed: 0 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -254,27 +254,6 @@ public RowTypeInfo buildOutRowTypeInfo(List<FieldInfo> sideJoinFieldInfo, HashBa
254254
return new RowTypeInfo(sideOutTypes, sideOutNames);
255255
}
256256

257-
/**
258-
* 对时间类型进行类型转换
259-
* @param leftTypeInfo
260-
* @return
261-
*/
262-
private RowTypeInfo buildLeftTableOutType(RowTypeInfo leftTypeInfo) {
263-
TypeInformation[] sideOutTypes = new TypeInformation[leftTypeInfo.getFieldNames().length];
264-
TypeInformation<?>[] fieldTypes = leftTypeInfo.getFieldTypes();
265-
for (int i = 0; i < sideOutTypes.length; i++) {
266-
sideOutTypes[i] = convertTimeAttributeType(fieldTypes[i]);
267-
}
268-
RowTypeInfo rowTypeInfo = new RowTypeInfo(sideOutTypes, leftTypeInfo.getFieldNames());
269-
return rowTypeInfo;
270-
}
271-
272-
private TypeInformation convertTimeAttributeType(TypeInformation typeInformation) {
273-
if (typeInformation instanceof TimeIndicatorTypeInfo) {
274-
return TypeInformation.of(Timestamp.class);
275-
}
276-
return typeInformation;
277-
}
278257

279258
//需要考虑更多的情况
280259
private void replaceFieldName(SqlNode sqlNode, HashBasedTable<String, String, String> mappingTable, String targetTableName, String tableAlias) {
@@ -786,9 +765,6 @@ private void joinFun(Object pollObj, Map<String, Table> localTableCache,
786765

787766
//join side table before keyby ===> Reducing the size of each dimension table cache of async
788767
if (sideTableInfo.isPartitionedJoin()) {
789-
// RowTypeInfo leftTableOutType = buildLeftTableOutType(leftTypeInfo);
790-
// adaptStream.getTransformation().setOutputType(leftTableOutType);
791-
792768
List<String> leftJoinColList = getConditionFields(joinInfo.getCondition(), joinInfo.getLeftTableAlias(), sideTableInfo);
793769
List<String> fieldNames = Arrays.asList(targetTable.getSchema().getFieldNames());
794770
int[] keyIndex = leftJoinColList.stream().mapToInt(fieldNames::indexOf).toArray();

hbase/hbase-side/hbase-all-side/src/main/java/com/dtstack/flink/sql/side/hbase/HbaseAllReqRow.java

Lines changed: 15 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.apache.commons.collections.map.HashedMap;
2727
import org.apache.flink.api.java.typeutils.RowTypeInfo;
2828
import com.google.common.collect.Maps;
29+
import org.apache.flink.table.runtime.types.CRow;
2930
import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo;
3031
import org.apache.flink.types.Row;
3132
import org.apache.flink.util.Collector;
@@ -114,41 +115,39 @@ protected void reloadCache() {
114115
}
115116

116117
@Override
117-
public void flatMap(Row value, Collector<Row> out) throws Exception {
118+
public void flatMap(CRow input, Collector<CRow> out) throws Exception {
118119
Map<String, Object> refData = Maps.newHashMap();
119120
for (int i = 0; i < sideInfo.getEqualValIndex().size(); i++) {
120121
Integer conValIndex = sideInfo.getEqualValIndex().get(i);
121-
Object equalObj = value.getField(conValIndex);
122-
if(equalObj == null){
123-
if(sideInfo.getJoinType() == JoinType.LEFT){
124-
Row data = fillData(value, null);
125-
out.collect(data);
122+
Object equalObj = input.row().getField(conValIndex);
123+
if (equalObj == null) {
124+
if (sideInfo.getJoinType() == JoinType.LEFT) {
125+
Row data = fillData(input.row(), null);
126+
out.collect(new CRow(data, input.change()));
126127
}
127128
return;
128129
}
129130
refData.put(sideInfo.getEqualFieldList().get(i), equalObj);
130131
}
131132

132-
String rowKeyStr = ((HbaseAllSideInfo)sideInfo).getRowKeyBuilder().getRowKey(refData);
133+
String rowKeyStr = ((HbaseAllSideInfo) sideInfo).getRowKeyBuilder().getRowKey(refData);
133134

134135
Map<String, Object> cacheList = null;
135136

136137
SideTableInfo sideTableInfo = sideInfo.getSideTableInfo();
137138
HbaseSideTableInfo hbaseSideTableInfo = (HbaseSideTableInfo) sideTableInfo;
138-
if (hbaseSideTableInfo.isPreRowKey())
139-
{
140-
for (Map.Entry<String, Map<String, Object>> entry : cacheRef.get().entrySet()){
141-
if (entry.getKey().startsWith(rowKeyStr))
142-
{
139+
if (hbaseSideTableInfo.isPreRowKey()) {
140+
for (Map.Entry<String, Map<String, Object>> entry : cacheRef.get().entrySet()) {
141+
if (entry.getKey().startsWith(rowKeyStr)) {
143142
cacheList = cacheRef.get().get(entry.getKey());
144-
Row row = fillData(value, cacheList);
145-
out.collect(row);
143+
Row row = fillData(input.row(), cacheList);
144+
out.collect(new CRow(row, input.change()));
146145
}
147146
}
148147
} else {
149148
cacheList = cacheRef.get().get(rowKeyStr);
150-
Row row = fillData(value, cacheList);
151-
out.collect(row);
149+
Row row = fillData(input.row(), cacheList);
150+
out.collect(new CRow(row, input.change()));
152151
}
153152

154153
}

kudu/kudu-side/kudu-all-side/src/main/java/com/dtstack/flink/sql/side/kudu/KuduAllReqRow.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import org.apache.flink.api.java.typeutils.RowTypeInfo;
1313
import com.google.common.collect.Lists;
1414
import com.google.common.collect.Maps;
15+
import org.apache.flink.table.runtime.types.CRow;
1516
import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo;
1617
import org.apache.flink.types.Row;
1718
import org.apache.flink.util.Collector;
@@ -110,10 +111,10 @@ protected void reloadCache() {
110111

111112

112113
@Override
113-
public void flatMap(Row value, Collector<Row> out) throws Exception {
114+
public void flatMap(CRow input, Collector<CRow> out) throws Exception {
114115
List<Object> inputParams = Lists.newArrayList();
115116
for (Integer conValIndex : sideInfo.getEqualValIndex()) {
116-
Object equalObj = value.getField(conValIndex);
117+
Object equalObj = input.row().getField(conValIndex);
117118
if (equalObj == null) {
118119
out.collect(null);
119120
}
@@ -124,14 +125,14 @@ public void flatMap(Row value, Collector<Row> out) throws Exception {
124125
List<Map<String, Object>> cacheList = cacheRef.get().get(key);
125126
if (CollectionUtils.isEmpty(cacheList)) {
126127
if (sideInfo.getJoinType() == JoinType.LEFT) {
127-
Row row = fillData(value, null);
128-
out.collect(row);
128+
Row row = fillData(input.row(), null);
129+
out.collect(new CRow(row, input.change()));
129130
}
130131
return;
131132
}
132133

133134
for (Map<String, Object> one : cacheList) {
134-
out.collect(fillData(value, one));
135+
out.collect(new CRow(fillData(input.row(), one), input.change()));
135136
}
136137
}
137138

mongo/mongo-side/mongo-all-side/src/main/java/com/dtstack/flink/sql/side/mongo/MongoAllReqRow.java

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import org.apache.flink.api.java.typeutils.RowTypeInfo;
4040
import com.google.common.collect.Lists;
4141
import com.google.common.collect.Maps;
42+
import org.apache.flink.table.runtime.types.CRow;
4243
import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo;
4344
import org.apache.flink.types.Row;
4445
import org.apache.flink.util.Collector;
@@ -128,14 +129,14 @@ protected void reloadCache() {
128129
}
129130

130131
@Override
131-
public void flatMap(Row value, Collector<Row> out) throws Exception {
132+
public void flatMap(CRow input, Collector<CRow> out) throws Exception {
132133
List<Object> inputParams = Lists.newArrayList();
133134
for (Integer conValIndex : sideInfo.getEqualValIndex()) {
134-
Object equalObj = value.getField(conValIndex);
135+
Object equalObj = input.row().getField(conValIndex);
135136
if (equalObj == null) {
136137
if(sideInfo.getJoinType() == JoinType.LEFT){
137-
Row data = fillData(value, null);
138-
out.collect(data);
138+
Row data = fillData(input.row(), null);
139+
out.collect(new CRow(data, input.change()));
139140
}
140141
return;
141142
}
@@ -147,8 +148,8 @@ public void flatMap(Row value, Collector<Row> out) throws Exception {
147148
List<Map<String, Object>> cacheList = cacheRef.get().get(key);
148149
if (CollectionUtils.isEmpty(cacheList)) {
149150
if (sideInfo.getJoinType() == JoinType.LEFT) {
150-
Row row = fillData(value, null);
151-
out.collect(row);
151+
Row row = fillData(input.row(), null);
152+
out.collect(new CRow(row, input.change()));
152153
} else {
153154
return;
154155
}
@@ -157,7 +158,7 @@ public void flatMap(Row value, Collector<Row> out) throws Exception {
157158
}
158159

159160
for (Map<String, Object> one : cacheList) {
160-
out.collect(fillData(value, one));
161+
out.collect(new CRow(fillData(input.row(), one), input.change()));
161162
}
162163
}
163164

rdb/rdb-side/src/main/java/com/dtstack/flink/sql/side/rdb/all/RdbAllReqRow.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.apache.commons.collections.CollectionUtils;
2727
import com.google.common.collect.Lists;
2828
import com.google.common.collect.Maps;
29+
import org.apache.flink.table.runtime.types.CRow;
2930
import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo;
3031
import org.apache.flink.types.Row;
3132
import org.apache.flink.util.Collector;
@@ -114,14 +115,14 @@ protected void reloadCache() {
114115

115116

116117
@Override
117-
public void flatMap(Row value, Collector<Row> out) throws Exception {
118+
public void flatMap(CRow value, Collector<CRow> out) throws Exception {
118119
List<Object> inputParams = Lists.newArrayList();
119120
for (Integer conValIndex : sideInfo.getEqualValIndex()) {
120-
Object equalObj = value.getField(conValIndex);
121+
Object equalObj = value.row().getField(conValIndex);
121122
if (equalObj == null) {
122123
if (sideInfo.getJoinType() == JoinType.LEFT) {
123-
Row row = fillData(value, null);
124-
out.collect(row);
124+
Row row = fillData(value.row(), null);
125+
out.collect(new CRow(row, value.change()));
125126
}
126127
return;
127128
}
@@ -132,8 +133,8 @@ public void flatMap(Row value, Collector<Row> out) throws Exception {
132133
List<Map<String, Object>> cacheList = cacheRef.get().get(key);
133134
if (CollectionUtils.isEmpty(cacheList)) {
134135
if (sideInfo.getJoinType() == JoinType.LEFT) {
135-
Row row = fillData(value, null);
136-
out.collect(row);
136+
Row row = fillData(value.row(), null);
137+
out.collect(new CRow(row, value.change()));
137138
} else {
138139
return;
139140
}
@@ -142,9 +143,8 @@ public void flatMap(Row value, Collector<Row> out) throws Exception {
142143
}
143144

144145
for (Map<String, Object> one : cacheList) {
145-
out.collect(fillData(value, one));
146+
out.collect(new CRow(fillData(value.row(), one), value.change()));
146147
}
147-
148148
}
149149

150150
private String buildKey(List<Object> equalValList) {

redis5/redis5-side/redis-all-side/src/main/java/com/dtstack/flink/sql/side/redis/RedisAllReqRow.java

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
2727
import org.apache.flink.api.java.typeutils.RowTypeInfo;
2828
import com.google.common.collect.Maps;
29+
import org.apache.flink.table.runtime.types.CRow;
2930
import org.apache.flink.types.Row;
3031
import org.apache.flink.util.Collector;
3132
import org.slf4j.Logger;
@@ -89,14 +90,14 @@ protected void reloadCache() {
8990
}
9091

9192
@Override
92-
public void flatMap(Row row, Collector<Row> out) throws Exception {
93+
public void flatMap(CRow input, Collector<CRow> out) throws Exception {
9394
Map<String, String> inputParams = Maps.newHashMap();
9495
for(Integer conValIndex : sideInfo.getEqualValIndex()){
95-
Object equalObj = row.getField(conValIndex);
96+
Object equalObj = input.row().getField(conValIndex);
9697
if(equalObj == null){
97-
if(sideInfo.getJoinType() == JoinType.LEFT){
98-
Row data = fillData(row, null);
99-
out.collect(data);
98+
if (sideInfo.getJoinType() == JoinType.LEFT) {
99+
Row data = fillData(input.row(), null);
100+
out.collect(new CRow(data, input.change()));
100101
}
101102
return;
102103
}
@@ -109,17 +110,17 @@ public void flatMap(Row row, Collector<Row> out) throws Exception {
109110

110111
if (cacheMap == null){
111112
if(sideInfo.getJoinType() == JoinType.LEFT){
112-
Row data = fillData(row, null);
113-
out.collect(data);
113+
Row data = fillData(input.row(), null);
114+
out.collect(new CRow(data, input.change()));
114115
}else{
115116
return;
116117
}
117118

118119
return;
119120
}
120121

121-
Row newRow = fillData(row, cacheMap);
122-
out.collect(newRow);
122+
Row newRow = fillData(input.row(), cacheMap);
123+
out.collect(new CRow(newRow, input.change()));
123124
}
124125

125126
private String buildKey(Map<String, String> inputParams) {

0 commit comments

Comments
 (0)