Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion ydb/core/tablet_flat/flat_boot_bundle.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ namespace NBoot {
LeftReads -= 1;

if (msg.Status == NKikimrProto::OK) {
Y_ENSURE(msg.Cookie == 0);
Loader->Save(std::move(msg.Pages));

TryFinalize();
Expand Down
8 changes: 6 additions & 2 deletions ydb/core/tablet_flat/flat_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -857,7 +857,7 @@ void TExecutor::Boot(TEvTablet::TEvBoot::TPtr &ev, const TActorContext &ctx) {

auto executorCaches = CleanupState();

BootLogic.Reset(new TExecutorBootLogic(this, SelfId(), Owner->Info(), maxBootBytesInFly));
BootLogic.Reset(new TExecutorBootLogic(this, SelfId(), ++BootAttempt, Owner->Info(), maxBootBytesInFly));

ProcessIoStats(
NBlockIO::EDir::Read, NBlockIO::EPriority::Fast,
Expand Down Expand Up @@ -898,7 +898,7 @@ void TExecutor::FollowerBoot(TEvTablet::TEvFBoot::TPtr &ev, const TActorContext

auto executorCaches = CleanupState();

BootLogic.Reset(new TExecutorBootLogic(this, SelfId(), Owner->Info(), maxBootBytesInFly));
BootLogic.Reset(new TExecutorBootLogic(this, SelfId(), ++BootAttempt, Owner->Info(), maxBootBytesInFly));

ProcessIoStats(
NBlockIO::EDir::Read, NBlockIO::EPriority::Fast,
Expand Down Expand Up @@ -3102,6 +3102,10 @@ void TExecutor::Handle(NSharedCache::TEvResult::TPtr &ev) {
}
return;

case ESharedCacheRequestType::BootLogic:
// ignore outdated replies
return;

default:
Y_DEBUG_ABORT_S("Unexpected request " << ev->Cookie);
break;
Expand Down
1 change: 1 addition & 0 deletions ydb/core/tablet_flat/flat_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -447,6 +447,7 @@ class TExecutor

TWaitingSnaps WaitingSnapshots;

ui64 BootAttempt = 0;
THolder<TExecutorBootLogic> BootLogic;
THolder<TPrivatePageCache> PrivatePageCache;
THolder<TExecutorCounters> Counters;
Expand Down
9 changes: 7 additions & 2 deletions ydb/core/tablet_flat/flat_executor_bootlogic.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,10 @@ NBoot::TLoadBlobs::TLoadBlobs(IStep *owner, NPageCollection::TLargeGlobId largeG
Logic->LoadEntry(this);
}

TExecutorBootLogic::TExecutorBootLogic(IOps *ops, const TActorId &self, TTabletStorageInfo *info, ui64 maxBytesInFly)
TExecutorBootLogic::TExecutorBootLogic(IOps *ops, const TActorId &self, ui64 bootAttempt, TTabletStorageInfo *info, ui64 maxBytesInFly)
: Ops(ops)
, SelfId(self)
, BootAttempt(bootAttempt)
, Info(info)
, GroupResolveCachedChannel(Max<ui32>())
, GroupResolveCachedGeneration(Max<ui32>())
Expand Down Expand Up @@ -186,7 +187,8 @@ NBoot::TSpawned TExecutorBootLogic::LoadPages(NBoot::IStep *step, NTable::TLoade
new NSharedCache::TEvRequest(
NBlockIO::EPriority::Fast,
std::move(fetch.PageCollection),
std::move(fetch.Pages)),
std::move(fetch.Pages),
BootAttempt),
0, (ui64)ESharedCacheRequestType::BootLogic);

return NBoot::TSpawned(true);
Expand Down Expand Up @@ -272,6 +274,9 @@ TExecutorBootLogic::EOpResult TExecutorBootLogic::Receive(::NActors::IEventHandl
if (ESharedCacheRequestType(ev.Cookie) != ESharedCacheRequestType::BootLogic)
return OpResultUnhandled;

if (msg->Cookie != BootAttempt)
return OpResultUnhandled;

auto it = Loads.find(msg->PageCollection.Get());
if (it == Loads.end()) // could receive outdated results
return OpResultUnhandled;
Expand Down
3 changes: 2 additions & 1 deletion ydb/core/tablet_flat/flat_executor_bootlogic.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ class TExecutorBootLogic
TAutoPtr<NBoot::TRoot> Steps;
TActorId LeaseWaiter;

const ui64 BootAttempt;
TMonotonic BootTimestamp;

const TIntrusiveConstPtr<TTabletStorageInfo> Info;
Expand Down Expand Up @@ -106,7 +107,7 @@ class TExecutorBootLogic
inline NBoot::TBack& State() const noexcept { return *State_; }

public:
TExecutorBootLogic(IOps*, const TActorId&, TTabletStorageInfo *info, ui64 maxBytesInFly);
TExecutorBootLogic(IOps*, const TActorId&, ui64 bootAttempt, TTabletStorageInfo *info, ui64 maxBytesInFly);
~TExecutorBootLogic();

void Describe(IOutputStream&) const;
Expand Down
65 changes: 65 additions & 0 deletions ydb/core/tablet_flat/flat_executor_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3811,6 +3811,71 @@ Y_UNIT_TEST_SUITE(TFlatTableExecutor_Follower) {
UNIT_ASSERT_C(!blockedSnapshot.empty(), "expected tablet to make a log snapshot after part switch");
}

Y_UNIT_TEST(FollowerPromoteToLeaderWhileLoadingPages) {
TMyEnvBase env;
TRowsModel rows;

env->SetLogPriority(NKikimrServices::TABLET_EXECUTOR, NActors::NLog::PRI_DEBUG);

// Start the source tablet
env.FireTablet(env.Edge, env.Tablet, [&env](const TActorId &tablet, TTabletStorageInfo *info) {
return new TTestFlatTablet(env.Edge, tablet, info);
});
env.WaitForWakeUp();

Cerr << "... initializing schema" << Endl;
env.SendSync(rows.MakeScheme(new TCompactionPolicy()));

Cerr << "... inserting rows" << Endl;
env.SendSync(rows.MakeRows(10, 0, 10));

Cerr << "...compacting table" << Endl;
env.SendSync(new NFake::TEvCompact(TRowsModel::TableId));
env.WaitFor<NFake::TEvCompacted>();

Cerr << "... starting follower" << Endl;
TActorId followerSysActor;
auto followerBootObserver = env->AddObserver<TEvTablet::TEvFBoot>([&](auto& ev) {
followerSysActor = ev->Sender;
Cerr << "... observed TEvTablet::TEvFBoot" << Endl;
});
TBlockEvents<NSharedCache::TEvRequest> blockedFollowerRequests(env.Env);
env.FireFollower(env.Edge, env.Tablet, [&env](const TActorId &tablet, TTabletStorageInfo *info) {
return new TTestFlatTablet(env.Edge, tablet, info);
}, /* followerId */ 1);
env->WaitFor("follower shared cache requests", [&]{ return blockedFollowerRequests.size() > 0; });

Cerr << "... stopping leader" << Endl;
env.SendSync(new TEvents::TEvPoison, false, true);
env.WaitForGone();

blockedFollowerRequests.Stop();
TBlockEvents<NSharedCache::TEvRequest> blockedLeaderRequests(env.Env);

TActorId leaderSysActor;
auto leaderBootObserver = env->AddObserver<TEvTablet::TEvBoot>([&](auto& ev) {
leaderSysActor = ev->Sender;
Cerr << "... observed TEvTablet::TEvBoot" << Endl;
});
Cerr << "... promoting follower" << Endl;
{
NFake::TStarter starter;
auto* info = starter.MakeTabletInfo(env.Tablet, env.StorageGroupCount);
auto* promote = new TEvTablet::TEvPromoteToLeader(0, info);
env->Send(new IEventHandle(followerSysActor, followerSysActor, promote), 0, /* viaActorSystem */ true);
}
env->WaitFor("promoted shared cache requests", [&]{ return blockedLeaderRequests.size() > 0; });

Cerr << "... unblocking promoted requests" << Endl;
blockedLeaderRequests.Stop().Unblock();
env->SimulateSleep(TDuration::MilliSeconds(10));

// Simulate reordering replies by reordering requests
Cerr << "... unblocking earlier requests" << Endl;
blockedFollowerRequests.Unblock();
env->SimulateSleep(TDuration::MilliSeconds(10));
}

}

Y_UNIT_TEST_SUITE(TFlatTableExecutor_RejectProbability) {
Expand Down
Loading