Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 10 additions & 4 deletions velox/connectors/hive/SplitReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -421,11 +421,17 @@ std::vector<TypePtr> SplitReader::adaptColumns(
auto fileTypeIdx = fileType->getChildIdxIfExists(fieldName);
if (!fileTypeIdx.has_value()) {
// Column is missing. Most likely due to schema evolution.
VELOX_CHECK(tableSchema, "Unable to resolve column '{}'", fieldName);
auto outputTypeIdx = readerOutputType_->getChildIdxIfExists(fieldName);
TypePtr fieldType;
if (outputTypeIdx.has_value()) {
// Field name exists in the user-specified output type.
fieldType = readerOutputType_->childAt(outputTypeIdx.value());
} else {
VELOX_CHECK(tableSchema, "Unable to resolve column '{}'", fieldName);
fieldType = tableSchema->findChild(fieldName);
}
childSpec->setConstantValue(BaseVector::createNullConstant(
tableSchema->findChild(fieldName),
1,
connectorQueryCtx_->memoryPool()));
fieldType, 1, connectorQueryCtx_->memoryPool()));
} else {
// Column no longer missing, reset constant value set on the spec.
childSpec->setConstantValue(nullptr);
Expand Down
2 changes: 1 addition & 1 deletion velox/dwio/common/ScanSpec.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ bool ScanSpec::hasFilter() const {
if (hasFilter_.has_value()) {
return hasFilter_.value();
}
if (!isConstant() && filter()) {
if (filter()) {
hasFilter_ = true;
return true;
}
Expand Down
2 changes: 2 additions & 0 deletions velox/dwio/common/SelectiveFlatMapColumnReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,13 @@ namespace facebook::velox::dwio::common {
class SelectiveFlatMapColumnReader : public SelectiveStructColumnReaderBase {
protected:
SelectiveFlatMapColumnReader(
const dwio::common::ColumnReaderOptions& columnReaderOptions,
const TypePtr& requestedType,
const std::shared_ptr<const dwio::common::TypeWithId>& fileType,
FormatParams& params,
velox::common::ScanSpec& scanSpec)
: SelectiveStructColumnReaderBase(
columnReaderOptions,
requestedType,
fileType,
params,
Expand Down
11 changes: 6 additions & 5 deletions velox/dwio/common/SelectiveStructColumnReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -426,7 +426,6 @@ void SelectiveStructColumnReaderBase::read(
}

const auto& childSpecs = scanSpec_->children();
VELOX_CHECK(!childSpecs.empty());
for (size_t i = 0; i < childSpecs.size(); ++i) {
const auto& childSpec = childSpecs[i];
VELOX_TRACE_HISTORY_PUSH("read %s", childSpec->fieldName().c_str());
Expand Down Expand Up @@ -526,9 +525,12 @@ bool SelectiveStructColumnReaderBase::isChildMissing(
// row type that doesn't exist
// in the output.
fileType_->type()->kind() !=
TypeKind::MAP && // If this is the case it means this is a flat map,
// so it can't have "missing" fields.
childSpec.channel() >= fileType_->size());
TypeKind::MAP // If this is the case it means this is a flat map,
// so it can't have "missing" fields.
) &&
(columnReaderOptions_.useColumnNamesForColumnMapping_
? !asRowType(fileType_->type())->containsChild(childSpec.fieldName())
: childSpec.channel() >= fileType_->size());
}

std::unique_ptr<velox::dwio::common::ColumnLoader>
Expand All @@ -540,7 +542,6 @@ SelectiveStructColumnReaderBase::makeColumnLoader(vector_size_t index) {
void SelectiveStructColumnReaderBase::getValues(
const RowSet& rows,
VectorPtr* result) {
VELOX_CHECK(!scanSpec_->children().empty());
VELOX_CHECK_NOT_NULL(
*result, "SelectiveStructColumnReaderBase expects a non-null result");
VELOX_CHECK(
Expand Down
5 changes: 5 additions & 0 deletions velox/dwio/common/SelectiveStructColumnReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

#pragma once

#include "velox/dwio/common/Options.h"
#include "velox/dwio/common/SelectiveColumnReaderInternal.h"

namespace facebook::velox::dwio::common {
Expand Down Expand Up @@ -113,13 +114,15 @@ class SelectiveStructColumnReaderBase : public SelectiveColumnReader {
static constexpr int32_t kConstantChildSpecSubscript = -1;

SelectiveStructColumnReaderBase(
const dwio::common::ColumnReaderOptions& columnReaderOptions,
const TypePtr& requestedType,
const std::shared_ptr<const dwio::common::TypeWithId>& fileType,
FormatParams& params,
velox::common::ScanSpec& scanSpec,
bool isRoot = false,
bool generateLazyChildren = true)
: SelectiveColumnReader(requestedType, fileType, params, scanSpec),
columnReaderOptions_(columnReaderOptions),
debugString_(
getExceptionContext().message(VeloxException::Type::kSystem)),
isRoot_(isRoot),
Expand Down Expand Up @@ -180,6 +183,8 @@ class SelectiveStructColumnReaderBase : public SelectiveColumnReader {
}
}

const dwio::common::ColumnReaderOptions& columnReaderOptions_;

// Context information obtained from ExceptionContext. Stored here
// so that LazyVector readers under this can add this to their
// ExceptionContext. Allows contextualizing reader errors to split
Expand Down
3 changes: 3 additions & 0 deletions velox/dwio/dwrf/reader/SelectiveFlatMapColumnReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@ class SelectiveFlatMapAsStructReader : public SelectiveStructColumnReaderBase {
DwrfParams& params,
common::ScanSpec& scanSpec)
: SelectiveStructColumnReaderBase(
columnReaderOptions,
requestedType,
fileType,
params,
Expand Down Expand Up @@ -241,6 +242,7 @@ class SelectiveFlatMapAsMapReader : public SelectiveStructColumnReaderBase {
DwrfParams& params,
common::ScanSpec& scanSpec)
: SelectiveStructColumnReaderBase(
columnReaderOptions,
requestedType,
fileType,
params,
Expand Down Expand Up @@ -285,6 +287,7 @@ class SelectiveFlatMapReader
DwrfParams& params,
common::ScanSpec& scanSpec)
: dwio::common::SelectiveFlatMapColumnReader(
columnReaderOptions,
requestedType,
fileType,
params,
Expand Down
1 change: 1 addition & 0 deletions velox/dwio/dwrf/reader/SelectiveStructColumnReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ SelectiveStructColumnReader::SelectiveStructColumnReader(
common::ScanSpec& scanSpec,
bool isRoot)
: SelectiveStructColumnReaderBase(
columnReaderOptions,
requestedType,
fileType,
params,
Expand Down
2 changes: 2 additions & 0 deletions velox/dwio/dwrf/reader/SelectiveStructColumnReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,15 @@ class SelectiveStructColumnReaderBase
: public dwio::common::SelectiveStructColumnReaderBase {
public:
SelectiveStructColumnReaderBase(
const dwio::common::ColumnReaderOptions& columnReaderOptions,
const TypePtr& requestedType,
const std::shared_ptr<const dwio::common::TypeWithId>& fileType,
DwrfParams& params,
common::ScanSpec& scanSpec,
bool isRoot = false,
bool generateLazyChildren = true)
: dwio::common::SelectiveStructColumnReaderBase(
columnReaderOptions,
requestedType,
fileType,
params,
Expand Down
9 changes: 5 additions & 4 deletions velox/dwio/parquet/reader/ParquetColumnReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ std::unique_ptr<dwio::common::SelectiveColumnReader> ParquetColumnReader::build(
const TypePtr& requestedType,
const std::shared_ptr<const dwio::common::TypeWithId>& fileType,
ParquetParams& params,
common::ScanSpec& scanSpec) {
common::ScanSpec& scanSpec,
memory::MemoryPool& pool) {
auto colName = scanSpec.fieldName();

switch (fileType->type()->kind()) {
Expand All @@ -59,7 +60,7 @@ std::unique_ptr<dwio::common::SelectiveColumnReader> ParquetColumnReader::build(

case TypeKind::ROW:
return std::make_unique<StructColumnReader>(
columnReaderOptions, requestedType, fileType, params, scanSpec);
columnReaderOptions, requestedType, fileType, params, scanSpec, pool);

case TypeKind::VARBINARY:
case TypeKind::VARCHAR:
Expand All @@ -68,12 +69,12 @@ std::unique_ptr<dwio::common::SelectiveColumnReader> ParquetColumnReader::build(
case TypeKind::ARRAY: {
VELOX_CHECK(requestedType->isArray(), "Requested type must be array");
return std::make_unique<ListColumnReader>(
columnReaderOptions, requestedType, fileType, params, scanSpec);
columnReaderOptions, requestedType, fileType, params, scanSpec, pool);
}

case TypeKind::MAP:
return std::make_unique<MapColumnReader>(
columnReaderOptions, requestedType, fileType, params, scanSpec);
columnReaderOptions, requestedType, fileType, params, scanSpec, pool);

case TypeKind::BOOLEAN:
return std::make_unique<BooleanColumnReader>(
Expand Down
3 changes: 2 additions & 1 deletion velox/dwio/parquet/reader/ParquetColumnReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ class ParquetColumnReader {
const TypePtr& requestedType,
const std::shared_ptr<const dwio::common::TypeWithId>& fileType,
ParquetParams& params,
common::ScanSpec& scanSpec);
common::ScanSpec& scanSpec,
memory::MemoryPool& pool);
};
} // namespace facebook::velox::parquet
8 changes: 4 additions & 4 deletions velox/dwio/parquet/reader/ParquetReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1207,12 +1207,15 @@ class ParquetRowReader::Impl {
options_.timestampPrecision());
requestedType_ = options_.requestedType() ? options_.requestedType()
: readerBase_->schema();
columnReaderOptions_ =
dwio::common::makeColumnReaderOptions(readerBase_->options());
columnReader_ = ParquetColumnReader::build(
columnReaderOptions_,
requestedType_,
readerBase_->schemaWithId(), // Id is schema id
params,
*options_.scanSpec());
*options_.scanSpec(),
pool_);
columnReader_->setIsTopLevel();

filterRowGroups();
Expand All @@ -1222,9 +1225,6 @@ class ParquetRowReader::Impl {
// table scan.
advanceToNextRowGroup();
}

columnReaderOptions_ =
dwio::common::makeColumnReaderOptions(readerBase_->options());
}

void filterRowGroups() {
Expand Down
18 changes: 13 additions & 5 deletions velox/dwio/parquet/reader/RepeatedColumnReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ PageReader* readLeafRepDefs(
return nullptr;
}
auto pageReader = reader->formatData().as<ParquetData>().reader();
if (pageReader == nullptr) {
return nullptr;
}
pageReader->decodeRepDefs(numTop);
return pageReader;
}
Expand Down Expand Up @@ -114,7 +117,8 @@ MapColumnReader::MapColumnReader(
const TypePtr& requestedType,
const std::shared_ptr<const dwio::common::TypeWithId>& fileType,
ParquetParams& params,
common::ScanSpec& scanSpec)
common::ScanSpec& scanSpec,
memory::MemoryPool& pool)
: dwio::common::SelectiveMapColumnReader(
requestedType,
fileType,
Expand All @@ -128,13 +132,15 @@ MapColumnReader::MapColumnReader(
keyChildType,
fileType_->childAt(0),
params,
*scanSpec.children()[0]);
*scanSpec.children()[0],
pool);
elementReader_ = ParquetColumnReader::build(
columnReaderOptions,
elementChildType,
fileType_->childAt(1),
params,
*scanSpec.children()[1]);
*scanSpec.children()[1],
pool);
reinterpret_cast<const ParquetTypeWithId*>(fileType.get())
->makeLevelInfo(levelInfo_);
children_ = {keyReader_.get(), elementReader_.get()};
Expand Down Expand Up @@ -232,7 +238,8 @@ ListColumnReader::ListColumnReader(
const TypePtr& requestedType,
const std::shared_ptr<const dwio::common::TypeWithId>& fileType,
ParquetParams& params,
common::ScanSpec& scanSpec)
common::ScanSpec& scanSpec,
memory::MemoryPool& pool)
: dwio::common::SelectiveListColumnReader(
requestedType,
fileType,
Expand All @@ -244,7 +251,8 @@ ListColumnReader::ListColumnReader(
childType,
fileType_->childAt(0),
params,
*scanSpec.children()[0]);
*scanSpec.children()[0],
pool);
reinterpret_cast<const ParquetTypeWithId*>(fileType.get())
->makeLevelInfo(levelInfo_);
children_ = {child_.get()};
Expand Down
6 changes: 4 additions & 2 deletions velox/dwio/parquet/reader/RepeatedColumnReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ class MapColumnReader : public dwio::common::SelectiveMapColumnReader {
const TypePtr& requestedType,
const std::shared_ptr<const dwio::common::TypeWithId>& fileType,
ParquetParams& params,
common::ScanSpec& scanSpec);
common::ScanSpec& scanSpec,
memory::MemoryPool& pool);

void prepareRead(
vector_size_t offset,
Expand Down Expand Up @@ -118,7 +119,8 @@ class ListColumnReader : public dwio::common::SelectiveListColumnReader {
const TypePtr& requestedType,
const std::shared_ptr<const dwio::common::TypeWithId>& fileType,
ParquetParams& params,
common::ScanSpec& scanSpec);
common::ScanSpec& scanSpec,
memory::MemoryPool& pool);

void prepareRead(
vector_size_t offset,
Expand Down
49 changes: 43 additions & 6 deletions velox/dwio/parquet/reader/StructColumnReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,18 +31,33 @@ StructColumnReader::StructColumnReader(
const TypePtr& requestedType,
const std::shared_ptr<const dwio::common::TypeWithId>& fileType,
ParquetParams& params,
common::ScanSpec& scanSpec)
: SelectiveStructColumnReader(requestedType, fileType, params, scanSpec) {
common::ScanSpec& scanSpec,
memory::MemoryPool& pool)
: SelectiveStructColumnReader(
columnReaderOptions,
requestedType,
fileType,
params,
scanSpec,
/*isRoot=*/false) {
auto& childSpecs = scanSpec_->stableChildren();
const bool useColumnNames =
columnReaderOptions.useColumnNamesForColumnMapping_;
std::vector<column_index_t> missingFields;
for (auto i = 0; i < childSpecs.size(); ++i) {
auto childSpec = childSpecs[i];
if (childSpec->isConstant() || isChildMissing(*childSpec)) {
if (childSpec->isConstant() &&
(!useColumnNames && isChildMissing(*childSpec))) {
childSpec->setSubscript(kConstantChildSpecSubscript);
continue;
}
if (!childSpecs[i]->readFromFile()) {
continue;
}
if (useColumnNames && isChildMissing(*childSpec)) {
missingFields.emplace_back(i);
continue;
}
auto childFileType = fileType_->childByName(childSpec->fieldName());
auto childRequestedType =
requestedType_->asRow().findChild(childSpec->fieldName());
Expand All @@ -51,10 +66,30 @@ StructColumnReader::StructColumnReader(
childRequestedType,
childFileType,
params,
*childSpec));
*childSpec,
pool));

childSpecs[i]->setSubscript(children_.size() - 1);
}

// 'missingFields' is not empty only when using column names for column
// mapping.
if (missingFields.size() > 0) {
// Set the struct as null if all the subfields in the requested type are
// missing and the number of subfields is more than one.
if (childSpecs.size() > 1 && missingFields.size() == childSpecs.size()) {
scanSpec_->setConstantValue(
BaseVector::createNullConstant(requestedType_, 1, &pool));
} else {
// Set null constant for the missing subfield of requested type.
auto rowTypePtr = asRowType(requestedType_);
for (int channel : missingFields) {
childSpecs[channel]->setConstantValue(BaseVector::createNullConstant(
rowTypePtr->findChild(childSpecs[channel]->fieldName()), 1, &pool));
}
}
}

auto type = reinterpret_cast<const ParquetTypeWithId*>(fileType_.get());
if (type->parent()) {
levelMode_ = reinterpret_cast<const ParquetTypeWithId*>(fileType_.get())
Expand All @@ -64,7 +99,10 @@ StructColumnReader::StructColumnReader(
// this and the child.
auto child = childForRepDefs_;
for (;;) {
assert(child);
if (child == nullptr) {
levelMode_ = LevelMode::kNulls;
break;
}
if (child->fileType().type()->kind() == TypeKind::ARRAY ||
child->fileType().type()->kind() == TypeKind::MAP) {
levelMode_ = LevelMode::kStructOverLists;
Expand Down Expand Up @@ -101,7 +139,6 @@ StructColumnReader::findBestLeaf() {
best = child;
}
}
assert(best);
return best;
}

Expand Down
Loading