Skip to content

Commit 54aef0b

Browse files
Yuhtafacebook-github-bot
authored andcommitted
fix: Do not use hook in DeltaUpdateColumnLoader (facebookincubator#13996)
Summary: Pull Request resolved: facebookincubator#13996 In `DeltaUpdateColumnLoader`, hooks cannot be used efficiently and only used as a fallback. Since hooks are only used in aggregation today, we can add a new method `VectorLoader::supportsHook` to indicate if hooks should be used, and avoid using of hook in aggregation if the loader does not support it. This allows us to remove some inefficient and error-prone code (some indices mismatch is causing out of boundary access and crash). bypass-github-export-checks Reviewed By: oerling Differential Revision: D77618589 fbshipit-source-id: 71bef3cc8bef1988b236d336bc9f92d09a020c91
1 parent 67baa89 commit 54aef0b

File tree

8 files changed

+45
-83
lines changed

8 files changed

+45
-83
lines changed

velox/dwio/common/ColumnLoader.cpp

Lines changed: 2 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -87,64 +87,6 @@ void scatter(RowSet rows, vector_size_t resultSize, VectorPtr* result) {
8787
*result = BaseVector::wrapInDictionary(nullptr, indices, resultSize, *result);
8888
}
8989

90-
template <typename T>
91-
void addToHookImpl(
92-
const DecodedVector& decoded,
93-
const RowSet& rows,
94-
ValueHook& hook) {
95-
if (decoded.isIdentityMapping() && rows.size() == rows.back() + 1) {
96-
auto* values = decoded.data<T>();
97-
hook.addValues(rows.data(), values, rows.size());
98-
return;
99-
}
100-
// Masked aggregate is not pushed down for now, so the input row set is
101-
// guaranteed to be dense. Just make sure it works with sparse input for
102-
// future proof.
103-
for (vector_size_t i = 0; i < rows.size(); ++i) {
104-
if (!decoded.isNullAt(i)) {
105-
hook.addValueTyped(rows[i], decoded.valueAt<T>(i));
106-
} else if (hook.acceptsNulls()) {
107-
hook.addNull(rows[i]);
108-
}
109-
}
110-
}
111-
112-
void addToHook(
113-
const DecodedVector& decoded,
114-
const RowSet& rows,
115-
ValueHook& hook) {
116-
switch (decoded.base()->typeKind()) {
117-
case TypeKind::BOOLEAN:
118-
addToHookImpl<bool>(decoded, rows, hook);
119-
break;
120-
case TypeKind::TINYINT:
121-
addToHookImpl<int8_t>(decoded, rows, hook);
122-
break;
123-
case TypeKind::SMALLINT:
124-
addToHookImpl<int16_t>(decoded, rows, hook);
125-
break;
126-
case TypeKind::INTEGER:
127-
addToHookImpl<int32_t>(decoded, rows, hook);
128-
break;
129-
case TypeKind::BIGINT:
130-
addToHookImpl<int64_t>(decoded, rows, hook);
131-
break;
132-
case TypeKind::REAL:
133-
addToHookImpl<float>(decoded, rows, hook);
134-
break;
135-
case TypeKind::DOUBLE:
136-
addToHookImpl<double>(decoded, rows, hook);
137-
break;
138-
case TypeKind::VARCHAR:
139-
case TypeKind::VARBINARY:
140-
addToHookImpl<StringView>(decoded, rows, hook);
141-
break;
142-
default:
143-
VELOX_FAIL(
144-
"Unsupported type kind for hook: {}", decoded.base()->typeKind());
145-
}
146-
}
147-
14890
} // namespace
14991

