1010import com .dtstack .flink .sql .side .cache .CacheObj ;
1111import com .dtstack .flink .sql .side .kudu .table .KuduSideTableInfo ;
1212import com .dtstack .flink .sql .side .kudu .utils .KuduUtil ;
13+ import com .dtstack .flink .sql .util .DateUtil ;
1314import com .dtstack .flink .sql .util .KrbUtils ;
1415import com .dtstack .flink .sql .util .RowDataComplete ;
1516import com .google .common .collect .Lists ;
3839import org .slf4j .LoggerFactory ;
3940
4041import java .io .IOException ;
42+ import java .math .BigDecimal ;
4143import java .security .PrivilegedAction ;
44+ import java .sql .Timestamp ;
45+ import java .time .Instant ;
4246import java .util .Arrays ;
4347import java .util .Collections ;
4448import java .util .List ;
@@ -91,7 +95,7 @@ private void connKuDu() throws IOException {
9195 throw new IllegalArgumentException ("Table Open Failed , please check table exists" );
9296 }
9397 table = asyncClient .syncClient ().openTable (tableName );
94- LOG .info ("connect kudu is successed !" );
98+ LOG .info ("connect kudu is succeed !" );
9599 }
96100 scannerBuilder = asyncClient .newScannerBuilder (table );
97101 Integer batchSizeBytes = kuduSideTableInfo .getBatchSizeBytes ();
@@ -139,12 +143,7 @@ private AsyncKuduClient getClient() throws IOException {
139143 kuduSideTableInfo .getKrb5conf ()
140144 );
141145 return ugi .doAs (
142- new PrivilegedAction <AsyncKuduClient >() {
143- @ Override
144- public AsyncKuduClient run () {
145- return asyncKuduClientBuilder .build ();
146- }
147- });
146+ (PrivilegedAction <AsyncKuduClient >) asyncKuduClientBuilder ::build );
148147 } else {
149148 return asyncKuduClientBuilder .build ();
150149 }
@@ -160,19 +159,24 @@ public void handleAsyncInvoke(Map<String, Object> inputParams, BaseRow input, Re
160159 connKuDu ();
161160 Schema schema = table .getSchema ();
162161 // @wenbaoup fix bug
163- inputParams .entrySet ().forEach (e ->{
164- scannerBuilder .addPredicate (KuduPredicate .newInListPredicate (schema .getColumn (e .getKey ()), Collections .singletonList (e .getValue ())));
162+ inputParams .forEach ((key , value ) -> {
163+ Object transformValue = transformValue (value );
164+ scannerBuilder .addPredicate (
165+ KuduPredicate .newInListPredicate (
166+ schema .getColumn (key ),
167+ Collections .singletonList (transformValue )
168+ )
169+ );
165170 });
166171
167172 // 填充谓词信息
168173 List <PredicateInfo > predicateInfoes = sideInfo .getSideTableInfo ().getPredicateInfoes ();
169174 if (predicateInfoes .size () > 0 ) {
170- predicateInfoes .stream ().map (info -> {
175+ predicateInfoes .stream ().peek (info -> {
171176 KuduPredicate kuduPredicate = KuduUtil .buildKuduPredicate (schema , info );
172177 if (null != kuduPredicate ) {
173178 scannerBuilder .addPredicate (kuduPredicate );
174179 }
175- return info ;
176180 }).count ();
177181 }
178182
@@ -184,6 +188,42 @@ public void handleAsyncInvoke(Map<String, Object> inputParams, BaseRow input, Re
184188 data .addCallbackDeferring (new GetListRowCB (inputCopy , cacheContent , rowList , asyncKuduScanner , resultFuture , buildCacheKey (inputParams )));
185189 }
186190
191+ /**
192+ * 将value转化为Java 通用类型
193+ * @param value value
194+ * @return 类型转化的value
195+ */
196+ private Object transformValue (Object value ) {
197+ if (value == null ) {
198+ return null ;
199+ } else if (value instanceof Number && !(value instanceof BigDecimal )) {
200+ return value ;
201+ } else if (value instanceof Boolean ) {
202+ return value ;
203+ } else if (value instanceof String ) {
204+ return value ;
205+ } else if (value instanceof Character ) {
206+ return value ;
207+ } else if (value instanceof CharSequence ) {
208+ return value ;
209+ } else if (value instanceof Map ) {
210+ return value ;
211+ } else if (value instanceof List ) {
212+ return value ;
213+ } else if (value instanceof byte []) {
214+ return value ;
215+ } else if (value instanceof Instant ) {
216+ return value ;
217+ } else if (value instanceof Timestamp ) {
218+ value = DateUtil .timestampToString ((Timestamp ) value );
219+ } else if (value instanceof java .util .Date ) {
220+ value = DateUtil .dateToString ((java .sql .Date ) value );
221+ } else {
222+ value = value .toString ();
223+ }
224+ return value ;
225+ }
226+
187227 @ Override
188228 public String buildCacheKey (Map <String , Object > inputParams ) {
189229 StringBuilder sb = new StringBuilder ();
0 commit comments