Skip to content

Commit 787bbcf

Browse files
committed
update
1 parent 5187eb1 commit 787bbcf

File tree

5 files changed

+708
-2
lines changed

5 files changed

+708
-2
lines changed

parquet-hadoop/src/main/java/org/apache/parquet/hadoop/example/ExampleParquetWriter.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,7 @@ public static Builder builder(OutputFile file) {
9797

9898
public static class Builder extends ParquetWriter.Builder<Group, Builder> {
9999
private MessageType type = null;
100+
private boolean strictUnsignedIntegerValidation = false;
100101

101102
private Builder(Path file) {
102103
super(file);
@@ -111,6 +112,18 @@ public Builder withType(MessageType type) {
111112
return this;
112113
}
113114

115+
/**
116+
* Enable strict validation for unsigned integer values (UINT_8, UINT_16, UINT_32, UINT_64).
117+
* When enabled, negative values or out-of-range values will cause an exception.
118+
*
119+
* @param enabled whether to enable strict validation
120+
* @return this builder for method chaining
121+
*/
122+
public Builder withStrictUnsignedIntegerValidation(boolean enabled) {
123+
this.strictUnsignedIntegerValidation = enabled;
124+
return this;
125+
}
126+
114127
@Override
115128
protected Builder self() {
116129
return this;
@@ -123,7 +136,7 @@ protected WriteSupport<Group> getWriteSupport(Configuration conf) {
123136

124137
@Override
125138
protected WriteSupport<Group> getWriteSupport(ParquetConfiguration conf) {
126-
return new GroupWriteSupport(type);
139+
return new GroupWriteSupport(type, strictUnsignedIntegerValidation);
127140
}
128141

129142
@Override

parquet-hadoop/src/main/java/org/apache/parquet/hadoop/example/GroupWriteSupport.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ public static MessageType getSchema(ParquetConfiguration configuration) {
5252
private MessageType schema;
5353
private GroupWriter groupWriter;
5454
private Map<String, String> extraMetaData;
55+
private boolean strictUnsignedIntegerValidation;
5556

5657
public GroupWriteSupport() {
5758
this(null, new HashMap<String, String>());
@@ -62,8 +63,17 @@ public GroupWriteSupport() {
6263
}
6364

6465
GroupWriteSupport(MessageType schema, Map<String, String> extraMetaData) {
66+
this(schema, extraMetaData, false);
67+
}
68+
69+
GroupWriteSupport(MessageType schema, boolean strictUnsignedIntegerValidation) {
70+
this(schema, new HashMap<String, String>(), strictUnsignedIntegerValidation);
71+
}
72+
73+
GroupWriteSupport(MessageType schema, Map<String, String> extraMetaData, boolean strictUnsignedIntegerValidation) {
6574
this.schema = schema;
6675
this.extraMetaData = extraMetaData;
76+
this.strictUnsignedIntegerValidation = strictUnsignedIntegerValidation;
6777
}
6878

6979
@Override
@@ -87,7 +97,10 @@ public org.apache.parquet.hadoop.api.WriteSupport.WriteContext init(ParquetConfi
8797

8898
@Override
8999
public void prepareForWrite(RecordConsumer recordConsumer) {
90-
groupWriter = new GroupWriter(recordConsumer, schema);
100+
RecordConsumer consumer = strictUnsignedIntegerValidation
101+
? new ValidatingUnsignedIntegerRecordConsumer(recordConsumer, schema)
102+
: recordConsumer;
103+
groupWriter = new GroupWriter(consumer, schema);
91104
}
92105

93106
@Override
Lines changed: 197 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,197 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
* <p>
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
* <p>
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.parquet.hadoop.example;
20+
21+
import java.util.Stack;
22+
import org.apache.parquet.io.InvalidRecordException;
23+
import org.apache.parquet.io.api.RecordConsumer;
24+
import org.apache.parquet.schema.LogicalTypeAnnotation;
25+
import org.apache.parquet.schema.MessageType;
26+
import org.apache.parquet.schema.Type;
27+
28+
/**
29+
* A RecordConsumer that validates unsigned integer values to ensure they comply
30+
* with the Parquet specification.
31+
*
32+
* This consumer wraps another RecordConsumer and validates that values written
33+
* to unsigned integer fields (UINT_8, UINT_16, UINT_32, UINT_64) are within
34+
* the valid unsigned range before delegating to the wrapped consumer.
35+
*/
36+
public class ValidatingUnsignedIntegerRecordConsumer extends RecordConsumer {
37+
38+
private final RecordConsumer delegate;
39+
private final MessageType schema;
40+
private final Stack<Type> types = new Stack<>();
41+
private final Stack<Integer> fields = new Stack<>();
42+
43+
public ValidatingUnsignedIntegerRecordConsumer(RecordConsumer delegate, MessageType schema) {
44+
this.delegate = delegate;
45+
this.schema = schema;
46+
}
47+
48+
@Override
49+
public void startMessage() {
50+
types.push(schema);
51+
delegate.startMessage();
52+
}
53+
54+
@Override
55+
public void endMessage() {
56+
types.pop();
57+
delegate.endMessage();
58+
}
59+
60+
@Override
61+
public void startField(String field, int index) {
62+
fields.push(index);
63+
64+
if (!types.isEmpty()) {
65+
Type parentType = types.peek();
66+
if (parentType.asGroupType() != null && index < parentType.asGroupType().getFieldCount()) {
67+
Type fieldType = parentType.asGroupType().getType(index);
68+
types.push(fieldType);
69+
}
70+
}
71+
72+
delegate.startField(field, index);
73+
}
74+
75+
@Override
76+
public void endField(String field, int index) {
77+
if (!types.isEmpty() && !fields.isEmpty()) {
78+
types.pop();
79+
fields.pop();
80+
}
81+
delegate.endField(field, index);
82+
}
83+
84+
@Override
85+
public void startGroup() {
86+
delegate.startGroup();
87+
}
88+
89+
@Override
90+
public void endGroup() {
91+
delegate.endGroup();
92+
}
93+
94+
@Override
95+
public void addInteger(int value) {
96+
validateUnsignedInteger(value);
97+
delegate.addInteger(value);
98+
}
99+
100+
@Override
101+
public void addLong(long value) {
102+
validateUnsignedLong(value);
103+
delegate.addLong(value);
104+
}
105+
106+
@Override
107+
public void addBoolean(boolean value) {
108+
delegate.addBoolean(value);
109+
}
110+
111+
@Override
112+
public void addBinary(org.apache.parquet.io.api.Binary value) {
113+
delegate.addBinary(value);
114+
}
115+
116+
@Override
117+
public void addFloat(float value) {
118+
delegate.addFloat(value);
119+
}
120+
121+
@Override
122+
public void addDouble(double value) {
123+
delegate.addDouble(value);
124+
}
125+
126+
private void validateUnsignedInteger(int value) {
127+
Type currentType = getCurrentFieldType();
128+
if (currentType != null && currentType.isPrimitive()) {
129+
LogicalTypeAnnotation logicalType = currentType.asPrimitiveType().getLogicalTypeAnnotation();
130+
if (logicalType instanceof LogicalTypeAnnotation.IntLogicalTypeAnnotation) {
131+
LogicalTypeAnnotation.IntLogicalTypeAnnotation intType =
132+
(LogicalTypeAnnotation.IntLogicalTypeAnnotation) logicalType;
133+
if (!intType.isSigned()) {
134+
switch (intType.getBitWidth()) {
135+
case 8:
136+
if (value < 0 || value > 255) {
137+
throw new InvalidRecordException(
138+
"Value " + value + " is out of range for UINT_8 (0-255) in field " + currentType.getName());
139+
}
140+
break;
141+
case 16:
142+
if (value < 0 || value > 65535) {
143+
throw new InvalidRecordException(
144+
"Value " + value + " is out of range for UINT_16 (0-65535) in field " + currentType.getName());
145+
}
146+
break;
147+
case 32:
148+
case 64:
149+
if (value < 0) {
150+
throw new InvalidRecordException(
151+
"Negative value " + value + " is not allowed for unsigned integer type "
152+
+ currentType.getName());
153+
}
154+
break;
155+
}
156+
}
157+
}
158+
}
159+
}
160+
161+
private void validateUnsignedLong(long value) {
162+
Type currentType = getCurrentFieldType();
163+
if (currentType != null && currentType.isPrimitive()) {
164+
LogicalTypeAnnotation logicalType = currentType.asPrimitiveType().getLogicalTypeAnnotation();
165+
if (logicalType instanceof LogicalTypeAnnotation.IntLogicalTypeAnnotation) {
166+
LogicalTypeAnnotation.IntLogicalTypeAnnotation intType =
167+
(LogicalTypeAnnotation.IntLogicalTypeAnnotation) logicalType;
168+
if (!intType.isSigned()) {
169+
if (value < 0) {
170+
throw new InvalidRecordException(
171+
"Negative value " + value + " is not allowed for unsigned integer type "
172+
+ currentType.getName());
173+
}
174+
}
175+
}
176+
}
177+
}
178+
179+
private Type getCurrentFieldType() {
180+
if (fields.isEmpty() || types.isEmpty()) {
181+
return null;
182+
}
183+
184+
Type parentType = types.size() > 1 ? types.get(types.size() - 2) : schema;
185+
if (parentType.isPrimitive()) {
186+
return parentType;
187+
}
188+
189+
if (parentType.asGroupType() != null) {
190+
int fieldIndex = fields.peek();
191+
if (fieldIndex >= 0 && fieldIndex < parentType.asGroupType().getFieldCount()) {
192+
return parentType.asGroupType().getType(fieldIndex);
193+
}
194+
}
195+
return null;
196+
}
197+
}

0 commit comments

Comments
 (0)