Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions hetu-docs/en/connector/hive.md
Original file line number Diff line number Diff line change
Expand Up @@ -567,6 +567,24 @@ VACUUM TABLE hive_acid_table_partitioned



## Table Properties

Bloom Index is supported for both transactional and non-transactional tables. Support is added for real, date, timestamp and char datatypes.

| Property Name | Data type | Description | Default |
|:---------------------------------|:---------------|:-------------------------------------------------------------------|:---------|
| `orc_bloom_filter_columns` | array(varchar) | Bloom Filter Index columns for ORC files as a comma seperated list | |
| `orc_bloom_filter_fpp` | double | False positive probability of bloom filter | `0.05` |

Example: Creating a table with bloom columns specified:

```sql
CREATE TABLE testbloom
(a bigint, b row(c bigint, d bigint), e row(f double, g date))
WITH (format='orc',transactional=true,
orc_bloom_filter_columns=ARRAY['a','b.c','e'], orc_bloom_filter_fpp=0.001);
```

## Schema Evolution


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,8 +207,9 @@ public class HiveMetadata

public static final String STORAGE_FORMAT = "storage_format";

private static final String ORC_BLOOM_FILTER_COLUMNS_KEY = "orc.bloom.filter.columns";
private static final String ORC_BLOOM_FILTER_FPP_KEY = "orc.bloom.filter.fpp";
public static final String ORC_BLOOM_FILTER_COLUMNS_KEY = "orc.bloom.filter.columns";
public static final String ORC_BLOOM_FILTER_FPP_KEY = "orc.bloom.filter.fpp";
public static final Splitter COLUMN_NAMES_SPLITTER = Splitter.on(',').trimResults().omitEmptyStrings();

private static final String TEXT_SKIP_HEADER_COUNT_KEY = "skip.header.line.count";
private static final String TEXT_SKIP_FOOTER_COUNT_KEY = "skip.footer.line.count";
Expand Down Expand Up @@ -876,8 +877,10 @@ protected Map<String, String> getEmptyTableProperties(ConnectorTableMetadata tab
List<String> columns = HiveTableProperties.getOrcBloomFilterColumns(tableMetadata.getProperties());
if (columns != null && !columns.isEmpty()) {
checkFormatForProperty(hiveStorageFormat, HiveStorageFormat.ORC, HiveTableProperties.ORC_BLOOM_FILTER_COLUMNS);
Double bloomFilterFpp = HiveTableProperties.getOrcBloomFilterFpp(tableMetadata.getProperties());
checkValueForBloomFilterFpp(bloomFilterFpp);
tableProperties.put(ORC_BLOOM_FILTER_COLUMNS_KEY, Joiner.on(",").join(columns));
tableProperties.put(ORC_BLOOM_FILTER_FPP_KEY, String.valueOf(HiveTableProperties.getOrcBloomFilterFpp(tableMetadata.getProperties())));
tableProperties.put(ORC_BLOOM_FILTER_FPP_KEY, String.valueOf(bloomFilterFpp));
}

// Avro specific properties
Expand Down Expand Up @@ -948,6 +951,13 @@ private static void checkFormatForProperty(HiveStorageFormat actualStorageFormat
}
}

private static void checkValueForBloomFilterFpp(Double bloomFilterFpp)
{
if (bloomFilterFpp < 0.0 || bloomFilterFpp > 1.0) {
throw new PrestoException(INVALID_TABLE_PROPERTY, String.format("Invalid value for %s property: %s", HiveTableProperties.ORC_BLOOM_FILTER_FPP, bloomFilterFpp));
}
}

