From 48471c2cef2842b20b5a202af713c7b641fa3654 Mon Sep 17 00:00:00 2001 From: Dmitry Kardymon Date: Wed, 27 Aug 2025 12:58:33 +0300 Subject: [PATCH] Add FlagSubscribeOnSession in resource__allocator --- .../yql/providers/dq/actors/resource_allocator.cpp | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/ydb/library/yql/providers/dq/actors/resource_allocator.cpp b/ydb/library/yql/providers/dq/actors/resource_allocator.cpp index 8c143ec3dadc..21111d621419 100644 --- a/ydb/library/yql/providers/dq/actors/resource_allocator.cpp +++ b/ydb/library/yql/providers/dq/actors/resource_allocator.cpp @@ -77,10 +77,7 @@ class TResourceAllocator: public TRichActor STRICT_STFUNC(Handle, { HFunc(TEvAllocateWorkersResponse, OnAllocateWorkersResponse) cFunc(TEvents::TEvPoison::EventType, PassAway) - hFunc(TEvInterconnect::TEvNodeConnected, [this](TEvInterconnect::TEvNodeConnected::TPtr& ev) { - // Store GWM NodeId. Auto-unsubscribe on actor-death - Subscribe(ev->Get()->NodeId); - }) + IgnoreFunc(TEvInterconnect::TEvNodeConnected) cFunc(TEvInterconnect::TEvNodeDisconnected::EventType, [this]() { Fail((ui64)-1, "GWM Disconnected"); }) @@ -260,15 +257,16 @@ class TResourceAllocator: public TRichActor if (backoff) { TActivationContext::Schedule(backoff, new IEventHandle( MakeWorkerManagerActorID(nodeId), SelfId(), request.Release(), - IEventHandle::FlagTrackDelivery | IEventHandle::FlagGenerateUnsureUndelivered, + IEventHandle::FlagTrackDelivery | IEventHandle::FlagGenerateUnsureUndelivered | IEventHandle::FlagSubscribeOnSession, node.ResourceId)); } else { Send( MakeWorkerManagerActorID(nodeId), request.Release(), - IEventHandle::FlagTrackDelivery | IEventHandle::FlagGenerateUnsureUndelivered, + IEventHandle::FlagTrackDelivery | IEventHandle::FlagGenerateUnsureUndelivered | IEventHandle::FlagSubscribeOnSession, node.ResourceId); } + Subscribe(nodeId); } void Fail(const ui64 cookie, const TString& reason) {