diff --git a/accumulo-handler/pom.xml b/accumulo-handler/pom.xml index 6c1c413eb86c..4ce293888eb3 100644 --- a/accumulo-handler/pom.xml +++ b/accumulo-handler/pom.xml @@ -173,12 +173,12 @@ fk-art-snapshot libs-snapshot - http://artifactory.fkinternal.com/artifactory/v1.0/artifacts/libs-snapshots-local + http://jfrog.fkinternal.com/maven_virtual fk-art-release libs-rel - http://artifactory.fkinternal.com/artifactory/v1.0/artifacts/libs-release-local + http://jfrog.fkinternal.com/artifactory/v1.0/artifacts/libs-release-local diff --git a/beeline/pom.xml b/beeline/pom.xml index 1239839ba2ac..05917766df73 100644 --- a/beeline/pom.xml +++ b/beeline/pom.xml @@ -182,12 +182,12 @@ fk-art-snapshot libs-snapshot - http://artifactory.fkinternal.com/artifactory/v1.0/artifacts/libs-snapshots-local + http://jfrog.fkinternal.com/maven_virtual fk-art-release libs-rel - http://artifactory.fkinternal.com/artifactory/v1.0/artifacts/libs-release-local + http://jfrog.fkinternal.com/artifactory/v1.0/artifacts/libs-release-local diff --git a/classification/pom.xml b/classification/pom.xml index 3a2e65acc923..e52965acf9cf 100644 --- a/classification/pom.xml +++ b/classification/pom.xml @@ -75,12 +75,12 @@ fk-art-snapshot libs-snapshot - http://artifactory.fkinternal.com/artifactory/v1.0/artifacts/libs-snapshots-local + http://jfrog.fkinternal.com/maven_virtual fk-art-release libs-rel - http://artifactory.fkinternal.com/artifactory/v1.0/artifacts/libs-release-local + http://jfrog.fkinternal.com/artifactory/v1.0/artifacts/libs-release-local diff --git a/cli/pom.xml b/cli/pom.xml index 762df16d76a6..0b442f3f53fb 100644 --- a/cli/pom.xml +++ b/cli/pom.xml @@ -138,12 +138,12 @@ fk-art-snapshot libs-snapshot - http://artifactory.fkinternal.com/artifactory/v1.0/artifacts/libs-snapshots-local + http://jfrog.fkinternal.com/maven_virtual fk-art-release libs-rel - http://artifactory.fkinternal.com/artifactory/v1.0/artifacts/libs-release-local + http://jfrog.fkinternal.com/artifactory/v1.0/artifacts/libs-release-local diff --git a/common/pom.xml b/common/pom.xml index 81f37332b57c..6545cc738ad0 100644 --- a/common/pom.xml +++ b/common/pom.xml @@ -388,12 +388,12 @@ fk-art-snapshot libs-snapshot - http://artifactory.fkinternal.com/artifactory/v1.0/artifacts/libs-snapshots-local + http://jfrog.fkinternal.com/maven_virtual fk-art-release libs-rel - http://artifactory.fkinternal.com/artifactory/v1.0/artifacts/libs-release-local + http://jfrog.fkinternal.com/artifactory/v1.0/artifacts/libs-release-local diff --git a/common/src/java/org/apache/hadoop/hive/common/type/Timestamp.java b/common/src/java/org/apache/hadoop/hive/common/type/Timestamp.java index a8b7b6d186ba..0193aba0f771 100644 --- a/common/src/java/org/apache/hadoop/hive/common/type/Timestamp.java +++ b/common/src/java/org/apache/hadoop/hive/common/type/Timestamp.java @@ -18,7 +18,10 @@ package org.apache.hadoop.hive.common.type; import java.time.Instant; +import java.time.LocalDate; import java.time.LocalDateTime; +import java.time.LocalTime; +import java.time.ZoneId; import java.time.ZoneOffset; import java.time.format.DateTimeFormatter; import java.time.format.DateTimeFormatterBuilder; @@ -166,6 +169,11 @@ public static Timestamp valueOf(String s) { return new Timestamp(localDateTime); } + public static Timestamp getTimestampFromTime(String s) { + return new Timestamp(LocalDateTime.of(LocalDate.now(), + LocalTime.parse(s, DateTimeFormatter.ISO_LOCAL_TIME))); + } + public static Timestamp ofEpochSecond(long epochSecond) { return ofEpochSecond(epochSecond, 0); } @@ -175,6 +183,10 @@ public static Timestamp ofEpochSecond(long epochSecond, int nanos) { LocalDateTime.ofEpochSecond(epochSecond, nanos, ZoneOffset.UTC)); } + public static Timestamp ofEpochSecond(long epochSecond, long nanos, ZoneId zone) { + return new Timestamp(LocalDateTime.ofInstant(Instant.ofEpochSecond(epochSecond, nanos), zone)); + } + public static Timestamp ofEpochMilli(long epochMilli) { return new Timestamp(LocalDateTime .ofInstant(Instant.ofEpochMilli(epochMilli), ZoneOffset.UTC)); diff --git a/contrib/pom.xml b/contrib/pom.xml index cecd6691f5b1..37f17f39b640 100644 --- a/contrib/pom.xml +++ b/contrib/pom.xml @@ -85,12 +85,12 @@ fk-art-snapshot libs-snapshot - http://artifactory.fkinternal.com/artifactory/v1.0/artifacts/libs-snapshots-local + http://jfrog.fkinternal.com/maven_virtual fk-art-release libs-rel - http://artifactory.fkinternal.com/artifactory/v1.0/artifacts/libs-release-local + http://jfrog.fkinternal.com/artifactory/v1.0/artifacts/libs-release-local diff --git a/druid-handler/pom.xml b/druid-handler/pom.xml index 0f1d011b57ea..0ebb921577fc 100644 --- a/druid-handler/pom.xml +++ b/druid-handler/pom.xml @@ -406,12 +406,12 @@ fk-art-snapshot libs-snapshot - http://artifactory.fkinternal.com/artifactory/v1.0/artifacts/libs-snapshots-local + http://jfrog.fkinternal.com/maven_virtual fk-art-release libs-rel - http://artifactory.fkinternal.com/artifactory/v1.0/artifacts/libs-release-local + http://jfrog.fkinternal.com/artifactory/v1.0/artifacts/libs-release-local diff --git a/hbase-handler/pom.xml b/hbase-handler/pom.xml index b3873e7bdbe8..808d0efe623e 100644 --- a/hbase-handler/pom.xml +++ b/hbase-handler/pom.xml @@ -280,12 +280,12 @@ fk-art-snapshot libs-snapshot - http://artifactory.fkinternal.com/artifactory/v1.0/artifacts/libs-snapshots-local + http://jfrog.fkinternal.com/maven_virtual fk-art-release libs-rel - http://artifactory.fkinternal.com/artifactory/v1.0/artifacts/libs-release-local + http://jfrog.fkinternal.com/artifactory/v1.0/artifacts/libs-release-local diff --git a/hcatalog/pom.xml b/hcatalog/pom.xml index bd5fcdb8c9da..03374327a0ed 100644 --- a/hcatalog/pom.xml +++ b/hcatalog/pom.xml @@ -99,12 +99,12 @@ fk-art-snapshot libs-snapshot - http://artifactory.fkinternal.com/artifactory/v1.0/artifacts/libs-snapshots-local + http://jfrog.fkinternal.com/maven_virtual fk-art-release libs-rel - http://artifactory.fkinternal.com/artifactory/v1.0/artifacts/libs-release-local + http://jfrog.fkinternal.com/artifactory/v1.0/artifacts/libs-release-local diff --git a/hplsql/pom.xml b/hplsql/pom.xml index 3bd7c05fe862..e67e372d0387 100644 --- a/hplsql/pom.xml +++ b/hplsql/pom.xml @@ -100,12 +100,12 @@ fk-art-snapshot libs-snapshot - http://artifactory.fkinternal.com/artifactory/v1.0/artifacts/libs-snapshots-local + http://jfrog.fkinternal.com/maven_virtual fk-art-release libs-rel - http://artifactory.fkinternal.com/artifactory/v1.0/artifacts/libs-release-local + http://jfrog.fkinternal.com/artifactory/v1.0/artifacts/libs-release-local diff --git a/jdbc-handler/pom.xml b/jdbc-handler/pom.xml index 269d0c951581..f7140c7bd081 100644 --- a/jdbc-handler/pom.xml +++ b/jdbc-handler/pom.xml @@ -127,12 +127,12 @@ fk-art-snapshot libs-snapshot - http://artifactory.fkinternal.com/artifactory/v1.0/artifacts/libs-snapshots-local + http://jfrog.fkinternal.com/maven_virtual fk-art-release libs-rel - http://artifactory.fkinternal.com/artifactory/v1.0/artifacts/libs-release-local + http://jfrog.fkinternal.com/artifactory/v1.0/artifacts/libs-release-local diff --git a/jdbc/pom.xml b/jdbc/pom.xml index ca11e553025d..20c7cfb57a8e 100644 --- a/jdbc/pom.xml +++ b/jdbc/pom.xml @@ -381,12 +381,12 @@ fk-art-snapshot libs-snapshot - http://artifactory.fkinternal.com/artifactory/v1.0/artifacts/libs-snapshots-local + http://jfrog.fkinternal.com/maven_virtual fk-art-release libs-rel - http://artifactory.fkinternal.com/artifactory/v1.0/artifacts/libs-release-local + http://jfrog.fkinternal.com/artifactory/v1.0/artifacts/libs-release-local diff --git a/kryo-registrator/pom.xml b/kryo-registrator/pom.xml index 04e87eede838..edf968da5118 100644 --- a/kryo-registrator/pom.xml +++ b/kryo-registrator/pom.xml @@ -49,12 +49,12 @@ fk-art-snapshot libs-snapshot - http://artifactory.fkinternal.com/artifactory/v1.0/artifacts/libs-snapshots-local + http://jfrog.fkinternal.com/maven_virtual fk-art-release libs-rel - http://artifactory.fkinternal.com/artifactory/v1.0/artifacts/libs-release-local + http://jfrog.fkinternal.com/artifactory/v1.0/artifacts/libs-release-local diff --git a/llap-client/pom.xml b/llap-client/pom.xml index 2f3927880b58..4df7efde2f5c 100644 --- a/llap-client/pom.xml +++ b/llap-client/pom.xml @@ -192,12 +192,12 @@ fk-art-snapshot libs-snapshot - http://artifactory.fkinternal.com/artifactory/v1.0/artifacts/libs-snapshots-local + http://jfrog.fkinternal.com/maven_virtual fk-art-release libs-rel - http://artifactory.fkinternal.com/artifactory/v1.0/artifacts/libs-release-local + http://jfrog.fkinternal.com/artifactory/v1.0/artifacts/libs-release-local diff --git a/llap-common/pom.xml b/llap-common/pom.xml index 951761f87e89..a9f769b1cf2d 100644 --- a/llap-common/pom.xml +++ b/llap-common/pom.xml @@ -266,12 +266,12 @@ fk-art-snapshot libs-snapshot - http://artifactory.fkinternal.com/artifactory/v1.0/artifacts/libs-snapshots-local + http://jfrog.fkinternal.com/maven_virtual fk-art-release libs-rel - http://artifactory.fkinternal.com/artifactory/v1.0/artifacts/libs-release-local + http://jfrog.fkinternal.com/artifactory/v1.0/artifacts/libs-release-local diff --git a/llap-ext-client/pom.xml b/llap-ext-client/pom.xml index 36ccb8f848a2..ea3f2ec420ef 100644 --- a/llap-ext-client/pom.xml +++ b/llap-ext-client/pom.xml @@ -178,12 +178,12 @@ fk-art-snapshot libs-snapshot - http://artifactory.fkinternal.com/artifactory/v1.0/artifacts/libs-snapshots-local + http://jfrog.fkinternal.com/maven_virtual fk-art-release libs-rel - http://artifactory.fkinternal.com/artifactory/v1.0/artifacts/libs-release-local + http://jfrog.fkinternal.com/artifactory/v1.0/artifacts/libs-release-local diff --git a/llap-server/pom.xml b/llap-server/pom.xml index e44565bd75e0..ea648171ca25 100644 --- a/llap-server/pom.xml +++ b/llap-server/pom.xml @@ -418,12 +418,12 @@ fk-art-snapshot libs-snapshot - http://artifactory.fkinternal.com/artifactory/v1.0/artifacts/libs-snapshots-local + http://jfrog.fkinternal.com/maven_virtual fk-art-release libs-rel - http://artifactory.fkinternal.com/artifactory/v1.0/artifacts/libs-release-local + http://jfrog.fkinternal.com/artifactory/v1.0/artifacts/libs-release-local diff --git a/llap-tez/pom.xml b/llap-tez/pom.xml index 87cc34045fc8..43485310de27 100644 --- a/llap-tez/pom.xml +++ b/llap-tez/pom.xml @@ -242,12 +242,12 @@ fk-art-snapshot libs-snapshot - http://artifactory.fkinternal.com/artifactory/v1.0/artifacts/libs-snapshots-local + http://jfrog.fkinternal.com/maven_virtual fk-art-release libs-rel - http://artifactory.fkinternal.com/artifactory/v1.0/artifacts/libs-release-local + http://jfrog.fkinternal.com/artifactory/v1.0/artifacts/libs-release-local diff --git a/metastore/pom.xml b/metastore/pom.xml index 8ccd442d21d4..dcab1e5ce843 100644 --- a/metastore/pom.xml +++ b/metastore/pom.xml @@ -291,12 +291,12 @@ fk-art-snapshot libs-snapshot - http://artifactory.fkinternal.com/artifactory/v1.0/artifacts/libs-snapshots-local + http://jfrog.fkinternal.com/maven_virtual fk-art-release libs-rel - http://artifactory.fkinternal.com/artifactory/v1.0/artifacts/libs-release-local + http://jfrog.fkinternal.com/artifactory/v1.0/artifacts/libs-release-local diff --git a/packaging/pom.xml b/packaging/pom.xml index 64ee0a8bd2b5..220e81376975 100644 --- a/packaging/pom.xml +++ b/packaging/pom.xml @@ -307,12 +307,12 @@ fk-art-snapshot libs-snapshot - http://artifactory.fkinternal.com/artifactory/v1.0/artifacts/libs-snapshots-local + http://jfrog.fkinternal.com/maven_virtual fk-art-release libs-rel - http://artifactory.fkinternal.com/artifactory/v1.0/artifacts/libs-release-local + http://jfrog.fkinternal.com/artifactory/v1.0/artifacts/libs-release-local diff --git a/pom.xml b/pom.xml index 13c06c5659f3..6324af1cd066 100644 --- a/pom.xml +++ b/pom.xml @@ -188,7 +188,7 @@ 2.0.0-M5 4.1.17.Final 3.10.5.Final - 1.10.0 + 1.13.0 0.16.0 1.5.6 2.5.0 @@ -219,12 +219,12 @@ fk-art-snapshot Flipkart-Artifactory - http://artifactory.fkinternal.com/artifactory/v1.0/artifacts/libs-snapshots-local + http://jfrog.fkinternal.com/maven_virtual fk-art-release Flipkart-Artifactory - http://artifactory.fkinternal.com/artifactory/v1.0/artifacts/libs-release-local + http://jfrog.fkinternal.com/artifactory/v1.0/artifacts/libs-release-local @@ -1557,12 +1557,12 @@ fk-art-snapshot libs-snapshot - http://artifactory.fkinternal.com/artifactory/v1.0/artifacts/libs-snapshots-local + http://jfrog.fkinternal.com/maven_virtual fk-art-release libs-rel - http://artifactory.fkinternal.com/artifactory/v1.0/artifacts/libs-release-local + http://jfrog.fkinternal.com/artifactory/v1.0/artifacts/libs-release-local diff --git a/ql/pom.xml b/ql/pom.xml index 4d62bdad4a2b..88ddfe63d008 100644 --- a/ql/pom.xml +++ b/ql/pom.xml @@ -1014,12 +1014,12 @@ fk-art-snapshot libs-snapshot - http://artifactory.fkinternal.com/artifactory/v1.0/artifacts/libs-snapshots-local + http://jfrog.fkinternal.com/maven_virtual fk-art-release libs-rel - http://artifactory.fkinternal.com/artifactory/v1.0/artifacts/libs-release-local + http://jfrog.fkinternal.com/artifactory/v1.0/artifacts/libs-release-local diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/ETypeConverter.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/ETypeConverter.java index be101c1ecdce..1b164a419e32 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/ETypeConverter.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/ETypeConverter.java @@ -16,12 +16,15 @@ import java.math.BigDecimal; import java.util.ArrayList; import java.util.Map; +import java.util.Optional; +import org.apache.hadoop.hive.common.type.HiveDecimal; import org.apache.hadoop.hive.common.type.Timestamp; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.io.parquet.read.DataWritableReadSupport; import org.apache.hadoop.hive.ql.io.parquet.timestamp.NanoTime; import org.apache.hadoop.hive.ql.io.parquet.timestamp.NanoTimeUtils; +import org.apache.hadoop.hive.ql.io.parquet.timestamp.ParquetTimestampUtils; import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.hive.serde2.io.DateWritableV2; import org.apache.hadoop.hive.serde2.io.DoubleWritable; @@ -43,6 +46,12 @@ import org.apache.parquet.column.Dictionary; import org.apache.parquet.io.api.Binary; import org.apache.parquet.io.api.PrimitiveConverter; +import org.apache.parquet.schema.LogicalTypeAnnotation; +import org.apache.parquet.schema.LogicalTypeAnnotation.DecimalLogicalTypeAnnotation; +import org.apache.parquet.schema.LogicalTypeAnnotation.LogicalTypeAnnotationVisitor; +import org.apache.parquet.schema.LogicalTypeAnnotation.StringLogicalTypeAnnotation; +import org.apache.parquet.schema.LogicalTypeAnnotation.TimestampLogicalTypeAnnotation; +import org.apache.parquet.schema.LogicalTypeAnnotation.DateLogicalTypeAnnotation; import org.apache.parquet.schema.OriginalType; import org.apache.parquet.schema.PrimitiveType; @@ -56,6 +65,89 @@ public enum ETypeConverter { EDOUBLE_CONVERTER(Double.TYPE) { @Override PrimitiveConverter getConverter(final PrimitiveType type, final int index, final ConverterParent parent, TypeInfo hiveTypeInfo) { + if (hiveTypeInfo != null) { + String typeName = TypeInfoUtils.getBaseName(hiveTypeInfo.getTypeName()); + final double minValue = getMinValue(typeName, Double.MIN_VALUE); + final double maxValue = getMaxValue(typeName, Double.MAX_VALUE); + + switch (typeName) { + case serdeConstants.FLOAT_TYPE_NAME: + return new PrimitiveConverter() { + @Override + public void addDouble(final double value) { + double absValue = (value < 0) ? (value * -1) : value; + int exponent = Math.getExponent(value); + if ((absValue >= minValue) && (absValue <= maxValue) && + (exponent <= Float.MAX_EXPONENT) && (exponent >= Float.MIN_EXPONENT)) { + parent.set(index, new FloatWritable((float) value)); + } else { + parent.set(index, null); + } + } + }; + case serdeConstants.DECIMAL_TYPE_NAME: + return new PrimitiveConverter() { + @Override + public void addDouble(final double value) { + HiveDecimalWritable decimalWritable = new HiveDecimalWritable(); + decimalWritable.setFromDouble(value); + parent.set(index, HiveDecimalUtils + .enforcePrecisionScale(decimalWritable, (DecimalTypeInfo) hiveTypeInfo)); + } + }; + case serdeConstants.BIGINT_TYPE_NAME: + return new PrimitiveConverter() { + @Override + public void addDouble(final double value) { + if ((value >= minValue) && (value <= maxValue) && (value % 1 == 0)) { + parent.set(index, new LongWritable((long) value)); + } else { + parent.set(index, null); + } + } + }; + case serdeConstants.INT_TYPE_NAME: + return new PrimitiveConverter() { + @Override + public void addDouble(final double value) { + if ((value >= minValue) && (value <= maxValue) && (value % 1 == 0)) { + parent.set(index, new IntWritable((int) value)); + } else { + parent.set(index, null); + } + } + }; + case serdeConstants.SMALLINT_TYPE_NAME: + return new PrimitiveConverter() { + @Override + public void addDouble(final double value) { + if ((value >= minValue) && (value <= maxValue) && (value % 1 == 0)) { + parent.set(index, new IntWritable((int) value)); + } else { + parent.set(index, null); + } + } + }; + case serdeConstants.TINYINT_TYPE_NAME: + return new PrimitiveConverter() { + @Override + public void addDouble(final double value) { + if ((value >= minValue) && (value <= maxValue) && (value % 1 == 0)) { + parent.set(index, new IntWritable((int) value)); + } else { + parent.set(index, null); + } + } + }; + default: + return new PrimitiveConverter() { + @Override + public void addDouble(final double value) { + parent.set(index, new DoubleWritable(value)); + } + }; + } + } return new PrimitiveConverter() { @Override public void addDouble(final double value) { @@ -78,21 +170,88 @@ public void addBoolean(final boolean value) { EFLOAT_CONVERTER(Float.TYPE) { @Override PrimitiveConverter getConverter(final PrimitiveType type, final int index, final ConverterParent parent, TypeInfo hiveTypeInfo) { - if (hiveTypeInfo != null && hiveTypeInfo.equals(TypeInfoFactory.doubleTypeInfo)) { - return new PrimitiveConverter() { - @Override - public void addFloat(final float value) { - parent.set(index, new DoubleWritable((double) value)); - } - }; - } else { - return new PrimitiveConverter() { - @Override - public void addFloat(final float value) { - parent.set(index, new FloatWritable(value)); - } - }; + if (hiveTypeInfo != null) { + String typeName = TypeInfoUtils.getBaseName(hiveTypeInfo.getTypeName()); + final double minValue = getMinValue(typeName, Double.MIN_VALUE); + final double maxValue = getMaxValue(typeName, Double.MAX_VALUE); + + switch (typeName) { + case serdeConstants.DOUBLE_TYPE_NAME: + return new PrimitiveConverter() { + @Override + public void addFloat(final float value) { + parent.set(index, new DoubleWritable(value)); + } + }; + case serdeConstants.DECIMAL_TYPE_NAME: + return new PrimitiveConverter() { + @Override + public void addFloat(final float value) { + HiveDecimalWritable decimalWritable = new HiveDecimalWritable(); + decimalWritable.setFromDouble(value); + parent.set(index, HiveDecimalUtils + .enforcePrecisionScale(decimalWritable, (DecimalTypeInfo) hiveTypeInfo)); + } + }; + case serdeConstants.BIGINT_TYPE_NAME: + return new PrimitiveConverter() { + @Override + public void addFloat(final float value) { + if ((value >= minValue) && (value <= maxValue) && (value % 1 == 0)) { + parent.set(index, new LongWritable((long) value)); + } else { + parent.set(index, null); + } + } + }; + case serdeConstants.INT_TYPE_NAME: + return new PrimitiveConverter() { + @Override + public void addFloat(final float value) { + if ((value >= minValue) && (value <= maxValue) && (value % 1 == 0)) { + parent.set(index, new IntWritable((int) value)); + } else { + parent.set(index, null); + } + } + }; + case serdeConstants.SMALLINT_TYPE_NAME: + return new PrimitiveConverter() { + @Override + public void addFloat(final float value) { + if ((value >= minValue) && (value <= maxValue) && (value % 1 == 0)) { + parent.set(index, new IntWritable((int) value)); + } else { + parent.set(index, null); + } + } + }; + case serdeConstants.TINYINT_TYPE_NAME: + return new PrimitiveConverter() { + @Override + public void addFloat(final float value) { + if ((value >= minValue) && (value <= maxValue) && (value % 1 == 0)) { + parent.set(index, new IntWritable((int) value)); + } else { + parent.set(index, null); + } + } + }; + default: + return new PrimitiveConverter() { + @Override + public void addFloat(final float value) { + parent.set(index, new FloatWritable(value)); + } + }; + } } + + return new PrimitiveConverter() { + @Override public void addFloat(final float value) { + parent.set(index, new FloatWritable(value)); + } + }; } }, EINT32_CONVERTER(Integer.TYPE) { @@ -189,10 +348,7 @@ public void addInt(final int value) { return new PrimitiveConverter() { @Override public void addInt(final int value) { - if (value >= ((OriginalType.UINT_8 == type.getOriginalType() || - OriginalType.UINT_16 == type.getOriginalType() || - OriginalType.UINT_32 == type.getOriginalType() || - OriginalType.UINT_64 == type.getOriginalType()) ? 0 : + if (value >= ((ETypeConverter.isUnsignedInteger(type)) ? 0 : Integer.MIN_VALUE)) { parent.set(index, new IntWritable(value)); } else { @@ -296,10 +452,7 @@ public void addLong(final long value) { return new PrimitiveConverter() { @Override public void addLong(final long value) { - if (value >= ((OriginalType.UINT_8 == type.getOriginalType() || - OriginalType.UINT_16 == type.getOriginalType() || - OriginalType.UINT_32 == type.getOriginalType() || - OriginalType.UINT_64 == type.getOriginalType()) ? 0 : Long.MIN_VALUE)) { + if (value >= ((ETypeConverter.isUnsignedInteger(type)) ? 0 : Long.MIN_VALUE)) { parent.set(index, new LongWritable(value)); } else { parent.set(index, null); @@ -333,10 +486,179 @@ protected Text convert(Binary binary) { EDECIMAL_CONVERTER(BigDecimal.class) { @Override PrimitiveConverter getConverter(final PrimitiveType type, final int index, final ConverterParent parent, TypeInfo hiveTypeInfo) { + if (hiveTypeInfo != null) { + String typeName = TypeInfoUtils.getBaseName(hiveTypeInfo.getTypeName()); + final double minValue = getMinValue(typeName, Double.MIN_VALUE); + final double maxValue = getMaxValue(typeName, Double.MAX_VALUE); + + switch (typeName) { + case serdeConstants.FLOAT_TYPE_NAME: + return new PrimitiveConverter() { + @Override + public void addBinary(Binary value) { + HiveDecimalWritable decimalWritable = + new HiveDecimalWritable(value.getBytes(), getScale(type)); + setValue(decimalWritable.doubleValue(), decimalWritable.floatValue()); + } + + @Override + public void addInt(final int value) { + HiveDecimal hiveDecimal = HiveDecimal.create(value, getScale(type)); + setValue(hiveDecimal.doubleValue(), hiveDecimal.floatValue()); + } + + @Override + public void addLong(final long value) { + HiveDecimal hiveDecimal = HiveDecimal.create(value, getScale(type)); + setValue(hiveDecimal.doubleValue(), hiveDecimal.floatValue()); + } + + private void setValue(double doubleValue, float floatValue) { + double absDoubleValue = (doubleValue < 0) ? (doubleValue * -1) : doubleValue; + if (((absDoubleValue >= minValue) && (absDoubleValue <= maxValue)) || absDoubleValue == 0d) { + parent.set(index, new FloatWritable(floatValue)); + } else { + parent.set(index, null); + } + } + + private int getScale(PrimitiveType type) { + DecimalLogicalTypeAnnotation logicalType = (DecimalLogicalTypeAnnotation) type.getLogicalTypeAnnotation(); + return logicalType.getScale(); + } + }; + case serdeConstants.DOUBLE_TYPE_NAME: + return new PrimitiveConverter() { + @Override + public void addBinary(Binary value) { + HiveDecimalWritable decimalWritable = + new HiveDecimalWritable(value.getBytes(), getScale(type)); + parent.set(index, new DoubleWritable(decimalWritable.doubleValue())); + } + + @Override + public void addInt(final int value) { + HiveDecimal hiveDecimal = HiveDecimal.create(value, getScale(type)); + parent.set(index, new DoubleWritable(hiveDecimal.doubleValue())); + } + + @Override + public void addLong(final long value) { + HiveDecimal hiveDecimal = HiveDecimal.create(value, getScale(type)); + parent.set(index, new DoubleWritable(hiveDecimal.doubleValue())); + } + + private int getScale(PrimitiveType type) { + DecimalLogicalTypeAnnotation logicalType = (DecimalLogicalTypeAnnotation) type.getLogicalTypeAnnotation(); + return logicalType.getScale(); + } + }; + case serdeConstants.BIGINT_TYPE_NAME: + return new PrimitiveConverter() { + @Override + public void addBinary(Binary value) { + HiveDecimalWritable decimalWritable = + new HiveDecimalWritable(value.getBytes(), getScale(type)); + setValue(decimalWritable.doubleValue(), decimalWritable.longValue()); + } + + @Override + public void addInt(final int value) { + HiveDecimal hiveDecimal = HiveDecimal.create(value, getScale(type)); + setValue(hiveDecimal.doubleValue(), hiveDecimal.longValue()); + } + + @Override + public void addLong(final long value) { + HiveDecimal hiveDecimal = HiveDecimal.create(value, getScale(type)); + setValue(hiveDecimal.doubleValue(), hiveDecimal.longValue()); + } + + private void setValue(double doubleValue, long longValue) { + if ((doubleValue >= minValue) && (doubleValue <= maxValue) && (doubleValue % 1 == 0)) { + parent.set(index, new LongWritable(longValue)); + } else { + parent.set(index, null); + } + } + + private int getScale(PrimitiveType type) { + DecimalLogicalTypeAnnotation logicalType = (DecimalLogicalTypeAnnotation) type.getLogicalTypeAnnotation(); + return logicalType.getScale(); + } + }; + case serdeConstants.INT_TYPE_NAME: + case serdeConstants.SMALLINT_TYPE_NAME: + case serdeConstants.TINYINT_TYPE_NAME: + return new PrimitiveConverter() { + @Override + public void addBinary(Binary value) { + HiveDecimalWritable decimalWritable = + new HiveDecimalWritable(value.getBytes(), getScale(type)); + setValue(decimalWritable.doubleValue(), decimalWritable.intValue()); + } + + @Override + public void addInt(final int value) { + HiveDecimal hiveDecimal = HiveDecimal.create(value, getScale(type)); + setValue(hiveDecimal.doubleValue(), hiveDecimal.intValue()); + } + + @Override + public void addLong(final long value) { + HiveDecimal hiveDecimal = HiveDecimal.create(value, getScale(type)); + setValue(hiveDecimal.doubleValue(), hiveDecimal.intValue()); + } + + private void setValue(double doubleValue, int intValue) { + if ((doubleValue >= minValue) && (doubleValue <= maxValue) && (doubleValue % 1 == 0)) { + parent.set(index, new IntWritable(intValue)); + } else { + parent.set(index, null); + } + } + + private int getScale(PrimitiveType type) { + DecimalLogicalTypeAnnotation logicalType = (DecimalLogicalTypeAnnotation) type.getLogicalTypeAnnotation(); + return logicalType.getScale(); + } + }; + default: + return new BinaryConverter(type, parent, index, hiveTypeInfo) { + @Override + protected HiveDecimalWritable convert(Binary binary) { + DecimalLogicalTypeAnnotation logicalType = (DecimalLogicalTypeAnnotation) type.getLogicalTypeAnnotation(); + return HiveDecimalUtils.enforcePrecisionScale( + new HiveDecimalWritable(binary.getBytes(), logicalType.getScale()), + (DecimalTypeInfo) hiveTypeInfo); + } + + @Override + public void addInt(final int value) { + addDecimal(value); + } + + @Override + public void addLong(final long value) { + addDecimal(value); + } + + private void addDecimal(long value) { + DecimalTypeInfo decimalInfo = (DecimalTypeInfo) hiveTypeInfo; + HiveDecimal hiveDecimal = HiveDecimal.create(value, decimalInfo.scale()); + HiveDecimalWritable result = HiveDecimalUtils.enforcePrecisionScale(new HiveDecimalWritable(hiveDecimal), + (DecimalTypeInfo) hiveTypeInfo); + parent.set(index, result); + } + }; + } + } return new BinaryConverter(type, parent, index) { @Override protected HiveDecimalWritable convert(Binary binary) { - return new HiveDecimalWritable(binary.getBytes(), type.getDecimalMetadata().getScale()); + DecimalLogicalTypeAnnotation logicalType = + (DecimalLogicalTypeAnnotation) type.getLogicalTypeAnnotation(); + return new HiveDecimalWritable(binary.getBytes(), logicalType.getScale()); } }; } @@ -365,6 +687,21 @@ protected TimestampWritableV2 convert(Binary binary) { }; } }, + EINT64_TIMESTAMP_CONVERTER(TimestampWritableV2.class) { + @Override + PrimitiveConverter getConverter(final PrimitiveType type, final int index, final ConverterParent parent, + TypeInfo hiveTypeInfo) { + return new PrimitiveConverter() { + @Override + public void addLong(final long value) { + TimestampLogicalTypeAnnotation logicalType = (TimestampLogicalTypeAnnotation) type.getLogicalTypeAnnotation(); + Timestamp timestamp = + ParquetTimestampUtils.getTimestamp(value, logicalType.getUnit(), logicalType.isAdjustedToUTC()); + parent.set(index, new TimestampWritableV2(timestamp)); + } + }; + } + }, EDATE_CONVERTER(DateWritableV2.class) { @Override PrimitiveConverter getConverter(final PrimitiveType type, final int index, final ConverterParent parent, TypeInfo hiveTypeInfo) { @@ -390,17 +727,38 @@ private Class getType() { abstract PrimitiveConverter getConverter(final PrimitiveType type, final int index, final ConverterParent parent, TypeInfo hiveTypeInfo); public static PrimitiveConverter getNewConverter(final PrimitiveType type, final int index, - final ConverterParent parent, TypeInfo hiveTypeInfo) { + final ConverterParent parent, final TypeInfo hiveTypeInfo) { if (type.isPrimitive() && (type.asPrimitiveType().getPrimitiveTypeName().equals(PrimitiveType.PrimitiveTypeName.INT96))) { //TODO- cleanup once parquet support Timestamp type annotation. return ETypeConverter.ETIMESTAMP_CONVERTER.getConverter(type, index, parent, hiveTypeInfo); } - if (OriginalType.DECIMAL == type.getOriginalType()) { - return EDECIMAL_CONVERTER.getConverter(type, index, parent, hiveTypeInfo); - } else if (OriginalType.UTF8 == type.getOriginalType()) { - return ESTRING_CONVERTER.getConverter(type, index, parent, hiveTypeInfo); - } else if (OriginalType.DATE == type.getOriginalType()) { - return EDATE_CONVERTER.getConverter(type, index, parent, hiveTypeInfo); + if (type.getLogicalTypeAnnotation() != null) { + Optional converter = type.getLogicalTypeAnnotation() + .accept(new LogicalTypeAnnotationVisitor() { + @Override + public Optional visit(DecimalLogicalTypeAnnotation logicalTypeAnnotation) { + return Optional.of(EDECIMAL_CONVERTER.getConverter(type, index, parent, hiveTypeInfo)); + } + + @Override + public Optional visit(StringLogicalTypeAnnotation logicalTypeAnnotation) { + return Optional.of(ESTRING_CONVERTER.getConverter(type, index, parent, hiveTypeInfo)); + } + + @Override + public Optional visit(DateLogicalTypeAnnotation logicalTypeAnnotation) { + return Optional.of(EDATE_CONVERTER.getConverter(type, index, parent, hiveTypeInfo)); + } + + @Override + public Optional visit(TimestampLogicalTypeAnnotation logicalTypeAnnotation) { + return Optional.of(EINT64_TIMESTAMP_CONVERTER.getConverter(type, index, parent, hiveTypeInfo)); + } + }); + + if (converter.isPresent()) { + return converter.get(); + } } Class javaType = type.getPrimitiveTypeName().javaType; @@ -413,11 +771,24 @@ public static PrimitiveConverter getNewConverter(final PrimitiveType type, final throw new IllegalArgumentException("Converter not found ... for type : " + type); } + public static boolean isUnsignedInteger(final PrimitiveType type) { + if (type.getLogicalTypeAnnotation() != null) { + Optional isUnsignedInteger = type.getLogicalTypeAnnotation() + .accept(new LogicalTypeAnnotationVisitor() { + @Override public Optional visit( + LogicalTypeAnnotation.IntLogicalTypeAnnotation intLogicalType) { + return Optional.of(!intLogicalType.isSigned()); + } + }); + if (isUnsignedInteger.isPresent()) { + return isUnsignedInteger.get(); + } + } + return false; + } + private static long getMinValue(final PrimitiveType type, String typeName, long defaultValue) { - if (OriginalType.UINT_8 == type.getOriginalType() || - OriginalType.UINT_16 == type.getOriginalType() || - OriginalType.UINT_32 == type.getOriginalType() || - OriginalType.UINT_64 == type.getOriginalType()) { + if(isUnsignedInteger(type)) { return 0; } else { switch (typeName) { @@ -446,16 +817,57 @@ private static long getMaxValue(String typeName, long defaultValue) { } } + private static double getMinValue(String typeName, double defaultValue) { + switch (typeName) { + case serdeConstants.BIGINT_TYPE_NAME: + return (double) Long.MIN_VALUE; + case serdeConstants.INT_TYPE_NAME: + return (double) Integer.MIN_VALUE; + case serdeConstants.SMALLINT_TYPE_NAME: + return (double) Short.MIN_VALUE; + case serdeConstants.TINYINT_TYPE_NAME: + return (double) Byte.MIN_VALUE; + case serdeConstants.FLOAT_TYPE_NAME: + return (double) Float.MIN_VALUE; + default: + return defaultValue; + } + } + + private static double getMaxValue(String typeName, double defaultValue) { + switch (typeName) { + case serdeConstants.BIGINT_TYPE_NAME: + return (double) Long.MAX_VALUE; + case serdeConstants.INT_TYPE_NAME: + return (double) Integer.MAX_VALUE; + case serdeConstants.SMALLINT_TYPE_NAME: + return (double) Short.MAX_VALUE; + case serdeConstants.TINYINT_TYPE_NAME: + return (double) Byte.MAX_VALUE; + case serdeConstants.FLOAT_TYPE_NAME: + return (double) Float.MAX_VALUE; + default: + return defaultValue; + } + } + public abstract static class BinaryConverter extends PrimitiveConverter { protected final PrimitiveType type; private final ConverterParent parent; private final int index; + private final TypeInfo hiveTypeInfo; private ArrayList lookupTable; - public BinaryConverter(PrimitiveType type, ConverterParent parent, int index) { + public BinaryConverter(PrimitiveType type, ConverterParent parent, int index, + TypeInfo hiveTypeInfo) { this.type = type; this.parent = parent; this.index = index; + this.hiveTypeInfo = hiveTypeInfo; + } + + public BinaryConverter(PrimitiveType type, ConverterParent parent, int index) { + this(type, parent, index, null); } protected abstract T convert(Binary binary); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/timestamp/ParquetTimestampUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/timestamp/ParquetTimestampUtils.java new file mode 100644 index 000000000000..9ce07e74f321 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/timestamp/ParquetTimestampUtils.java @@ -0,0 +1,56 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.io.parquet.timestamp; + +import java.time.ZoneId; +import java.time.ZoneOffset; + +import org.apache.hadoop.hive.common.type.Timestamp; +import org.apache.parquet.schema.LogicalTypeAnnotation.TimeUnit; + +public class ParquetTimestampUtils { + private static final long MILLI = 1000; + private static final long MICRO = 1_000_000; + private static final long NANO = 1_000_000_000; + + public static Timestamp getTimestamp(long value, TimeUnit timeUnit, boolean isAdjustedToUTC) { + + ZoneId zone = ZoneOffset.UTC; + if (isAdjustedToUTC) { + zone = ZoneId.systemDefault(); + } + long seconds = 0L; + long nanoseconds = 0L; + + switch (timeUnit) { + case MILLIS: + seconds = value / MILLI; + nanoseconds = (value % MILLI) * MICRO; + break; + + case MICROS: + seconds = value / MICRO; + nanoseconds = (value % MICRO) * MILLI; + break; + + case NANOS: + seconds = value / NANO; + nanoseconds = (value % NANO); + break; + default: + break; + } + return Timestamp.ofEpochSecond(seconds, nanoseconds, zone); + } +} \ No newline at end of file diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/ParquetDataColumnReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/ParquetDataColumnReader.java index 26a45111870f..2f7d356f0531 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/ParquetDataColumnReader.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/ParquetDataColumnReader.java @@ -37,6 +37,14 @@ public interface ParquetDataColumnReader { */ void initFromPage(int valueCount, ByteBufferInputStream in) throws IOException; + long readSmallInt(); + + long readSmallInt(int id); + + long readTinyInt(); + + long readTinyInt(int id); + /** * @return the next Dictionary ID from the page */ diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/ParquetDataColumnReaderFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/ParquetDataColumnReaderFactory.java index 2eb3a9bfafe4..0b862aaf31e4 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/ParquetDataColumnReaderFactory.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/ParquetDataColumnReaderFactory.java @@ -19,14 +19,19 @@ package org.apache.hadoop.hive.ql.io.parquet.vector; import org.apache.hadoop.hive.common.type.HiveBaseChar; +import org.apache.hadoop.hive.common.type.HiveDecimal; import org.apache.hadoop.hive.common.type.Timestamp; import org.apache.hadoop.hive.ql.exec.vector.expressions.StringExpr; +import org.apache.hadoop.hive.ql.io.parquet.convert.ETypeConverter; import org.apache.hadoop.hive.ql.io.parquet.timestamp.NanoTime; import org.apache.hadoop.hive.ql.io.parquet.timestamp.NanoTimeUtils; +import org.apache.hadoop.hive.ql.io.parquet.timestamp.ParquetTimestampUtils; import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable; import org.apache.hadoop.hive.serde2.typeinfo.CharTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.VarcharTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; @@ -35,15 +40,21 @@ import org.apache.parquet.column.Dictionary; import org.apache.parquet.column.values.ValuesReader; import org.apache.parquet.io.api.Binary; -import org.apache.parquet.schema.OriginalType; +import org.apache.parquet.schema.LogicalTypeAnnotation; +import org.apache.parquet.schema.LogicalTypeAnnotation.DecimalLogicalTypeAnnotation; +import org.apache.parquet.schema.LogicalTypeAnnotation.LogicalTypeAnnotationVisitor; +import org.apache.parquet.schema.LogicalTypeAnnotation.StringLogicalTypeAnnotation; +import org.apache.parquet.schema.LogicalTypeAnnotation.TimeUnit; +import org.apache.parquet.schema.LogicalTypeAnnotation.TimestampLogicalTypeAnnotation; import org.apache.parquet.schema.PrimitiveType; import java.io.IOException; -import java.io.UnsupportedEncodingException; import java.nio.ByteBuffer; import java.nio.ByteOrder; +import java.nio.charset.StandardCharsets; import java.time.ZoneId; import java.util.Arrays; +import java.util.Optional; /** * Parquet file has self-describing schema which may differ from the user required schema (e.g. @@ -58,11 +69,23 @@ private ParquetDataColumnReaderFactory() { /** * The default data column reader for existing Parquet page reader which works for both * dictionary or non dictionary types, Mirror from dictionary encoding path. + * + * HMS metadata can have a typename that is different from the type of the parquet data. + * If the data is valid as per the definition of the type in HMS, the data will be returned + * as part of hive query. If the data is invalid, null will be returned. */ public static class DefaultParquetDataColumnReader implements ParquetDataColumnReader { protected ValuesReader valuesReader; protected Dictionary dict; + // After the data is read in the parquet type, isValid will be set to true if the data can be + // returned in the type defined in HMS. Otherwise isValid is set to false. + boolean isValid = true; + + protected int hivePrecision = 0; + protected int hiveScale = 0; + protected final HiveDecimalWritable hiveDecimalWritable = new HiveDecimalWritable(0L); + // Varchar or char length protected int length = -1; @@ -76,6 +99,19 @@ public DefaultParquetDataColumnReader(Dictionary dict, int length) { this.length = length; } + public DefaultParquetDataColumnReader(ValuesReader realReader, int length, int precision, + int scale) { + this(realReader, length); + hivePrecision = precision; + hiveScale = scale; + } + + public DefaultParquetDataColumnReader(Dictionary dict, int length, int precision, int scale) { + this(dict, length); + this.hivePrecision = precision; + this.hiveScale = scale; + } + @Override public void initFromPage(int i, ByteBufferInputStream in) throws IOException { valuesReader.initFromPage(i, in); @@ -207,6 +243,26 @@ public long readLong() { return valuesReader.readLong(); } + @Override + public long readSmallInt() { + return validatedLong(valuesReader.readInteger(), serdeConstants.SMALLINT_TYPE_NAME); + } + + @Override + public long readSmallInt(int id) { + return validatedLong(dict.decodeToInt(id), serdeConstants.SMALLINT_TYPE_NAME); + } + + @Override + public long readTinyInt() { + return validatedLong(valuesReader.readInteger(), serdeConstants.TINYINT_TYPE_NAME); + } + + @Override + public long readTinyInt(int id) { + return validatedLong(dict.decodeToInt(id), serdeConstants.TINYINT_TYPE_NAME); + } + @Override public int readValueDictionaryId() { return valuesReader.readValueDictionaryId(); @@ -239,128 +295,202 @@ protected String getPaddedString(String value) { * Method to convert string to UTF-8 bytes. */ protected static byte[] convertToBytes(String value) { - try { - // convert integer to string - return value.getBytes("UTF-8"); - } catch (UnsupportedEncodingException e) { - throw new RuntimeException("Failed to encode string in UTF-8", e); - } + return value.getBytes(StandardCharsets.UTF_8); } - } - /** - * The reader who reads from the underlying int32 value value. Implementation is in consist with - * ETypeConverter EINT32_CONVERTER - */ - public static class TypesFromInt32PageReader extends DefaultParquetDataColumnReader { + /** + * Helper function to validate long data. Sets the isValid to true if the data is valid + * for the type it will be read in, otherwise false. + * @param longValue input value of long type to be validated. + * @param typeName the hivetype to be used to read the longValue + * @param isUnSigned true when longValue is unsigned parquet type + * @return 0 if the data is invalid, other longValue + */ + long validatedLong(long longValue, String typeName, boolean isUnSigned) { + switch (typeName) { + case serdeConstants.BIGINT_TYPE_NAME: + isValid = !isUnSigned || (longValue >= 0); + break; + case serdeConstants.INT_TYPE_NAME: + isValid = ((longValue <= Integer.MAX_VALUE) && + (longValue >= (isUnSigned ? 0 : Integer.MIN_VALUE))); + break; + case serdeConstants.SMALLINT_TYPE_NAME: + isValid = ((longValue <= Short.MAX_VALUE) && + (longValue >= (isUnSigned ? 0 : Short.MIN_VALUE))); + break; + case serdeConstants.TINYINT_TYPE_NAME: + isValid = ((longValue <= Byte.MAX_VALUE) && + (longValue >= (isUnSigned ? 0 : Byte.MIN_VALUE))); + break; + default: + isValid = true; + } - public TypesFromInt32PageReader(ValuesReader realReader, int length) { - super(realReader, length); + if (isValid) { + return longValue; + } else { + return 0; + } } - public TypesFromInt32PageReader(Dictionary dict, int length) { - super(dict, length); + /** + * Helper function to validate long data. Sets the isValid to true if the data is valid + * for the type it will be read in, otherwise false. + * @param longValue input value of long type to be validated. + * @param typeName the hivetype to be used to read the longValue + * @return 0 if the data is invalid, other longValue + */ + long validatedLong(long longValue, String typeName) { + return validatedLong(longValue, typeName, false); } - @Override - public long readLong() { - return valuesReader.readInteger(); + /** + * Helper function to validate long data when it will be read as a decimal from hive. Sets the + * isValid to true if the data can be read as decimal, otherwise false. + * @param longValue input value of long type to be validated. + * @return null if the data is invalid, other a validated hivedecimalwritable + */ + byte[] validatedDecimal(long longValue) { + hiveDecimalWritable.setFromLong(longValue); + return validatedDecimal(); } - @Override - public long readLong(int id) { - return dict.decodeToInt(id); + /** + * Helper function to validate double data when it will be read as a decimal from hive. Sets + * the isValid to true if the data can be read as decimal, otherwise false. + * @param doubleValue input value of double type to be validated. + * @return null if the data is invalid, other a validated hivedecimalwritable + */ + byte[] validatedDecimal(double doubleValue) { + hiveDecimalWritable.setFromDouble(doubleValue); + return validatedDecimal(); } - @Override - public float readFloat() { - return valuesReader.readInteger(); + /** + * Helper function to validate decimal data in hiveDecimalWritable can be read as the decimal + * type defined in HMS. Sets the isValid to true if the data can be read as decimal, otherwise + * false. + * @return null if the data is invalid, other a validated hivedecimalwritable + */ + byte[] validatedDecimal() { + return validatedScaledDecimal(hiveScale); } - @Override - public float readFloat(int id) { - return dict.decodeToInt(id); + byte[] validatedScaledDecimal(int inpScale) { + hiveDecimalWritable.mutateEnforcePrecisionScale(hivePrecision, hiveScale); + if (hiveDecimalWritable.isSet()) { + this.isValid = true; + return hiveDecimalWritable.getHiveDecimal().bigIntegerBytesScaled(inpScale); + } else { + this.isValid = false; + return null; + } } - @Override - public double readDouble() { - return valuesReader.readInteger(); - } + /** + * Helper function to validate double data. Sets the isValid to true if the data is valid + * for the type it will be read in, otherwise false. + * @param doubleValue input value of long type to be validated. + * @param typeName the hivetype to be used to read the doubleValue + * @return 0 if the data is invalid, other doubleValue + */ + double validatedDouble(double doubleValue, String typeName) { + switch (typeName) { + case serdeConstants.FLOAT_TYPE_NAME: + double absDoubleValue = (doubleValue < 0) ? (doubleValue * -1) : doubleValue; + int exponent = Math.getExponent(doubleValue); + isValid = ((absDoubleValue <= Float.MAX_VALUE) && (absDoubleValue >= Float.MIN_VALUE) && + (exponent <= Float.MAX_EXPONENT) && (exponent >= Float.MIN_EXPONENT)); + break; + case serdeConstants.BIGINT_TYPE_NAME: + isValid = ((doubleValue <= Long.MAX_VALUE) && (doubleValue >= Long.MIN_VALUE) && + (doubleValue % 1 == 0)); + break; + case serdeConstants.INT_TYPE_NAME: + isValid = ((doubleValue <= Integer.MAX_VALUE) && (doubleValue >= Integer.MIN_VALUE) && + (doubleValue % 1 == 0)); + break; + case serdeConstants.SMALLINT_TYPE_NAME: + isValid = ((doubleValue <= Short.MAX_VALUE) && (doubleValue >= Short.MIN_VALUE) && + (doubleValue % 1 == 0)); + break; + case serdeConstants.TINYINT_TYPE_NAME: + isValid = ((doubleValue <= Byte.MAX_VALUE) && (doubleValue >= Byte.MIN_VALUE) && + (doubleValue % 1 == 0)); + break; + default: + isValid = true; + } - @Override - public double readDouble(int id) { - return dict.decodeToInt(id); + if (isValid) { + return doubleValue; + } else { + return 0; + } } + } - @Override - public byte[] readString() { - return convertToBytes(valuesReader.readInteger()); - } + /** + * The reader who reads from the underlying int64 value value. Implementation is in consist with + * ETypeConverter EINT64_CONVERTER + * + * The data is read as long from the reader, then validated, converted and returned as per the + * type defined in HMS. + */ + public static class TypesFromInt64PageReader extends DefaultParquetDataColumnReader { - @Override - public byte[] readString(int id) { - return convertToBytes(dict.decodeToInt(id)); - } + private boolean isAdjustedToUTC; + private TimeUnit timeUnit; - @Override - public byte[] readVarchar() { - String value = enforceMaxLength( - convertToString(valuesReader.readInteger())); - return convertToBytes(value); + public TypesFromInt64PageReader(ValuesReader realReader, int length, int precision, int scale) { + super(realReader, length, precision, scale); } - @Override - public byte[] readVarchar(int id) { - String value = enforceMaxLength( - convertToString(dict.decodeToInt(id))); - return convertToBytes(value); + public TypesFromInt64PageReader(Dictionary dict, int length, int precision, int scale) { + super(dict, length, precision, scale); } - @Override - public byte[] readChar() { - String value = enforceMaxLength( - convertToString(valuesReader.readInteger())); - return convertToBytes(value); + public TypesFromInt64PageReader(ValuesReader realReader, int length, boolean isAdjustedToUTC, TimeUnit timeUnit) { + super(realReader, length); + this.isAdjustedToUTC = isAdjustedToUTC; + this.timeUnit = timeUnit; } - @Override - public byte[] readChar(int id) { - String value = enforceMaxLength( - convertToString(dict.decodeToInt(id))); - return convertToBytes(value); + public TypesFromInt64PageReader(Dictionary dict, int length, boolean isAdjustedToUTC, TimeUnit timeUnit) { + super(dict, length); + this.isAdjustedToUTC = isAdjustedToUTC; + this.timeUnit = timeUnit; } - private static String convertToString(int value) { - return Integer.toString(value); + @Override + public long readInteger() { + return super.validatedLong(valuesReader.readLong(), serdeConstants.INT_TYPE_NAME); } - private static byte[] convertToBytes(int value) { - return convertToBytes(convertToString(value)); + @Override + public long readInteger(int id) { + return super.validatedLong(dict.decodeToLong(id), serdeConstants.INT_TYPE_NAME); } - } - - /** - * The reader who reads from the underlying int64 value value. Implementation is in consist with - * ETypeConverter EINT64_CONVERTER - */ - public static class TypesFromInt64PageReader extends DefaultParquetDataColumnReader { - public TypesFromInt64PageReader(ValuesReader realReader, int length) { - super(realReader, length); + @Override + public long readSmallInt() { + return super.validatedLong(valuesReader.readLong(), serdeConstants.SMALLINT_TYPE_NAME); } - public TypesFromInt64PageReader(Dictionary dict, int length) { - super(dict, length); + @Override + public long readSmallInt(int id) { + return super.validatedLong(dict.decodeToLong(id), serdeConstants.SMALLINT_TYPE_NAME); } @Override - public long readInteger() { - return valuesReader.readLong(); + public long readTinyInt() { + return super.validatedLong(valuesReader.readLong(), serdeConstants.TINYINT_TYPE_NAME); } @Override - public long readInteger(int id) { - return dict.decodeToLong(id); + public long readTinyInt(int id) { + return super.validatedLong(dict.decodeToLong(id), serdeConstants.TINYINT_TYPE_NAME); } @Override @@ -383,6 +513,16 @@ public double readDouble(int id) { return dict.decodeToLong(id); } + @Override + public byte[] readDecimal() { + return super.validatedDecimal(valuesReader.readLong()); + } + + @Override + public byte[] readDecimal(int id) { + return super.validatedDecimal(dict.decodeToLong(id)); + } + @Override public byte[] readString() { return convertToBytes(valuesReader.readLong()); @@ -421,6 +561,21 @@ public byte[] readChar(int id) { return convertToBytes(value); } + private Timestamp convert(Long value) { + Timestamp timestamp = ParquetTimestampUtils.getTimestamp(value, timeUnit, isAdjustedToUTC); + return timestamp; + } + + @Override + public Timestamp readTimestamp(int id) { + return convert(dict.decodeToLong(id)); + } + + @Override + public Timestamp readTimestamp() { + return convert(valuesReader.readLong()); + } + private static String convertToString(long value) { return Long.toString(value); } @@ -431,372 +586,337 @@ private static byte[] convertToBytes(long value) { } /** - * The reader who reads long data using int type. + * The reader who reads unsigned long data. + * + * The data is read as long from the reader, then validated, converted and returned as per the + * type defined in HMS. The data is unsigned long hence when read back it can't be negative. + * The call to validate will indicate that the data is unsigned to do the appropriate + * validation. */ - public static class Types64Int2IntPageReader extends TypesFromInt64PageReader { + public static class TypesFromUInt64PageReader extends TypesFromInt64PageReader { - public Types64Int2IntPageReader(ValuesReader realReader, int length) { - super(realReader, length); + public TypesFromUInt64PageReader(ValuesReader realReader, int length, int precision, + int scale) { + super(realReader, length, precision, scale); } - public Types64Int2IntPageReader(Dictionary dict, int length) { - super(dict, length); + public TypesFromUInt64PageReader(Dictionary dict, int length, int precision, int scale) { + super(dict, length, precision, scale); } @Override - public boolean isValid(long value) { - return ((value <= Integer.MAX_VALUE) && (value >= Integer.MIN_VALUE)); - } - } - - /** - * The reader who reads long data using smallint type. - */ - public static class Types64Int2SmallintPageReader extends TypesFromInt64PageReader { - public Types64Int2SmallintPageReader(ValuesReader realReader, int length) { - super(realReader, length); + public long readLong() { + return super.validatedLong(valuesReader.readLong(), serdeConstants.BIGINT_TYPE_NAME, true); } - public Types64Int2SmallintPageReader(Dictionary dict, int length) { - super(dict, length); + @Override + public long readLong(int id) { + return super.validatedLong(dict.decodeToLong(id), serdeConstants.BIGINT_TYPE_NAME, true); } @Override - public boolean isValid(long value) { - return ((value <= Short.MAX_VALUE) && (value >= Short.MIN_VALUE)); + public long readInteger() { + return super.validatedLong(valuesReader.readLong(), serdeConstants.INT_TYPE_NAME, true); } - } - /** - * The reader who reads long data using tinyint type. - */ - public static class Types64Int2TinyintPageReader extends TypesFromInt64PageReader { - public Types64Int2TinyintPageReader(ValuesReader realReader, int length) { - super(realReader, length); + @Override + public long readInteger(int id) { + return super.validatedLong(dict.decodeToLong(id), serdeConstants.INT_TYPE_NAME, true); } - public Types64Int2TinyintPageReader(Dictionary dict, int length) { - super(dict, length); + @Override + public long readSmallInt() { + return super.validatedLong(valuesReader.readLong(), serdeConstants.SMALLINT_TYPE_NAME, true); } @Override - public boolean isValid(long value) { - return ((value <= Byte.MAX_VALUE) && (value >= Byte.MIN_VALUE)); + public long readSmallInt(int id) { + return super.validatedLong(dict.decodeToLong(id), serdeConstants.SMALLINT_TYPE_NAME, true); } - } - - /** - * The reader who reads long data using Decimal type. - */ - public static class Types64Int2DecimalPageReader extends TypesFromInt64PageReader { - private int precision = 0; - private int scale = 0; - private final HiveDecimalWritable hiveDecimalWritable = new HiveDecimalWritable(0L); - public Types64Int2DecimalPageReader(ValuesReader realReader, int length, int precision, - int scale) { - super(realReader, length); - this.precision = precision; - this.scale = scale; + @Override + public long readTinyInt() { + return super.validatedLong(valuesReader.readLong(), serdeConstants.TINYINT_TYPE_NAME, true); } - public Types64Int2DecimalPageReader(Dictionary dict, int length, int precision, int scale) { - super(dict, length); - this.precision = precision; - this.scale = scale; + @Override + public long readTinyInt(int id) { + return super.validatedLong(dict.decodeToLong(id), serdeConstants.TINYINT_TYPE_NAME, true); } @Override - public boolean isValid(long value) { - hiveDecimalWritable.setFromLong(value); - hiveDecimalWritable.mutateEnforcePrecisionScale(precision, scale); - return hiveDecimalWritable.isSet(); + public float readFloat() { + return (float) super.validatedLong(valuesReader.readLong(), serdeConstants.BIGINT_TYPE_NAME, + true); } - } - - /** - * The reader who reads unsigned long data. - */ - public static class TypesFromUInt64PageReader extends TypesFromInt64PageReader { - public TypesFromUInt64PageReader(ValuesReader realReader, int length) { - super(realReader, length); + @Override + public float readFloat(int id) { + return (float) super.validatedLong(dict.decodeToLong(id), serdeConstants.BIGINT_TYPE_NAME, + true); } - public TypesFromUInt64PageReader(Dictionary dict, int length) { - super(dict, length); + @Override + public double readDouble() { + return (double) super.validatedLong(valuesReader.readLong(), serdeConstants.BIGINT_TYPE_NAME, + true); } @Override - public boolean isValid(long value) { - return (value >= 0); + public double readDouble(int id) { + return (double) super.validatedLong(dict.decodeToLong(id), serdeConstants.BIGINT_TYPE_NAME, + true); } @Override - public boolean isValid(float value) { - return (value >= 0); + public byte[] readDecimal() { + long validatedLongValue = super.validatedLong(valuesReader.readLong(), + serdeConstants.BIGINT_TYPE_NAME, true); + if (super.isValid) { + return super.validatedDecimal(validatedLongValue); + } else { + return null; + } } @Override - public boolean isValid(double value) { - return (value >= 0); + public byte[] readDecimal(int id) { + long validatedLongValue = super.validatedLong(dict.decodeToLong(id), + serdeConstants.BIGINT_TYPE_NAME, true); + if (super.isValid) { + return super.validatedDecimal(validatedLongValue); + } else { + return null; + } } } /** - * The reader who reads unsigned long data using int type. + * The reader who reads from the underlying int32 value value. Implementation is in consist with + * ETypeConverter EINT32_CONVERTER + * + * The data is read as integer from the reader, then validated, converted and returned as per the + * type defined in HMS. */ - public static class Types64UInt2IntPageReader extends TypesFromInt64PageReader { + public static class TypesFromInt32PageReader extends DefaultParquetDataColumnReader { - public Types64UInt2IntPageReader(ValuesReader realReader, int length) { - super(realReader, length); + public TypesFromInt32PageReader(ValuesReader realReader, int length, int precision, + int scale) { + super(realReader, length, precision, scale); } - public Types64UInt2IntPageReader(Dictionary dict, int length) { - super(dict, length); + public TypesFromInt32PageReader(Dictionary dict, int length, int precision, int scale) { + super(dict, length, precision, scale); } @Override - public boolean isValid(long value) { - return ((value <= Integer.MAX_VALUE) && (value >= 0)); + public long readLong() { + return valuesReader.readInteger(); } - } - /** - * The reader who reads unsigned long data using smallint type. - */ - public static class Types64UInt2SmallintPageReader extends TypesFromInt64PageReader { - public Types64UInt2SmallintPageReader(ValuesReader realReader, int length) { - super(realReader, length); + @Override + public long readLong(int id) { + return dict.decodeToInt(id); } - public Types64UInt2SmallintPageReader(Dictionary dict, int length) { - super(dict, length); + @Override + public float readFloat() { + return valuesReader.readInteger(); } @Override - public boolean isValid(long value) { - return ((value <= Short.MAX_VALUE) && (value >= 0)); + public float readFloat(int id) { + return dict.decodeToInt(id); } - } - /** - * The reader who reads unsigned long data using tinyint type. - */ - public static class Types64UInt2TinyintPageReader extends TypesFromInt64PageReader { - public Types64UInt2TinyintPageReader(ValuesReader realReader, int length) { - super(realReader, length); + @Override + public double readDouble() { + return valuesReader.readInteger(); } - public Types64UInt2TinyintPageReader(Dictionary dict, int length) { - super(dict, length); + @Override + public double readDouble(int id) { + return dict.decodeToInt(id); } @Override - public boolean isValid(long value) { - return ((value <= Byte.MAX_VALUE) && (value >= 0)); + public byte[] readString() { + return convertToBytes(valuesReader.readInteger()); } - } - /** - * The reader who reads unsigned long data using Decimal type. - */ - public static class Types64UInt2DecimalPageReader extends TypesFromInt64PageReader { - private int precision = 0; - private int scale = 0; - private final HiveDecimalWritable hiveDecimalWritable = new HiveDecimalWritable(0L); - - public Types64UInt2DecimalPageReader(ValuesReader realReader, int length, int precision, - int scale) { - super(realReader, length); - this.precision = precision; - this.scale = scale; + @Override + public byte[] readString(int id) { + return convertToBytes(dict.decodeToInt(id)); } - public Types64UInt2DecimalPageReader(Dictionary dict, int length, int precision, int scale) { - super(dict, length); - this.precision = precision; - this.scale = scale; + @Override + public byte[] readVarchar() { + String value = enforceMaxLength( + convertToString(valuesReader.readInteger())); + return convertToBytes(value); } @Override - public boolean isValid(long value) { - hiveDecimalWritable.setFromLong(value); - hiveDecimalWritable.mutateEnforcePrecisionScale(precision, scale); - return ((value >= 0) && hiveDecimalWritable.isSet()); + public byte[] readVarchar(int id) { + String value = enforceMaxLength( + convertToString(dict.decodeToInt(id))); + return convertToBytes(value); } - } - /** - * The reader who reads int data using smallint type. - */ - public static class Types32Int2SmallintPageReader extends TypesFromInt32PageReader { - public Types32Int2SmallintPageReader(ValuesReader realReader, int length) { - super(realReader, length); + @Override + public byte[] readChar() { + String value = enforceMaxLength( + convertToString(valuesReader.readInteger())); + return convertToBytes(value); } - public Types32Int2SmallintPageReader(Dictionary dict, int length) { - super(dict, length); + @Override + public byte[] readChar(int id) { + String value = enforceMaxLength( + convertToString(dict.decodeToInt(id))); + return convertToBytes(value); } - @Override - public boolean isValid(long value) { - return ((value <= Short.MAX_VALUE) && (value >= Short.MIN_VALUE)); + private static String convertToString(int value) { + return Integer.toString(value); } - } - /** - * The reader who reads int data using tinyint type. - */ - public static class Types32Int2TinyintPageReader extends TypesFromInt32PageReader { - public Types32Int2TinyintPageReader(ValuesReader realReader, int length) { - super(realReader, length); + private static byte[] convertToBytes(int value) { + return convertToBytes(convertToString(value)); } - public Types32Int2TinyintPageReader(Dictionary dict, int length) { - super(dict, length); + @Override + public byte[] readDecimal() { + return super.validatedDecimal(valuesReader.readInteger()); } @Override - public boolean isValid(long value) { - return ((value <= Byte.MAX_VALUE) && (value >= Byte.MIN_VALUE)); + public byte[] readDecimal(int id) { + return super.validatedDecimal(dict.decodeToInt(id)); } } /** - * The reader who reads int data using Decimal type. + * The reader who reads unsigned int data. + * + * The data is read as integer from the reader, then validated, converted and returned as per the + * type defined in HMS. The data is unsigned integer hence when read back it can't be negative. + * The call to validate will indicate that the data is unsigned to do the appropriate + * validation. */ - public static class Types32Int2DecimalPageReader extends TypesFromInt32PageReader { - private int precision = 0; - private int scale = 0; - private final HiveDecimalWritable hiveDecimalWritable = new HiveDecimalWritable(0L); + public static class TypesFromUInt32PageReader extends TypesFromInt32PageReader { - public Types32Int2DecimalPageReader(ValuesReader realReader, int length, int precision, + public TypesFromUInt32PageReader(ValuesReader realReader, int length, int precision, int scale) { - super(realReader, length); - this.precision = precision; - this.scale = scale; + super(realReader, length, precision, scale); } - public Types32Int2DecimalPageReader(Dictionary dict, int length, int precision, int scale) { - super(dict, length); - this.precision = precision; - this.scale = scale; + public TypesFromUInt32PageReader(Dictionary dict, int length, int precision, int scale) { + super(dict, length, precision, scale); } @Override - public boolean isValid(long value) { - hiveDecimalWritable.setFromLong(value); - hiveDecimalWritable.mutateEnforcePrecisionScale(precision, scale); - return hiveDecimalWritable.isSet(); - } - } - - /** - * The reader who reads unsigned int data. - */ - public static class TypesFromUInt32PageReader extends TypesFromInt32PageReader { - public TypesFromUInt32PageReader(ValuesReader realReader, int length) { - super(realReader, length); + public long readLong() { + return super.validatedLong(valuesReader.readInteger(), serdeConstants.BIGINT_TYPE_NAME, true); } - public TypesFromUInt32PageReader(Dictionary dict, int length) { - super(dict, length); + @Override + public long readLong(int id) { + return super.validatedLong(dict.decodeToLong(id), serdeConstants.BIGINT_TYPE_NAME, true); } @Override - public boolean isValid(long value) { - return (value >= 0); + public long readInteger() { + return super.validatedLong(valuesReader.readInteger(), serdeConstants.INT_TYPE_NAME, true); } @Override - public boolean isValid(float value) { - return (value >= 0); + public long readInteger(int id) { + return super.validatedLong(dict.decodeToInt(id), serdeConstants.INT_TYPE_NAME, true); } @Override - public boolean isValid(double value) { - return (value >= 0); + public long readSmallInt() { + return validatedLong(valuesReader.readInteger(), serdeConstants.SMALLINT_TYPE_NAME, true); } - } - /** - * The reader who reads unsigned int data using smallint type. - */ - public static class Types32UInt2SmallintPageReader extends TypesFromInt32PageReader { - public Types32UInt2SmallintPageReader(ValuesReader realReader, int length) { - super(realReader, length); + @Override + public long readSmallInt(int id) { + return validatedLong(dict.decodeToInt(id), serdeConstants.SMALLINT_TYPE_NAME, true); } - public Types32UInt2SmallintPageReader(Dictionary dict, int length) { - super(dict, length); + @Override + public long readTinyInt() { + return validatedLong(valuesReader.readInteger(), serdeConstants.TINYINT_TYPE_NAME, true); } @Override - public boolean isValid(long value) { - return ((value <= Short.MAX_VALUE) && (value >= 0)); + public long readTinyInt(int id) { + return validatedLong(dict.decodeToInt(id), serdeConstants.TINYINT_TYPE_NAME, true); } - } - /** - * The reader who reads unsigned int data using tinyint type. - */ - public static class Types32UInt2TinyintPageReader extends TypesFromInt32PageReader { - public Types32UInt2TinyintPageReader(ValuesReader realReader, int length) { - super(realReader, length); + @Override + public float readFloat() { + return (float) super.validatedLong(valuesReader.readInteger(), serdeConstants.BIGINT_TYPE_NAME, + true); } - public Types32UInt2TinyintPageReader(Dictionary dict, int length) { - super(dict, length); + @Override + public float readFloat(int id) { + return (float) super.validatedLong(dict.decodeToLong(id), serdeConstants.BIGINT_TYPE_NAME, + true); } @Override - public boolean isValid(long value) { - return ((value <= Byte.MAX_VALUE) && (value >= 0)); + public double readDouble() { + return (double) super.validatedLong(valuesReader.readInteger(), + serdeConstants.BIGINT_TYPE_NAME, true); } - } - /** - * The reader who reads unsigned int data using Decimal type. - */ - public static class Types32UInt2DecimalPageReader extends TypesFromInt32PageReader { - private int precision = 0; - private int scale = 0; - private final HiveDecimalWritable hiveDecimalWritable = new HiveDecimalWritable(0L); - - public Types32UInt2DecimalPageReader(ValuesReader realReader, int length, int precision, - int scale) { - super(realReader, length); - this.precision = precision; - this.scale = scale; + @Override + public double readDouble(int id) { + return (double) super.validatedLong(dict.decodeToLong(id), serdeConstants.BIGINT_TYPE_NAME, + true); } - public Types32UInt2DecimalPageReader(Dictionary dict, int length, int precision, int scale) { - super(dict, length); - this.precision = precision; - this.scale = scale; + @Override + public byte[] readDecimal() { + long validatedIntValue = super.validatedLong(valuesReader.readInteger(), + serdeConstants.INT_TYPE_NAME, true); + if (super.isValid) { + return super.validatedDecimal(validatedIntValue); + } else { + return null; + } } @Override - public boolean isValid(long value) { - hiveDecimalWritable.setFromLong(value); - hiveDecimalWritable.mutateEnforcePrecisionScale(precision, scale); - return ((value >= 0) && hiveDecimalWritable.isSet()); + public byte[] readDecimal(int id) { + long validatedIntValue = super.validatedLong(dict.decodeToInt(id), + serdeConstants.INT_TYPE_NAME, true); + if (super.isValid) { + return super.validatedDecimal(validatedIntValue); + } else { + return null; + } } } /** * The reader who reads from the underlying float value value. Implementation is in consist with * ETypeConverter EFLOAT_CONVERTER + * + * The data is read as float from the reader, then validated, converted and returned as per the + * type defined in HMS. */ public static class TypesFromFloatPageReader extends DefaultParquetDataColumnReader { - public TypesFromFloatPageReader(ValuesReader realReader, int length) { - super(realReader, length); + public TypesFromFloatPageReader(ValuesReader realReader, int length, int precision, int scale) { + super(realReader, length, precision, scale); } - public TypesFromFloatPageReader(Dictionary realReader, int length) { - super(realReader, length); + public TypesFromFloatPageReader(Dictionary dict, int length, int precision, int scale) { + super(dict, length, precision, scale); } @Override @@ -847,6 +967,49 @@ public byte[] readChar(int id) { return convertToBytes(value); } + @Override + public long readLong() { + return (long)(super.validatedDouble(valuesReader.readFloat(), + serdeConstants.BIGINT_TYPE_NAME)); + } + + @Override + public long readLong(int id) { + return (long)(super.validatedDouble(dict.decodeToFloat(id), serdeConstants.BIGINT_TYPE_NAME)); + } + + @Override + public long readInteger() { + return (long)(super.validatedDouble(valuesReader.readFloat(), serdeConstants.INT_TYPE_NAME)); + } + + @Override + public long readInteger(int id) { + return (long)(super.validatedDouble(dict.decodeToFloat(id), serdeConstants.INT_TYPE_NAME)); + } + + @Override + public long readSmallInt() { + return (long)super.validatedDouble(valuesReader.readFloat(), + serdeConstants.SMALLINT_TYPE_NAME); + } + + @Override + public long readSmallInt(int id) { + return (long)super.validatedDouble(dict.decodeToFloat(id), serdeConstants.SMALLINT_TYPE_NAME); + } + + @Override + public long readTinyInt() { + return (long)super.validatedDouble(valuesReader.readFloat(), + serdeConstants.TINYINT_TYPE_NAME); + } + + @Override + public long readTinyInt(int id) { + return (long)super.validatedDouble(dict.decodeToFloat(id), serdeConstants.TINYINT_TYPE_NAME); + } + private static String convertToString(float value) { return Float.toString(value); } @@ -854,19 +1017,32 @@ private static String convertToString(float value) { private static byte[] convertToBytes(float value) { return convertToBytes(convertToString(value)); } + + @Override + public byte[] readDecimal() { + return super.validatedDecimal(valuesReader.readFloat()); + } + + @Override + public byte[] readDecimal(int id) { + return super.validatedDecimal(dict.decodeToFloat(id)); + } } /** * The reader who reads from the underlying double value value. + * + * The data is read as double from the reader, then validated, converted and returned as per the + * type defined in HMS. */ public static class TypesFromDoublePageReader extends DefaultParquetDataColumnReader { - public TypesFromDoublePageReader(ValuesReader realReader, int length) { - super(realReader, length); + public TypesFromDoublePageReader(ValuesReader realReader, int length, int precision, int scale) { + super(realReader, length, precision, scale); } - public TypesFromDoublePageReader(Dictionary dict, int length) { - super(dict, length); + public TypesFromDoublePageReader(Dictionary dict, int length, int precision, int scale) { + super(dict, length, precision, scale); } @Override @@ -907,6 +1083,61 @@ public byte[] readChar(int id) { return convertToBytes(value); } + @Override + public long readLong() { + return (long)(super.validatedDouble(valuesReader.readDouble(), + serdeConstants.BIGINT_TYPE_NAME)); + } + + @Override + public long readLong(int id) { + return (long)(super.validatedDouble(dict.decodeToDouble(id), + serdeConstants.BIGINT_TYPE_NAME)); + } + + @Override + public long readInteger() { + return (long)(super.validatedDouble(valuesReader.readDouble(), + serdeConstants.INT_TYPE_NAME)); + } + + @Override + public long readInteger(int id) { + return (long)(super.validatedDouble(dict.decodeToDouble(id), serdeConstants.INT_TYPE_NAME)); + } + + @Override + public long readSmallInt() { + return (long)super.validatedDouble(valuesReader.readDouble(), serdeConstants.SMALLINT_TYPE_NAME); + } + + @Override + public long readSmallInt(int id) { + return (long)super.validatedDouble(dict.decodeToDouble(id), serdeConstants.SMALLINT_TYPE_NAME); + } + + @Override + public long readTinyInt() { + return (long)super.validatedDouble(valuesReader.readDouble(), serdeConstants.TINYINT_TYPE_NAME); + } + + @Override + public long readTinyInt(int id) { + return (long)super.validatedDouble(dict.decodeToDouble(id), serdeConstants.TINYINT_TYPE_NAME); + } + + @Override + public float readFloat() { + return (float)(super.validatedDouble(valuesReader.readDouble(), + serdeConstants.FLOAT_TYPE_NAME)); + } + + @Override + public float readFloat(int id) { + return (float)(super.validatedDouble(dict.decodeToDouble(id), + serdeConstants.FLOAT_TYPE_NAME)); + } + private static String convertToString(double value) { return Double.toString(value); } @@ -914,6 +1145,16 @@ private static String convertToString(double value) { private static byte[] convertToBytes(double value) { return convertToBytes(convertToString(value)); } + + @Override + public byte[] readDecimal() { + return super.validatedDecimal(valuesReader.readDouble()); + } + + @Override + public byte[] readDecimal(int id) { + return super.validatedDecimal(dict.decodeToDouble(id)); + } } /** @@ -1065,18 +1306,22 @@ private static byte[] convertToBytes(Timestamp value) { /** * The reader who reads from the underlying decimal value value. + * + * The data is read as binary from the reader treated as a decimal, then validated, converted + * and returned as per the type defined in HMS. */ public static class TypesFromDecimalPageReader extends DefaultParquetDataColumnReader { - private HiveDecimalWritable tempDecimal = new HiveDecimalWritable(); private short scale; - public TypesFromDecimalPageReader(ValuesReader realReader, int length, short scale) { - super(realReader, length); + public TypesFromDecimalPageReader(ValuesReader realReader, int length, short scale, + int hivePrecision, int hiveScale) { + super(realReader, length, hivePrecision, hiveScale); this.scale = scale; } - public TypesFromDecimalPageReader(Dictionary dict, int length, short scale) { - super(dict, length); + public TypesFromDecimalPageReader(Dictionary dict, int length, short scale, int hivePrecision, + int hiveScale) { + super(dict, length, hivePrecision, hiveScale); this.scale = scale; } @@ -1118,14 +1363,434 @@ public byte[] readChar(int id) { return convertToBytes(value); } + @Override + public float readFloat() { + hiveDecimalWritable.set(valuesReader.readBytes().getBytesUnsafe(), scale); + return (float)(super.validatedDouble(hiveDecimalWritable.doubleValue(), + serdeConstants.FLOAT_TYPE_NAME)); + } + + @Override + public float readFloat(int id) { + hiveDecimalWritable.set(dict.decodeToBinary(id).getBytesUnsafe(), scale); + return (float)(super.validatedDouble(hiveDecimalWritable.doubleValue(), + serdeConstants.FLOAT_TYPE_NAME)); + } + + @Override + public double readDouble() { + hiveDecimalWritable.set(valuesReader.readBytes().getBytesUnsafe(), scale); + return (super.validatedDouble(hiveDecimalWritable.doubleValue(), + serdeConstants.DOUBLE_TYPE_NAME)); + } + + @Override + public double readDouble(int id) { + hiveDecimalWritable.set(dict.decodeToBinary(id).getBytesUnsafe(), scale); + return (super.validatedDouble(hiveDecimalWritable.doubleValue(), + serdeConstants.DOUBLE_TYPE_NAME)); + } + + @Override + public long readLong() { + hiveDecimalWritable.set(valuesReader.readBytes().getBytesUnsafe(), scale); + return (long)(super.validatedDouble(hiveDecimalWritable.doubleValue(), + serdeConstants.BIGINT_TYPE_NAME)); + } + + @Override + public long readLong(int id) { + hiveDecimalWritable.set(dict.decodeToBinary(id).getBytesUnsafe(), scale); + return (long)(super.validatedDouble(hiveDecimalWritable.doubleValue(), + serdeConstants.BIGINT_TYPE_NAME)); + } + + @Override + public long readInteger() { + hiveDecimalWritable.set(valuesReader.readBytes().getBytesUnsafe(), scale); + return (long)(super.validatedDouble(hiveDecimalWritable.doubleValue(), + serdeConstants.INT_TYPE_NAME)); + } + + @Override + public long readInteger(int id) { + hiveDecimalWritable.set(dict.decodeToBinary(id).getBytesUnsafe(), scale); + return (long)(super.validatedDouble(hiveDecimalWritable.doubleValue(), + serdeConstants.INT_TYPE_NAME)); + } + + @Override + public long readSmallInt() { + hiveDecimalWritable.set(valuesReader.readBytes().getBytesUnsafe(), scale); + return (long)(super.validatedDouble(hiveDecimalWritable.doubleValue(), + serdeConstants.SMALLINT_TYPE_NAME)); + } + + @Override + public long readSmallInt(int id) { + hiveDecimalWritable.set(dict.decodeToBinary(id).getBytesUnsafe(), scale); + return (long)(super.validatedDouble(hiveDecimalWritable.doubleValue(), + serdeConstants.SMALLINT_TYPE_NAME)); + } + + @Override + public long readTinyInt() { + hiveDecimalWritable.set(valuesReader.readBytes().getBytesUnsafe(), scale); + return (long)(super.validatedDouble(hiveDecimalWritable.doubleValue(), + serdeConstants.TINYINT_TYPE_NAME)); + } + + @Override + public long readTinyInt(int id) { + hiveDecimalWritable.set(dict.decodeToBinary(id).getBytesUnsafe(), scale); + return (long)(super.validatedDouble(hiveDecimalWritable.doubleValue(), + serdeConstants.TINYINT_TYPE_NAME)); + } + private String convertToString(Binary value) { - tempDecimal.set(value.getBytesUnsafe(), scale); - return tempDecimal.toString(); + hiveDecimalWritable.set(value.getBytesUnsafe(), scale); + return hiveDecimalWritable.toString(); } private byte[] convertToBytes(Binary value) { return convertToBytes(convertToString(value)); } + + @Override + public byte[] readDecimal() { + hiveDecimalWritable.set(valuesReader.readBytes().getBytesUnsafe(), scale); + return super.validatedScaledDecimal(scale); + } + + @Override + public byte[] readDecimal(int id) { + hiveDecimalWritable.set(dict.decodeToBinary(id).getBytesUnsafe(), scale); + return super.validatedScaledDecimal(scale); + } + } + + /** + * The reader who reads from the underlying decimal value which is stored in an INT32 physical type. + * + * The data is read as INT32 from the reader treated as a decimal, then validated, converted + * and returned as per the type defined in HMS. + */ + public static class TypesFromInt32DecimalPageReader extends DefaultParquetDataColumnReader { + private short scale; + + public TypesFromInt32DecimalPageReader(ValuesReader realReader, int length, short scale, int hivePrecision, + int hiveScale) { + super(realReader, length, hivePrecision, hiveScale); + this.scale = scale; + } + + public TypesFromInt32DecimalPageReader(Dictionary dict, int length, short scale, int hivePrecision, int hiveScale) { + super(dict, length, hivePrecision, hiveScale); + this.scale = scale; + } + + @Override + public byte[] readString() { + return convertToBytes(valuesReader.readInteger()); + } + + @Override + public byte[] readString(int id) { + return convertToBytes(dict.decodeToInt(id)); + } + + @Override + public byte[] readVarchar() { + String value = enforceMaxLength(convertToString(valuesReader.readInteger())); + return convertToBytes(value); + } + + @Override + public byte[] readVarchar(int id) { + String value = enforceMaxLength(convertToString(dict.decodeToInt(id))); + return convertToBytes(value); + } + + @Override + public byte[] readChar() { + String value = enforceMaxLength(convertToString(valuesReader.readInteger())); + return convertToBytes(value); + } + + @Override + public byte[] readChar(int id) { + String value = enforceMaxLength(convertToString(dict.decodeToInt(id))); + return convertToBytes(value); + } + + @Override + public float readFloat() { + HiveDecimal hiveDecimal = HiveDecimal.create(valuesReader.readInteger(), scale); + hiveDecimalWritable.set(hiveDecimal); + return (float) (super.validatedDouble(hiveDecimalWritable.doubleValue(), serdeConstants.FLOAT_TYPE_NAME)); + } + + @Override + public float readFloat(int id) { + HiveDecimal hiveDecimal = HiveDecimal.create(dict.decodeToInt(id), scale); + hiveDecimalWritable.set(hiveDecimal); + return (float) (super.validatedDouble(hiveDecimalWritable.doubleValue(), serdeConstants.FLOAT_TYPE_NAME)); + } + + @Override + public double readDouble() { + HiveDecimal hiveDecimal = HiveDecimal.create(valuesReader.readInteger(), scale); + hiveDecimalWritable.set(hiveDecimal); + return (super.validatedDouble(hiveDecimalWritable.doubleValue(), serdeConstants.DOUBLE_TYPE_NAME)); + } + + @Override + public double readDouble(int id) { + HiveDecimal hiveDecimal = HiveDecimal.create(dict.decodeToInt(id), scale); + hiveDecimalWritable.set(hiveDecimal); + return (super.validatedDouble(hiveDecimalWritable.doubleValue(), serdeConstants.DOUBLE_TYPE_NAME)); + } + + @Override + public long readLong() { + HiveDecimal hiveDecimal = HiveDecimal.create(valuesReader.readInteger(), scale); + hiveDecimalWritable.set(hiveDecimal); + return (long) (super.validatedDouble(hiveDecimalWritable.doubleValue(), serdeConstants.BIGINT_TYPE_NAME)); + } + + @Override + public long readLong(int id) { + HiveDecimal hiveDecimal = HiveDecimal.create(dict.decodeToInt(id), scale); + hiveDecimalWritable.set(hiveDecimal); + return (long) (super.validatedDouble(hiveDecimalWritable.doubleValue(), serdeConstants.BIGINT_TYPE_NAME)); + } + + @Override + public long readInteger() { + HiveDecimal hiveDecimal = HiveDecimal.create(valuesReader.readInteger(), scale); + hiveDecimalWritable.set(hiveDecimal); + return (long) (super.validatedDouble(hiveDecimalWritable.doubleValue(), serdeConstants.INT_TYPE_NAME)); + } + + @Override + public long readInteger(int id) { + HiveDecimal hiveDecimal = HiveDecimal.create(dict.decodeToInt(id), scale); + hiveDecimalWritable.set(hiveDecimal); + return (long) (super.validatedDouble(hiveDecimalWritable.doubleValue(), serdeConstants.INT_TYPE_NAME)); + } + + @Override + public long readSmallInt() { + HiveDecimal hiveDecimal = HiveDecimal.create(valuesReader.readInteger(), scale); + hiveDecimalWritable.set(hiveDecimal); + return (long) (super.validatedDouble(hiveDecimalWritable.doubleValue(), serdeConstants.SMALLINT_TYPE_NAME)); + } + + @Override + public long readSmallInt(int id) { + HiveDecimal hiveDecimal = HiveDecimal.create(dict.decodeToInt(id), scale); + hiveDecimalWritable.set(hiveDecimal); + return (long) (super.validatedDouble(hiveDecimalWritable.doubleValue(), serdeConstants.SMALLINT_TYPE_NAME)); + } + + @Override + public long readTinyInt() { + HiveDecimal hiveDecimal = HiveDecimal.create(valuesReader.readInteger(), scale); + hiveDecimalWritable.set(hiveDecimal); + return (long) (super.validatedDouble(hiveDecimalWritable.doubleValue(), serdeConstants.TINYINT_TYPE_NAME)); + } + + @Override + public long readTinyInt(int id) { + HiveDecimal hiveDecimal = HiveDecimal.create(dict.decodeToInt(id), scale); + hiveDecimalWritable.set(hiveDecimal); + return (long) (super.validatedDouble(hiveDecimalWritable.doubleValue(), serdeConstants.TINYINT_TYPE_NAME)); + } + + private String convertToString(int value) { + HiveDecimal hiveDecimal = HiveDecimal.create(value, scale); + return hiveDecimal.toString(); + } + + private byte[] convertToBytes(int value) { + return convertToBytes(convertToString(value)); + } + + @Override + public byte[] readDecimal() { + HiveDecimal hiveDecimal = HiveDecimal.create(valuesReader.readInteger(), scale); + hiveDecimalWritable.set(hiveDecimal); + return super.validatedScaledDecimal(scale); + } + + @Override + public byte[] readDecimal(int id) { + HiveDecimal hiveDecimal = HiveDecimal.create(dict.decodeToInt(id), scale); + hiveDecimalWritable.set(hiveDecimal); + return super.validatedScaledDecimal(scale); + } + } + + /** + * The reader who reads from the underlying decimal value which is stored in an INT64 physical type. + * + * The data is read as INT64 from the reader treated as a decimal, then validated, converted + * and returned as per the type defined in HMS. + */ + public static class TypesFromInt64DecimalPageReader extends DefaultParquetDataColumnReader { + private short scale; + + public TypesFromInt64DecimalPageReader(ValuesReader realReader, int length, short scale, int hivePrecision, + int hiveScale) { + super(realReader, length, hivePrecision, hiveScale); + this.scale = scale; + } + + public TypesFromInt64DecimalPageReader(Dictionary dict, int length, short scale, int hivePrecision, int hiveScale) { + super(dict, length, hivePrecision, hiveScale); + this.scale = scale; + } + + @Override + public byte[] readString() { + return convertToBytes(valuesReader.readLong()); + } + + @Override + public byte[] readString(int id) { + return convertToBytes(dict.decodeToLong(id)); + } + + @Override + public byte[] readVarchar() { + String value = enforceMaxLength(convertToString(valuesReader.readLong())); + return convertToBytes(value); + } + + @Override + public byte[] readVarchar(int id) { + String value = enforceMaxLength(convertToString(dict.decodeToLong(id))); + return convertToBytes(value); + } + + @Override + public byte[] readChar() { + String value = enforceMaxLength(convertToString(valuesReader.readLong())); + return convertToBytes(value); + } + + @Override + public byte[] readChar(int id) { + String value = enforceMaxLength(convertToString(dict.decodeToLong(id))); + return convertToBytes(value); + } + + @Override + public float readFloat() { + HiveDecimal hiveDecimal = HiveDecimal.create(valuesReader.readLong(), scale); + hiveDecimalWritable.set(hiveDecimal); + return (float) (super.validatedDouble(hiveDecimalWritable.doubleValue(), serdeConstants.FLOAT_TYPE_NAME)); + } + + @Override + public float readFloat(int id) { + HiveDecimal hiveDecimal = HiveDecimal.create(dict.decodeToLong(id), scale); + hiveDecimalWritable.set(hiveDecimal); + return (float) (super.validatedDouble(hiveDecimalWritable.doubleValue(), serdeConstants.FLOAT_TYPE_NAME)); + } + + @Override + public double readDouble() { + HiveDecimal hiveDecimal = HiveDecimal.create(valuesReader.readLong(), scale); + hiveDecimalWritable.set(hiveDecimal); + return (super.validatedDouble(hiveDecimalWritable.doubleValue(), serdeConstants.DOUBLE_TYPE_NAME)); + } + + @Override + public double readDouble(int id) { + HiveDecimal hiveDecimal = HiveDecimal.create(dict.decodeToLong(id), scale); + hiveDecimalWritable.set(hiveDecimal); + return (super.validatedDouble(hiveDecimalWritable.doubleValue(), serdeConstants.DOUBLE_TYPE_NAME)); + } + + @Override + public long readLong() { + HiveDecimal hiveDecimal = HiveDecimal.create(valuesReader.readLong(), scale); + hiveDecimalWritable.set(hiveDecimal); + return (long) (super.validatedDouble(hiveDecimalWritable.doubleValue(), serdeConstants.BIGINT_TYPE_NAME)); + } + + @Override + public long readLong(int id) { + HiveDecimal hiveDecimal = HiveDecimal.create(dict.decodeToLong(id), scale); + hiveDecimalWritable.set(hiveDecimal); + return (long) (super.validatedDouble(hiveDecimalWritable.doubleValue(), serdeConstants.BIGINT_TYPE_NAME)); + } + + @Override + public long readInteger() { + HiveDecimal hiveDecimal = HiveDecimal.create(valuesReader.readLong(), scale); + hiveDecimalWritable.set(hiveDecimal); + return (long) (super.validatedDouble(hiveDecimalWritable.doubleValue(), serdeConstants.INT_TYPE_NAME)); + } + + @Override + public long readInteger(int id) { + HiveDecimal hiveDecimal = HiveDecimal.create(dict.decodeToLong(id), scale); + hiveDecimalWritable.set(hiveDecimal); + return (long) (super.validatedDouble(hiveDecimalWritable.doubleValue(), serdeConstants.INT_TYPE_NAME)); + } + + @Override + public long readSmallInt() { + HiveDecimal hiveDecimal = HiveDecimal.create(valuesReader.readLong(), scale); + hiveDecimalWritable.set(hiveDecimal); + return (long) (super.validatedDouble(hiveDecimalWritable.doubleValue(), serdeConstants.SMALLINT_TYPE_NAME)); + } + + @Override + public long readSmallInt(int id) { + HiveDecimal hiveDecimal = HiveDecimal.create(dict.decodeToLong(id), scale); + hiveDecimalWritable.set(hiveDecimal); + return (long) (super.validatedDouble(hiveDecimalWritable.doubleValue(), serdeConstants.SMALLINT_TYPE_NAME)); + } + + @Override + public long readTinyInt() { + HiveDecimal hiveDecimal = HiveDecimal.create(valuesReader.readLong(), scale); + hiveDecimalWritable.set(hiveDecimal); + return (long) (super.validatedDouble(hiveDecimalWritable.doubleValue(), serdeConstants.TINYINT_TYPE_NAME)); + } + + @Override + public long readTinyInt(int id) { + HiveDecimal hiveDecimal = HiveDecimal.create(dict.decodeToLong(id), scale); + hiveDecimalWritable.set(hiveDecimal); + return (long) (super.validatedDouble(hiveDecimalWritable.doubleValue(), serdeConstants.TINYINT_TYPE_NAME)); + } + + private String convertToString(long value) { + HiveDecimal hiveDecimal = HiveDecimal.create(value, scale); + return hiveDecimal.toString(); + } + + private byte[] convertToBytes(long value) { + return convertToBytes(convertToString(value)); + } + + @Override + public byte[] readDecimal() { + HiveDecimal hiveDecimal = HiveDecimal.create(valuesReader.readLong(), scale); + hiveDecimalWritable.set(hiveDecimal); + return super.validatedScaledDecimal(scale); + } + + @Override + public byte[] readDecimal(int id) { + HiveDecimal hiveDecimal = HiveDecimal.create(dict.decodeToLong(id), scale); + hiveDecimalWritable.set(hiveDecimal); + return super.validatedScaledDecimal(scale); + } } /** @@ -1195,108 +1860,59 @@ private static ParquetDataColumnReader getDataColumnReaderByTypeHelper(boolean i throws IOException { // max length for varchar and char cases int length = getVarcharLength(hiveType); - String typeName = TypeInfoUtils.getBaseName(hiveType.getTypeName()); + TypeInfo realHiveType = (hiveType instanceof ListTypeInfo) ? ((ListTypeInfo) hiveType) + .getListElementTypeInfo() : hiveType; + + String typeName = TypeInfoUtils.getBaseName(realHiveType.getTypeName()); + + int hivePrecision = (typeName.equalsIgnoreCase(serdeConstants.DECIMAL_TYPE_NAME)) ? + ((DecimalTypeInfo) realHiveType).getPrecision() : 0; + int hiveScale = (typeName.equalsIgnoreCase(serdeConstants.DECIMAL_TYPE_NAME)) ? + ((DecimalTypeInfo) realHiveType).getScale() : 0; switch (parquetType.getPrimitiveTypeName()) { case INT32: - if (OriginalType.UINT_8 == parquetType.getOriginalType() || - OriginalType.UINT_16 == parquetType.getOriginalType() || - OriginalType.UINT_32 == parquetType.getOriginalType() || - OriginalType.UINT_64 == parquetType.getOriginalType()) { - switch (typeName) { - case serdeConstants.SMALLINT_TYPE_NAME: - return isDictionary ? new Types32UInt2SmallintPageReader(dictionary, - length) : new Types32UInt2SmallintPageReader(valuesReader, length); - case serdeConstants.TINYINT_TYPE_NAME: - return isDictionary ? new Types32UInt2TinyintPageReader(dictionary, - length) : new Types32UInt2TinyintPageReader(valuesReader, length); - case serdeConstants.DECIMAL_TYPE_NAME: - return isDictionary ? - new Types32UInt2DecimalPageReader(dictionary, length, - ((DecimalTypeInfo) hiveType).getPrecision(), - ((DecimalTypeInfo) hiveType).getScale()) : - new Types32UInt2DecimalPageReader(valuesReader, length, - ((DecimalTypeInfo) hiveType).getPrecision(), - ((DecimalTypeInfo) hiveType).getScale()); - default: - return isDictionary ? new TypesFromUInt32PageReader(dictionary, - length) : new TypesFromUInt32PageReader(valuesReader, length); - } + if (ETypeConverter.isUnsignedInteger(parquetType)) { + return isDictionary ? new TypesFromUInt32PageReader(dictionary, length, hivePrecision, + hiveScale) : new TypesFromUInt32PageReader(valuesReader, length, hivePrecision, + hiveScale); + } else if (parquetType.getLogicalTypeAnnotation() instanceof DecimalLogicalTypeAnnotation) { + DecimalLogicalTypeAnnotation logicalType = (DecimalLogicalTypeAnnotation) parquetType.getLogicalTypeAnnotation(); + final short scale = (short) logicalType.getScale(); + return isDictionary ? new TypesFromInt32DecimalPageReader(dictionary, length, scale, hivePrecision, hiveScale) + : new TypesFromInt32DecimalPageReader(valuesReader, length, scale, hivePrecision, hiveScale); } else { - switch (typeName) { - case serdeConstants.SMALLINT_TYPE_NAME: - return isDictionary ? new Types32Int2SmallintPageReader(dictionary, - length) : new Types32Int2SmallintPageReader(valuesReader, length); - case serdeConstants.TINYINT_TYPE_NAME: - return isDictionary ? new Types32Int2TinyintPageReader(dictionary, - length) : new Types32Int2TinyintPageReader(valuesReader, length); - case serdeConstants.DECIMAL_TYPE_NAME: - return isDictionary ? - new Types32Int2DecimalPageReader(dictionary, length, - ((DecimalTypeInfo) hiveType).getPrecision(), - ((DecimalTypeInfo) hiveType).getScale()) : - new Types32Int2DecimalPageReader(valuesReader, length, - ((DecimalTypeInfo) hiveType).getPrecision(), - ((DecimalTypeInfo) hiveType).getScale()); - default: - return isDictionary ? new TypesFromInt32PageReader(dictionary, - length) : new TypesFromInt32PageReader(valuesReader, length); - } + return isDictionary ? new TypesFromInt32PageReader(dictionary, length, hivePrecision, + hiveScale) : new TypesFromInt32PageReader(valuesReader, length, hivePrecision, + hiveScale); } case INT64: - if (OriginalType.UINT_8 == parquetType.getOriginalType() || - OriginalType.UINT_16 == parquetType.getOriginalType() || - OriginalType.UINT_32 == parquetType.getOriginalType() || - OriginalType.UINT_64 == parquetType.getOriginalType()) { - switch (typeName) { - case serdeConstants.INT_TYPE_NAME: - return isDictionary ? new Types64UInt2IntPageReader(dictionary, - length) : new Types64UInt2IntPageReader(valuesReader, length); - case serdeConstants.SMALLINT_TYPE_NAME: - return isDictionary ? new Types64UInt2SmallintPageReader(dictionary, - length) : new Types64UInt2SmallintPageReader(valuesReader, length); - case serdeConstants.TINYINT_TYPE_NAME: - return isDictionary ? new Types64UInt2TinyintPageReader(dictionary, - length) : new Types64UInt2TinyintPageReader(valuesReader, length); - case serdeConstants.DECIMAL_TYPE_NAME: - return isDictionary ? - new Types64UInt2DecimalPageReader(dictionary, length, - ((DecimalTypeInfo) hiveType).getPrecision(), - ((DecimalTypeInfo) hiveType).getScale()) : - new Types64UInt2DecimalPageReader(valuesReader, length, - ((DecimalTypeInfo) hiveType).getPrecision(), - ((DecimalTypeInfo) hiveType).getScale()); - default: - return isDictionary ? new TypesFromUInt64PageReader(dictionary, - length) : new TypesFromUInt64PageReader(valuesReader, length); - } - } else { - switch (typeName) { - case serdeConstants.INT_TYPE_NAME: - return isDictionary ? new Types64Int2IntPageReader(dictionary, - length) : new Types64Int2IntPageReader(valuesReader, length); - case serdeConstants.SMALLINT_TYPE_NAME: - return isDictionary ? new Types64Int2SmallintPageReader(dictionary, - length) : new Types64Int2SmallintPageReader(valuesReader, length); - case serdeConstants.TINYINT_TYPE_NAME: - return isDictionary ? new Types64Int2TinyintPageReader(dictionary, - length) : new Types64Int2TinyintPageReader(valuesReader, length); - case serdeConstants.DECIMAL_TYPE_NAME: - return isDictionary ? - new Types64Int2DecimalPageReader(dictionary, length, - ((DecimalTypeInfo) hiveType).getPrecision(), - ((DecimalTypeInfo) hiveType).getScale()) : - new Types64Int2DecimalPageReader(valuesReader, length, - ((DecimalTypeInfo) hiveType).getPrecision(), - ((DecimalTypeInfo) hiveType).getScale()); - default: - return isDictionary ? new TypesFromInt64PageReader(dictionary, - length) : new TypesFromInt64PageReader(valuesReader, length); - } + LogicalTypeAnnotation logicalType = parquetType.getLogicalTypeAnnotation(); + if (logicalType instanceof TimestampLogicalTypeAnnotation) { + TimestampLogicalTypeAnnotation timestampLogicalType = (TimestampLogicalTypeAnnotation) logicalType; + boolean isAdjustedToUTC = timestampLogicalType.isAdjustedToUTC(); + TimeUnit timeUnit = timestampLogicalType.getUnit(); + return isDictionary ? new TypesFromInt64PageReader(dictionary, length, isAdjustedToUTC, timeUnit) + : new TypesFromInt64PageReader(valuesReader, length, isAdjustedToUTC, timeUnit); } + + if (ETypeConverter.isUnsignedInteger(parquetType)) { + return isDictionary ? new TypesFromUInt64PageReader(dictionary, length, hivePrecision, hiveScale) + : new TypesFromUInt64PageReader(valuesReader, length, hivePrecision, hiveScale); + } + + if (logicalType instanceof DecimalLogicalTypeAnnotation) { + DecimalLogicalTypeAnnotation decimalLogicalType = (DecimalLogicalTypeAnnotation) logicalType; + final short scale = (short) decimalLogicalType.getScale(); + return isDictionary ? new TypesFromInt64DecimalPageReader(dictionary, length, scale, hivePrecision, hiveScale) + : new TypesFromInt64DecimalPageReader(valuesReader, length, scale, hivePrecision, hiveScale); + } + + return isDictionary ? new TypesFromInt64PageReader(dictionary, length, hivePrecision, hiveScale) + : new TypesFromInt64PageReader(valuesReader, length, hivePrecision, hiveScale); case FLOAT: - return isDictionary ? new TypesFromFloatPageReader(dictionary, length) : new - TypesFromFloatPageReader(valuesReader, length); + return isDictionary ? new TypesFromFloatPageReader(dictionary, length, hivePrecision, + hiveScale) : new TypesFromFloatPageReader(valuesReader, length, hivePrecision, hiveScale); case INT96: return isDictionary ? new TypesFromInt96PageReader(dictionary, length, skipTimestampConversion, writerTimezone) : new @@ -1308,11 +1924,13 @@ private static ParquetDataColumnReader getDataColumnReaderByTypeHelper(boolean i case FIXED_LEN_BYTE_ARRAY: return getConvertorFromBinary(isDictionary, parquetType, hiveType, valuesReader, dictionary); case DOUBLE: - return isDictionary ? new TypesFromDoublePageReader(dictionary, length) : new - TypesFromDoublePageReader(valuesReader, length); + return isDictionary ? new TypesFromDoublePageReader(dictionary, length, hivePrecision, + hiveScale) : new TypesFromDoublePageReader(valuesReader, length, hivePrecision, + hiveScale); default: - return isDictionary ? new DefaultParquetDataColumnReader(dictionary, length) : new - DefaultParquetDataColumnReader(valuesReader, length); + return isDictionary ? new DefaultParquetDataColumnReader(dictionary, length, hivePrecision, + hiveScale) : new DefaultParquetDataColumnReader(valuesReader, length, hivePrecision, + hiveScale); } } @@ -1321,27 +1939,53 @@ private static ParquetDataColumnReader getConvertorFromBinary(boolean isDict, TypeInfo hiveType, ValuesReader valuesReader, Dictionary dictionary) { - OriginalType originalType = parquetType.getOriginalType(); + LogicalTypeAnnotation logicalType = parquetType.getLogicalTypeAnnotation(); // max length for varchar and char cases int length = getVarcharLength(hiveType); + TypeInfo realHiveType = (hiveType instanceof ListTypeInfo) ? + ((ListTypeInfo) hiveType).getListElementTypeInfo() : + (hiveType instanceof MapTypeInfo) ? + ((MapTypeInfo) hiveType).getMapValueTypeInfo() : hiveType; - if (originalType == null) { - return isDict ? new DefaultParquetDataColumnReader(dictionary, length) : new - DefaultParquetDataColumnReader(valuesReader, length); - } - switch (originalType) { - case DECIMAL: - final short scale = (short) parquetType.asPrimitiveType().getDecimalMetadata().getScale(); - return isDict ? new TypesFromDecimalPageReader(dictionary, length, scale) : new - TypesFromDecimalPageReader(valuesReader, length, scale); - case UTF8: - return isDict ? new TypesFromStringPageReader(dictionary, length) : new - TypesFromStringPageReader(valuesReader, length); - default: + String typeName = TypeInfoUtils.getBaseName(realHiveType.getTypeName()); + + int hivePrecision = (typeName.equalsIgnoreCase(serdeConstants.DECIMAL_TYPE_NAME)) ? + ((DecimalTypeInfo) realHiveType).getPrecision() : 0; + int hiveScale = (typeName.equalsIgnoreCase(serdeConstants.DECIMAL_TYPE_NAME)) ? + ((DecimalTypeInfo) realHiveType).getScale() : 0; + + if (logicalType == null) { return isDict ? new DefaultParquetDataColumnReader(dictionary, length) : new DefaultParquetDataColumnReader(valuesReader, length); } + + Optional reader = parquetType.getLogicalTypeAnnotation() + .accept(new LogicalTypeAnnotationVisitor() { + @Override public Optional visit( + DecimalLogicalTypeAnnotation logicalTypeAnnotation) { + final short scale = (short) logicalTypeAnnotation.getScale(); + return isDict ? Optional + .of(new TypesFromDecimalPageReader(dictionary, length, scale, hivePrecision, + hiveScale)) : Optional + .of(new TypesFromDecimalPageReader(valuesReader, length, scale, hivePrecision, + hiveScale)); + } + + @Override public Optional visit( + StringLogicalTypeAnnotation logicalTypeAnnotation) { + return isDict ? Optional + .of(new TypesFromStringPageReader(dictionary, length)) : Optional + .of(new TypesFromStringPageReader(valuesReader, length)); + } + }); + + if (reader.isPresent()) { + return reader.get(); + } + + return isDict ? new DefaultParquetDataColumnReader(dictionary, length) : new + DefaultParquetDataColumnReader(valuesReader, length); } public static ParquetDataColumnReader getDataColumnReaderByTypeOnDictionary( @@ -1356,10 +2000,8 @@ public static ParquetDataColumnReader getDataColumnReaderByTypeOnDictionary( } public static ParquetDataColumnReader getDataColumnReaderByType(PrimitiveType parquetType, - TypeInfo hiveType, - ValuesReader realReader, - boolean skipTimestampConversion, - ZoneId writerTimezone) + TypeInfo hiveType, ValuesReader realReader, boolean skipTimestampConversion, + ZoneId writerTimezone) throws IOException { return getDataColumnReaderByTypeHelper(false, parquetType, hiveType, null, realReader, skipTimestampConversion, writerTimezone); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedPrimitiveColumnReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedPrimitiveColumnReader.java index 999419016465..48cc33dcc472 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedPrimitiveColumnReader.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedPrimitiveColumnReader.java @@ -394,6 +394,9 @@ private void readTimestamp(int total, TimestampColumnVector c, int rowId) throws case INT96: c.set(rowId, dataColumn.readTimestamp().toSqlTimestamp()); break; + case INT64: + c.set(rowId, dataColumn.readTimestamp().toSqlTimestamp()); + break; default: throw new IOException( "Unsupported parquet logical type: " + type.getOriginalType() + " for timestamp"); diff --git a/serde/pom.xml b/serde/pom.xml index b1281fac2307..aa42898eb473 100644 --- a/serde/pom.xml +++ b/serde/pom.xml @@ -242,12 +242,12 @@ fk-art-snapshot libs-snapshot - http://artifactory.fkinternal.com/artifactory/v1.0/artifacts/libs-snapshots-local + http://jfrog.fkinternal.com/maven_virtual fk-art-release libs-rel - http://artifactory.fkinternal.com/artifactory/v1.0/artifacts/libs-release-local + http://jfrog.fkinternal.com/artifactory/v1.0/artifacts/libs-release-local diff --git a/service-rpc/pom.xml b/service-rpc/pom.xml index 8bdf0dde6fd3..89249c467ef8 100644 --- a/service-rpc/pom.xml +++ b/service-rpc/pom.xml @@ -153,12 +153,12 @@ fk-art-snapshot libs-snapshot - http://artifactory.fkinternal.com/artifactory/v1.0/artifacts/libs-snapshots-local + http://jfrog.fkinternal.com/maven_virtual fk-art-release libs-rel - http://artifactory.fkinternal.com/artifactory/v1.0/artifacts/libs-release-local + http://jfrog.fkinternal.com/artifactory/v1.0/artifacts/libs-release-local diff --git a/service/pom.xml b/service/pom.xml index 6c89a791f20e..dc2c66fa9b02 100644 --- a/service/pom.xml +++ b/service/pom.xml @@ -439,12 +439,12 @@ fk-art-snapshot libs-snapshot - http://artifactory.fkinternal.com/artifactory/v1.0/artifacts/libs-snapshots-local + http://jfrog.fkinternal.com/maven_virtual fk-art-release libs-rel - http://artifactory.fkinternal.com/artifactory/v1.0/artifacts/libs-release-local + http://jfrog.fkinternal.com/artifactory/v1.0/artifacts/libs-release-local diff --git a/shims/pom.xml b/shims/pom.xml index 6817ef8823f5..c77742046c61 100644 --- a/shims/pom.xml +++ b/shims/pom.xml @@ -41,12 +41,12 @@ fk-art-snapshot libs-snapshot - http://artifactory.fkinternal.com/artifactory/v1.0/artifacts/libs-snapshots-local + http://jfrog.fkinternal.com/maven_virtual fk-art-release libs-rel - http://artifactory.fkinternal.com/artifactory/v1.0/artifacts/libs-release-local + http://jfrog.fkinternal.com/artifactory/v1.0/artifacts/libs-release-local diff --git a/spark-client/pom.xml b/spark-client/pom.xml index 9d6a93ebd2e9..5cba111e1037 100644 --- a/spark-client/pom.xml +++ b/spark-client/pom.xml @@ -205,12 +205,12 @@ fk-art-snapshot libs-snapshot - http://artifactory.fkinternal.com/artifactory/v1.0/artifacts/libs-snapshots-local + http://jfrog.fkinternal.com/maven_virtual fk-art-release libs-rel - http://artifactory.fkinternal.com/artifactory/v1.0/artifacts/libs-release-local + http://jfrog.fkinternal.com/artifactory/v1.0/artifacts/libs-release-local diff --git a/standalone-metastore/pom.xml b/standalone-metastore/pom.xml index 923884ec505b..5cfd9bc69aa4 100644 --- a/standalone-metastore/pom.xml +++ b/standalone-metastore/pom.xml @@ -811,12 +811,12 @@ fk-art-snapshot libs-snapshot - http://artifactory.fkinternal.com/artifactory/v1.0/artifacts/libs-snapshots-local + http://jfrog.fkinternal.com/maven_virtual fk-art-release libs-rel - http://artifactory.fkinternal.com/artifactory/v1.0/artifacts/libs-release-local + http://jfrog.fkinternal.com/artifactory/v1.0/artifacts/libs-release-local diff --git a/streaming/pom.xml b/streaming/pom.xml index 52919fc5617b..5f715f060ee9 100644 --- a/streaming/pom.xml +++ b/streaming/pom.xml @@ -117,12 +117,12 @@ fk-art-snapshot libs-snapshot - http://artifactory.fkinternal.com/artifactory/v1.0/artifacts/libs-snapshots-local + http://jfrog.fkinternal.com/maven_virtual fk-art-release libs-rel - http://artifactory.fkinternal.com/artifactory/v1.0/artifacts/libs-release-local + http://jfrog.fkinternal.com/artifactory/v1.0/artifacts/libs-release-local diff --git a/testutils/pom.xml b/testutils/pom.xml index 8b8e76f87cde..5e6a629fc39b 100644 --- a/testutils/pom.xml +++ b/testutils/pom.xml @@ -53,12 +53,12 @@ fk-art-snapshot libs-snapshot - http://artifactory.fkinternal.com/artifactory/v1.0/artifacts/libs-snapshots-local + http://jfrog.fkinternal.com/maven_virtual fk-art-release libs-rel - http://artifactory.fkinternal.com/artifactory/v1.0/artifacts/libs-release-local + http://jfrog.fkinternal.com/artifactory/v1.0/artifacts/libs-release-local diff --git a/upgrade-acid/pom.xml b/upgrade-acid/pom.xml index 313cd2a1a3c2..dc3336749bad 100644 --- a/upgrade-acid/pom.xml +++ b/upgrade-acid/pom.xml @@ -297,12 +297,12 @@ java.io.IOException: Cannot initialize Cluster. Please check your configuration fk-art-snapshot libs-snapshot - http://artifactory.fkinternal.com/artifactory/v1.0/artifacts/libs-snapshots-local + http://jfrog.fkinternal.com/maven_virtual fk-art-release libs-rel - http://artifactory.fkinternal.com/artifactory/v1.0/artifacts/libs-release-local + http://jfrog.fkinternal.com/artifactory/v1.0/artifacts/libs-release-local diff --git a/vector-code-gen/pom.xml b/vector-code-gen/pom.xml index be68ae1126d8..6021b1e470ae 100644 --- a/vector-code-gen/pom.xml +++ b/vector-code-gen/pom.xml @@ -69,12 +69,12 @@ fk-art-snapshot libs-snapshot - http://artifactory.fkinternal.com/artifactory/v1.0/artifacts/libs-snapshots-local + http://jfrog.fkinternal.com/maven_virtual fk-art-release libs-rel - http://artifactory.fkinternal.com/artifactory/v1.0/artifacts/libs-release-local + http://jfrog.fkinternal.com/artifactory/v1.0/artifacts/libs-release-local