Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,8 @@ private ArrowVectorAccessor<DecimalT, Utf8StringT, ArrayT, ChildVectorT> getPlai
return new FixedSizeBinaryAccessor<>(
(FixedSizeBinaryVector) vector, stringFactorySupplier.get());
}
throw new UnsupportedOperationException("Unsupported vector: " + vector.getClass());
String vectorName = (vector == null) ? "null" : vector.getClass().toString();
throw new UnsupportedOperationException("Unsupported vector: " + vectorName);
}

private static boolean isDecimal(PrimitiveType primitive) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import static org.apache.iceberg.Files.localInput;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

import java.io.File;
import java.io.IOException;
Expand Down Expand Up @@ -272,6 +273,51 @@ public void testReadColumnFilter2() throws Exception {
scan, NUM_ROWS_PER_MONTH, 12 * NUM_ROWS_PER_MONTH, ImmutableList.of("timestamp"));
}

@Test
public void testThrowsUOEWhenNewColumnHasNoValue() throws Exception {
rowsWritten = Lists.newArrayList();
tables = new HadoopTables();

Schema schema =
new Schema(
Types.NestedField.required(1, "a", Types.IntegerType.get()),
Types.NestedField.optional(2, "b", Types.StringType.get()),
Types.NestedField.required(3, "c", Types.DecimalType.of(12, 3)));

PartitionSpec spec = PartitionSpec.builderFor(schema).build();
Table table1 = tables.create(schema, spec, tableLocation);

// Add one record to the table
GenericRecord rec = GenericRecord.create(schema);
rec.setField("a", 1);
rec.setField("b", "san diego");
rec.setField("c", new BigDecimal("1024.025"));
List<GenericRecord> genericRecords = Lists.newArrayList();
genericRecords.add(rec);

// Alter the table schema by adding a new, optional column.
// Do not add any data for this new column in the one existing row in the table
// and do not insert any new rows into the table.
Table table = tables.load(tableLocation);
table.updateSchema().addColumn("a1", Types.IntegerType.get()).commit();

// Select all columns, all rows from the table
TableScan scan = table.newScan().select("*");

assertThatThrownBy(
() -> {
// Read the data.
try (VectorizedTableScanIterable itr =
new VectorizedTableScanIterable(scan, 1000, false)) {
for (ColumnarBatch batch : itr) {
// no-op
}
}
})
.isInstanceOf(UnsupportedOperationException.class)
.hasMessage("Unsupported vector: null");
}

