From 7f3a6685924abce7443876d914f48e615a78e0e5 Mon Sep 17 00:00:00 2001 From: Alexander Rutkovsky Date: Tue, 26 Aug 2025 09:02:57 +0000 Subject: [PATCH 1/2] Protect BridgeProxy request against races with configuration update --- ydb/core/blobstorage/bridge/proxy/bridge_proxy.cpp | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/ydb/core/blobstorage/bridge/proxy/bridge_proxy.cpp b/ydb/core/blobstorage/bridge/proxy/bridge_proxy.cpp index 6e7f9e48c4b3..89b6e69cdb15 100644 --- a/ydb/core/blobstorage/bridge/proxy/bridge_proxy.cpp +++ b/ydb/core/blobstorage/bridge/proxy/bridge_proxy.cpp @@ -673,10 +673,12 @@ namespace NKikimr { ev->GetTypeRewrite(), std::move(ev->TraceId)); STLOG(PRI_DEBUG, BS_PROXY_BRIDGE, BPB00, "new request", (RequestId, request->RequestId), - (GroupId, GroupId), (Request, originalRequest.ToString())); + (GroupId, GroupId), (GroupGeneration, request->Info->GroupGeneration), + (BridgeGroupState, request->Info->Group->GetBridgeGroupState()), + (Request, originalRequest.ToString())); - Y_ABORT_UNLESS(Info->Group); - const auto& state = Info->Group->GetBridgeGroupState(); + Y_ABORT_UNLESS(request->Info->Group); + const auto& state = request->Info->Group->GetBridgeGroupState(); for (size_t i = 0; i < state.PileSize(); ++i) { const auto bridgePileId = TBridgePileId::FromPileIndex(i); const TBridgeInfo::TPile *pile = BridgeInfo->GetPile(bridgePileId); @@ -696,7 +698,7 @@ namespace NKikimr { void SendQuery(std::shared_ptr request, TBridgePileId bridgePileId, std::unique_ptr ev, TRequestPayload&& payload = {}) { - const auto& state = Info->Group->GetBridgeGroupState(); + const auto& state = request->Info->Group->GetBridgeGroupState(); const auto& groupPileInfo = state.GetPile(bridgePileId.GetPileIndex()); const auto groupId = TGroupId::FromProto(&groupPileInfo, &NKikimrBridge::TGroupState::TPile::GetGroupId); From 16d2515aac3bf0bca2b30c9ca839dd7501a036a3 Mon Sep 17 00:00:00 2001 From: Alexander Rutkovsky Date: Wed, 27 Aug 2025 14:47:40 +0000 Subject: [PATCH 2/2] Refactor assimilation reverse iterating in VDisk --- .../ut_blobstorage/assimilation.cpp | 130 +++++++++++++++++ .../vdisk/hulldb/base/hullds_heap_it.h | 4 + .../vdisk/hulldb/fresh/fresh_appendix.h | 20 ++- .../vdisk/hulldb/fresh/fresh_segment_impl.h | 137 ++++++------------ .../vdisk/hulldb/generic/hullds_idxsnap_it.h | 5 + .../blobstorage/vdisk/query/assimilation.cpp | 19 +-- 6 files changed, 207 insertions(+), 108 deletions(-) diff --git a/ydb/core/blobstorage/ut_blobstorage/assimilation.cpp b/ydb/core/blobstorage/ut_blobstorage/assimilation.cpp index 26d8967852db..9c61033e0001 100644 --- a/ydb/core/blobstorage/ut_blobstorage/assimilation.cpp +++ b/ydb/core/blobstorage/ut_blobstorage/assimilation.cpp @@ -1,4 +1,6 @@ #include +#include +#include void RunAssimilationTest(bool reverse) { TEnvironmentSetup env{{ @@ -206,6 +208,128 @@ void RunAssimilationTest(bool reverse) { UNIT_ASSERT(aBlobs.empty()); } +void RunVDiskIterationTest(bool reverse) { + TEnvironmentSetup env{{ + .NodeCount = 8, + .Erasure = TBlobStorageGroupType::Erasure4Plus2Block, + }}; + auto& runtime = env.Runtime; + runtime->SetLogPriority(NKikimrServices::BS_PROXY_ASSIMILATE, NLog::PRI_DEBUG); + + env.CreateBoxAndPool(1, 1); + env.Sim(TDuration::Seconds(30)); + auto groups = env.GetGroups(); + UNIT_ASSERT_VALUES_EQUAL(groups.size(), 1); + const TIntrusivePtr info = env.GetGroupInfo(groups.front()); + + auto edge = runtime->AllocateEdgeActor(1, __FILE__, __LINE__); + const TVDiskID vdiskId = info->GetVDiskId(0); + auto putQueueId = env.CreateQueueActor(vdiskId, NKikimrBlobStorage::PutTabletLog, 0, 1); + auto getQueueId = env.CreateQueueActor(vdiskId, NKikimrBlobStorage::GetFastRead, 0, 1); + + THashMap blobs; + ui64 tabletId = 1; + ui32 gen = 1; + ui32 step = 1; + + TString data = FastGenDataForLZ4(32); + + for (size_t iter = 0; iter < 10000; ++iter) { + ui32 action = RandomNumber(100u); + if (action < 95) { + ui32 partId = 1 + RandomNumber(6u); + TLogoBlobID id; + if (RandomNumber(4u) || blobs.empty()) { + for (;;) { + id = TLogoBlobID(tabletId, gen, step++, 0, 128, 0); + if (info->GetIdxInSubgroup(vdiskId, id.Hash()) >= 6) { + // this VDisk is a handoff for blob + break; + } + } + } else { + std::vector ids; + std::ranges::copy(blobs | std::views::keys, std::back_inserter(ids)); + id = ids[RandomNumber(ids.size())]; + } + Cerr << "putting " << id << " partId# " << partId << Endl; + runtime->Send(new IEventHandle(putQueueId, edge, new TEvBlobStorage::TEvVPut(TLogoBlobID(id, partId), + TRope(data), vdiskId, false, nullptr, TInstant::Max(), NKikimrBlobStorage::TabletLog)), edge.NodeId()); + auto res = env.WaitForEdgeActorEvent(edge, false); + UNIT_ASSERT_VALUES_EQUAL(res->Get()->Record.GetStatus(), NKikimrProto::OK); + blobs[id] |= TIngress::CreateIngressWithLocal(&info->GetTopology(), vdiskId, TLogoBlobID(id, partId))->Raw(); + } else { + Cerr << "compacting" << Endl; + const TActorId actorId = info->GetDynamicInfo().ServiceIdForOrderNumber.front(); + auto ev = std::make_unique(actorId, edge, TEvCompactVDisk::Create(EHullDbType::LogoBlobs)); + ev->Rewrite(TEvBlobStorage::EvForwardToSkeleton, actorId); + env.Runtime->Send(ev.release(), edge.NodeId()); + env.WaitForEdgeActorEvent(edge, false); + } + + std::optional from; + THashMap m = blobs; + + for (;;) { + Cerr << "starting assimilation from# " << (from ? from->ToString() : "") << Endl; + runtime->Send(new IEventHandle(getQueueId, edge, new TEvBlobStorage::TEvVAssimilate(vdiskId, std::nullopt, + std::nullopt, from, true, reverse)), edge.NodeId()); + auto res = env.WaitForEdgeActorEvent(edge, false); + if (!res->Get()->Record.BlobsSize()) { + break; + } + + ui64 raw[3] = {0, 0, 0}; + size_t n = res->Get()->Record.BlobsSize(); + for (const auto& item : res->Get()->Record.GetBlobs()) { + if (item.HasRawX1()) { + raw[0] = item.GetRawX1(); + } else if (item.HasDiffX1()) { + if (reverse) { + raw[0] -= item.GetDiffX1(); + } else { + raw[0] += item.GetDiffX1(); + } + } + + if (item.HasRawX2()) { + raw[1] = item.GetRawX2(); + } else if (item.HasDiffX2()) { + if (reverse) { + raw[1] -= item.GetDiffX2(); + } else { + raw[1] += item.GetDiffX2(); + } + } + + if (item.HasRawX3()) { + raw[2] = item.GetRawX3(); + } else if (item.HasDiffX3()) { + if (reverse) { + raw[2] -= item.GetDiffX3(); + } else { + raw[2] += item.GetDiffX3(); + } + } + + TLogoBlobID id(raw); + + UNIT_ASSERT(m.contains(id)); + UNIT_ASSERT_VALUES_EQUAL(m[id], item.GetIngress()); + m.erase(id); + + if (RandomNumber(n) == 0) { + from = id; + break; + } + + --n; + } + } + UNIT_ASSERT(m.empty()); + } +} + Y_UNIT_TEST_SUITE(VDiskAssimilation) { Y_UNIT_TEST(Test) { RunAssimilationTest(false); @@ -213,4 +337,10 @@ Y_UNIT_TEST_SUITE(VDiskAssimilation) { Y_UNIT_TEST(TestReverse) { RunAssimilationTest(true); } + Y_UNIT_TEST(Iteration) { + RunVDiskIterationTest(false); + } + Y_UNIT_TEST(IterationReverse) { + RunVDiskIterationTest(true); + } } diff --git a/ydb/core/blobstorage/vdisk/hulldb/base/hullds_heap_it.h b/ydb/core/blobstorage/vdisk/hulldb/base/hullds_heap_it.h index 4ed2a8c9d761..e1b6ccc091ad 100644 --- a/ydb/core/blobstorage/vdisk/hulldb/base/hullds_heap_it.h +++ b/ydb/core/blobstorage/vdisk/hulldb/base/hullds_heap_it.h @@ -136,6 +136,10 @@ namespace NKikimr { return SeekImpl([](auto *iter) { iter->SeekToFirst(); }); } + void SeekToLast() { + return SeekImpl([](auto *iter) { iter->SeekToLast(); }); + } + template void Walk(std::optional key, TMerger merger, TCallback&& callback) { if (key.has_value()) { diff --git a/ydb/core/blobstorage/vdisk/hulldb/fresh/fresh_appendix.h b/ydb/core/blobstorage/vdisk/hulldb/fresh/fresh_appendix.h index 9829d21a19b9..b7b771635d20 100644 --- a/ydb/core/blobstorage/vdisk/hulldb/fresh/fresh_appendix.h +++ b/ydb/core/blobstorage/vdisk/hulldb/fresh/fresh_appendix.h @@ -115,13 +115,13 @@ namespace NKikimr { TIterator(const THullCtxPtr &hullCtx, TContType *apndx) : Apndx(apndx) - , It() + , It(Apndx ? Apndx->SortedRecs.end() : TFreshAppendix::TVecIterator()) { Y_UNUSED(hullCtx); } bool Valid() const { - return Apndx && It >= Apndx->SortedRecs.begin() && It < Apndx->SortedRecs.end(); + return Apndx && It != Apndx->SortedRecs.end(); } void Next() { @@ -130,10 +130,11 @@ namespace NKikimr { } void Prev() { - if (It == Apndx->SortedRecs.begin()) - It = {}; - else + if (It == Apndx->SortedRecs.begin()) { + It = Apndx->SortedRecs.end(); + } else { --It; + } } TKey GetCurKey() const { @@ -147,11 +148,18 @@ namespace NKikimr { } void SeekToFirst() { + Y_DEBUG_ABORT_UNLESS(Apndx); It = Apndx->SortedRecs.begin(); } + void SeekToLast() { + Y_DEBUG_ABORT_UNLESS(Apndx); + It = Apndx->SortedRecs.empty() ? Apndx->SortedRecs.end() : std::prev(Apndx->SortedRecs.end()); + } + void Seek(const TKey &key) { - It = ::LowerBound(Apndx->SortedRecs.begin(), Apndx->SortedRecs.end(), key); + Y_DEBUG_ABORT_UNLESS(Apndx); + It = std::lower_bound(Apndx->SortedRecs.begin(), Apndx->SortedRecs.end(), key); } template diff --git a/ydb/core/blobstorage/vdisk/hulldb/fresh/fresh_segment_impl.h b/ydb/core/blobstorage/vdisk/hulldb/fresh/fresh_segment_impl.h index 8dc9f0160c13..f0f4e716aa8f 100644 --- a/ydb/core/blobstorage/vdisk/hulldb/fresh/fresh_segment_impl.h +++ b/ydb/core/blobstorage/vdisk/hulldb/fresh/fresh_segment_impl.h @@ -125,7 +125,6 @@ namespace NKikimr { TIterator(const TSkipListIndex *skipListIndex) : SkipListIndex(skipListIndex) - , It() {} bool Valid() const { @@ -322,15 +321,14 @@ namespace NKikimr { template void PutToMerger(TRecordMerger *merger) { - TIterator cursor = It; - Y_DEBUG_ABORT_UNLESS(cursor.Valid()); - TKey key = It.GetValue().Key; - while (cursor.Valid() && key == cursor.GetValue().Key) { - ui64 cursorLsn = cursor.GetValue().Lsn; - if (cursorLsn <= Lsn) - PutToMerger(cursor.GetValue().MemRec, cursorLsn, merger); - cursor.Next(); + Y_DEBUG_ABORT_UNLESS(It.Valid()); + const auto& key = It.GetValue().Key; + bool putSomething = false; + for (TIterator cursor = It; cursor.Valid() && cursor.GetValue().Key == key && cursor.GetValue().Lsn <= Lsn; cursor.Next()) { + PutToMerger(cursor.GetValue().MemRec, cursor.GetValue().Lsn, merger); + putSomething = true; } + Y_DEBUG_ABORT_UNLESS(putSomething); } TKey GetCurKey() const { @@ -364,18 +362,6 @@ namespace NKikimr { const ui64 Lsn; TIterator It; - bool HasSatisfyingValues() const { - Y_DEBUG_ABORT_UNLESS(It.Valid()); - TIterator cursor = It; - TKey key = cursor.GetValue().Key; - while (cursor.Valid() && key == cursor.GetValue().Key) { - if (cursor.GetValue().Lsn <= Lsn) - return true; - cursor.Next(); - } - return false; - } - template void PutToMerger(const TMemRec &memRec, ui64 lsn, TRecordMerger *merger) { TKey key = It.GetValue().Key; @@ -431,7 +417,6 @@ namespace NKikimr { public: TForwardIterator(const THullCtxPtr &hullCtx, const TContType *freshSegment, ui64 lsn) : TBase(hullCtx, freshSegment, lsn) - , SeekCache() {} using TBase::PutToMerger; @@ -443,52 +428,30 @@ namespace NKikimr { void Next() { Y_DEBUG_ABORT_UNLESS(It.Valid()); - - // switch to the next - TKey key = It.GetValue().Key; - It.Next(); - while (It.Valid() && key == It.GetValue().Key) - It.Next(); - - // check that It has values - while (It.Valid()) { - TIterator cursor = It; - key = cursor.GetValue().Key; - while (cursor.Valid() && key == cursor.GetValue().Key) { - if (cursor.GetValue().Lsn <= Lsn) - return; // has values, that's perfect, return - cursor.Next(); - } - // no values, continue searching - It = cursor; - } + const TKey& key = It.GetValue().Key; + for (It.Next(); It.Valid() && It.GetValue().Key == key; It.Next()) {} + for (; It.Valid() && Lsn < It.GetValue().Lsn; It.Next()) {} } void SeekToFirst() { if (Seg) { It = TIterator(Seg->Index.get()); - It.SeekToFirst(); - if (!It.Valid()) - return; - if (!HasSatisfyingValues()) - Next(); - } else + // just find ANY value when Lsn less or equal than required, that'll do + for (It.SeekToFirst(); It.Valid() && Lsn < It.GetValue().Lsn; It.Next()) {} + } else { It = TIterator(); + } } void Seek(const TKey &key) { - if (Seg) { - bool fromCache = SeekCache.Search(key, It); - if (!fromCache) { - typename TFreshIndex::TIdxKey idxKey(0, key, TMemRec()); - It = TIterator(Seg->Index.get()); - It.SeekTo(idxKey); - if (It.Valid() && !HasSatisfyingValues()) - Next(); - SeekCache.Set(key, It); - } - } else + if (!Seg) { It = TIterator(); + } else if (!SeekCache.Search(key, It)) { + typename TFreshIndex::TIdxKey idxKey(0, key, TMemRec()); + It = TIterator(Seg->Index.get()); + for (It.SeekTo(idxKey); It.Valid() && Lsn < It.GetValue().Lsn; It.Next()) {} + SeekCache.Set(key, It); + } } protected: @@ -496,7 +459,6 @@ namespace NKikimr { using TBase::It; using TBase::Seg; using TBase::Lsn; - using TBase::HasSatisfyingValues; }; @@ -524,14 +486,17 @@ namespace NKikimr { void Prev() { Y_DEBUG_ABORT_UNLESS(It.Valid()); + It.Prev(); + Adjust(); + } - while (true) { - It.Prev(); - if (!It.Valid()) - return; - ToTheChainStart(); - if (HasSatisfyingValues()) - return; + void SeekToLast() { + if (Seg) { + It = TIterator(Seg->Index.get()); + It.SeekToLast(); + Adjust(); + } else { + It = TIterator(); } } @@ -541,18 +506,15 @@ namespace NKikimr { It = TIterator(Seg->Index.get()); It.SeekTo(idxKey); if (!It.Valid()) { - // i.e. end - It = TIterator(Seg->Index.get()); - It.SeekToLast(); - if (It.Valid()) { - ToTheChainStart(); - if (!HasSatisfyingValues()) - Prev(); - } + SeekToLast(); + } else if (It.GetValue().Key != key) { + Y_DEBUG_ABORT_UNLESS(key < It.GetValue().Key); + Prev(); } else { - if (!(It.GetValue().Key == key)) - Prev(); + Adjust(); } + } else { + It = TIterator(); } } @@ -560,22 +522,17 @@ namespace NKikimr { using TBase::It; using TBase::Seg; using TBase::Lsn; - using TBase::HasSatisfyingValues; - - void ToTheChainStart() { - Y_DEBUG_ABORT_UNLESS(It.Valid()); - TKey key = It.GetValue().Key; - - TIterator cursor = It; - while (true) { - cursor.Prev(); - if (!cursor.Valid()) - return; - if (cursor.GetValue().Key == key) + void Adjust() { + // move back and stop when iterator gets exhausted, or we have acceptable LSN + for (; It.Valid() && Lsn < It.GetValue().Lsn; It.Prev()) {} + // position to the first item of this key, if we got one + if (It.Valid()) { + Y_DEBUG_ABORT_UNLESS(It.GetValue().Lsn <= Lsn); + TIterator cursor = It; + for (cursor.Prev(); cursor.Valid() && cursor.GetValue().Key == It.GetValue().Key; cursor.Prev()) { It = cursor; - else - return; + } } } }; diff --git a/ydb/core/blobstorage/vdisk/hulldb/generic/hullds_idxsnap_it.h b/ydb/core/blobstorage/vdisk/hulldb/generic/hullds_idxsnap_it.h index 9554d27d4966..921685172dd0 100644 --- a/ydb/core/blobstorage/vdisk/hulldb/generic/hullds_idxsnap_it.h +++ b/ydb/core/blobstorage/vdisk/hulldb/generic/hullds_idxsnap_it.h @@ -86,6 +86,11 @@ namespace NKikimr { MergeAndAdvance(); } + void SeekToLast() { + HeapIt.SeekToLast(); + MergeAndAdvance(); + } + void Seek(const TKey& key) { HeapIt.Seek(key); MergeAndAdvance(); diff --git a/ydb/core/blobstorage/vdisk/query/assimilation.cpp b/ydb/core/blobstorage/vdisk/query/assimilation.cpp index c2672e32fa8b..1c6e8251be88 100644 --- a/ydb/core/blobstorage/vdisk/query/assimilation.cpp +++ b/ydb/core/blobstorage/vdisk/query/assimilation.cpp @@ -86,20 +86,15 @@ namespace NKikimr { : Iter(hullCtx, snap) , SkipUpTo(skipUpTo) { - if constexpr (IsForward) { - if (SkipUpTo) { - Iter.Seek(*SkipUpTo); - if (Iter.Valid() && Iter.GetCurKey() == *SkipUpTo) { - Iter.MergeAndAdvance(); - } - } else { - Iter.SeekToFirst(); - } - } else { - Iter.Seek(SkipUpTo.value_or(TKey::Inf())); // seek to the end when SkipUpTo is empty - if (SkipUpTo && Iter.Valid() && Iter.GetCurKey() == *SkipUpTo) { + if (SkipUpTo) { + Iter.Seek(*SkipUpTo); + if (Iter.Valid() && Iter.GetCurKey() == *SkipUpTo) { Iter.MergeAndAdvance(); } + } else if constexpr (IsForward) { + Iter.SeekToFirst(); + } else { + Iter.SeekToLast(); } }