2828import java .nio .file .Path ;
2929import java .util .Base64 ;
3030import java .util .List ;
31+ import java .util .Map ;
3132import org .apache .flink .configuration .Configuration ;
3233import org .apache .flink .configuration .CoreOptions ;
3334import org .apache .flink .table .api .TableEnvironment ;
@@ -137,7 +138,7 @@ private Table createPrimitiveTable() throws IOException {
137138 return table ;
138139 }
139140
140- private void createNestedTable () throws IOException {
141+ private Table createNestedTable () throws IOException {
141142 Table table =
142143 validationCatalog .createTable (
143144 TableIdentifier .of (DATABASE , TABLE_NAME ),
@@ -154,6 +155,7 @@ private void createNestedTable() throws IOException {
154155 File testFile = File .createTempFile ("junit" , null , temp .toFile ());
155156 DataFile dataFile = FileHelpers .writeDataFile (table , Files .localOutput (testFile ), records );
156157 table .newAppend ().appendFile (dataFile ).commit ();
158+ return table ;
157159 }
158160
159161 @ BeforeEach
@@ -212,32 +214,88 @@ protected Object[] row(Object... values) {
212214
213215 @ TestTemplate
214216 public void testPrimitiveColumns () throws Exception {
215- createPrimitiveTable ();
217+ Table table = createPrimitiveTable ();
216218 List <Row > result = sql ("SELECT readable_metrics FROM %s$files" , TABLE_NAME );
217219
220+ // With new releases of Parquet, new features might be added which cause the
221+ // size of the column to increase. For example, with Parquet 1.14.x the
222+ // uncompressed size has been added to allow for better allocation of memory upfront.
223+ // Therefore, we look the sizes up, rather than hardcoding them
224+ DataFile dataFile = table .currentSnapshot ().addedDataFiles (table .io ()).iterator ().next ();
225+ Map <Integer , Long > columnSizeStats = dataFile .columnSizes ();
226+
218227 Row binaryCol =
219228 Row .of (
220- 52L ,
229+ columnSizeStats . get ( PRIMITIVE_SCHEMA . findField ( "binaryCol" ). fieldId ()) ,
221230 4L ,
222231 2L ,
223232 null ,
224233 Base64 .getDecoder ().decode ("1111" ),
225234 Base64 .getDecoder ().decode ("2222" ));
226- Row booleanCol = Row .of (32L , 4L , 0L , null , false , true );
227- Row decimalCol = Row .of (85L , 4L , 1L , null , new BigDecimal ("1.00" ), new BigDecimal ("2.00" ));
228- Row doubleCol = Row .of (85L , 4L , 0L , 1L , 1.0D , 2.0D );
235+ Row booleanCol =
236+ Row .of (
237+ columnSizeStats .get (PRIMITIVE_SCHEMA .findField ("booleanCol" ).fieldId ()),
238+ 4L ,
239+ 0L ,
240+ null ,
241+ false ,
242+ true );
243+ Row decimalCol =
244+ Row .of (
245+ columnSizeStats .get (PRIMITIVE_SCHEMA .findField ("decimalCol" ).fieldId ()),
246+ 4L ,
247+ 1L ,
248+ null ,
249+ new BigDecimal ("1.00" ),
250+ new BigDecimal ("2.00" ));
251+ Row doubleCol =
252+ Row .of (
253+ columnSizeStats .get (PRIMITIVE_SCHEMA .findField ("doubleCol" ).fieldId ()),
254+ 4L ,
255+ 0L ,
256+ 1L ,
257+ 1.0D ,
258+ 2.0D );
229259 Row fixedCol =
230260 Row .of (
231- 44L ,
261+ columnSizeStats . get ( PRIMITIVE_SCHEMA . findField ( "fixedCol" ). fieldId ()) ,
232262 4L ,
233263 2L ,
234264 null ,
235265 Base64 .getDecoder ().decode ("1111" ),
236266 Base64 .getDecoder ().decode ("2222" ));
237- Row floatCol = Row .of (71L , 4L , 0L , 2L , 0f , 0f );
238- Row intCol = Row .of (71L , 4L , 0L , null , 1 , 2 );
239- Row longCol = Row .of (79L , 4L , 0L , null , 1L , 2L );
240- Row stringCol = Row .of (79L , 4L , 0L , null , "1" , "2" );
267+ Row floatCol =
268+ Row .of (
269+ columnSizeStats .get (PRIMITIVE_SCHEMA .findField ("floatCol" ).fieldId ()),
270+ 4L ,
271+ 0L ,
272+ 2L ,
273+ 0f ,
274+ 0f );
275+ Row intCol =
276+ Row .of (
277+ columnSizeStats .get (PRIMITIVE_SCHEMA .findField ("intCol" ).fieldId ()),
278+ 4L ,
279+ 0L ,
280+ null ,
281+ 1 ,
282+ 2 );
283+ Row longCol =
284+ Row .of (
285+ columnSizeStats .get (PRIMITIVE_SCHEMA .findField ("longCol" ).fieldId ()),
286+ 4L ,
287+ 0L ,
288+ null ,
289+ 1L ,
290+ 2L );
291+ Row stringCol =
292+ Row .of (
293+ columnSizeStats .get (PRIMITIVE_SCHEMA .findField ("stringCol" ).fieldId ()),
294+ 4L ,
295+ 0L ,
296+ null ,
297+ "1" ,
298+ "2" );
241299
242300 List <Row > expected =
243301 Lists .newArrayList (
@@ -288,12 +346,18 @@ public void testSelectNestedValues() throws Exception {
288346 @ TestTemplate
289347 public void testNestedValues () throws Exception {
290348 createNestedTable ();
349+ List <Row > result = sql ("SELECT readable_metrics FROM %s$files" , TABLE_NAME );
350+
351+ // We have to take a slightly different approach, since we don't store
352+ // the column sizes for nested fields.
353+ long leafDoubleColSize =
354+ (long ) ((Row ) ((Row ) result .get (0 ).getField (0 )).getField (0 )).getField (0 );
355+ long leafLongColSize = (long ) ((Row ) ((Row ) result .get (0 ).getField (0 )).getField (1 )).getField (0 );
291356
292- Row leafDoubleCol = Row .of (46L , 3L , 1L , 1L , 0.0D , 0.0D );
293- Row leafLongCol = Row .of (54L , 3L , 1L , null , 0L , 1L );
357+ Row leafDoubleCol = Row .of (leafDoubleColSize , 3L , 1L , 1L , 0.0D , 0.0D );
358+ Row leafLongCol = Row .of (leafLongColSize , 3L , 1L , null , 0L , 1L );
294359 Row metrics = Row .of (Row .of (leafDoubleCol , leafLongCol ));
295360
296- TestHelpers .assertRows (
297- sql ("SELECT readable_metrics FROM %s$files" , TABLE_NAME ), ImmutableList .of (metrics ));
361+ TestHelpers .assertRows (result , ImmutableList .of (metrics ));
298362 }
299363}
0 commit comments