Skip to content

Commit 411e97a

Browse files
committed
Ignore shared cache replies from earlier boot attempts
1 parent 3756d66 commit 411e97a

File tree

6 files changed

+81
-6
lines changed

6 files changed

+81
-6
lines changed

ydb/core/tablet_flat/flat_boot_bundle.h

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,6 @@ namespace NBoot {
5858
LeftReads -= 1;
5959

6060
if (msg.Status == NKikimrProto::OK) {
61-
Y_ENSURE(msg.Cookie == 0);
6261
Loader->Save(std::move(msg.Pages));
6362

6463
TryFinalize();

ydb/core/tablet_flat/flat_executor.cpp

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -857,7 +857,7 @@ void TExecutor::Boot(TEvTablet::TEvBoot::TPtr &ev, const TActorContext &ctx) {
857857

858858
auto executorCaches = CleanupState();
859859

860-
BootLogic.Reset(new TExecutorBootLogic(this, SelfId(), Owner->Info(), maxBootBytesInFly));
860+
BootLogic.Reset(new TExecutorBootLogic(this, SelfId(), ++BootAttempt, Owner->Info(), maxBootBytesInFly));
861861

862862
ProcessIoStats(
863863
NBlockIO::EDir::Read, NBlockIO::EPriority::Fast,
@@ -898,7 +898,7 @@ void TExecutor::FollowerBoot(TEvTablet::TEvFBoot::TPtr &ev, const TActorContext
898898

899899
auto executorCaches = CleanupState();
900900

901-
BootLogic.Reset(new TExecutorBootLogic(this, SelfId(), Owner->Info(), maxBootBytesInFly));
901+
BootLogic.Reset(new TExecutorBootLogic(this, SelfId(), ++BootAttempt, Owner->Info(), maxBootBytesInFly));
902902

903903
ProcessIoStats(
904904
NBlockIO::EDir::Read, NBlockIO::EPriority::Fast,
@@ -3102,6 +3102,10 @@ void TExecutor::Handle(NSharedCache::TEvResult::TPtr &ev) {
31023102
}
31033103
return;
31043104

3105+
case ESharedCacheRequestType::BootLogic:
3106+
// ignore outdated replies
3107+
return;
3108+
31053109
default:
31063110
Y_DEBUG_ABORT_S("Unexpected request " << ev->Cookie);
31073111
break;

ydb/core/tablet_flat/flat_executor.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -447,6 +447,7 @@ class TExecutor
447447

448448
TWaitingSnaps WaitingSnapshots;
449449

450+
ui64 BootAttempt = 0;
450451
THolder<TExecutorBootLogic> BootLogic;
451452
THolder<TPrivatePageCache> PrivatePageCache;
452453
THolder<TExecutorCounters> Counters;

ydb/core/tablet_flat/flat_executor_bootlogic.cpp

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,10 @@ NBoot::TLoadBlobs::TLoadBlobs(IStep *owner, NPageCollection::TLargeGlobId largeG
3030
Logic->LoadEntry(this);
3131
}
3232

33-
TExecutorBootLogic::TExecutorBootLogic(IOps *ops, const TActorId &self, TTabletStorageInfo *info, ui64 maxBytesInFly)
33+
TExecutorBootLogic::TExecutorBootLogic(IOps *ops, const TActorId &self, ui64 bootAttempt, TTabletStorageInfo *info, ui64 maxBytesInFly)
3434
: Ops(ops)
3535
, SelfId(self)
36+
, BootAttempt(bootAttempt)
3637
, Info(info)
3738
, GroupResolveCachedChannel(Max<ui32>())
3839
, GroupResolveCachedGeneration(Max<ui32>())
@@ -186,7 +187,8 @@ NBoot::TSpawned TExecutorBootLogic::LoadPages(NBoot::IStep *step, NTable::TLoade
186187
new NSharedCache::TEvRequest(
187188
NBlockIO::EPriority::Fast,
188189
std::move(fetch.PageCollection),
189-
std::move(fetch.Pages)),
190+
std::move(fetch.Pages),
191+
BootAttempt),
190192
0, (ui64)ESharedCacheRequestType::BootLogic);
191193

192194
return NBoot::TSpawned(true);
@@ -272,6 +274,9 @@ TExecutorBootLogic::EOpResult TExecutorBootLogic::Receive(::NActors::IEventHandl
272274
if (ESharedCacheRequestType(ev.Cookie) != ESharedCacheRequestType::BootLogic)
273275
return OpResultUnhandled;
274276

277+
if (msg->Cookie != BootAttempt)
278+
return OpResultUnhandled;
279+
275280
auto it = Loads.find(msg->PageCollection.Get());
276281
if (it == Loads.end()) // could receive outdated results
277282
return OpResultUnhandled;

ydb/core/tablet_flat/flat_executor_bootlogic.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ class TExecutorBootLogic
7878
TAutoPtr<NBoot::TRoot> Steps;
7979
TActorId LeaseWaiter;
8080

81+
const ui64 BootAttempt;
8182
TMonotonic BootTimestamp;
8283

8384
const TIntrusiveConstPtr<TTabletStorageInfo> Info;
@@ -106,7 +107,7 @@ class TExecutorBootLogic
106107
inline NBoot::TBack& State() const noexcept { return *State_; }
107108

108109
public:
109-
TExecutorBootLogic(IOps*, const TActorId&, TTabletStorageInfo *info, ui64 maxBytesInFly);
110+
TExecutorBootLogic(IOps*, const TActorId&, ui64 bootAttempt, TTabletStorageInfo *info, ui64 maxBytesInFly);
110111
~TExecutorBootLogic();
111112

112113
void Describe(IOutputStream&) const;

ydb/core/tablet_flat/flat_executor_ut.cpp

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3811,6 +3811,71 @@ Y_UNIT_TEST_SUITE(TFlatTableExecutor_Follower) {
38113811
UNIT_ASSERT_C(!blockedSnapshot.empty(), "expected tablet to make a log snapshot after part switch");
38123812
}
38133813

3814+
Y_UNIT_TEST(FollowerPromoteToLeaderWhileLoadingPages) {
3815+
TMyEnvBase env;
3816+
TRowsModel rows;
3817+
3818+
env->SetLogPriority(NKikimrServices::TABLET_EXECUTOR, NActors::NLog::PRI_DEBUG);
3819+
3820+
// Start the source tablet
3821+
env.FireTablet(env.Edge, env.Tablet, [&env](const TActorId &tablet, TTabletStorageInfo *info) {
3822+
return new TTestFlatTablet(env.Edge, tablet, info);
3823+
});
3824+
env.WaitForWakeUp();
3825+
3826+
Cerr << "... initializing schema" << Endl;
3827+
env.SendSync(rows.MakeScheme(new TCompactionPolicy()));
3828+
3829+
Cerr << "... inserting rows" << Endl;
3830+
env.SendSync(rows.MakeRows(10, 0, 10));
3831+
3832+
Cerr << "...compacting table" << Endl;
3833+
env.SendSync(new NFake::TEvCompact(TRowsModel::TableId));
3834+
env.WaitFor<NFake::TEvCompacted>();
3835+
3836+
Cerr << "... starting follower" << Endl;
3837+
TActorId followerSysActor;
3838+
auto followerBootObserver = env->AddObserver<TEvTablet::TEvFBoot>([&](auto& ev) {
3839+
followerSysActor = ev->Sender;
3840+
Cerr << "... observed TEvTablet::TEvFBoot" << Endl;
3841+
});
3842+
TBlockEvents<NSharedCache::TEvRequest> blockedFollowerRequests(env.Env);
3843+
env.FireFollower(env.Edge, env.Tablet, [&env](const TActorId &tablet, TTabletStorageInfo *info) {
3844+
return new TTestFlatTablet(env.Edge, tablet, info);
3845+
}, /* followerId */ 1);
3846+
env->WaitFor("follower shared cache requests", [&]{ return blockedFollowerRequests.size() > 0; });
3847+
3848+
Cerr << "... stopping leader" << Endl;
3849+
env.SendSync(new TEvents::TEvPoison, false, true);
3850+
env.WaitForGone();
3851+
3852+
blockedFollowerRequests.Stop();
3853+
TBlockEvents<NSharedCache::TEvRequest> blockedLeaderRequests(env.Env);
3854+
3855+
TActorId leaderSysActor;
3856+
auto leaderBootObserver = env->AddObserver<TEvTablet::TEvBoot>([&](auto& ev) {
3857+
leaderSysActor = ev->Sender;
3858+
Cerr << "... observed TEvTablet::TEvBoot" << Endl;
3859+
});
3860+
Cerr << "... promoting follower" << Endl;
3861+
{
3862+
NFake::TStarter starter;
3863+
auto* info = starter.MakeTabletInfo(env.Tablet, env.StorageGroupCount);
3864+
auto* promote = new TEvTablet::TEvPromoteToLeader(0, info);
3865+
env->Send(new IEventHandle(followerSysActor, followerSysActor, promote), 0, /* viaActorSystem */ true);
3866+
}
3867+
env->WaitFor("promoted shared cache requests", [&]{ return blockedLeaderRequests.size() > 0; });
3868+
3869+
Cerr << "... unblocking promoted requests" << Endl;
3870+
blockedLeaderRequests.Stop().Unblock();
3871+
env->SimulateSleep(TDuration::MilliSeconds(10));
3872+
3873+
// Simulate reordering replies by reordering requests
3874+
Cerr << "... unblocking earlier requests" << Endl;
3875+
blockedFollowerRequests.Unblock();
3876+
env->SimulateSleep(TDuration::MilliSeconds(10));
3877+
}
3878+
38143879
}
38153880

38163881
Y_UNIT_TEST_SUITE(TFlatTableExecutor_RejectProbability) {

0 commit comments

Comments
 (0)