Skip to content

Commit 8693abd

Browse files
xiaoxmengfacebook-github-bot
authored andcommitted
fix: Fix the server crash caused by remote exchange error (facebookincubator#13905)
Summary: Pull Request resolved: facebookincubator#13905 The remote exchange might run into error so merge operator check if a remote merge source is blocked or not, it might trigger merge source to fetch data from the exchange client which in turn try to dequeue from the exchange source. The dequeue operation might throw if there is already an error. The recent merge spill support moves the merge source is blocked check inside SCOPE_EXIT which converts an throw to a server crash as we can't throw inside a dtor. This changes move the merge source is blocked check out of SCOPE_EXIT and add a unit test to reproduce and verify Reviewed By: tanjialiang Differential Revision: D77423282 fbshipit-source-id: b9fd6a62749a8e344b935a94ed46b74016fd4eb2
1 parent e92705e commit 8693abd

File tree

3 files changed

+60
-6
lines changed

3 files changed

+60
-6
lines changed

velox/exec/ExchangeQueue.cpp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,10 @@
1616
#include "velox/exec/ExchangeQueue.h"
1717
#include <algorithm>
1818

19+
#include "velox/common/testutil/TestValue.h"
20+
21+
using facebook::velox::common::testutil::TestValue;
22+
1923
namespace facebook::velox::exec {
2024

2125
SerializedPage::SerializedPage(
@@ -136,6 +140,8 @@ std::vector<std::unique_ptr<SerializedPage>> ExchangeQueue::dequeueLocked(
136140
ContinueFuture* future,
137141
ContinuePromise* stalePromise) {
138142
VELOX_CHECK_NOT_NULL(future);
143+
TestValue::adjust(
144+
"facebook::velox::exec::ExchangeQueue::dequeueLocked", this);
139145
if (!error_.empty()) {
140146
*atEnd = true;
141147
VELOX_FAIL(error_);

velox/exec/Merge.cpp

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,10 @@ BlockingReason Merge::isBlocked(ContinueFuture* future) {
9191

9292
maybeStartNextMergeSourceGroup();
9393

94+
if (sourceMerger_ != nullptr) {
95+
sourceMerger_->isBlocked(sourceBlockingFutures_);
96+
}
97+
9498
if (sourceBlockingFutures_.empty()) {
9599
return BlockingReason::kNotBlocked;
96100
}
@@ -176,12 +180,6 @@ void Merge::setupSpillMerger() {
176180
}
177181

178182
void Merge::maybeStartNextMergeSourceGroup() {
179-
SCOPE_EXIT {
180-
if (sourceMerger_ != nullptr) {
181-
sourceMerger_->isBlocked(sourceBlockingFutures_);
182-
}
183-
};
184-
185183
if (sourceMerger_ != nullptr || numStartedSources_ >= sources_.size()) {
186184
return;
187185
}

velox/exec/tests/MultiFragmentTest.cpp

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1609,6 +1609,56 @@ TEST_P(MultiFragmentTest, mergeExchangeOverEmptySources) {
16091609
}
16101610
}
16111611

1612+
DEBUG_ONLY_TEST_P(MultiFragmentTest, mergeExchangeFailureOnStart) {
1613+
std::vector<std::shared_ptr<Task>> tasks;
1614+
std::vector<std::string> leafTaskIds;
1615+
1616+
const auto injectErrorMsg{"injectError"};
1617+
SCOPED_TESTVALUE_SET(
1618+
"facebook::velox::exec::ExchangeQueue::dequeueLocked",
1619+
std::function<void(ExchangeQueue*)>(
1620+
([&](ExchangeQueue* /*unused*/) { VELOX_FAIL(injectErrorMsg); })));
1621+
1622+
auto data = makeRowVector(rowType_, 0);
1623+
1624+
for (int i = 0; i < 2; ++i) {
1625+
auto taskId = makeTaskId("leaf-", i);
1626+
leafTaskIds.push_back(taskId);
1627+
auto plan =
1628+
PlanBuilder()
1629+
.values({data})
1630+
.partitionedOutput({}, 1, /*outputLayout=*/{}, GetParam().serdeKind)
1631+
.planNode();
1632+
1633+
auto task = makeTask(taskId, plan, tasks.size());
1634+
tasks.push_back(task);
1635+
task->start(4);
1636+
}
1637+
1638+
auto exchangeTaskId = makeTaskId("exchange-", 0);
1639+
auto plan = PlanBuilder()
1640+
.mergeExchange(rowType_, {"c0"}, GetParam().serdeKind)
1641+
.singleAggregation({"c0"}, {"count(1)"})
1642+
.planNode();
1643+
1644+
std::vector<Split> leafTaskSplits;
1645+
for (auto leafTaskId : leafTaskIds) {
1646+
leafTaskSplits.emplace_back(remoteSplit(leafTaskId));
1647+
}
1648+
VELOX_ASSERT_THROW(
1649+
test::AssertQueryBuilder(plan, duckDbQueryRunner_)
1650+
.splits(std::move(leafTaskSplits))
1651+
.config(
1652+
core::QueryConfig::kShuffleCompressionKind,
1653+
common::compressionKindToString(GetParam().compressionKind))
1654+
.assertResults(""),
1655+
injectErrorMsg);
1656+
1657+
for (auto& task : tasks) {
1658+
ASSERT_TRUE(waitForTaskCompletion(task.get())) << task->taskId();
1659+
}
1660+
}
1661+
16121662
namespace {
16131663
core::PlanNodePtr makeJoinOverExchangePlan(
16141664
const RowTypePtr& exchangeType,

0 commit comments

Comments
 (0)