15092
void ColumnLoader::loadInternal(
@@ -198,11 +140,8 @@ void DeltaUpdateColumnLoader::loadInternal(
198140
read(structReader_, fieldReader_, version_, rows, selectedRows, nullptr);
199141
fieldReader_->getValues(effectiveRows, result);
200142
scanSpec->deltaUpdate()->update(effectiveRows, *result);
201-
if (hook) {
202-
DecodedVector decoded(**result);
203-
addToHook(decoded, rows, *hook);
204-
} else if (
205-
rows.back() + 1 < resultSize ||
143+
VELOX_CHECK_NULL(hook);
144+
if (rows.back() + 1 < resultSize ||
206145
rows.size() != structReader_->outputRows().size()) {
207146
scatter(rows, resultSize, result);
208147
}

velox/dwio/common/ColumnLoader.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,10 @@ class ColumnLoader : public VectorLoader {
3030
fieldReader_(fieldReader),
3131
version_(version) {}
3232

33+
bool supportsHook() const override {
34+
return true;
35+
}
36+
3337
private:
3438
void loadInternal(
3539
RowSet rows,

velox/functions/lib/aggregates/MinMaxAggregateBase.cpp

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,8 @@ class SimpleNumericMaxAggregate : public SimpleNumericMinMaxAggregate<T> {
114114
bool mayPushdown) override {
115115
if constexpr (BaseAggregate::template kMayPushdown<T>) {
116116
if (!args[0]->type()->isDecimal()) {
117-
if (mayPushdown && args[0]->isLazy()) {
117+
if (mayPushdown && args[0]->isLazy() &&
118+
args[0]->asChecked<const LazyVector>()->supportsHook()) {
118119
BaseAggregate::template pushdown<
119120
velox::aggregate::MinMaxHook<T, false>>(groups, rows, args[0]);
120121
return;
@@ -215,7 +216,8 @@ class SimpleNumericMinAggregate : public SimpleNumericMinMaxAggregate<T> {
215216
bool mayPushdown) override {
216217
if constexpr (BaseAggregate::template kMayPushdown<T>) {
217218
if (!args[0]->type()->isDecimal()) {
218-
if (mayPushdown && args[0]->isLazy()) {
219+
if (mayPushdown && args[0]->isLazy() &&
220+
args[0]->asChecked<const LazyVector>()->supportsHook()) {
219221
BaseAggregate::template pushdown<
220222
velox::aggregate::MinMaxHook<T, true>>(groups, rows, args[0]);
221223
return;

velox/functions/lib/aggregates/SimpleNumericAggregate.h

Lines changed: 19 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -98,22 +98,24 @@ class SimpleNumericAggregate : public exec::Aggregate {
9898
UpdateSingleValue updateSingleValue,
9999
bool mayPushdown) {
100100
DecodedVector decoded(*arg, rows, !mayPushdown);
101-
auto encoding = decoded.base()->encoding();
102101
if constexpr (kMayPushdown<TData>) {
103-
if (encoding == VectorEncoding::Simple::LAZY &&
102+
if (mayPushdown &&
103+
decoded.base()->encoding() == VectorEncoding::Simple::LAZY &&
104104
!arg->type()->isDecimal()) {
105-
velox::aggregate::SimpleCallableHook<TData, UpdateSingleValue> hook(
106-
exec::Aggregate::offset_,
107-
exec::Aggregate::nullByte_,
108-
exec::Aggregate::nullMask_,
109-
groups,
110-
&this->exec::Aggregate::numNulls_,
111-
updateSingleValue);
112-
113-
auto indices = decoded.indices();
114-
decoded.base()->as<const LazyVector>()->load(
115-
RowSet(indices, arg->size()), &hook);
116-
return;
105+
auto* lazy = decoded.base()->asChecked<const LazyVector>();
106+
if (lazy->supportsHook()) {
107+
velox::aggregate::SimpleCallableHook<TData, UpdateSingleValue> hook(
108+
exec::Aggregate::offset_,
109+
exec::Aggregate::nullByte_,
110+
exec::Aggregate::nullMask_,
111+
groups,
112+
&this->exec::Aggregate::numNulls_,
113+
updateSingleValue);
114+
auto indices = decoded.indices();
115+
lazy->load(RowSet(indices, arg->size()), &hook);
116+
return;
117+
}
118+
decoded.decode(*arg, rows);
117119
}
118120
}
119121

@@ -226,9 +228,9 @@ class SimpleNumericAggregate : public exec::Aggregate {
226228
numIndices = numSelected;
227229
}
228230
}
229-
230-
decoded.base()->as<const LazyVector>()->load(
231-
RowSet(indices, numIndices), &hook);
231+
auto* lazy = decoded.base()->asChecked<const LazyVector>();
232+
VELOX_CHECK(lazy->supportsHook());
233+
lazy->load(RowSet(indices, numIndices), &hook);
232234
}
233235

234236
private:

velox/functions/lib/aggregates/SumAggregateBase.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,8 @@ class SumAggregateBase
124124
bool mayPushdown) {
125125
const auto& arg = args[0];
126126

127-
if (mayPushdown && arg->isLazy()) {
127+
if (mayPushdown && arg->isLazy() &&
128+
arg->asChecked<const LazyVector>()->supportsHook()) {
128129
BaseAggregate::template pushdown<
129130
facebook::velox::aggregate::SumHook<TData, Overflow>>(
130131
groups, rows, arg);

velox/vector/LazyVector.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -254,6 +254,7 @@ void LazyVector::validate(const VectorValidateOptions& options) const {
254254

255255
void LazyVector::load(RowSet rows, ValueHook* hook) const {
256256
VELOX_CHECK(!allLoaded_, "A LazyVector can be loaded at most once");
257+
VELOX_CHECK(!hook || supportsHook());
257258

258259
allLoaded_ = true;
259260
if (rows.empty()) {

velox/vector/LazyVector.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -201,6 +201,10 @@ class VectorLoader {
201201
vector_size_t resultSize,
202202
VectorPtr* result);
203203

204+
virtual bool supportsHook() const {
205+
return false;
206+
}
207+
204208
protected:
205209
virtual void loadInternal(
206210
RowSet rows,
@@ -346,6 +350,10 @@ class LazyVector : public BaseVector {
346350

347351
VectorPtr slice(vector_size_t offset, vector_size_t length) const override;
348352

353+
bool supportsHook() const {
354+
return loader_->supportsHook();
355+
}
356+
349357
// Loads 'rows' of 'vector'. 'vector' may be an arbitrary wrapping
350358
// of a LazyVector. 'rows' are translated through the wrappers. If
351359
// there is no LazyVector inside 'vector', this has no

velox/vector/tests/VectorTest.cpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,10 @@ class TestingLoader : public VectorLoader {
6565
return rowCounter_;
6666
}
6767

68+
bool supportsHook() const override {
69+
return true;
70+
}
71+
6872
private:
6973
template <TypeKind Kind>
7074
void applyHook(RowSet rows, ValueHook* hook) {
@@ -2053,6 +2057,7 @@ class TestingHook : public ValueHook {
20532057
}
20542058
}
20552059
}
2060+
20562061
int32_t errors() const {
20572062
return errors_;
20582063
}

0 commit comments

Comments
 (0)