Skip to content

Commit 4542698

Browse files
committed
Refactor assimilation reverse iterating in VDisk
1 parent 793880a commit 4542698

File tree

6 files changed

+208
-108
lines changed

6 files changed

+208
-108
lines changed

ydb/core/blobstorage/ut_blobstorage/assimilation.cpp

Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
11
#include <ydb/core/blobstorage/ut_blobstorage/lib/env.h>
2+
#include <ydb/core/blobstorage/vdisk/ingress/blobstorage_ingress.h>
3+
#include <ydb/core/util/lz4_data_generator.h>
24

35
void RunAssimilationTest(bool reverse) {
46
TEnvironmentSetup env{{
@@ -206,11 +208,139 @@ void RunAssimilationTest(bool reverse) {
206208
UNIT_ASSERT(aBlobs.empty());
207209
}
208210

211+
void RunVDiskIterationTest(bool reverse) {
212+
TEnvironmentSetup env{{
213+
.NodeCount = 8,
214+
.Erasure = TBlobStorageGroupType::Erasure4Plus2Block,
215+
}};
216+
auto& runtime = env.Runtime;
217+
runtime->SetLogPriority(NKikimrServices::BS_PROXY_ASSIMILATE, NLog::PRI_DEBUG);
218+
219+
env.CreateBoxAndPool(1, 1);
220+
env.Sim(TDuration::Seconds(30));
221+
auto groups = env.GetGroups();
222+
UNIT_ASSERT_VALUES_EQUAL(groups.size(), 1);
223+
const TIntrusivePtr<TBlobStorageGroupInfo> info = env.GetGroupInfo(groups.front());
224+
225+
auto edge = runtime->AllocateEdgeActor(1, __FILE__, __LINE__);
226+
const TVDiskID vdiskId = info->GetVDiskId(0);
227+
auto putQueueId = env.CreateQueueActor(vdiskId, NKikimrBlobStorage::PutTabletLog, 0, 1);
228+
auto getQueueId = env.CreateQueueActor(vdiskId, NKikimrBlobStorage::GetFastRead, 0, 1);
229+
230+
THashMap<TLogoBlobID, ui64> blobs;
231+
ui64 tabletId = 1;
232+
ui32 gen = 1;
233+
ui32 step = 1;
234+
235+
TString data = FastGenDataForLZ4(32);
236+
237+
for (size_t iter = 0; iter < 10000; ++iter) {
238+
ui32 action = RandomNumber(100u);
239+
if (action < 95) {
240+
ui32 partId = 1 + RandomNumber(6u);
241+
TLogoBlobID id;
242+
if (RandomNumber(4u) || blobs.empty()) {
243+
for (;;) {
244+
id = TLogoBlobID(tabletId, gen, step++, 0, 128, 0);
245+
if (info->GetIdxInSubgroup(vdiskId, id.Hash()) >= 6) {
246+
// this VDisk is a handoff for blob
247+
break;
248+
}
249+
}
250+
} else {
251+
std::vector<TLogoBlobID> ids;
252+
std::ranges::copy(blobs | std::views::keys, std::back_inserter(ids));
253+
id = ids[RandomNumber(ids.size())];
254+
}
255+
Cerr << "putting " << id << " partId# " << partId << Endl;
256+
runtime->Send(new IEventHandle(putQueueId, edge, new TEvBlobStorage::TEvVPut(TLogoBlobID(id, partId),
257+
TRope(data), vdiskId, false, nullptr, TInstant::Max(), NKikimrBlobStorage::TabletLog)), edge.NodeId());
258+
auto res = env.WaitForEdgeActorEvent<TEvBlobStorage::TEvVPutResult>(edge, false);
259+
UNIT_ASSERT_VALUES_EQUAL(res->Get()->Record.GetStatus(), NKikimrProto::OK);
260+
blobs[id] |= TIngress::CreateIngressWithLocal(&info->GetTopology(), vdiskId, TLogoBlobID(id, partId))->Raw();
261+
} else {
262+
Cerr << "compacting" << Endl;
263+
const TActorId actorId = info->GetDynamicInfo().ServiceIdForOrderNumber.front();
264+
auto ev = std::make_unique<IEventHandle>(actorId, edge, TEvCompactVDisk::Create(EHullDbType::LogoBlobs));
265+
ev->Rewrite(TEvBlobStorage::EvForwardToSkeleton, actorId);
266+
env.Runtime->Send(ev.release(), edge.NodeId());
267+
env.WaitForEdgeActorEvent<TEvCompactVDiskResult>(edge, false);
268+
}
269+
270+
std::optional<TLogoBlobID> from;
271+
THashMap<TLogoBlobID, ui64> m = blobs;
272+
273+
for (;;) {
274+
Cerr << "starting assimilation from# " << (from ? from->ToString() : "<none>") << Endl;
275+
runtime->Send(new IEventHandle(getQueueId, edge, new TEvBlobStorage::TEvVAssimilate(vdiskId, std::nullopt,
276+
std::nullopt, from, true, reverse)), edge.NodeId());
277+
auto res = env.WaitForEdgeActorEvent<TEvBlobStorage::TEvVAssimilateResult>(edge, false);
278+
if (!res->Get()->Record.BlobsSize()) {
279+
break;
280+
}
281+
282+
ui64 raw[3] = {0, 0, 0};
283+
size_t n = res->Get()->Record.BlobsSize();
284+
for (const auto& item : res->Get()->Record.GetBlobs()) {
285+
if (item.HasRawX1()) {
286+
raw[0] = item.GetRawX1();
287+
} else if (item.HasDiffX1()) {
288+
if (reverse) {
289+
raw[0] -= item.GetDiffX1();
290+
} else {
291+
raw[0] += item.GetDiffX1();
292+
}
293+
}
294+
295+
if (item.HasRawX2()) {
296+
raw[1] = item.GetRawX2();
297+
} else if (item.HasDiffX2()) {
298+
if (reverse) {
299+
raw[1] -= item.GetDiffX2();
300+
} else {
301+
raw[1] += item.GetDiffX2();
302+
}
303+
}
304+
305+
if (item.HasRawX3()) {
306+
raw[2] = item.GetRawX3();
307+
} else if (item.HasDiffX3()) {
308+
if (reverse) {
309+
raw[2] -= item.GetDiffX3();
310+
} else {
311+
raw[2] += item.GetDiffX3();
312+
}
313+
}
314+
315+
TLogoBlobID id(raw);
316+
317+
UNIT_ASSERT(m.contains(id));
318+
UNIT_ASSERT_VALUES_EQUAL(m[id], item.GetIngress());
319+
m.erase(id);
320+
321+
if (RandomNumber(n) == 0) {
322+
from = id;
323+
break;
324+
}
325+
326+
--n;
327+
}
328+
}
329+
UNIT_ASSERT(m.empty());
330+
}
331+
}
332+
209333
Y_UNIT_TEST_SUITE(VDiskAssimilation) {
210334
Y_UNIT_TEST(Test) {
211335
RunAssimilationTest(false);
212336
}
213337
Y_UNIT_TEST(TestReverse) {
214338
RunAssimilationTest(true);
215339
}
340+
Y_UNIT_TEST(Iteration) {
341+
RunVDiskIterationTest(false);
342+
}
343+
Y_UNIT_TEST(IterationReverse) {
344+
RunVDiskIterationTest(true);
345+
}
216346
}

ydb/core/blobstorage/vdisk/hulldb/base/hullds_heap_it.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,10 @@ namespace NKikimr {
136136
return SeekImpl([](auto *iter) { iter->SeekToFirst(); });
137137
}
138138

139+
void SeekToLast() {
140+
return SeekImpl([](auto *iter) { iter->SeekToLast(); });
141+
}
142+
139143
template<typename TMerger, typename TCallback>
140144
void Walk(std::optional<TKey> key, TMerger merger, TCallback&& callback) {
141145
if (key.has_value()) {

ydb/core/blobstorage/vdisk/hulldb/fresh/fresh_appendix.h

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -115,13 +115,13 @@ namespace NKikimr {
115115

116116
TIterator(const THullCtxPtr &hullCtx, TContType *apndx)
117117
: Apndx(apndx)
118-
, It()
118+
, It(Apndx ? Apndx->SortedRecs.end() : TFreshAppendix::TVecIterator())
119119
{
120120
Y_UNUSED(hullCtx);
121121
}
122122

123123
bool Valid() const {
124-
return Apndx && It >= Apndx->SortedRecs.begin() && It < Apndx->SortedRecs.end();
124+
return Apndx && It != Apndx->SortedRecs.end();
125125
}
126126

127127
void Next() {
@@ -130,10 +130,12 @@ namespace NKikimr {
130130
}
131131

132132
void Prev() {
133-
if (It == Apndx->SortedRecs.begin())
134-
It = {};
135-
else
133+
Y_DEBUG_ABORT_UNLESS(Valid());
134+
if (It == Apndx->SortedRecs.begin()) {
135+
It = Apndx->SortedRecs.end();
136+
} else {
136137
--It;
138+
}
137139
}
138140

139141
TKey GetCurKey() const {
@@ -147,11 +149,18 @@ namespace NKikimr {
147149
}
148150

149151
void SeekToFirst() {
152+
Y_DEBUG_ABORT_UNLESS(Apndx);
150153
It = Apndx->SortedRecs.begin();
151154
}
152155

156+
void SeekToLast() {
157+
Y_DEBUG_ABORT_UNLESS(Apndx);
158+
It = Apndx->SortedRecs.empty() ? Apndx->SortedRecs.end() : std::prev(Apndx->SortedRecs.end());
159+
}
160+
153161
void Seek(const TKey &key) {
154-
It = ::LowerBound(Apndx->SortedRecs.begin(), Apndx->SortedRecs.end(), key);
162+
Y_DEBUG_ABORT_UNLESS(Apndx);
163+
It = std::lower_bound(Apndx->SortedRecs.begin(), Apndx->SortedRecs.end(), key);
155164
}
156165

157166
template <class TRecordMerger>

0 commit comments

Comments
 (0)