Skip to content

[DRAFT] Bump Hive to Version 4.0 #2998

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 3 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions contrib/storage-hive/core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
Expand Down Expand Up @@ -93,6 +97,13 @@
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-hbase-handler</artifactId>
<exclusions>
<exclusion>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
</exclusion>
</exclusions>

</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
Expand Down Expand Up @@ -147,6 +158,11 @@
<artifactId>log4j-api</artifactId>
<version>${log4j.version}</version>
</dependency>
<dependency>
<groupId>org.apiguardian</groupId>
<artifactId>apiguardian-api</artifactId>
<version>1.1.2</version> <!-- Use the latest compatible version -->
</dependency>
<dependency>
<groupId>org.apache.drill</groupId>
<artifactId>drill-common</artifactId>
Expand Down Expand Up @@ -249,6 +265,10 @@
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-1.2-api</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,10 @@
*/
package org.apache.drill.exec.store.hive;

import java.io.IOException;
import java.io.StringReader;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;

import com.fasterxml.jackson.core.type.TypeReference;

import com.google.common.collect.ImmutableSet;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.commons.lang3.StringEscapeUtils;

import org.apache.drill.common.JSONOptions;
import org.apache.drill.common.exceptions.DrillRuntimeException;
import org.apache.drill.common.exceptions.ExecutionSetupException;
Expand All @@ -49,16 +38,26 @@
import org.apache.drill.exec.store.SchemaConfig;
import org.apache.drill.exec.store.StoragePluginOptimizerRule;
import org.apache.drill.exec.store.hive.schema.HiveSchemaFactory;
import com.google.common.collect.ImmutableSet;

import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.thrift.transport.TTransportException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.io.StringReader;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;

public class HiveStoragePlugin extends AbstractStoragePlugin {

private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HiveStoragePlugin.class);
private static final Logger logger = LoggerFactory.getLogger(HiveStoragePlugin.class);

private final HiveStoragePluginConfig config;
private HiveSchemaFactory schemaFactory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,11 @@
*/
package org.apache.drill.exec.store.hive;

import org.apache.calcite.rel.type.RelDataType;
import org.apache.drill.exec.physical.impl.scan.v3.schema.SchemaUtils;
import org.apache.drill.exec.planner.types.HiveToRelDataTypeConverter;
import org.apache.drill.exec.record.metadata.ColumnMetadata;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import io.netty.buffer.DrillBuf;
import org.apache.calcite.plan.RelOptRuleCall;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.drill.common.exceptions.DrillRuntimeException;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.exceptions.UserException;
Expand All @@ -36,8 +33,11 @@
import org.apache.drill.exec.expr.holders.Decimal28SparseHolder;
import org.apache.drill.exec.expr.holders.Decimal38SparseHolder;
import org.apache.drill.exec.expr.holders.Decimal9Holder;
import org.apache.drill.exec.physical.impl.scan.v3.schema.SchemaUtils;
import org.apache.drill.exec.planner.logical.DrillScanRel;
import org.apache.drill.exec.planner.physical.PlannerSettings;
import org.apache.drill.exec.planner.types.HiveToRelDataTypeConverter;
import org.apache.drill.exec.record.metadata.ColumnMetadata;
import org.apache.drill.exec.server.options.OptionSet;
import org.apache.drill.exec.util.DecimalUtility;
import org.apache.drill.exec.vector.NullableBigIntVector;
Expand All @@ -56,15 +56,14 @@
import org.apache.drill.exec.vector.NullableVarDecimalVector;
import org.apache.drill.exec.vector.ValueVector;
import org.apache.drill.exec.work.ExecErrorConstants;

import org.apache.hadoop.hive.common.type.HiveDecimal;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hadoop.hive.ql.io.IOConstants;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler;
Expand Down Expand Up @@ -572,6 +571,14 @@ public static boolean hasHeaderOrFooter(HiveTableWithColumnCache table) {
return skipHeader > 0 || skipFooter > 0;
}

public static boolean isTablePropertyTransactional(JobConf parameters) {
String resultStr = parameters.get(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL);
if (resultStr == null) {
resultStr = parameters.get(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL.toUpperCase());
}
return Boolean.parseBoolean(resultStr);
}

