From d18c2ee9c84df8adbd44c7521451629b90c21486 Mon Sep 17 00:00:00 2001 From: arnavb Date: Fri, 29 Aug 2025 04:14:19 +0000 Subject: [PATCH 1/3] update --- .../parquet/io/ValidatingRecordConsumer.java | 66 ++++ .../TestStrictUnsignedIntegerValidation.java | 311 ++++++++++++++++++ 2 files changed, 377 insertions(+) create mode 100644 parquet-hadoop/src/test/java/org/apache/parquet/hadoop/example/TestStrictUnsignedIntegerValidation.java diff --git a/parquet-column/src/main/java/org/apache/parquet/io/ValidatingRecordConsumer.java b/parquet-column/src/main/java/org/apache/parquet/io/ValidatingRecordConsumer.java index 5cc984182e..8335e3b80f 100644 --- a/parquet-column/src/main/java/org/apache/parquet/io/ValidatingRecordConsumer.java +++ b/parquet-column/src/main/java/org/apache/parquet/io/ValidatingRecordConsumer.java @@ -30,8 +30,10 @@ import java.util.ArrayDeque; import java.util.Arrays; import java.util.Deque; +import java.util.Optional; import org.apache.parquet.io.api.Binary; import org.apache.parquet.io.api.RecordConsumer; +import org.apache.parquet.schema.LogicalTypeAnnotation; import org.apache.parquet.schema.MessageType; import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName; import org.apache.parquet.schema.Type; @@ -202,6 +204,7 @@ private void validate(PrimitiveTypeName... ptypes) { @Override public void addInteger(int value) { validate(INT32); + validateUnsignedInteger(value); delegate.addInteger(value); } @@ -211,6 +214,7 @@ public void addInteger(int value) { @Override public void addLong(long value) { validate(INT64); + validateUnsignedLong(value); delegate.addLong(value); } @@ -249,4 +253,66 @@ public void addDouble(double value) { validate(DOUBLE); delegate.addDouble(value); } + + private void validateUnsignedInteger(int value) { + Type currentType = types.peek().asGroupType().getType(fields.peek()); + if (currentType != null && currentType.isPrimitive()) { + LogicalTypeAnnotation logicalType = currentType.asPrimitiveType().getLogicalTypeAnnotation(); + if (logicalType != null) { + logicalType.accept(new LogicalTypeAnnotation.LogicalTypeAnnotationVisitor() { + @Override + public Optional visit(LogicalTypeAnnotation.IntLogicalTypeAnnotation intType) { + if (!intType.isSigned()) { + switch (intType.getBitWidth()) { + case 8: + if (value < 0 || value > 255) { + throw new InvalidRecordException("Value " + value + + " is out of range for UINT_8 (0-255) in field " + + currentType.getName()); + } + break; + case 16: + if (value < 0 || value > 65535) { + throw new InvalidRecordException("Value " + value + + " is out of range for UINT_16 (0-65535) in field " + + currentType.getName()); + } + break; + case 32: + case 64: + if (value < 0) { + throw new InvalidRecordException("Negative value " + value + + " is not allowed for unsigned integer type " + + currentType.getName()); + } + break; + } + } + return Optional.empty(); + } + }); + } + } + } + + private void validateUnsignedLong(long value) { + Type currentType = types.peek().asGroupType().getType(fields.peek()); + if (currentType != null && currentType.isPrimitive()) { + LogicalTypeAnnotation logicalType = currentType.asPrimitiveType().getLogicalTypeAnnotation(); + if (logicalType != null) { + logicalType.accept(new LogicalTypeAnnotation.LogicalTypeAnnotationVisitor() { + @Override + public Optional visit(LogicalTypeAnnotation.IntLogicalTypeAnnotation intType) { + if (!intType.isSigned()) { + if (value < 0) { + throw new InvalidRecordException("Negative value " + value + + " is not allowed for unsigned integer type " + currentType.getName()); + } + } + return Optional.empty(); + } + }); + } + } + } } diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/example/TestStrictUnsignedIntegerValidation.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/example/TestStrictUnsignedIntegerValidation.java new file mode 100644 index 0000000000..fd477e741f --- /dev/null +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/example/TestStrictUnsignedIntegerValidation.java @@ -0,0 +1,311 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.hadoop.example; + +import static org.apache.parquet.schema.LogicalTypeAnnotation.intType; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY; +import static org.junit.Assert.assertThrows; +import org.apache.parquet.io.api.Binary; + +import java.io.File; +import java.io.IOException; +import org.apache.hadoop.fs.Path; +import org.apache.parquet.example.data.Group; +import org.apache.parquet.example.data.simple.SimpleGroupFactory; +import org.apache.parquet.hadoop.ParquetWriter; +import org.apache.parquet.io.InvalidRecordException; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.Types; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +/** + * Test for unsigned integer validation in ExampleParquetWriter. + */ +public class TestStrictUnsignedIntegerValidation { + + @Rule + public TemporaryFolder tempFolder = new TemporaryFolder(); + + @Test + public void testValidUnsignedIntegerValues() throws IOException { + MessageType schema = Types.buildMessage() + .required(INT32) + .as(intType(8, false)) + .named("uint8_field") + .required(INT32) + .as(intType(16, false)) + .named("uint16_field") + .required(INT32) + .as(intType(32, false)) + .named("uint32_field") + .required(INT64) + .as(intType(64, false)) + .named("uint64_field") + .named("test_schema"); + + File tempFile = new File(tempFolder.getRoot(), "valid_unsigned.parquet"); + Path outputPath = new Path(tempFile.getAbsolutePath()); + + try (ParquetWriter writer = ExampleParquetWriter.builder(outputPath) + .withType(schema) + .withValidation(true) + .build()) { + + SimpleGroupFactory groupFactory = new SimpleGroupFactory(schema); + + Group validGroup = groupFactory + .newGroup() + .append("uint8_field", 255) + .append("uint16_field", 65535) + .append("uint32_field", Integer.MAX_VALUE) + .append("uint64_field", Long.MAX_VALUE); + + writer.write(validGroup); + + Group zeroGroup = groupFactory + .newGroup() + .append("uint8_field", 0) + .append("uint16_field", 0) + .append("uint32_field", 0) + .append("uint64_field", 0L); + + writer.write(zeroGroup); + } + } + + @Test + public void testMaximumUnsignedIntegerValues() throws IOException { + MessageType schema = Types.buildMessage() + .required(INT32) + .as(intType(8, false)) + .named("uint8_field") + .required(INT32) + .as(intType(16, false)) + .named("uint16_field") + .required(INT32) + .as(intType(32, false)) + .named("uint32_field") + .required(INT64) + .as(intType(64, false)) + .named("uint64_field") + .named("test_schema"); + + File tempFile = new File(tempFolder.getRoot(), "max_unsigned.parquet"); + Path outputPath = new Path(tempFile.getAbsolutePath()); + + try (ParquetWriter writer = ExampleParquetWriter.builder(outputPath) + .withType(schema) + .withValidation(true) + .build()) { + + SimpleGroupFactory groupFactory = new SimpleGroupFactory(schema); + + Group maxGroup = groupFactory + .newGroup() + .append("uint8_field", 255) + .append("uint16_field", 65535) + .append("uint32_field", Integer.MAX_VALUE) + .append("uint64_field", Long.MAX_VALUE); + + writer.write(maxGroup); + } + } + + @Test + public void testInvalidUint8Values() throws IOException { + MessageType schema = Types.buildMessage() + .required(INT32) + .as(intType(8, false)) + .named("uint8_field") + .named("test_schema"); + + File tempFile = new File(tempFolder.getRoot(), "invalid_uint8.parquet"); + Path outputPath = new Path(tempFile.getAbsolutePath()); + + try (ParquetWriter writer = ExampleParquetWriter.builder(outputPath) + .withType(schema) + .withValidation(true) + .build()) { + + SimpleGroupFactory groupFactory = new SimpleGroupFactory(schema); + + Group invalidGroup = groupFactory.newGroup().append("uint8_field", -1); + assertThrows(InvalidRecordException.class, () -> { + writer.write(invalidGroup); + }); + } + } + + @Test + public void testInvalidUint16Values() throws IOException { + MessageType schema = Types.buildMessage() + .required(INT32) + .as(intType(16, false)) + .named("uint16_field") + .named("test_schema"); + + File tempFile = new File(tempFolder.getRoot(), "invalid_uint16.parquet"); + Path outputPath = new Path(tempFile.getAbsolutePath()); + + try (ParquetWriter writer = ExampleParquetWriter.builder(outputPath) + .withType(schema) + .withValidation(true) + .build()) { + + SimpleGroupFactory groupFactory = new SimpleGroupFactory(schema); + + Group invalidGroup = groupFactory.newGroup().append("uint16_field", -20); + assertThrows(InvalidRecordException.class, () -> { + writer.write(invalidGroup); + }); + } + } + + @Test + public void testInvalidUint32Values() throws IOException { + MessageType schema = Types.buildMessage() + .required(INT32) + .as(intType(32, false)) + .named("uint32_field") + .named("test_schema"); + + File tempFile = new File(tempFolder.getRoot(), "invalid_uint32.parquet"); + Path outputPath = new Path(tempFile.getAbsolutePath()); + + try (ParquetWriter writer = ExampleParquetWriter.builder(outputPath) + .withType(schema) + .withValidation(true) + .build()) { + + SimpleGroupFactory groupFactory = new SimpleGroupFactory(schema); + + Group invalidGroup = groupFactory.newGroup().append("uint32_field", -100); + assertThrows(InvalidRecordException.class, () -> { + writer.write(invalidGroup); + }); + } + } + + @Test + public void testInvalidUint64Values() throws IOException { + MessageType schema = Types.buildMessage() + .required(INT64) + .as(intType(64, false)) + .named("uint64_field") + .named("test_schema"); + + File tempFile = new File(tempFolder.getRoot(), "invalid_uint64.parquet"); + Path outputPath = new Path(tempFile.getAbsolutePath()); + + try (ParquetWriter writer = ExampleParquetWriter.builder(outputPath) + .withType(schema) + .withValidation(true) + .build()) { + + SimpleGroupFactory groupFactory = new SimpleGroupFactory(schema); + + Group invalidGroup = groupFactory.newGroup().append("uint64_field", -1000L); + assertThrows(InvalidRecordException.class, () -> { + writer.write(invalidGroup); + }); + } + } + + @Test + public void testValidationDisabledByDefault() throws IOException { + MessageType schema = Types.buildMessage() + .required(INT32) + .as(intType(8, false)) + .named("uint8_field") + .named("test_schema"); + + File tempFile = new File(tempFolder.getRoot(), "validation_disabled.parquet"); + Path outputPath = new Path(tempFile.getAbsolutePath()); + + try (ParquetWriter writer = + ExampleParquetWriter.builder(outputPath).withType(schema).build()) { + + SimpleGroupFactory groupFactory = new SimpleGroupFactory(schema); + + Group invalidGroup = groupFactory.newGroup().append("uint8_field", -5); + writer.write(invalidGroup); + } + } + + @Test + public void testValidationCanBeExplicitlyDisabled() throws IOException { + MessageType schema = Types.buildMessage() + .required(INT32) + .as(intType(8, false)) + .named("uint8_field") + .named("test_schema"); + + File tempFile = new File(tempFolder.getRoot(), "validation_explicit_disabled.parquet"); + Path outputPath = new Path(tempFile.getAbsolutePath()); + + try (ParquetWriter writer = ExampleParquetWriter.builder(outputPath) + .withType(schema) + .withValidation(false) + .build()) { + + SimpleGroupFactory groupFactory = new SimpleGroupFactory(schema); + + Group invalidGroup = groupFactory.newGroup().append("uint8_field", -10); + writer.write(invalidGroup); + } + } + + @Test + public void testBasicValidation() throws IOException { + MessageType schema = Types.buildMessage() + .required(INT32) + .named("int32_field") + .named("test_schema"); + + File tempFile = new File(tempFolder.getRoot(), "basic_validation.parquet"); + Path outputPath = new Path(tempFile.getAbsolutePath()); + + try (ParquetWriter writer = ExampleParquetWriter.builder(outputPath) + .withType(schema) + .withValidation(true) + .build()) { + + SimpleGroupFactory groupFactory = new SimpleGroupFactory(schema); + + Group validGroup = groupFactory.newGroup().append("int32_field", 42); + writer.write(validGroup); + + MessageType stringSchema = Types.buildMessage() + .required(BINARY) + .named("int32_field") + .named("test_schema"); + + SimpleGroupFactory stringGroupFactory = new SimpleGroupFactory(stringSchema); + Group invalidGroup = stringGroupFactory.newGroup().append("int32_field", Binary.fromString("not_an_int")); + + assertThrows(InvalidRecordException.class, () -> { + writer.write(invalidGroup); + }); + } + } +} From b774c190195943c48f3c420d35cbef847fdc5b53 Mon Sep 17 00:00:00 2001 From: arnavb Date: Fri, 29 Aug 2025 04:16:51 +0000 Subject: [PATCH 2/3] lint --- .../parquet/io/ValidatingRecordConsumer.java | 11 +++++++---- .../TestStrictUnsignedIntegerValidation.java | 16 ++++++---------- 2 files changed, 13 insertions(+), 14 deletions(-) diff --git a/parquet-column/src/main/java/org/apache/parquet/io/ValidatingRecordConsumer.java b/parquet-column/src/main/java/org/apache/parquet/io/ValidatingRecordConsumer.java index 8335e3b80f..0d8a8c85ae 100644 --- a/parquet-column/src/main/java/org/apache/parquet/io/ValidatingRecordConsumer.java +++ b/parquet-column/src/main/java/org/apache/parquet/io/ValidatingRecordConsumer.java @@ -48,6 +48,9 @@ public class ValidatingRecordConsumer extends RecordConsumer { private static final Logger LOG = LoggerFactory.getLogger(ValidatingRecordConsumer.class); + private static final int UINT_8_MAX_VALUE = 255; + private static final int UINT_16_MAX_VALUE = 65535; + private final RecordConsumer delegate; private Deque types = new ArrayDeque<>(); @@ -265,16 +268,16 @@ public Optional visit(LogicalTypeAnnotation.IntLogicalTypeAnnotation intTy if (!intType.isSigned()) { switch (intType.getBitWidth()) { case 8: - if (value < 0 || value > 255) { + if (value < 0 || value > UINT_8_MAX_VALUE) { throw new InvalidRecordException("Value " + value - + " is out of range for UINT_8 (0-255) in field " + + " is out of range for UINT_8 (0-" + UINT_8_MAX_VALUE + ") in field " + currentType.getName()); } break; case 16: - if (value < 0 || value > 65535) { + if (value < 0 || value > UINT_16_MAX_VALUE) { throw new InvalidRecordException("Value " + value - + " is out of range for UINT_16 (0-65535) in field " + + " is out of range for UINT_16 (0-" + UINT_16_MAX_VALUE + ") in field " + currentType.getName()); } break; diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/example/TestStrictUnsignedIntegerValidation.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/example/TestStrictUnsignedIntegerValidation.java index fd477e741f..3fbba9c89c 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/example/TestStrictUnsignedIntegerValidation.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/example/TestStrictUnsignedIntegerValidation.java @@ -19,11 +19,10 @@ package org.apache.parquet.hadoop.example; import static org.apache.parquet.schema.LogicalTypeAnnotation.intType; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY; import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32; import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64; -import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY; import static org.junit.Assert.assertThrows; -import org.apache.parquet.io.api.Binary; import java.io.File; import java.io.IOException; @@ -32,6 +31,7 @@ import org.apache.parquet.example.data.simple.SimpleGroupFactory; import org.apache.parquet.hadoop.ParquetWriter; import org.apache.parquet.io.InvalidRecordException; +import org.apache.parquet.io.api.Binary; import org.apache.parquet.schema.MessageType; import org.apache.parquet.schema.Types; import org.junit.Rule; @@ -277,10 +277,8 @@ public void testValidationCanBeExplicitlyDisabled() throws IOException { @Test public void testBasicValidation() throws IOException { - MessageType schema = Types.buildMessage() - .required(INT32) - .named("int32_field") - .named("test_schema"); + MessageType schema = + Types.buildMessage().required(INT32).named("int32_field").named("test_schema"); File tempFile = new File(tempFolder.getRoot(), "basic_validation.parquet"); Path outputPath = new Path(tempFile.getAbsolutePath()); @@ -295,10 +293,8 @@ public void testBasicValidation() throws IOException { Group validGroup = groupFactory.newGroup().append("int32_field", 42); writer.write(validGroup); - MessageType stringSchema = Types.buildMessage() - .required(BINARY) - .named("int32_field") - .named("test_schema"); + MessageType stringSchema = + Types.buildMessage().required(BINARY).named("int32_field").named("test_schema"); SimpleGroupFactory stringGroupFactory = new SimpleGroupFactory(stringSchema); Group invalidGroup = stringGroupFactory.newGroup().append("int32_field", Binary.fromString("not_an_int")); From f3cfe411f5f3996e63b4cfde99cc501cc4bceec0 Mon Sep 17 00:00:00 2001 From: arnavb Date: Fri, 29 Aug 2025 07:08:27 +0000 Subject: [PATCH 3/3] update --- .../apache/parquet/io/ColumnIOFactory.java | 38 +++++++++-- .../apache/parquet/io/MessageColumnIO.java | 11 +++- .../parquet/io/ValidatingRecordConsumer.java | 20 +++++- .../hadoop/InternalParquetRecordWriter.java | 26 +++++++- .../apache/parquet/hadoop/ParquetWriter.java | 35 +++++++++- .../TestStrictUnsignedIntegerValidation.java | 65 +++++++++++++++++++ 6 files changed, 184 insertions(+), 11 deletions(-) diff --git a/parquet-column/src/main/java/org/apache/parquet/io/ColumnIOFactory.java b/parquet-column/src/main/java/org/apache/parquet/io/ColumnIOFactory.java index 24868acaf1..86d5d9a583 100644 --- a/parquet-column/src/main/java/org/apache/parquet/io/ColumnIOFactory.java +++ b/parquet-column/src/main/java/org/apache/parquet/io/ColumnIOFactory.java @@ -39,21 +39,27 @@ private static class ColumnIOCreatorVisitor implements TypeVisitor { private final boolean validating; private final MessageType requestedSchema; private final String createdBy; + private final boolean strictUnsignedIntegerValidation; private int currentRequestedIndex; private Type currentRequestedType; private boolean strictTypeChecking; private ColumnIOCreatorVisitor( - boolean validating, MessageType requestedSchema, String createdBy, boolean strictTypeChecking) { + boolean validating, + MessageType requestedSchema, + String createdBy, + boolean strictTypeChecking, + boolean strictUnsignedIntegerValidation) { this.validating = validating; this.requestedSchema = requestedSchema; this.createdBy = createdBy; this.strictTypeChecking = strictTypeChecking; + this.strictUnsignedIntegerValidation = strictUnsignedIntegerValidation; } @Override public void visit(MessageType messageType) { - columnIO = new MessageColumnIO(requestedSchema, validating, createdBy); + columnIO = new MessageColumnIO(requestedSchema, validating, strictUnsignedIntegerValidation, createdBy); visitChildren(columnIO, messageType, requestedSchema); columnIO.setLevels(); columnIO.setLeaves(leaves); @@ -113,12 +119,13 @@ public MessageColumnIO getColumnIO() { private final String createdBy; private final boolean validating; + private final boolean strictUnsignedIntegerValidation; /** * validation is off by default */ public ColumnIOFactory() { - this(null, false); + this(null, false, false); } /** @@ -127,14 +134,22 @@ public ColumnIOFactory() { * @param createdBy createdBy string for readers */ public ColumnIOFactory(String createdBy) { - this(createdBy, false); + this(createdBy, false, false); } /** * @param validating to turn validation on */ public ColumnIOFactory(boolean validating) { - this(null, validating); + this(null, validating, false); + } + + /** + * @param validating to turn validation on + * @param strictUnsignedIntegerValidation to turn strict unsigned integer validation on + */ + public ColumnIOFactory(boolean validating, boolean strictUnsignedIntegerValidation) { + this(null, validating, strictUnsignedIntegerValidation); } /** @@ -142,9 +157,19 @@ public ColumnIOFactory(boolean validating) { * @param validating to turn validation on */ public ColumnIOFactory(String createdBy, boolean validating) { + this(createdBy, validating, false); + } + + /** + * @param createdBy createdBy string for readers + * @param validating to turn validation on + * @param strictUnsignedIntegerValidation to turn strict unsigned integer validation on + */ + public ColumnIOFactory(String createdBy, boolean validating, boolean strictUnsignedIntegerValidation) { super(); this.createdBy = createdBy; this.validating = validating; + this.strictUnsignedIntegerValidation = strictUnsignedIntegerValidation; } /** @@ -163,7 +188,8 @@ public MessageColumnIO getColumnIO(MessageType requestedSchema, MessageType file * @return the corresponding serializing/deserializing structure */ public MessageColumnIO getColumnIO(MessageType requestedSchema, MessageType fileSchema, boolean strict) { - ColumnIOCreatorVisitor visitor = new ColumnIOCreatorVisitor(validating, requestedSchema, createdBy, strict); + ColumnIOCreatorVisitor visitor = new ColumnIOCreatorVisitor( + validating, requestedSchema, createdBy, strict, strictUnsignedIntegerValidation); fileSchema.accept(visitor); return visitor.getColumnIO(); } diff --git a/parquet-column/src/main/java/org/apache/parquet/io/MessageColumnIO.java b/parquet-column/src/main/java/org/apache/parquet/io/MessageColumnIO.java index 0ab2db39f6..2ea9215313 100644 --- a/parquet-column/src/main/java/org/apache/parquet/io/MessageColumnIO.java +++ b/parquet-column/src/main/java/org/apache/parquet/io/MessageColumnIO.java @@ -61,11 +61,18 @@ public class MessageColumnIO extends GroupColumnIO { private List leaves; private final boolean validating; + private final boolean strictUnsignedIntegerValidation; private final String createdBy; MessageColumnIO(MessageType messageType, boolean validating, String createdBy) { + this(messageType, validating, false, createdBy); + } + + MessageColumnIO( + MessageType messageType, boolean validating, boolean strictUnsignedIntegerValidation, String createdBy) { super(messageType, null, 0); this.validating = validating; + this.strictUnsignedIntegerValidation = strictUnsignedIntegerValidation; this.createdBy = createdBy; } @@ -508,7 +515,9 @@ public void flush() { public RecordConsumer getRecordWriter(ColumnWriteStore columns) { RecordConsumer recordWriter = new MessageColumnIORecordConsumer(columns); if (DEBUG) recordWriter = new RecordConsumerLoggingWrapper(recordWriter); - return validating ? new ValidatingRecordConsumer(recordWriter, getType()) : recordWriter; + return validating + ? new ValidatingRecordConsumer(recordWriter, getType(), strictUnsignedIntegerValidation) + : recordWriter; } void setLevels() { diff --git a/parquet-column/src/main/java/org/apache/parquet/io/ValidatingRecordConsumer.java b/parquet-column/src/main/java/org/apache/parquet/io/ValidatingRecordConsumer.java index 0d8a8c85ae..f309e34b97 100644 --- a/parquet-column/src/main/java/org/apache/parquet/io/ValidatingRecordConsumer.java +++ b/parquet-column/src/main/java/org/apache/parquet/io/ValidatingRecordConsumer.java @@ -52,6 +52,7 @@ public class ValidatingRecordConsumer extends RecordConsumer { private static final int UINT_16_MAX_VALUE = 65535; private final RecordConsumer delegate; + private final boolean strictUnsignedIntegerValidation; private Deque types = new ArrayDeque<>(); private Deque fields = new ArrayDeque<>(); @@ -63,7 +64,18 @@ public class ValidatingRecordConsumer extends RecordConsumer { * @param schema the schema to validate against */ public ValidatingRecordConsumer(RecordConsumer delegate, MessageType schema) { + this(delegate, schema, false); + } + + /** + * @param delegate the consumer to pass down the event to + * @param schema the schema to validate against + * @param strictUnsignedIntegerValidation whether to enable strict unsigned integer validation + */ + public ValidatingRecordConsumer( + RecordConsumer delegate, MessageType schema, boolean strictUnsignedIntegerValidation) { this.delegate = delegate; + this.strictUnsignedIntegerValidation = strictUnsignedIntegerValidation; this.types.push(schema); } @@ -207,7 +219,9 @@ private void validate(PrimitiveTypeName... ptypes) { @Override public void addInteger(int value) { validate(INT32); - validateUnsignedInteger(value); + if (strictUnsignedIntegerValidation) { + validateUnsignedInteger(value); + } delegate.addInteger(value); } @@ -217,7 +231,9 @@ public void addInteger(int value) { @Override public void addLong(long value) { validate(INT64); - validateUnsignedLong(value); + if (strictUnsignedIntegerValidation) { + validateUnsignedLong(value); + } delegate.addLong(value); } diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java index f296286800..fde53df50d 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java @@ -53,6 +53,7 @@ class InternalParquetRecordWriter { private long nextRowGroupSize; private final BytesInputCompressor compressor; private final boolean validating; + private final boolean strictUnsignedIntegerValidation; private final ParquetProperties props; private boolean closed; @@ -87,6 +88,28 @@ public InternalParquetRecordWriter( BytesInputCompressor compressor, boolean validating, ParquetProperties props) { + this( + parquetFileWriter, + writeSupport, + schema, + extraMetaData, + rowGroupSize, + compressor, + validating, + false, + props); + } + + public InternalParquetRecordWriter( + ParquetFileWriter parquetFileWriter, + WriteSupport writeSupport, + MessageType schema, + Map extraMetaData, + long rowGroupSize, + BytesInputCompressor compressor, + boolean validating, + boolean strictUnsignedIntegerValidation, + ParquetProperties props) { this.parquetFileWriter = parquetFileWriter; this.writeSupport = Objects.requireNonNull(writeSupport, "writeSupport cannot be null"); this.schema = schema; @@ -96,6 +119,7 @@ public InternalParquetRecordWriter( this.nextRowGroupSize = rowGroupSizeThreshold; this.compressor = compressor; this.validating = validating; + this.strictUnsignedIntegerValidation = strictUnsignedIntegerValidation; this.props = props; this.fileEncryptor = parquetFileWriter.getEncryptor(); this.rowGroupOrdinal = 0; @@ -120,7 +144,7 @@ private void initStore() { bloomFilterWriteStore = columnChunkPageWriteStore; columnStore = props.newColumnWriteStore(schema, pageStore, bloomFilterWriteStore); - MessageColumnIO columnIO = new ColumnIOFactory(validating).getColumnIO(schema); + MessageColumnIO columnIO = new ColumnIOFactory(validating, strictUnsignedIntegerValidation).getColumnIO(schema); this.recordConsumer = columnIO.getRecordWriter(columnStore); writeSupport.prepareForWrite(recordConsumer); } diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java index 7789cad5c0..cac33ee2dd 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java @@ -361,6 +361,7 @@ public ParquetWriter(Path file, Configuration conf, WriteSupport writeSupport new CodecFactory(conf, encodingProps.getPageSizeThreshold()), rowGroupSize, validating, + false, conf, maxPaddingSize, encodingProps, @@ -375,6 +376,7 @@ public ParquetWriter(Path file, Configuration conf, WriteSupport writeSupport CompressionCodecFactory codecFactory, long rowGroupSize, boolean validating, + boolean strictUnsignedIntegerValidation, ParquetConfiguration conf, int maxPaddingSize, ParquetProperties encodingProps, @@ -417,7 +419,15 @@ public ParquetWriter(Path file, Configuration conf, WriteSupport writeSupport } this.writer = new InternalParquetRecordWriter( - fileWriter, writeSupport, schema, extraMetadata, rowGroupSize, compressor, validating, encodingProps); + fileWriter, + writeSupport, + schema, + extraMetadata, + rowGroupSize, + compressor, + validating, + strictUnsignedIntegerValidation, + encodingProps); } public void write(T object) throws IOException { @@ -474,6 +484,7 @@ public abstract static class Builder> { private long rowGroupSize = DEFAULT_BLOCK_SIZE; private int maxPaddingSize = MAX_PADDING_SIZE_DEFAULT; private boolean enableValidation = DEFAULT_IS_VALIDATING_ENABLED; + private boolean strictUnsignedIntegerValidation = false; private ParquetProperties.Builder encodingPropsBuilder = ParquetProperties.builder(); protected Builder(Path path) { @@ -715,6 +726,27 @@ public SELF withValidation(boolean enableValidation) { return self(); } + /** + * Enable strict unsigned integer validation for the constructed writer. + * + * @return this builder for method chaining. + */ + public SELF enableStrictUnsignedIntegerValidation() { + this.strictUnsignedIntegerValidation = true; + return self(); + } + + /** + * Enable or disable strict unsigned integer validation for the constructed writer. + * + * @param strictUnsignedIntegerValidation whether strict unsigned integer validation should be enabled + * @return this builder for method chaining. + */ + public SELF withStrictUnsignedIntegerValidation(boolean strictUnsignedIntegerValidation) { + this.strictUnsignedIntegerValidation = strictUnsignedIntegerValidation; + return self(); + } + /** * Set the {@link WriterVersion format version} used by the constructed * writer. @@ -978,6 +1010,7 @@ public ParquetWriter build() throws IOException { codecFactory, rowGroupSize, enableValidation, + strictUnsignedIntegerValidation, conf, maxPaddingSize, encodingProps, diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/example/TestStrictUnsignedIntegerValidation.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/example/TestStrictUnsignedIntegerValidation.java index 3fbba9c89c..edc8da6345 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/example/TestStrictUnsignedIntegerValidation.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/example/TestStrictUnsignedIntegerValidation.java @@ -69,6 +69,7 @@ public void testValidUnsignedIntegerValues() throws IOException { try (ParquetWriter writer = ExampleParquetWriter.builder(outputPath) .withType(schema) .withValidation(true) + .withStrictUnsignedIntegerValidation(true) .build()) { SimpleGroupFactory groupFactory = new SimpleGroupFactory(schema); @@ -145,6 +146,7 @@ public void testInvalidUint8Values() throws IOException { try (ParquetWriter writer = ExampleParquetWriter.builder(outputPath) .withType(schema) .withValidation(true) + .withStrictUnsignedIntegerValidation(true) .build()) { SimpleGroupFactory groupFactory = new SimpleGroupFactory(schema); @@ -170,6 +172,7 @@ public void testInvalidUint16Values() throws IOException { try (ParquetWriter writer = ExampleParquetWriter.builder(outputPath) .withType(schema) .withValidation(true) + .withStrictUnsignedIntegerValidation(true) .build()) { SimpleGroupFactory groupFactory = new SimpleGroupFactory(schema); @@ -195,6 +198,7 @@ public void testInvalidUint32Values() throws IOException { try (ParquetWriter writer = ExampleParquetWriter.builder(outputPath) .withType(schema) .withValidation(true) + .withStrictUnsignedIntegerValidation(true) .build()) { SimpleGroupFactory groupFactory = new SimpleGroupFactory(schema); @@ -220,6 +224,7 @@ public void testInvalidUint64Values() throws IOException { try (ParquetWriter writer = ExampleParquetWriter.builder(outputPath) .withType(schema) .withValidation(true) + .withStrictUnsignedIntegerValidation(true) .build()) { SimpleGroupFactory groupFactory = new SimpleGroupFactory(schema); @@ -304,4 +309,64 @@ public void testBasicValidation() throws IOException { }); } } + + @Test + public void testStrictUnsignedIntegerValidationEnabled() throws IOException { + MessageType schema = Types.buildMessage() + .required(INT32) + .as(intType(8, false)) + .named("uint8_field") + .named("test_schema"); + + File tempFile = new File(tempFolder.getRoot(), "strict_validation_enabled.parquet"); + Path outputPath = new Path(tempFile.getAbsolutePath()); + + try (ParquetWriter writer = ExampleParquetWriter.builder(outputPath) + .withType(schema) + .withValidation(true) + .withStrictUnsignedIntegerValidation(true) + .build()) { + + SimpleGroupFactory groupFactory = new SimpleGroupFactory(schema); + + // This should work - valid value + Group validGroup = groupFactory.newGroup().append("uint8_field", 255); + writer.write(validGroup); + + // This should throw an exception - invalid value + Group invalidGroup = groupFactory.newGroup().append("uint8_field", -1); + assertThrows(InvalidRecordException.class, () -> { + writer.write(invalidGroup); + }); + } + } + + @Test + public void testStrictUnsignedIntegerValidationDisabled() throws IOException { + MessageType schema = Types.buildMessage() + .required(INT32) + .as(intType(8, false)) + .named("uint8_field") + .named("test_schema"); + + File tempFile = new File(tempFolder.getRoot(), "strict_validation_disabled.parquet"); + Path outputPath = new Path(tempFile.getAbsolutePath()); + + try (ParquetWriter writer = ExampleParquetWriter.builder(outputPath) + .withType(schema) + .withValidation(true) + .withStrictUnsignedIntegerValidation(false) + .build()) { + + SimpleGroupFactory groupFactory = new SimpleGroupFactory(schema); + + // This should work - valid value + Group validGroup = groupFactory.newGroup().append("uint8_field", 255); + writer.write(validGroup); + + // This should also work - invalid value but strict validation is disabled + Group invalidGroup = groupFactory.newGroup().append("uint8_field", -1); + writer.write(invalidGroup); + } + } }