/**
* The test asserts that {@link CloseableIterator#hasNext()} returned by the {@link ArrowReader}
* is idempotent.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,11 +139,11 @@ protected String buildTableArn() {
"arn:%s:glue:%s:%s:table/%s/%s", partitionName, region(), glueAccountId, dbName, tableName);
}

private LakeFormationClient lakeFormation() {
protected LakeFormationClient lakeFormation() {
return LakeFormationClient.builder()
.applyMutation(this::applyAssumeRoleConfigurations)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rg185064 I mistakenly pushed the Lake formation updates when I created the feature branch. Please verify in feature/teradata-apache-iceberg-1.10.1. ( I have updated my wiki instructions to make sure I don't miss this step again )

.applyMutation(httpClientProperties()::applyHttpClientConfigurations)
.build();
.applyMutation(this::applyAssumeRoleConfigurations)
.applyMutation(httpClientProperties()::applyHttpClientConfigurations)
.build();
}

protected AwsCredentialsProvider lakeFormationCredentialsProvider() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.iceberg.CatalogProperties;
Expand Down Expand Up @@ -132,7 +131,7 @@ public <R> R run(Action<R, IMetaStoreClient, TException> action, boolean retry)
static Key extractKey(String cacheKeys, Configuration conf) {
// generate key elements in a certain order, so that the Key instances are comparable
List<Object> elements = Lists.newArrayList();
elements.add(conf.get(HiveConf.ConfVars.METASTOREURIS.varname, ""));
elements.add(conf.get(HiveCatalog.HIVE_METASTORE_URIS, ""));
elements.add(conf.get(HiveCatalog.HIVE_CONF_CATALOG, "hive"));
if (cacheKeys == null || cacheKeys.isEmpty()) {
return Key.of(elements);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.TableType;
import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
Expand Down Expand Up @@ -82,6 +81,8 @@ public class HiveCatalog extends BaseMetastoreViewCatalog

// MetastoreConf is not available with current Hive version
static final String HIVE_CONF_CATALOG = "metastore.catalog.default";
static final String HIVE_METASTORE_WAREHOUSE_DIR = "hive.metastore.warehouse.dir";
static final String HIVE_METASTORE_URIS = "hive.metastore.uris";

private static final Logger LOG = LoggerFactory.getLogger(HiveCatalog.class);

Expand All @@ -104,12 +105,12 @@ public void initialize(String inputName, Map<String, String> properties) {
}

if (properties.containsKey(CatalogProperties.URI)) {
this.conf.set(HiveConf.ConfVars.METASTOREURIS.varname, properties.get(CatalogProperties.URI));
this.conf.set(HIVE_METASTORE_URIS, properties.get(CatalogProperties.URI));
}

if (properties.containsKey(CatalogProperties.WAREHOUSE_LOCATION)) {
this.conf.set(
HiveConf.ConfVars.METASTOREWAREHOUSE.varname,
HIVE_METASTORE_WAREHOUSE_DIR,
LocationUtil.stripTrailingSlash(properties.get(CatalogProperties.WAREHOUSE_LOCATION)));
}

Expand Down Expand Up @@ -728,7 +729,7 @@ protected String defaultWarehouseLocation(TableIdentifier tableIdentifier) {
}

private String databaseLocation(String databaseName) {
String warehouseLocation = conf.get(HiveConf.ConfVars.METASTOREWAREHOUSE.varname);
String warehouseLocation = conf.get(HIVE_METASTORE_WAREHOUSE_DIR);
Preconditions.checkNotNull(
warehouseLocation, "Warehouse location is not set: hive.metastore.warehouse.dir=null");
warehouseLocation = LocationUtil.stripTrailingSlash(warehouseLocation);
Expand Down Expand Up @@ -796,7 +797,7 @@ Database convertToDatabase(Namespace namespace, Map<String, String> meta) {
public String toString() {
return MoreObjects.toStringHelper(this)
.add("name", name)
.add("uri", this.conf == null ? "" : this.conf.get(HiveConf.ConfVars.METASTOREURIS.varname))
.add("uri", this.conf == null ? "" : this.conf.get(HIVE_METASTORE_URIS))
.toString();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,10 @@ public ParquetValueReader<?> struct(
if (fieldReader != null) {
Type fieldType = fields.get(i);
int fieldD = type.getMaxDefinitionLevel(path(fieldType.getName())) - 1;
int id = fieldType.getId().intValue();
int id =
fieldType.getId() != null
? fieldType.getId().intValue()
: i < expected.fields().size() ? expected.fields().get(i).fieldId() : -1;
readersById.put(id, ParquetValueReaders.option(fieldType, fieldD, fieldReader));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,11 +199,15 @@ private static <T> T visitField(
private static <T> List<T> visitFields(
Types.StructType struct, GroupType group, TypeWithSchemaVisitor<T> visitor) {
List<T> results = Lists.newArrayListWithExpectedSize(group.getFieldCount());
int fieldIdFromStruct = 0;
for (Type field : group.getFields()) {
int id = -1;
if (field.getId() != null) {
id = field.getId().intValue();
}
int id =
field.getId() != null
? field.getId().intValue()
: (struct != null && fieldIdFromStruct < struct.fields().size())
? struct.fields().get(fieldIdFromStruct).fieldId()
: -1;
fieldIdFromStruct++;
Types.NestedField iField = (struct != null && id >= 0) ? struct.field(id) : null;
results.add(visitField(iField, field, visitor));
}
Expand Down
143 changes: 143 additions & 0 deletions parquet/src/test/java/org/apache/iceberg/parquet/TestParquet.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@
import org.apache.iceberg.MetricsConfig;
import org.apache.iceberg.Schema;
import org.apache.iceberg.avro.AvroSchemaUtil;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.data.parquet.GenericParquetReaders;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.mapping.MappingUtil;
import org.apache.iceberg.mapping.NameMapping;
Expand Down Expand Up @@ -353,4 +356,144 @@ private Pair<File, Long> generateFile(
records.toArray(new GenericData.Record[] {}));
return Pair.of(file, size);
}

@Test
public void testReadNestedStructWithoutId() throws IOException {
Schema icebergSchema =
new Schema(
Types.NestedField.required(
1,
"outer_struct",
Types.StructType.of(
Types.NestedField.optional(
2,
"middle_struct",
Types.StructType.of(
Types.NestedField.optional(
3,
"inner_struct",
Types.StructType.of(
Types.NestedField.optional(
4, "value_field", Types.StringType.get()))))))));

// Create Avro schema without IDs.
org.apache.avro.Schema avroSchema = createAvroSchemaWithoutIds();

// Write test data to Parquet file.
File file = createTempFile(temp);
writeParquetFile(file, avroSchema);

// Read and verify the data.
try (CloseableIterable<Record> reader =
Parquet.read(Files.localInput(file))
.project(icebergSchema)
.createReaderFunc(
fileSchema -> GenericParquetReaders.buildReader(icebergSchema, fileSchema))
.build()) {

org.apache.iceberg.data.Record readRecord = Iterables.getOnlyElement(reader);
verifyNestedStructData(readRecord);
}
}

private org.apache.avro.Schema createAvroSchemaWithoutIds() {
org.apache.avro.Schema innerStructSchema =
org.apache.avro.Schema.createRecord("inner_struct_type", null, null, false);
innerStructSchema.setFields(
List.of(
new org.apache.avro.Schema.Field(
"value_field",
org.apache.avro.Schema.createUnion(
List.of(
org.apache.avro.Schema.create(org.apache.avro.Schema.Type.NULL),
org.apache.avro.Schema.create(org.apache.avro.Schema.Type.STRING))),
null,
org.apache.avro.JsonProperties.NULL_VALUE)));

org.apache.avro.Schema middleStructSchema =
org.apache.avro.Schema.createRecord("middle_struct_type", null, null, false);
middleStructSchema.setFields(
List.of(
new org.apache.avro.Schema.Field(
"inner_struct",
org.apache.avro.Schema.createUnion(
List.of(
org.apache.avro.Schema.create(org.apache.avro.Schema.Type.NULL),
innerStructSchema)),
null,
org.apache.avro.JsonProperties.NULL_VALUE)));

org.apache.avro.Schema outerStructSchema =
org.apache.avro.Schema.createRecord("outer_struct_type", null, null, false);
outerStructSchema.setFields(
List.of(
new org.apache.avro.Schema.Field(
"middle_struct",
org.apache.avro.Schema.createUnion(
List.of(
org.apache.avro.Schema.create(org.apache.avro.Schema.Type.NULL),
middleStructSchema)),
null,
org.apache.avro.JsonProperties.NULL_VALUE)));

org.apache.avro.Schema recordSchema =
org.apache.avro.Schema.createRecord("test_record", null, null, false);
recordSchema.setFields(
List.of(new org.apache.avro.Schema.Field("outer_struct", outerStructSchema, null, null)));

return recordSchema;
}

private void writeParquetFile(File file, org.apache.avro.Schema avroSchema) throws IOException {
// Create test data.
GenericData.Record record = createNestedRecord(avroSchema);

// Write to Parquet file.
try (ParquetWriter<GenericRecord> writer =
AvroParquetWriter.<GenericRecord>builder(new org.apache.hadoop.fs.Path(file.toURI()))
.withSchema(avroSchema)
.withDataModel(GenericData.get())
.build()) {
writer.write(record);
}
}

private GenericData.Record createNestedRecord(org.apache.avro.Schema avroSchema) {
org.apache.avro.Schema outerSchema = avroSchema.getField("outer_struct").schema();
org.apache.avro.Schema middleSchema =
outerSchema.getField("middle_struct").schema().getTypes().get(1);
org.apache.avro.Schema innerSchema =
middleSchema.getField("inner_struct").schema().getTypes().get(1);

GenericRecordBuilder innerBuilder = new GenericRecordBuilder(innerSchema);
innerBuilder.set("value_field", "test_value");

GenericRecordBuilder middleBuilder = new GenericRecordBuilder(middleSchema);
middleBuilder.set("inner_struct", innerBuilder.build());

GenericRecordBuilder outerBuilder = new GenericRecordBuilder(outerSchema);
outerBuilder.set("middle_struct", middleBuilder.build());

GenericRecordBuilder recordBuilder = new GenericRecordBuilder(avroSchema);
recordBuilder.set("outer_struct", outerBuilder.build());

return recordBuilder.build();
}

private void verifyNestedStructData(org.apache.iceberg.data.Record record) {
org.apache.iceberg.data.Record outerStruct = (org.apache.iceberg.data.Record) record.get(0);
assertThat(outerStruct).isNotNull().withFailMessage("Outer struct should not be null");

org.apache.iceberg.data.Record middleStruct =
(org.apache.iceberg.data.Record) outerStruct.get(0);
assertThat(middleStruct).isNotNull().withFailMessage("Middle struct should not be null");

org.apache.iceberg.data.Record innerStruct =
(org.apache.iceberg.data.Record) middleStruct.get(0);
assertThat(innerStruct).isNotNull().withFailMessage("Inner struct should not be null");

assertThat(innerStruct.get(0).toString())
.isEqualTo("test_value")
.withFailMessage("Inner value field should match expected value");
}
}
Loading