/**
* This method checks whether the table is transactional and set necessary properties in {@link JobConf}.<br>
* If schema evolution properties aren't set in job conf for the input format, method sets the column names
Expand All @@ -581,8 +588,7 @@ public static boolean hasHeaderOrFooter(HiveTableWithColumnCache table) {
* @param sd storage descriptor
*/
public static void verifyAndAddTransactionalProperties(JobConf job, StorageDescriptor sd) {

if (AcidUtils.isTablePropertyTransactional(job)) {
if (isTablePropertyTransactional(job)) {
HiveConf.setBoolVar(job, HiveConf.ConfVars.HIVE_TRANSACTIONAL_TABLE_SCAN, true);

// No work is needed, if schema evolution is used
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,7 @@
*/
package org.apache.drill.exec.store.hive.readers;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import com.google.common.util.concurrent.ListenableFuture;
import io.netty.buffer.DrillBuf;
import org.apache.drill.common.exceptions.DrillRuntimeException;
import org.apache.drill.common.exceptions.ExecutionSetupException;
Expand All @@ -51,10 +39,11 @@
import org.apache.drill.exec.vector.AllocationHelper;
import org.apache.drill.exec.vector.ValueVector;
import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.serde2.AbstractSerDe;
import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
import org.apache.hadoop.hive.serde2.Deserializer;
import org.apache.hadoop.hive.serde2.SerDeException;
Expand All @@ -74,14 +63,27 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

/**
* Reader which uses complex writer underneath to fill in value vectors with data read from Hive.
* At first glance initialization code in the writer looks cumbersome, but in the end it's main aim is to prepare list of key
* fields used in next() and readHiveRecordAndInsertIntoRecordBatch(Object rowValue) methods.
* <p>
* In a nutshell, the reader is used in two stages:
* 1) Setup stage configures mapredReader, partitionObjInspector, partitionDeserializer, list of {@link HiveValueWriter}s for each column in record
* batch, partition vectors and values
* batch, partition vectors, and values
* 2) Reading stage uses objects configured previously to get rows from InputSplits, represent each row as Struct of columns values,
* and write each row value of column into Drill's value vectors using HiveValueWriter for each specific column
*/
Expand Down Expand Up @@ -326,7 +328,7 @@ private Callable<Void> getInitTask(OutputMutator output) {
List<String> nestedColumnPaths = getColumns().stream()
.map(SchemaPath::getRootSegmentPath)
.collect(Collectors.toList());
ColumnProjectionUtils.appendReadColumns(job, idsOfProjectedColumns, selectedColumnNames, nestedColumnPaths);
ColumnProjectionUtils.appendReadColumns(job, idsOfProjectedColumns);

// Initialize selectedStructFieldRefs and columnValueWriters, which are two key collections of
// objects used to read and save columns row data into Drill's value vectors
Expand All @@ -345,7 +347,7 @@ private Callable<Void> getInitTask(OutputMutator output) {
if (partition != null && selectedPartitionColumnNames.size() > 0) {
List<ValueVector> partitionVectorList = new ArrayList<>(selectedPartitionColumnNames.size());
List<Object> partitionValueList = new ArrayList<>(selectedPartitionColumnNames.size());
String defaultPartitionValue = hiveConf.get(HiveConf.ConfVars.DEFAULTPARTITIONNAME.varname);
String defaultPartitionValue = hiveConf.get(ConfVars.DEFAULTPARTITIONNAME.varname);
OptionManager options = fragmentContext.getOptions();
for (int i = 0; i < partitionKeyFields.size(); i++) {
FieldSchema field = partitionKeyFields.get(i);
Expand Down Expand Up @@ -450,7 +452,7 @@ public void close() {
private static Deserializer createDeserializer(JobConf job, StorageDescriptor sd, Properties properties) throws Exception {
final Class<? extends Deserializer> c = Class.forName(sd.getSerdeInfo().getSerializationLib()).asSubclass(Deserializer.class);
final Deserializer deserializer = c.getConstructor().newInstance();
deserializer.initialize(job, properties);
((AbstractSerDe)deserializer).initialize(job, properties, null);

return deserializer;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,6 @@
*/
package org.apache.drill.exec.hive;

import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.function.Consumer;

import org.apache.drill.exec.server.Drillbit;
import org.apache.drill.exec.store.StoragePluginRegistry;
import org.apache.drill.exec.store.StoragePluginRegistry.PluginException;
Expand All @@ -38,6 +29,15 @@
import org.apache.hadoop.hive.ql.Driver;
import org.apache.hadoop.hive.ql.session.SessionState;

import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.function.Consumer;

import static java.util.Objects.nonNull;
import static java.util.Objects.requireNonNull;
import static org.apache.drill.exec.hive.HiveTestUtilities.createDirWithPosixPermissions;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,6 @@
*/
package org.apache.drill.exec.hive;

import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.attribute.PosixFilePermission;
import java.util.EnumSet;
import java.util.Set;

import org.apache.drill.test.QueryBuilder;
import org.apache.drill.test.TestTools;
import org.apache.hadoop.hive.ql.Driver;
Expand All @@ -33,6 +25,14 @@
import org.apache.hive.common.util.HiveVersionInfo;
import org.junit.AssumptionViolatedException;

import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.attribute.PosixFilePermission;
import java.util.EnumSet;
import java.util.Set;

import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.CoreMatchers.startsWith;
import static org.hamcrest.MatcherAssert.assertThat;
Expand All @@ -53,13 +53,10 @@ public static void executeQuery(Driver hiveDriver, String query) {
CommandProcessorResponse response;
try {
response = hiveDriver.run(query);
} catch (Exception e) {
throw new RuntimeException(e);
}

if (response.getResponseCode() != 0 ) {
catch (Exception e) {
throw new RuntimeException(String.format("Failed to execute command '%s', errorMsg = '%s'",
query, response.getErrorMessage()));
query, e.getMessage()));
}
}

Expand Down Expand Up @@ -125,13 +122,14 @@ public static void assertNativeScanUsed(QueryBuilder queryBuilder, String table)
}

/**
* Current Hive version doesn't support JDK 9+.
* Current Hive version doesn't support Jmvn DK 9+.
* Checks if current version is supported by Hive.
*
* @return {@code true} if current version is supported by Hive, {@code false} otherwise
*/
public static boolean supportedJavaVersion() {
return System.getProperty("java.version").startsWith("1.8");
return true;
//return System.getProperty("java.version").startsWith("1.8");
}

/**
Expand Down
Loading
Loading