Skip to content

Commit 485e08a

Browse files
authored
fix(transaction): DCHECK fail in non-atomic transactions (#5217)
Also fixes occasional DCHECK fail with ACL message being sent into a closing connection. (There is not test as its hard to reproduce with pytests). The TX bug was that we did not reset references into kv_fp_ array when switching commands within non-atomic transactions. This resulted in dcheck fails. Fixes #5212. The fix: 1. added test that reproduces the issue 2. Reset the fp_start and fp_count fields in each shard data. Signed-off-by: Roman Gershman <[email protected]>
1 parent e92e843 commit 485e08a

File tree

7 files changed

+38
-7
lines changed

7 files changed

+38
-7
lines changed

.clang-tidy

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,13 @@ Checks: >
88
-bugprone-easily-swappable-parameters,
99
-bugprone-branch-clone,
1010
-bugprone-implicit-widening-of-multiplication-result,
11+
-bugprone-too-small-loop-variable,
12+
-bugprone-reserved-identifier,
1113
boost-use-to-string,
1214
performance*,
13-
cert*,
1415
-cert-err58-cpp,
1516
-cert-dcl58-cpp, # Ignore std changes
17+
-cert-dcl51-cpp, # bugprone-reserved-identifier
1618
# Doesn't work with abseil flags
1719
clang-analyzer*,
1820
google-*,

src/facade/dragonfly_connection.cc

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1809,8 +1809,9 @@ void Connection::SendAsync(MessageHandle msg) {
18091809
DCHECK_EQ(ProactorBase::me(), socket_->proactor());
18101810

18111811
// "Closing" connections might be still processing commands, as we don't interrupt them.
1812-
// So we still want to deliver control messages to them (like checkpoints).
1813-
if (cc_->conn_closing && !msg.IsControl())
1812+
// So we still want to deliver control messages to them (like checkpoints) if
1813+
// async_fb_ is running (joinable).
1814+
if (cc_->conn_closing && (!msg.IsControl() || !async_fb_.IsJoinable()))
18141815
return;
18151816

18161817
// If we launch while closing, it won't be awaited. Control messages will be processed on cleanup.

src/server/multi_test.cc

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -998,6 +998,13 @@ TEST_F(MultiTest, NoKeyTransactional) {
998998
Run({"exec"});
999999
}
10001000

1001+
TEST_F(MultiTest, NoKeyTransactionalMany) {
1002+
vector<vector<string>> cmds;
1003+
cmds.push_back({"rename", "x", "z"});
1004+
cmds.push_back({"ft._list"});
1005+
RunMany(cmds);
1006+
}
1007+
10011008
class MultiEvalTest : public BaseFamilyTest {
10021009
protected:
10031010
MultiEvalTest() : BaseFamilyTest() {

src/server/test_utils.cc

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -467,6 +467,25 @@ RespExpr BaseFamilyTest::Run(std::string_view id, ArgSlice slice) {
467467
return e;
468468
}
469469

470+
void BaseFamilyTest::RunMany(const std::vector<std::vector<std::string>>& cmds) {
471+
if (!ProactorBase::IsProactorThread()) {
472+
return pp_->at(0)->Await([&] { return this->RunMany(cmds); });
473+
}
474+
TestConnWrapper* conn_wrapper = AddFindConn(Protocol::REDIS, GetId());
475+
auto* context = conn_wrapper->cmd_cntx();
476+
context->ns = &namespaces->GetDefaultNamespace();
477+
vector<ArgSlice> args_vec(cmds.size());
478+
vector<vector<string_view>> cmd_views(cmds.size());
479+
for (size_t i = 0; i < cmds.size(); ++i) {
480+
for (const auto& arg : cmds[i]) {
481+
cmd_views[i].emplace_back(arg);
482+
}
483+
args_vec[i] = absl::MakeSpan(cmd_views[i]);
484+
}
485+
service_->DispatchManyCommands(absl::MakeSpan(args_vec), conn_wrapper->builder(), context);
486+
DCHECK(context->transaction == nullptr);
487+
}
488+
470489
auto BaseFamilyTest::RunMC(MP::CmdType cmd_type, string_view key, string_view value, uint32_t flags,
471490
chrono::seconds ttl) -> MCResponse {
472491
if (!ProactorBase::IsProactorThread()) {

src/server/test_utils.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@ class BaseFamilyTest : public ::testing::Test {
8585
RespExpr Run(absl::Span<std::string> list);
8686

8787
RespExpr Run(std::string_view id, ArgSlice list);
88+
void RunMany(const std::vector<std::vector<std::string>>& cmds);
8889

8990
using MCResponse = std::vector<std::string>;
9091
MCResponse RunMC(MemcacheParser::CmdType cmd_type, std::string_view key, std::string_view value,

src/server/transaction.cc

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -512,6 +512,7 @@ void Transaction::MultiSwitchCmd(const CommandId* cid) {
512512

513513
for (auto& sd : shard_data_) {
514514
sd.slice_count = sd.slice_start = 0;
515+
sd.fp_start = sd.fp_count = 0; // Reset fingerprints span as kv_fp_ was cleared above.
515516

516517
if (multi_->mode == NON_ATOMIC) {
517518
sd.local_mask = 0; // Non atomic transactions schedule each time, so remove all flags
@@ -1325,7 +1326,7 @@ OpStatus Transaction::WaitOnWatch(const time_point& tp, WaitKeys wkeys, KeyReady
13251326

13261327
// If we don't follow up with an "action" hop, we must clean up manually on all shards.
13271328
if (result != OpStatus::OK)
1328-
ExpireBlocking(std::move(wkeys));
1329+
ExpireBlocking(wkeys);
13291330

13301331
return result;
13311332
}
@@ -1515,7 +1516,7 @@ void Transaction::ReviveAutoJournal() {
15151516
re_enabled_auto_journal_ = true;
15161517
}
15171518

1518-
void Transaction::CancelBlocking(std::function<OpStatus(ArgSlice)> status_cb) {
1519+
void Transaction::CancelBlocking(const std::function<OpStatus(ArgSlice)>& status_cb) {
15191520
// We're on the owning thread of this transaction, so we can safely access it's data below.
15201521
// First, check if it makes sense to proceed.
15211522
if (blocking_barrier_.IsClaimed() || cid_ == nullptr || (cid_->opt_mask() & CO::BLOCKING) == 0)
@@ -1610,7 +1611,7 @@ OpResult<KeyIndex> DetermineKeys(const CommandId* cid, CmdArgList args) {
16101611

16111612
if (cid->first_key_pos() > 0) {
16121613
start = cid->first_key_pos() - 1;
1613-
int last = cid->last_key_pos();
1614+
int8_t last = cid->last_key_pos();
16141615

16151616
if (num_custom_keys >= 0) {
16161617
end = start + num_custom_keys;

src/server/transaction.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -225,7 +225,7 @@ class Transaction {
225225

226226
// Cancel all blocking watches. Set COORD_CANCELLED.
227227
// Must be called from coordinator thread.
228-
void CancelBlocking(std::function<OpStatus(ArgSlice)>);
228+
void CancelBlocking(const std::function<OpStatus(ArgSlice)>&);
229229

230230
// Prepare a squashed hop on given shards.
231231
// Only compatible with multi modes that acquire all locks ahead - global and lock_ahead.

0 commit comments

Comments
 (0)