Skip to content

Commit 54d55b3

Browse files
author
Jennifer Baldwin
committed
fix tests
1 parent 9aff0a1 commit 54d55b3

File tree

1 file changed

+79
-15
lines changed

1 file changed

+79
-15
lines changed

flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestMetadataTableReadableMetrics.java

Lines changed: 79 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import java.nio.file.Path;
2929
import java.util.Base64;
3030
import java.util.List;
31+
import java.util.Map;
3132
import org.apache.flink.configuration.Configuration;
3233
import org.apache.flink.configuration.CoreOptions;
3334
import org.apache.flink.table.api.TableEnvironment;
@@ -137,7 +138,7 @@ private Table createPrimitiveTable() throws IOException {
137138
return table;
138139
}
139140

140-
private void createNestedTable() throws IOException {
141+
private Table createNestedTable() throws IOException {
141142
Table table =
142143
validationCatalog.createTable(
143144
TableIdentifier.of(DATABASE, TABLE_NAME),
@@ -154,6 +155,7 @@ private void createNestedTable() throws IOException {
154155
File testFile = File.createTempFile("junit", null, temp.toFile());
155156
DataFile dataFile = FileHelpers.writeDataFile(table, Files.localOutput(testFile), records);
156157
table.newAppend().appendFile(dataFile).commit();
158+
return table;
157159
}
158160

159161
@BeforeEach
@@ -212,32 +214,88 @@ protected Object[] row(Object... values) {
212214

213215
@TestTemplate
214216
public void testPrimitiveColumns() throws Exception {
215-
createPrimitiveTable();
217+
Table table = createPrimitiveTable();
216218
List<Row> result = sql("SELECT readable_metrics FROM %s$files", TABLE_NAME);
217219

220+
// With new releases of Parquet, new features might be added which cause the
221+
// size of the column to increase. For example, with Parquet 1.14.x the
222+
// uncompressed size has been added to allow for better allocation of memory upfront.
223+
// Therefore, we look the sizes up, rather than hardcoding them
224+
DataFile dataFile = table.currentSnapshot().addedDataFiles(table.io()).iterator().next();
225+
Map<Integer, Long> columnSizeStats = dataFile.columnSizes();
226+
218227
Row binaryCol =
219228
Row.of(
220-
52L,
229+
columnSizeStats.get(PRIMITIVE_SCHEMA.findField("binaryCol").fieldId()),
221230
4L,
222231
2L,
223232
null,
224233
Base64.getDecoder().decode("1111"),
225234
Base64.getDecoder().decode("2222"));
226-
Row booleanCol = Row.of(32L, 4L, 0L, null, false, true);
227-
Row decimalCol = Row.of(85L, 4L, 1L, null, new BigDecimal("1.00"), new BigDecimal("2.00"));
228-
Row doubleCol = Row.of(85L, 4L, 0L, 1L, 1.0D, 2.0D);
235+
Row booleanCol =
236+
Row.of(
237+
columnSizeStats.get(PRIMITIVE_SCHEMA.findField("booleanCol").fieldId()),
238+
4L,
239+
0L,
240+
null,
241+
false,
242+
true);
243+
Row decimalCol =
244+
Row.of(
245+
columnSizeStats.get(PRIMITIVE_SCHEMA.findField("decimalCol").fieldId()),
246+
4L,
247+
1L,
248+
null,
249+
new BigDecimal("1.00"),
250+
new BigDecimal("2.00"));
251+
Row doubleCol =
252+
Row.of(
253+
columnSizeStats.get(PRIMITIVE_SCHEMA.findField("doubleCol").fieldId()),
254+
4L,
255+
0L,
256+
1L,
257+
1.0D,
258+
2.0D);
229259
Row fixedCol =
230260
Row.of(
231-
44L,
261+
columnSizeStats.get(PRIMITIVE_SCHEMA.findField("fixedCol").fieldId()),
232262
4L,
233263
2L,
234264
null,
235265
Base64.getDecoder().decode("1111"),
236266
Base64.getDecoder().decode("2222"));
237-
Row floatCol = Row.of(71L, 4L, 0L, 2L, 0f, 0f);
238-
Row intCol = Row.of(71L, 4L, 0L, null, 1, 2);
239-
Row longCol = Row.of(79L, 4L, 0L, null, 1L, 2L);
240-
Row stringCol = Row.of(79L, 4L, 0L, null, "1", "2");
267+
Row floatCol =
268+
Row.of(
269+
columnSizeStats.get(PRIMITIVE_SCHEMA.findField("floatCol").fieldId()),
270+
4L,
271+
0L,
272+
2L,
273+
0f,
274+
0f);
275+
Row intCol =
276+
Row.of(
277+
columnSizeStats.get(PRIMITIVE_SCHEMA.findField("intCol").fieldId()),
278+
4L,
279+
0L,
280+
null,
281+
1,
282+
2);
283+
Row longCol =
284+
Row.of(
285+
columnSizeStats.get(PRIMITIVE_SCHEMA.findField("longCol").fieldId()),
286+
4L,
287+
0L,
288+
null,
289+
1L,
290+
2L);
291+
Row stringCol =
292+
Row.of(
293+
columnSizeStats.get(PRIMITIVE_SCHEMA.findField("stringCol").fieldId()),
294+
4L,
295+
0L,
296+
null,
297+
"1",
298+
"2");
241299

242300
List<Row> expected =
243301
Lists.newArrayList(
@@ -288,12 +346,18 @@ public void testSelectNestedValues() throws Exception {
288346
@TestTemplate
289347
public void testNestedValues() throws Exception {
290348
createNestedTable();
349+
List<Row> result = sql("SELECT readable_metrics FROM %s$files", TABLE_NAME);
350+
351+
// We have to take a slightly different approach, since we don't store
352+
// the column sizes for nested fields.
353+
long leafDoubleColSize =
354+
(long) ((Row) ((Row) result.get(0).getField(0)).getField(0)).getField(0);
355+
long leafLongColSize = (long) ((Row) ((Row) result.get(0).getField(0)).getField(1)).getField(0);
291356

292-
Row leafDoubleCol = Row.of(46L, 3L, 1L, 1L, 0.0D, 0.0D);
293-
Row leafLongCol = Row.of(54L, 3L, 1L, null, 0L, 1L);
357+
Row leafDoubleCol = Row.of(leafDoubleColSize, 3L, 1L, 1L, 0.0D, 0.0D);
358+
Row leafLongCol = Row.of(leafLongColSize, 3L, 1L, null, 0L, 1L);
294359
Row metrics = Row.of(Row.of(leafDoubleCol, leafLongCol));
295360

296-
TestHelpers.assertRows(
297-
sql("SELECT readable_metrics FROM %s$files", TABLE_NAME), ImmutableList.of(metrics));
361+
TestHelpers.assertRows(result, ImmutableList.of(metrics));
298362
}
299363
}

0 commit comments

Comments
 (0)