Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions ydb/core/blobstorage/bridge/proxy/bridge_proxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -673,10 +673,12 @@ namespace NKikimr {
ev->GetTypeRewrite(), std::move(ev->TraceId));

STLOG(PRI_DEBUG, BS_PROXY_BRIDGE, BPB00, "new request", (RequestId, request->RequestId),
(GroupId, GroupId), (Request, originalRequest.ToString()));
(GroupId, GroupId), (GroupGeneration, request->Info->GroupGeneration),
(BridgeGroupState, request->Info->Group->GetBridgeGroupState()),
(Request, originalRequest.ToString()));

Y_ABORT_UNLESS(Info->Group);
const auto& state = Info->Group->GetBridgeGroupState();
Y_ABORT_UNLESS(request->Info->Group);
const auto& state = request->Info->Group->GetBridgeGroupState();
for (size_t i = 0; i < state.PileSize(); ++i) {
const auto bridgePileId = TBridgePileId::FromPileIndex(i);
const TBridgeInfo::TPile *pile = BridgeInfo->GetPile(bridgePileId);
Expand All @@ -696,7 +698,7 @@ namespace NKikimr {

void SendQuery(std::shared_ptr<TRequest> request, TBridgePileId bridgePileId, std::unique_ptr<IEventBase> ev,
TRequestPayload&& payload = {}) {
const auto& state = Info->Group->GetBridgeGroupState();
const auto& state = request->Info->Group->GetBridgeGroupState();
const auto& groupPileInfo = state.GetPile(bridgePileId.GetPileIndex());
const auto groupId = TGroupId::FromProto(&groupPileInfo, &NKikimrBridge::TGroupState::TPile::GetGroupId);

Expand Down
130 changes: 130 additions & 0 deletions ydb/core/blobstorage/ut_blobstorage/assimilation.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
#include <ydb/core/blobstorage/ut_blobstorage/lib/env.h>
#include <ydb/core/blobstorage/vdisk/ingress/blobstorage_ingress.h>
#include <ydb/core/util/lz4_data_generator.h>

void RunAssimilationTest(bool reverse) {
TEnvironmentSetup env{{
Expand Down Expand Up @@ -206,11 +208,139 @@ void RunAssimilationTest(bool reverse) {
UNIT_ASSERT(aBlobs.empty());
}

void RunVDiskIterationTest(bool reverse) {
TEnvironmentSetup env{{
.NodeCount = 8,
.Erasure = TBlobStorageGroupType::Erasure4Plus2Block,
}};
auto& runtime = env.Runtime;
runtime->SetLogPriority(NKikimrServices::BS_PROXY_ASSIMILATE, NLog::PRI_DEBUG);

env.CreateBoxAndPool(1, 1);
env.Sim(TDuration::Seconds(30));
auto groups = env.GetGroups();
UNIT_ASSERT_VALUES_EQUAL(groups.size(), 1);
const TIntrusivePtr<TBlobStorageGroupInfo> info = env.GetGroupInfo(groups.front());

auto edge = runtime->AllocateEdgeActor(1, __FILE__, __LINE__);
const TVDiskID vdiskId = info->GetVDiskId(0);
auto putQueueId = env.CreateQueueActor(vdiskId, NKikimrBlobStorage::PutTabletLog, 0, 1);
auto getQueueId = env.CreateQueueActor(vdiskId, NKikimrBlobStorage::GetFastRead, 0, 1);

THashMap<TLogoBlobID, ui64> blobs;
ui64 tabletId = 1;
ui32 gen = 1;
ui32 step = 1;

TString data = FastGenDataForLZ4(32);

for (size_t iter = 0; iter < 10000; ++iter) {
ui32 action = RandomNumber(100u);
if (action < 95) {
ui32 partId = 1 + RandomNumber(6u);
TLogoBlobID id;
if (RandomNumber(4u) || blobs.empty()) {
for (;;) {
id = TLogoBlobID(tabletId, gen, step++, 0, 128, 0);
if (info->GetIdxInSubgroup(vdiskId, id.Hash()) >= 6) {
// this VDisk is a handoff for blob
break;
}
}
} else {
std::vector<TLogoBlobID> ids;
std::ranges::copy(blobs | std::views::keys, std::back_inserter(ids));
id = ids[RandomNumber(ids.size())];
}
Cerr << "putting " << id << " partId# " << partId << Endl;
runtime->Send(new IEventHandle(putQueueId, edge, new TEvBlobStorage::TEvVPut(TLogoBlobID(id, partId),
TRope(data), vdiskId, false, nullptr, TInstant::Max(), NKikimrBlobStorage::TabletLog)), edge.NodeId());
auto res = env.WaitForEdgeActorEvent<TEvBlobStorage::TEvVPutResult>(edge, false);
UNIT_ASSERT_VALUES_EQUAL(res->Get()->Record.GetStatus(), NKikimrProto::OK);
blobs[id] |= TIngress::CreateIngressWithLocal(&info->GetTopology(), vdiskId, TLogoBlobID(id, partId))->Raw();
} else {
Cerr << "compacting" << Endl;
const TActorId actorId = info->GetDynamicInfo().ServiceIdForOrderNumber.front();
auto ev = std::make_unique<IEventHandle>(actorId, edge, TEvCompactVDisk::Create(EHullDbType::LogoBlobs));
ev->Rewrite(TEvBlobStorage::EvForwardToSkeleton, actorId);
env.Runtime->Send(ev.release(), edge.NodeId());
env.WaitForEdgeActorEvent<TEvCompactVDiskResult>(edge, false);
}

std::optional<TLogoBlobID> from;
THashMap<TLogoBlobID, ui64> m = blobs;

for (;;) {
Cerr << "starting assimilation from# " << (from ? from->ToString() : "<none>") << Endl;
runtime->Send(new IEventHandle(getQueueId, edge, new TEvBlobStorage::TEvVAssimilate(vdiskId, std::nullopt,
std::nullopt, from, true, reverse)), edge.NodeId());
auto res = env.WaitForEdgeActorEvent<TEvBlobStorage::TEvVAssimilateResult>(edge, false);
if (!res->Get()->Record.BlobsSize()) {
break;
}

ui64 raw[3] = {0, 0, 0};
size_t n = res->Get()->Record.BlobsSize();
for (const auto& item : res->Get()->Record.GetBlobs()) {
if (item.HasRawX1()) {
raw[0] = item.GetRawX1();
} else if (item.HasDiffX1()) {
if (reverse) {
raw[0] -= item.GetDiffX1();
} else {
raw[0] += item.GetDiffX1();
}
}

if (item.HasRawX2()) {
raw[1] = item.GetRawX2();
} else if (item.HasDiffX2()) {
if (reverse) {
raw[1] -= item.GetDiffX2();
} else {
raw[1] += item.GetDiffX2();
}
}

if (item.HasRawX3()) {
raw[2] = item.GetRawX3();
} else if (item.HasDiffX3()) {
if (reverse) {
raw[2] -= item.GetDiffX3();
} else {
raw[2] += item.GetDiffX3();
}
}

TLogoBlobID id(raw);

UNIT_ASSERT(m.contains(id));
UNIT_ASSERT_VALUES_EQUAL(m[id], item.GetIngress());
m.erase(id);

if (RandomNumber(n) == 0) {
from = id;
break;
}

--n;
}
}
UNIT_ASSERT(m.empty());
}
}

Y_UNIT_TEST_SUITE(VDiskAssimilation) {
Y_UNIT_TEST(Test) {
RunAssimilationTest(false);
}
Y_UNIT_TEST(TestReverse) {
RunAssimilationTest(true);
}
Y_UNIT_TEST(Iteration) {
RunVDiskIterationTest(false);
}
Y_UNIT_TEST(IterationReverse) {
RunVDiskIterationTest(true);
}
}
4 changes: 4 additions & 0 deletions ydb/core/blobstorage/vdisk/hulldb/base/hullds_heap_it.h
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,10 @@ namespace NKikimr {
return SeekImpl([](auto *iter) { iter->SeekToFirst(); });
}

void SeekToLast() {
return SeekImpl([](auto *iter) { iter->SeekToLast(); });
}

template<typename TMerger, typename TCallback>
void Walk(std::optional<TKey> key, TMerger merger, TCallback&& callback) {
if (key.has_value()) {
Expand Down
20 changes: 14 additions & 6 deletions ydb/core/blobstorage/vdisk/hulldb/fresh/fresh_appendix.h
Original file line number Diff line number Diff line change
Expand Up @@ -115,13 +115,13 @@ namespace NKikimr {

TIterator(const THullCtxPtr &hullCtx, TContType *apndx)
: Apndx(apndx)
, It()
, It(Apndx ? Apndx->SortedRecs.end() : TFreshAppendix::TVecIterator())
{
Y_UNUSED(hullCtx);
}

bool Valid() const {
return Apndx && It >= Apndx->SortedRecs.begin() && It < Apndx->SortedRecs.end();
return Apndx && It != Apndx->SortedRecs.end();
}

void Next() {
Expand All @@ -130,10 +130,11 @@ namespace NKikimr {
}

void Prev() {
if (It == Apndx->SortedRecs.begin())
It = {};
else
if (It == Apndx->SortedRecs.begin()) {
It = Apndx->SortedRecs.end();
} else {
--It;
}
}

TKey GetCurKey() const {
Expand All @@ -147,11 +148,18 @@ namespace NKikimr {
}

void SeekToFirst() {
Y_DEBUG_ABORT_UNLESS(Apndx);
It = Apndx->SortedRecs.begin();
}

void SeekToLast() {
Y_DEBUG_ABORT_UNLESS(Apndx);
It = Apndx->SortedRecs.empty() ? Apndx->SortedRecs.end() : std::prev(Apndx->SortedRecs.end());
}

void Seek(const TKey &key) {
It = ::LowerBound(Apndx->SortedRecs.begin(), Apndx->SortedRecs.end(), key);
Y_DEBUG_ABORT_UNLESS(Apndx);
It = std::lower_bound(Apndx->SortedRecs.begin(), Apndx->SortedRecs.end(), key);
}

template <class TRecordMerger>
Expand Down
Loading
Loading