Skip to content

Commit f5369d6

Browse files
author
Ashok Vengala
committed
OTF-4251: Code changes to ingore removed partitions during latest schema validation
1 parent f893149 commit f5369d6

File tree

5 files changed

+58
-6
lines changed

5 files changed

+58
-6
lines changed

api/src/main/java/org/apache/iceberg/PartitionSpec.java

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,12 @@ public StructType partitionType() {
131131
for (PartitionField field : fields) {
132132
Type sourceType = schema.findType(field.sourceId());
133133
Type resultType = field.transform().getResultType(sourceType);
134+
135+
// When the source field has been dropped we cannot determine the type
136+
if (sourceType == null) {
137+
resultType = Types.UnknownType.get();
138+
}
139+
134140
structFields.add(Types.NestedField.optional(field.fieldId(), field.name(), resultType));
135141
}
136142

@@ -613,8 +619,12 @@ Builder add(int sourceId, int fieldId, String name, Transform<?, ?> transform) {
613619
}
614620

615621
public PartitionSpec build() {
622+
return build(false);
623+
}
624+
625+
public PartitionSpec build(boolean allowMissingFields) {
616626
PartitionSpec spec = buildUnchecked();
617-
checkCompatibility(spec, schema);
627+
checkCompatibility(spec, schema, allowMissingFields);
618628
return spec;
619629
}
620630

@@ -624,9 +634,17 @@ PartitionSpec buildUnchecked() {
624634
}
625635

626636
static void checkCompatibility(PartitionSpec spec, Schema schema) {
637+
checkCompatibility(spec, schema, false);
638+
}
639+
640+
static void checkCompatibility(PartitionSpec spec, Schema schema, boolean allowMissingFields) {
627641
for (PartitionField field : spec.fields) {
628642
Type sourceType = schema.findType(field.sourceId());
629643
Transform<?, ?> transform = field.transform();
644+
// In the case the underlying field is dropped, we cannot check if they are compatible
645+
if (allowMissingFields && sourceType == null) {
646+
continue;
647+
}
630648
// In the case of a Version 1 partition-spec field gets deleted,
631649
// it is replaced with a void transform, see:
632650
// https://iceberg.apache.org/spec/#partition-transforms

api/src/main/java/org/apache/iceberg/UnboundPartitionSpec.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,10 @@ public PartitionSpec bind(Schema schema) {
4646
return copyToBuilder(schema).build();
4747
}
4848

49+
public PartitionSpec bind(Schema schema, boolean ignoreMissingFields) {
50+
return copyToBuilder(schema).build(ignoreMissingFields);
51+
}
52+
4953
PartitionSpec bindUnchecked(Schema schema) {
5054
return copyToBuilder(schema).buildUnchecked();
5155
}

core/src/main/java/org/apache/iceberg/PartitionSpecParser.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ public static String toJson(UnboundPartitionSpec spec, boolean pretty) {
6868
}
6969

7070
public static PartitionSpec fromJson(Schema schema, JsonNode json) {
71-
return fromJson(json).bind(schema);
71+
return fromJson(json).bind(schema, true);
7272
}
7373

7474
public static UnboundPartitionSpec fromJson(JsonNode json) {

core/src/main/java/org/apache/iceberg/Partitioning.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -239,7 +239,8 @@ public static StructType groupingKeyType(Schema schema, Collection<PartitionSpec
239239
*/
240240
public static StructType partitionType(Table table) {
241241
Collection<PartitionSpec> specs = table.specs().values();
242-
return buildPartitionProjectionType("table partition", specs, allFieldIds(specs));
242+
return buildPartitionProjectionType(
243+
"table partition", specs, allActiveFieldIds(table.schema(), specs));
243244
}
244245

245246
/**
@@ -346,10 +347,11 @@ private static boolean compatibleTransforms(Transform<?, ?> t1, Transform<?, ?>
346347
|| t2.equals(Transforms.alwaysNull());
347348
}
348349

349-
// collects IDs of all partition field used across specs
350-
private static Set<Integer> allFieldIds(Collection<PartitionSpec> specs) {
350+
// collects IDs of all partition field used across specs that are in the current schema
351+
private static Set<Integer> allActiveFieldIds(Schema schema, Collection<PartitionSpec> specs) {
351352
return FluentIterable.from(specs)
352353
.transformAndConcat(PartitionSpec::fields)
354+
.filter(field -> schema.findField(field.sourceId()) != null)
353355
.transform(PartitionField::fieldId)
354356
.toSet();
355357
}

core/src/test/java/org/apache/iceberg/TestPartitioning.java

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,7 @@ public void testPartitionTypeWithIncompatibleSpecEvolution() {
172172

173173
PartitionSpec newSpec = PartitionSpec.builderFor(table.schema()).identity("category").build();
174174

175-
TableOperations ops = ((HasTableOperations) table).operations();
175+
TableOperations ops = table.operations();
176176
TableMetadata current = ops.current();
177177
ops.commit(current, current.updatePartitionSpec(newSpec));
178178

@@ -183,6 +183,34 @@ public void testPartitionTypeWithIncompatibleSpecEvolution() {
183183
.hasMessageStartingWith("Conflicting partition fields");
184184
}
185185

186+
@Test
187+
public void testPartitionTypeIgnoreInactiveFields() {
188+
TestTables.TestTable table =
189+
TestTables.create(
190+
tableDir, "test", SCHEMA, BY_DATA_CATEGORY_BUCKET_SPEC, V2_FORMAT_VERSION);
191+
192+
StructType actualType = Partitioning.partitionType(table);
193+
assertThat(actualType)
194+
.isEqualTo(
195+
StructType.of(
196+
NestedField.optional(1000, "data", Types.StringType.get()),
197+
NestedField.optional(1001, "category_bucket", Types.IntegerType.get())));
198+
199+
// Create a new spec, and drop the field of the old spec
200+
table.updateSpec().removeField("category_bucket").commit();
201+
table.updateSchema().deleteColumn("category").commit();
202+
203+
actualType = Partitioning.partitionType(table);
204+
assertThat(actualType)
205+
.isEqualTo(StructType.of(NestedField.optional(1000, "data", Types.StringType.get())));
206+
207+
table.updateSpec().removeField("data").commit();
208+
table.updateSchema().deleteColumn("data").commit();
209+
210+
actualType = Partitioning.partitionType(table);
211+
assertThat(actualType).isEqualTo(StructType.of());
212+
}
213+
186214
@Test
187215
public void testGroupingKeyTypeWithSpecEvolutionInV1Tables() {
188216
TestTables.TestTable table =

0 commit comments

Comments
 (0)