private String validateAndNormalizeAvroSchemaUrl(String url, HdfsContext context)
{
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import io.airlift.log.Logger;
import io.prestosql.orc.OrcDataSink;
import io.prestosql.orc.OrcDataSource;
Expand Down Expand Up @@ -51,10 +52,17 @@
import java.util.concurrent.Callable;
import java.util.function.Supplier;

import static io.prestosql.plugin.hive.HiveMetadata.COLUMN_NAMES_SPLITTER;
import static io.prestosql.plugin.hive.HiveMetadata.ORC_BLOOM_FILTER_COLUMNS_KEY;
import static io.prestosql.plugin.hive.HiveMetadata.ORC_BLOOM_FILTER_FPP_KEY;
import static io.prestosql.plugin.hive.HiveTableProperties.ORC_BLOOM_FILTER_FPP;
import static io.prestosql.plugin.hive.HiveTableProperties.TRANSACTIONAL;
import static io.prestosql.plugin.hive.HiveUtil.getColumnNames;
import static io.prestosql.plugin.hive.HiveUtil.getColumnTypes;
import static io.prestosql.spi.type.BigintType.BIGINT;
import static io.prestosql.spi.type.IntegerType.INTEGER;
import static java.lang.Double.parseDouble;
import static java.lang.String.format;
import static java.util.Locale.ENGLISH;
import static java.util.Objects.requireNonNull;
import static java.util.stream.Collectors.toList;
Expand All @@ -63,7 +71,8 @@ public class OrcFileWriterFactory
implements HiveFileWriterFactory
{
private static final Logger log = Logger.get(OrcFileWriterFactory.class);

private static final Integer BASE_OFFSET_FOR_TRANSACTIONAL_TABLE = 6;
private static final Integer BASE_OFFSET_FOR_NON_TRANSACTIONAL_TABLE = 0;
private final HdfsEnvironment hdfsEnvironment;
private final TypeManager typeManager;
private final NodeVersion nodeVersion;
Expand Down Expand Up @@ -210,12 +219,7 @@ public Optional<HiveFileWriter> createFileWriter(
fileColumnTypes,
dataFileColumnTypes,
compression,
orcWriterOptions
.withStripeMinSize(HiveSessionProperties.getOrcOptimizedWriterMinStripeSize(session))
.withStripeMaxSize(HiveSessionProperties.getOrcOptimizedWriterMaxStripeSize(session))
.withStripeMaxRowCount(HiveSessionProperties.getOrcOptimizedWriterMaxStripeRows(session))
.withDictionaryMaxMemory(HiveSessionProperties.getOrcOptimizedWriterMaxDictionaryMemory(session))
.withMaxStringStatisticsLimit(HiveSessionProperties.getOrcStringStatisticsLimit(session)),
getOrcWriterBloomOptions(schema, getOrcWriterOptions(schema, session)),
writeLegacyVersion,
fileInputColumnIndexes,
ImmutableMap.<String, String>builder()
Expand All @@ -236,6 +240,35 @@ public Optional<HiveFileWriter> createFileWriter(
}
}

public static OrcWriterOptions getOrcWriterBloomOptions(Properties schema, OrcWriterOptions orcWriterOptions)
{
if (schema.containsKey(ORC_BLOOM_FILTER_COLUMNS_KEY)) {
if (!schema.containsKey(ORC_BLOOM_FILTER_FPP_KEY)) {
throw new PrestoException(HiveErrorCode.HIVE_INVALID_METADATA, "FPP for bloom filter is missing");
}
try {
return orcWriterOptions
.withBloomFilterFpp(parseDouble(schema.getProperty(ORC_BLOOM_FILTER_FPP_KEY)))
.withBloomFilterColumns(ImmutableSet.copyOf(COLUMN_NAMES_SPLITTER.splitToList(schema.getProperty(ORC_BLOOM_FILTER_COLUMNS_KEY))));
}
catch (NumberFormatException e) {
throw new PrestoException(HiveErrorCode.HIVE_UNSUPPORTED_FORMAT, format("Invalid value for %s property: %s", ORC_BLOOM_FILTER_FPP, schema.getProperty(ORC_BLOOM_FILTER_FPP_KEY)));
}
}
return orcWriterOptions;
}

private OrcWriterOptions getOrcWriterOptions(Properties schema, ConnectorSession session)
{
return orcWriterOptions
.withStripeMinSize(HiveSessionProperties.getOrcOptimizedWriterMinStripeSize(session))
.withStripeMaxSize(HiveSessionProperties.getOrcOptimizedWriterMaxStripeSize(session))
.withStripeMaxRowCount(HiveSessionProperties.getOrcOptimizedWriterMaxStripeRows(session))
.withDictionaryMaxMemory(HiveSessionProperties.getOrcOptimizedWriterMaxDictionaryMemory(session))
.withMaxStringStatisticsLimit(HiveSessionProperties.getOrcStringStatisticsLimit(session))
.withBaseIndex(Boolean.valueOf(schema.getProperty(TRANSACTIONAL)) ? BASE_OFFSET_FOR_TRANSACTIONAL_TABLE : BASE_OFFSET_FOR_NON_TRANSACTIONAL_TABLE);
}

/**
* Allow subclass to replace data sink implementation.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package io.prestosql.plugin.hive.metastore.thrift;

import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableMap;
import io.prestosql.plugin.hive.HivePartition;
import io.prestosql.plugin.hive.HiveType;
Expand Down Expand Up @@ -47,9 +48,13 @@
import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static com.google.common.collect.ImmutableList.toImmutableList;
import static io.prestosql.plugin.hive.HiveMetadata.ORC_BLOOM_FILTER_COLUMNS_KEY;
import static io.prestosql.plugin.hive.HiveMetadata.ORC_BLOOM_FILTER_FPP_KEY;
import static io.prestosql.plugin.hive.HiveMetadata.TABLE_COMMENT;
import static io.prestosql.plugin.hive.metastore.thrift.ThriftMetastoreUtil.toMetastoreApiTable;
import static io.prestosql.spi.StandardErrorCode.NOT_SUPPORTED;
Expand Down Expand Up @@ -249,6 +254,7 @@ public void renameColumn(HiveIdentity identity, String databaseName, String tabl
fieldSchema.setName(newColumnName);
}
}
alterBloomColumns(oldColumnName, table, newColumnName);
alterTable(identity, databaseName, tableName, table);
}

Expand All @@ -258,10 +264,46 @@ public void dropColumn(HiveIdentity identity, String databaseName, String tableN
MetastoreUtil.verifyCanDropColumn(this, identity, databaseName, tableName, columnName);
org.apache.hadoop.hive.metastore.api.Table table = delegate.getTable(identity, databaseName, tableName)
.orElseThrow(() -> new TableNotFoundException(new SchemaTableName(databaseName, tableName)));

removeBloomColumns(columnName, table);
table.getSd().getCols().removeIf(fieldSchema -> fieldSchema.getName().equals(columnName));
alterTable(identity, databaseName, tableName, table);
}

private void alterBloomColumns(String oldColumnName, org.apache.hadoop.hive.metastore.api.Table table, String newColumnName)
{
if (table.getParameters().get(ORC_BLOOM_FILTER_COLUMNS_KEY) != null) {
List<String> bloomColumnNames = Stream.of(table.getParameters().get(ORC_BLOOM_FILTER_COLUMNS_KEY).split(","))
.map(s -> s.startsWith(oldColumnName + ".") ? s.replaceFirst(oldColumnName, newColumnName) : s)
.collect(Collectors.toList());
bloomColumnNames = bloomColumnNames.stream()
.map(s -> s.equals(oldColumnName) ? newColumnName : s)
.collect(Collectors.toList());

table.getParameters().put(ORC_BLOOM_FILTER_COLUMNS_KEY, Joiner.on(",").join(bloomColumnNames));
}
}

private void removeBloomColumns(String oldColumnName, org.apache.hadoop.hive.metastore.api.Table table)
{
if (table.getParameters().get(ORC_BLOOM_FILTER_COLUMNS_KEY) != null) {
Predicate<String> subColumnMatch = s -> !s.startsWith(oldColumnName + ".");
Predicate<String> columnMatch = r -> !r.equals(oldColumnName);

List<String> bloomColumnNames = Stream.of(table.getParameters().get(ORC_BLOOM_FILTER_COLUMNS_KEY).split(","))
.map(String::trim)
.filter(subColumnMatch.and(columnMatch))
.collect(Collectors.toList());
if (bloomColumnNames.size() > 0) {
table.getParameters().put(ORC_BLOOM_FILTER_COLUMNS_KEY, Joiner.on(",").join(bloomColumnNames));
}
else {
table.getParameters().remove(ORC_BLOOM_FILTER_COLUMNS_KEY);
table.getParameters().remove(ORC_BLOOM_FILTER_FPP_KEY);
}
}
}

private void alterTable(HiveIdentity identity, String databaseName, String tableName, org.apache.hadoop.hive.metastore.api.Table table)
{
delegate.alterTable(identity, databaseName, tableName, table);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.prestosql.plugin.hive;

import com.google.common.base.Joiner;
Expand Down Expand Up @@ -185,6 +186,7 @@
import static io.prestosql.testing.TestingSnapshotUtils.NOOP_SNAPSHOT_UTILS;
import static io.prestosql.testing.assertions.Assert.assertEquals;
import static io.prestosql.tests.QueryAssertions.assertEqualsIgnoreOrder;
import static io.prestosql.tests.sql.TestTable.randomTableSuffix;
import static io.prestosql.transaction.TransactionBuilder.transaction;
import static java.lang.String.format;
import static java.nio.charset.StandardCharsets.UTF_8;
Expand Down Expand Up @@ -6940,4 +6942,109 @@ public void testReadFromTablesWithPrimitiveAndStructDataTypeColumns()
assertUpdate("DROP TABLE testReadSchema4.testReadStruct9");
assertUpdate("DROP SCHEMA testReadSchema4");
}

@Test
public void testInvalidOrcBloomFilterProperty()
{
assertThatThrownBy(() -> assertUpdate("CREATE TABLE invalid_bloom_fpp (col1 bigint) WITH (format = 'ORC', orc_bloom_filter_columns = ARRAY['col1'], orc_bloom_filter_fpp=2)"))
.hasMessageMatching("Invalid value for orc_bloom_filter_fpp property: 2.0");
assertThatThrownBy(() -> assertUpdate("CREATE TABLE invalid_bloom_fpp (col1 bigint) WITH (format = 'ORC', orc_bloom_filter_columns = ARRAY['col1'], orc_bloom_filter_fpp=a)"))
.hasMessageStartingWith("Invalid value for table property 'orc_bloom_filter_fpp'");
}

@Test
public void testOrcBloomFilterWrittenForTransactionalTables()
{
String tableName = "bloom_for_txn_table_" + randomTableSuffix();
test_bloom_written_during_create(tableName, true);

tableName = "bloom_for_txn_table_" + randomTableSuffix();
test_bloom_written_during_insert(tableName, true);
}

@Test
public void testOrcBloomFilterWrittenForNonTransactionalTables()
{
String tableName = "bloom_for_non_txn_table_" + randomTableSuffix();
test_bloom_written_during_create(tableName, false);

tableName = "bloom_for_non_txn_table_" + randomTableSuffix();
test_bloom_written_during_insert(tableName, false);
}

private void test_bloom_written_during_create(String tableName, boolean transactional)
{
// test for bloom is getting written for create table query
assertUpdate(format("drop table if exists %s", tableName));
assertUpdate(
format(
"create table %s WITH (%s) as select * from tpch.tiny.lineitem",
tableName,
addTableProperties("ORC", transactional, "orderkey", 0.001)),
60175);
assertBloomFilterBasedRowGroupPruning(format("select * from %s where orderkey = 29989", tableName));

// test to check bloom still takes effect on rename column
assertUpdate(
format(
"alter table %s rename column orderkey to alt_orderkey", tableName));
assertBloomFilterBasedRowGroupPruning(format("select * from %s where alt_orderkey = 29989", tableName));
assertUpdate(format("DROP TABLE %s", tableName));
}

private void test_bloom_written_during_insert(String tableName, boolean transactional)
{
// test for bloom is getting written for insert query
assertUpdate(format("drop table if exists %s", tableName));
assertUpdate(
format(
"create table %s (orderkey bigint, partkey bigint, shipmode VARCHAR(10)) WITH (%s)",
tableName,
addTableProperties("ORC", transactional, "orderkey", 0.001)));
assertUpdate(
format(
"INSERT INTO %s SELECT orderkey, partkey, shipmode from tpch.tiny.lineitem",
tableName),
60175);
assertBloomFilterBasedRowGroupPruning(format("select * from %s where orderkey = 29989", tableName));
assertUpdate(format("DROP TABLE %s", tableName));
}

private String addTableProperties(String fileFormat, boolean transactional, String bloomFilterColumnName, double fpp)
{
return format(
"format = '%s', transactional = %s, orc_bloom_filter_columns = ARRAY['%s'], orc_bloom_filter_fpp = %s",
fileFormat,
transactional,
bloomFilterColumnName,
fpp);
}

private void assertBloomFilterBasedRowGroupPruning(@Language("SQL") String sql)
{
assertQueryStats(
enableBloomFilters(getSession(), false),
sql,
queryStats -> {
assertThat(queryStats.getPhysicalInputPositions()).isGreaterThan(0);
assertThat(queryStats.getProcessedInputPositions()).isEqualTo(queryStats.getPhysicalInputPositions());
},
results -> assertThat(results.getRowCount()).isEqualTo(2));

assertQueryStats(
enableBloomFilters(getSession(), true),
sql,
queryStats -> {
assertThat(queryStats.getPhysicalInputPositions()).isGreaterThan(0);
assertThat(queryStats.getProcessedInputPositions()).isGreaterThan(0);
},
results -> assertThat(results.getRowCount()).isEqualTo(2));
}

private Session enableBloomFilters(Session session, boolean value)
{
return Session.builder(session)
.setCatalogSessionProperty(session.getCatalog().get(), "orc_bloom_filters_enabled", String.valueOf(value))
.build();
}
}
Loading