38
38
import com .google .common .collect .Maps ;
39
39
import org .apache .flink .configuration .Configuration ;
40
40
import org .apache .flink .streaming .api .functions .async .ResultFuture ;
41
+ import org .apache .flink .table .runtime .types .CRow ;
41
42
import org .apache .flink .types .Row ;
42
43
43
44
import java .util .Collections ;
@@ -120,14 +121,14 @@ public Row fillData(Row input, Object sideInput) {
120
121
}
121
122
122
123
@ Override
123
- public void asyncInvoke (Row input , ResultFuture <Row > resultFuture ) throws Exception {
124
- Row inputRow = Row . copy ( input );
124
+ public void asyncInvoke (CRow input , ResultFuture <CRow > resultFuture ) throws Exception {
125
+ CRow inputCopy = new CRow ( input . row (), input . change () );
125
126
Map <String , Object > refData = Maps .newHashMap ();
126
127
for (int i = 0 ; i < sideInfo .getEqualValIndex ().size (); i ++) {
127
128
Integer conValIndex = sideInfo .getEqualValIndex ().get (i );
128
- Object equalObj = inputRow .getField (conValIndex );
129
+ Object equalObj = input . row () .getField (conValIndex );
129
130
if (equalObj == null ){
130
- dealMissKey (inputRow , resultFuture );
131
+ dealMissKey (inputCopy , resultFuture );
131
132
return ;
132
133
}
133
134
refData .put (sideInfo .getEqualFieldList ().get (i ), equalObj );
@@ -141,14 +142,14 @@ public void asyncInvoke(Row input, ResultFuture<Row> resultFuture) throws Except
141
142
CacheObj val = getFromCache (key );
142
143
if (val != null ){
143
144
if (ECacheContentType .MissVal == val .getType ()){
144
- dealMissKey (inputRow , resultFuture );
145
+ dealMissKey (inputCopy , resultFuture );
145
146
return ;
146
147
}else if (ECacheContentType .MultiLine == val .getType ()){
147
148
try {
148
- Row row = fillData (inputRow , val .getContent ());
149
- resultFuture .complete (Collections .singleton (row ));
149
+ Row row = fillData (input . row () , val .getContent ());
150
+ resultFuture .complete (Collections .singleton (new CRow ( row , inputCopy . change ()) ));
150
151
} catch (Exception e ) {
151
- dealFillDataError (resultFuture , e , inputRow );
152
+ dealFillDataError (resultFuture , e , inputCopy );
152
153
}
153
154
}else {
154
155
RuntimeException exception = new RuntimeException ("not support cache obj type " + val .getType ());
@@ -164,14 +165,14 @@ public void asyncInvoke(Row input, ResultFuture<Row> resultFuture) throws Except
164
165
public void accept (Map <String , String > values ) {
165
166
if (MapUtils .isNotEmpty (values )) {
166
167
try {
167
- Row row = fillData (inputRow , values );
168
+ Row row = fillData (input . row () , values );
168
169
dealCacheData (key ,CacheObj .buildCacheObj (ECacheContentType .MultiLine , values ));
169
- resultFuture .complete (Collections .singleton (row ));
170
+ resultFuture .complete (Collections .singleton (new CRow ( row , inputCopy . change ()) ));
170
171
} catch (Exception e ) {
171
- dealFillDataError (resultFuture , e , inputRow );
172
+ dealFillDataError (resultFuture , e , inputCopy );
172
173
}
173
174
} else {
174
- dealMissKey (inputRow , resultFuture );
175
+ dealMissKey (inputCopy , resultFuture );
175
176
dealCacheData (key ,CacheMissVal .getMissKeyObj ());
176
177
}
177
178
}
0 commit comments