Skip to content

Commit adb6b76

Browse files
rui-moLakehouse Engine Bot
authored andcommitted
Support struct schema evolution matching by name
Alchemy-item: [Support struct schema evolution matching by name](#29 (comment)) commit 1/1 - 8450c1e
1 parent 6fb8746 commit adb6b76

16 files changed

+318
-33
lines changed

velox/connectors/hive/SplitReader.cpp

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -413,11 +413,17 @@ std::vector<TypePtr> SplitReader::adaptColumns(
413413
auto fileTypeIdx = fileType->getChildIdxIfExists(fieldName);
414414
if (!fileTypeIdx.has_value()) {
415415
// Column is missing. Most likely due to schema evolution.
416-
VELOX_CHECK(tableSchema, "Unable to resolve column '{}'", fieldName);
416+
auto outputTypeIdx = readerOutputType_->getChildIdxIfExists(fieldName);
417+
TypePtr fieldType;
418+
if (outputTypeIdx.has_value()) {
419+
// Field name exists in the user-specified output type.
420+
fieldType = readerOutputType_->childAt(outputTypeIdx.value());
421+
} else {
422+
VELOX_CHECK(tableSchema, "Unable to resolve column '{}'", fieldName);
423+
fieldType = tableSchema->findChild(fieldName);
424+
}
417425
childSpec->setConstantValue(BaseVector::createNullConstant(
418-
tableSchema->findChild(fieldName),
419-
1,
420-
connectorQueryCtx_->memoryPool()));
426+
fieldType, 1, connectorQueryCtx_->memoryPool()));
421427
} else {
422428
// Column no longer missing, reset constant value set on the spec.
423429
childSpec->setConstantValue(nullptr);

velox/dwio/common/ScanSpec.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,7 @@ bool ScanSpec::hasFilter() const {
142142
if (hasFilter_.has_value()) {
143143
return hasFilter_.value();
144144
}
145-
if (!isConstant() && filter()) {
145+
if (filter()) {
146146
hasFilter_ = true;
147147
return true;
148148
}

velox/dwio/common/SelectiveStructColumnReader.cpp

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -358,7 +358,6 @@ void SelectiveStructColumnReaderBase::read(
358358
}
359359

360360
const auto& childSpecs = scanSpec_->children();
361-
VELOX_CHECK(!childSpecs.empty());
362361
for (size_t i = 0; i < childSpecs.size(); ++i) {
363362
const auto& childSpec = childSpecs[i];
364363
VELOX_TRACE_HISTORY_PUSH("read %s", childSpec->fieldName().c_str());
@@ -458,15 +457,17 @@ bool SelectiveStructColumnReaderBase::isChildMissing(
458457
// row type that doesn't exist
459458
// in the output.
460459
fileType_->type()->kind() !=
461-
TypeKind::MAP && // If this is the case it means this is a flat map,
462-
// so it can't have "missing" fields.
463-
childSpec.channel() >= fileType_->size());
460+
TypeKind::MAP // If this is the case it means this is a flat map,
461+
// so it can't have "missing" fields.
462+
) &&
463+
(columnReaderOptions_.useColumnNamesForColumnMapping_
464+
? !asRowType(fileType_->type())->containsChild(childSpec.fieldName())
465+
: childSpec.channel() >= fileType_->size());
464466
}
465467

466468
void SelectiveStructColumnReaderBase::getValues(
467469
const RowSet& rows,
468470
VectorPtr* result) {
469-
VELOX_CHECK(!scanSpec_->children().empty());
470471
VELOX_CHECK_NOT_NULL(
471472
*result, "SelectiveStructColumnReaderBase expects a non-null result");
472473
VELOX_CHECK(

velox/dwio/common/SelectiveStructColumnReader.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
#pragma once
1818

19+
#include "velox/dwio/common/Options.h"
1920
#include "velox/dwio/common/SelectiveColumnReaderInternal.h"
2021

2122
namespace facebook::velox::dwio::common {
@@ -111,13 +112,15 @@ class SelectiveStructColumnReaderBase : public SelectiveColumnReader {
111112
static constexpr int32_t kConstantChildSpecSubscript = -1;
112113

113114
SelectiveStructColumnReaderBase(
115+
const dwio::common::ColumnReaderOptions& columnReaderOptions,
114116
const TypePtr& requestedType,
115117
const std::shared_ptr<const dwio::common::TypeWithId>& fileType,
116118
FormatParams& params,
117119
velox::common::ScanSpec& scanSpec,
118120
bool isRoot = false,
119121
bool generateLazyChildren = true)
120122
: SelectiveColumnReader(requestedType, fileType, params, scanSpec),
123+
columnReaderOptions_(columnReaderOptions),
121124
debugString_(
122125
getExceptionContext().message(VeloxException::Type::kSystem)),
123126
isRoot_(isRoot),
@@ -155,6 +158,8 @@ class SelectiveStructColumnReaderBase : public SelectiveColumnReader {
155158
}
156159
}
157160

161+
const dwio::common::ColumnReaderOptions& columnReaderOptions_;
162+
158163
// Context information obtained from ExceptionContext. Stored here
159164
// so that LazyVector readers under this can add this to their
160165
// ExceptionContext. Allows contextualizing reader errors to split

velox/dwio/dwrf/reader/SelectiveFlatMapColumnReader.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -177,6 +177,7 @@ class SelectiveFlatMapAsStructReader : public SelectiveStructColumnReaderBase {
177177
DwrfParams& params,
178178
common::ScanSpec& scanSpec)
179179
: SelectiveStructColumnReaderBase(
180+
columnReaderOptions,
180181
requestedType,
181182
fileType,
182183
params,
@@ -215,6 +216,7 @@ class SelectiveFlatMapReader : public SelectiveStructColumnReaderBase {
215216
DwrfParams& params,
216217
common::ScanSpec& scanSpec)
217218
: SelectiveStructColumnReaderBase(
219+
columnReaderOptions,
218220
requestedType,
219221
fileType,
220222
params,

velox/dwio/dwrf/reader/SelectiveStructColumnReader.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ SelectiveStructColumnReader::SelectiveStructColumnReader(
3131
common::ScanSpec& scanSpec,
3232
bool isRoot)
3333
: SelectiveStructColumnReaderBase(
34+
columnReaderOptions,
3435
requestedType,
3536
fileType,
3637
params,

velox/dwio/dwrf/reader/SelectiveStructColumnReader.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,14 @@ class SelectiveStructColumnReaderBase
2525
: public dwio::common::SelectiveStructColumnReaderBase {
2626
public:
2727
SelectiveStructColumnReaderBase(
28+
const dwio::common::ColumnReaderOptions& columnReaderOptions,
2829
const TypePtr& requestedType,
2930
const std::shared_ptr<const dwio::common::TypeWithId>& fileType,
3031
DwrfParams& params,
3132
common::ScanSpec& scanSpec,
3233
bool isRoot = false)
3334
: dwio::common::SelectiveStructColumnReaderBase(
35+
columnReaderOptions,
3436
requestedType,
3537
fileType,
3638
params,

velox/dwio/parquet/reader/ParquetColumnReader.cpp

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,8 @@ std::unique_ptr<dwio::common::SelectiveColumnReader> ParquetColumnReader::build(
3838
const TypePtr& requestedType,
3939
const std::shared_ptr<const dwio::common::TypeWithId>& fileType,
4040
ParquetParams& params,
41-
common::ScanSpec& scanSpec) {
41+
common::ScanSpec& scanSpec,
42+
memory::MemoryPool& pool) {
4243
auto colName = scanSpec.fieldName();
4344

4445
switch (fileType->type()->kind()) {
@@ -59,19 +60,19 @@ std::unique_ptr<dwio::common::SelectiveColumnReader> ParquetColumnReader::build(
5960

6061
case TypeKind::ROW:
6162
return std::make_unique<StructColumnReader>(
62-
columnReaderOptions, requestedType, fileType, params, scanSpec);
63+
columnReaderOptions, requestedType, fileType, params, scanSpec, pool);
6364

6465
case TypeKind::VARBINARY:
6566
case TypeKind::VARCHAR:
6667
return std::make_unique<StringColumnReader>(fileType, params, scanSpec);
6768

6869
case TypeKind::ARRAY:
6970
return std::make_unique<ListColumnReader>(
70-
columnReaderOptions, requestedType, fileType, params, scanSpec);
71+
columnReaderOptions, requestedType, fileType, params, scanSpec, pool);
7172

7273
case TypeKind::MAP:
7374
return std::make_unique<MapColumnReader>(
74-
columnReaderOptions, requestedType, fileType, params, scanSpec);
75+
columnReaderOptions, requestedType, fileType, params, scanSpec, pool);
7576

7677
case TypeKind::BOOLEAN:
7778
return std::make_unique<BooleanColumnReader>(

velox/dwio/parquet/reader/ParquetColumnReader.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ class ParquetColumnReader {
4747
const TypePtr& requestedType,
4848
const std::shared_ptr<const dwio::common::TypeWithId>& fileType,
4949
ParquetParams& params,
50-
common::ScanSpec& scanSpec);
50+
common::ScanSpec& scanSpec,
51+
memory::MemoryPool& pool);
5152
};
5253
} // namespace facebook::velox::parquet

velox/dwio/parquet/reader/ParquetReader.cpp

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1066,12 +1066,15 @@ class ParquetRowReader::Impl {
10661066
options_.timestampPrecision());
10671067
requestedType_ = options_.requestedType() ? options_.requestedType()
10681068
: readerBase_->schema();
1069+
columnReaderOptions_ =
1070+
dwio::common::makeColumnReaderOptions(readerBase_->options());
10691071
columnReader_ = ParquetColumnReader::build(
10701072
columnReaderOptions_,
10711073
requestedType_,
10721074
readerBase_->schemaWithId(), // Id is schema id
10731075
params,
1074-
*options_.scanSpec());
1076+
*options_.scanSpec(),
1077+
pool_);
10751078
columnReader_->setIsTopLevel();
10761079

10771080
filterRowGroups();
@@ -1081,9 +1084,6 @@ class ParquetRowReader::Impl {
10811084
// table scan.
10821085
advanceToNextRowGroup();
10831086
}
1084-
1085-
columnReaderOptions_ =
1086-
dwio::common::makeColumnReaderOptions(readerBase_->options());
10871087
}
10881088

10891089
void filterRowGroups() {

0 commit comments

Comments
 (0)