diff --git a/ydb/core/kafka_proxy/actors/kafka_metadata_actor.cpp b/ydb/core/kafka_proxy/actors/kafka_metadata_actor.cpp index d6f3295750a3..0ef948295980 100644 --- a/ydb/core/kafka_proxy/actors/kafka_metadata_actor.cpp +++ b/ydb/core/kafka_proxy/actors/kafka_metadata_actor.cpp @@ -183,7 +183,7 @@ TVector TKafkaMetadataActor::CheckTopicNodes(TE TVector partitionNodes; for (const auto& part : response->Partitions) { auto iter = Nodes.find(part.NodeId); - if (iter.IsEnd()) { + if (iter == Nodes.end()) { return {}; } partitionNodes.push_back(&iter->second); @@ -215,21 +215,36 @@ void TKafkaMetadataActor::AddTopicResponse( responsePartition.ErrorCode = NONE_ERROR; responsePartition.LeaderId = nodeId; responsePartition.LeaderEpoch = part.Generation; - responsePartition.ReplicaNodes.push_back(nodeId); - responsePartition.IsrNodes.push_back(nodeId); - - topic.Partitions.emplace_back(std::move(responsePartition)); + // adding replica nodes in a roundrobin manner based on sorted NodeId + std::vector nodesToAdd = {nodeId}; if (!WithProxy && !NeedAllNodes) { - auto ins = AllClusterNodes.insert(part.NodeId); - if (ins.second) { - auto hostname = (*nodeIter)->Host; - if (hostname.StartsWith(UnderlayPrefix)) { - hostname = hostname.substr(sizeof(UnderlayPrefix) - 1); + AddBroker(nodeId, (*nodeIter)->Host, (*nodeIter)->Port); + } + if (!WithProxy) { + auto nodeToAddIter = Nodes.find(part.NodeId); + nodeToAddIter++; + for (size_t i = 0; i < 2; ++i) { + if (nodeToAddIter == Nodes.end()) { + nodeToAddIter = Nodes.begin(); + } + if (nodeToAddIter->first == nodeId) { + break; + } + nodesToAdd.push_back(nodeToAddIter->first); + if (!NeedAllNodes) { + AddBroker(nodeToAddIter->first, nodeToAddIter->second.Host, nodeToAddIter->second.Port); } - AddBroker(part.NodeId, hostname, (*nodeIter)->Port); + nodeToAddIter++; } + std::sort(nodesToAdd.begin(), nodesToAdd.end()); } + + for (size_t i = 0; i < nodesToAdd.size(); i++) { + responsePartition.ReplicaNodes.push_back(nodesToAdd[i]); + responsePartition.IsrNodes.push_back(nodesToAdd[i]); + } + topic.Partitions.emplace_back(std::move(responsePartition)); ++nodeIter; } } @@ -312,11 +327,18 @@ void TKafkaMetadataActor::SendCreateTopicsRequest(const TString& topicName, ui32 } void TKafkaMetadataActor::AddBroker(ui64 nodeId, const TString& host, ui64 port) { - auto broker = TMetadataResponseData::TMetadataResponseBroker{}; - broker.NodeId = nodeId; - broker.Host = host; - broker.Port = port; - Response->Brokers.emplace_back(std::move(broker)); + auto ins = AddedNodes.insert(nodeId); + if (ins.second) { + auto hostname = host; + if (hostname.StartsWith(UnderlayPrefix)) { + hostname = hostname.substr(sizeof(UnderlayPrefix) - 1); + }; + auto broker = TMetadataResponseData::TMetadataResponseBroker{}; + broker.NodeId = nodeId; + broker.Host = hostname; + broker.Port = port; + Response->Brokers.emplace_back(std::move(broker)); + } } void TKafkaMetadataActor::RespondIfRequired(const TActorContext& ctx) { diff --git a/ydb/core/kafka_proxy/actors/kafka_metadata_actor.h b/ydb/core/kafka_proxy/actors/kafka_metadata_actor.h index 5d328736059b..a3d0e6ccd115 100644 --- a/ydb/core/kafka_proxy/actors/kafka_metadata_actor.h +++ b/ydb/core/kafka_proxy/actors/kafka_metadata_actor.h @@ -88,7 +88,7 @@ class TKafkaMetadataActor: public NActors::TActorBootstrapped> TopicIndexes; - THashSet AllClusterNodes; + THashSet AddedNodes; EKafkaErrors ErrorCode = EKafkaErrors::NONE_ERROR; TActorId DiscoveryCacheActor; @@ -102,7 +102,7 @@ class TKafkaMetadataActor: public NActors::TActorBootstrapped Nodes; + TMap Nodes; THashMap PartitionActors; THashSet HaveBrokers; diff --git a/ydb/core/kafka_proxy/ut/actors_ut.cpp b/ydb/core/kafka_proxy/ut/actors_ut.cpp index 01808338d74a..5886bfe94bd2 100644 --- a/ydb/core/kafka_proxy/ut/actors_ut.cpp +++ b/ydb/core/kafka_proxy/ut/actors_ut.cpp @@ -14,6 +14,8 @@ using namespace NKikimr; +static constexpr const ui64 FAKE_SERVERLESS_KAFKA_PROXY_PORT = 19092; + auto UnpackDiscoveryData(const TString& data) { Ydb::Discovery::ListEndpointsResponse leResponse; Ydb::Discovery::ListEndpointsResult leResult; @@ -71,7 +73,7 @@ struct TMetarequestTestParams { TString FullTopicName; }; -TMetarequestTestParams SetupServer(const TString shortTopicName) { +TMetarequestTestParams SetupServer(const TString shortTopicName, bool serverless = false) { TStringBuilder fullTopicName; fullTopicName << "rt3.dc1--" << shortTopicName; auto pm = MakeSimpleShared(); @@ -80,6 +82,10 @@ TMetarequestTestParams SetupServer(const TString shortTopicName) { serverSettings.AppConfig->MutableKafkaProxyConfig()->SetEnableKafkaProxy(true); serverSettings.AppConfig->MutableKafkaProxyConfig()->SetListeningPort(kafkaPort); + if (serverless) { + serverSettings.AppConfig->MutableKafkaProxyConfig()->MutableProxy()->SetHostname("localhost"); + serverSettings.AppConfig->MutableKafkaProxyConfig()->MutableProxy()->SetPort(FAKE_SERVERLESS_KAFKA_PROXY_PORT); + } NPersQueue::TTestServer server(serverSettings, true, {}, NActors::NLog::PRI_INFO, pm); server.AnnoyingClient->CreateTopic(fullTopicName, 1); @@ -219,12 +225,14 @@ namespace NKafka::NTests { runtime->EnableScheduleForActor(actorId); } - void CheckKafkaMetaResponse(TTestActorRuntime* runtime, ui64 kafkaPort, bool error = false, ui64 expectedCount = 1) { + void CheckKafkaMetaResponse(TTestActorRuntime* runtime, ui64 kafkaPort, bool error = false, + ui64 expectedTopicCount = 1, ui64 expectedBrokersCount = 1, + const std::vector& expectedNodeIds = {}) { TAutoPtr handle; auto* ev = runtime->GrabEdgeEvent(handle); UNIT_ASSERT(ev); auto response = dynamic_cast(ev->Response.get()); - UNIT_ASSERT_VALUES_EQUAL(response->Topics.size(), expectedCount); + UNIT_ASSERT_VALUES_EQUAL(response->Topics.size(), expectedTopicCount); if (!error) { for (const auto& topic : response->Topics) { UNIT_ASSERT(topic.ErrorCode == EKafkaErrors::NONE_ERROR); @@ -234,9 +242,26 @@ namespace NKafka::NTests { UNIT_ASSERT(ev->ErrorCode == EKafkaErrors::LISTENER_NOT_FOUND); return; } - UNIT_ASSERT_VALUES_EQUAL(response->Brokers.size(), 1); - Cerr << "Broker " << response->Brokers[0].NodeId << " - " << response->Brokers[0].Host << ":" << response->Brokers[0].Port << Endl; - UNIT_ASSERT_VALUES_EQUAL(response->Brokers[0].Port, kafkaPort); + UNIT_ASSERT_VALUES_EQUAL(response->Brokers.size(), expectedBrokersCount); + for (size_t i = 0; i < response->Topics[0].Partitions.size(); i++) { + i64 lastNodeId = -1; + for (const auto& nodeId : response->Topics[0].Partitions[i].ReplicaNodes) { + if (lastNodeId != -1) { + UNIT_ASSERT(lastNodeId < nodeId); + } + lastNodeId = nodeId; + } + } + if (!expectedNodeIds.empty()) { + UNIT_ASSERT_VALUES_EQUAL(response->Topics[0].Partitions[0].ReplicaNodes.size(), expectedNodeIds.size()); + for (size_t i = 0; i < response->Topics[0].Partitions[0].ReplicaNodes.size(); ++i) { + UNIT_ASSERT_VALUES_EQUAL(response->Topics[0].Partitions[0].ReplicaNodes[i], expectedNodeIds[i]); + } + } + if (expectedBrokersCount == 1) { + Cerr << "Broker " << response->Brokers[0].NodeId << " - " << response->Brokers[0].Host << ":" << response->Brokers[0].Port << Endl; + UNIT_ASSERT_VALUES_EQUAL(response->Brokers[0].Port, kafkaPort); + } } Y_UNIT_TEST(MetadataActorGetsEndpoint) { @@ -271,7 +296,7 @@ namespace NKafka::NTests { CreateMetarequestActor(edge, {NKikimr::JoinPath({"/Root/PQ/", topicName})}, runtime, config, fakeCache); - CheckKafkaMetaResponse(runtime, kafkaPort); + CheckKafkaMetaResponse(runtime, kafkaPort, false, 1, 3); } Y_UNIT_TEST(DiscoveryResponsesWithError) { @@ -308,6 +333,149 @@ namespace NKafka::NTests { CheckKafkaMetaResponse(runtime, 12345); } + Y_UNIT_TEST(TopicMetadataOnlyThreeReplicaNodesReturnedFromMany) { + auto [server, kafkaPort, config, topicName] = SetupServer("topic1"); + + auto* runtime = server.GetRuntime(); + auto edge = runtime->AllocateEdgeActor(); + TVector allNodesIds = {0, runtime->GetNodeId(0), 10000, 10001, 10002}; + Ydb::Discovery::ListEndpointsResult leResult; + auto* ep = leResult.add_endpoints(); + for (size_t i = 0; i < allNodesIds.size(); i++) { + ep->set_address("localhost"); + ep->set_port(i + 1); + ep->set_node_id(allNodesIds[i]); + if (i != allNodesIds.size() - 1) { + ep = leResult.add_endpoints(); + } + } + + auto fakeCache = runtime->Register(new TFakeDiscoveryCache(leResult, false)); + runtime->EnableScheduleForActor(fakeCache); + CreateMetarequestActor(edge, {NKikimr::JoinPath({"/Root/PQ/", topicName})}, runtime, + config, fakeCache); + + std::vector expectedNodeIds = {runtime->GetNodeId(0), 10000, 10001}; + CheckKafkaMetaResponse(runtime, 12345, false, 1, 3, expectedNodeIds); + } + + Y_UNIT_TEST(TopicMetadataOnlyTwoReplicaNodeReturned) { + auto [server, kafkaPort, config, topicName] = SetupServer("topic1"); + + auto* runtime = server.GetRuntime(); + auto edge = runtime->AllocateEdgeActor(); + + Ydb::Discovery::ListEndpointsResult leResult; + auto* ep = leResult.add_endpoints(); + ep->set_address("localhost"); + ep->set_port(12345); + ep->set_node_id(10000); + + auto fakeCache = runtime->Register(new TFakeDiscoveryCache(leResult, false)); + runtime->EnableScheduleForActor(fakeCache); + CreateMetarequestActor(edge, {NKikimr::JoinPath({"/Root/PQ/", topicName})}, runtime, + config, fakeCache); + + std::vector expectedNodeIds = {runtime->GetNodeId(0), 10000}; + CheckKafkaMetaResponse(runtime, 12345, false, 1, 2, expectedNodeIds); + } + + Y_UNIT_TEST(TopicMetadataOnlyOneReplicaNodeReturned) { + auto [server, kafkaPort, config, topicName] = SetupServer("topic1"); + + auto* runtime = server.GetRuntime(); + auto edge = runtime->AllocateEdgeActor(); + + Ydb::Discovery::ListEndpointsResult leResult; + + auto fakeCache = runtime->Register(new TFakeDiscoveryCache(leResult, false)); + runtime->EnableScheduleForActor(fakeCache); + CreateMetarequestActor(edge, {NKikimr::JoinPath({"/Root/PQ/", topicName})}, runtime, + config, fakeCache); + + std::vector expectedNodeIds = {runtime->GetNodeId(0)}; + CheckKafkaMetaResponse(runtime, kafkaPort, false, 1, 1, expectedNodeIds); + } + + Y_UNIT_TEST(TopicMetadataNodesCorrectOrderReturned) { + auto [server, kafkaPort, config, topicName] = SetupServer("topic1"); + + auto* runtime = server.GetRuntime(); + auto edge = runtime->AllocateEdgeActor(); + + Ydb::Discovery::ListEndpointsResult leResult; + auto* ep = leResult.add_endpoints(); + ep->set_address("localhost"); + ep->set_port(12345); + ep->set_node_id(0); + + ep = leResult.add_endpoints(); + ep->set_address("localhost"); + ep->set_port(1999); + ep->set_node_id(10000); + + auto fakeCache = runtime->Register(new TFakeDiscoveryCache(leResult, false)); + runtime->EnableScheduleForActor(fakeCache); + CreateMetarequestActor(edge, {NKikimr::JoinPath({"/Root/PQ/", topicName})}, runtime, + config, fakeCache); + + std::vector expectedNodeIds = {0, runtime->GetNodeId(0), 10000}; + std::sort(expectedNodeIds.begin(), expectedNodeIds.end()); + CheckKafkaMetaResponse(runtime, 12345, false, 1, 3, expectedNodeIds); + } + + Y_UNIT_TEST(TopicMetadataTwoNodesCheckReverseOrder) { + auto [server, kafkaPort, config, topicName] = SetupServer("topic1"); + + auto* runtime = server.GetRuntime(); + auto edge = runtime->AllocateEdgeActor(); + + Ydb::Discovery::ListEndpointsResult leResult; + auto* ep = leResult.add_endpoints(); + ep->set_address("localhost"); + ep->set_port(12345); + ep->set_node_id(0); + + auto fakeCache = runtime->Register(new TFakeDiscoveryCache(leResult, false)); + runtime->EnableScheduleForActor(fakeCache); + CreateMetarequestActor(edge, {NKikimr::JoinPath({"/Root/PQ/", topicName})}, runtime, + config, fakeCache); + + std::vector expectedNodeIds = {0, runtime->GetNodeId(0)}; + CheckKafkaMetaResponse(runtime, 12345, false, 1, 2, expectedNodeIds); + } + + Y_UNIT_TEST(TopicMetadataServerlessReturnsSingleNode) { + auto [server, kafkaPort, config, topicName] = SetupServer("topic1", true); + + auto* runtime = server.GetRuntime(); + auto edge = runtime->AllocateEdgeActor(); + + Ydb::Discovery::ListEndpointsResult leResult; + auto* ep = leResult.add_endpoints(); + ep->set_address("localhost"); + ep->set_port(12345); + ep->set_node_id(0); + + ep = leResult.add_endpoints(); + ep->set_address("localhost"); + ep->set_port(111); + ep->set_node_id(10000); + + ep = leResult.add_endpoints(); + ep->set_address("localhost"); + ep->set_port(222); + ep->set_node_id(100); + + auto fakeCache = runtime->Register(new TFakeDiscoveryCache(leResult, false)); + runtime->EnableScheduleForActor(fakeCache); + CreateMetarequestActor(edge, {NKikimr::JoinPath({"/Root/PQ/", topicName})}, runtime, + config, fakeCache); + + std::vector expectedNodeIds = {NKafka::ProxyNodeId}; + CheckKafkaMetaResponse(runtime, FAKE_SERVERLESS_KAFKA_PROXY_PORT, false, 1, 1, expectedNodeIds); + } + Y_UNIT_TEST(MetadataActorDoubleTopic) { auto [server, kafkaPort, config, topicName] = SetupServer("topic1"); diff --git a/ydb/core/kafka_proxy/ut/kafka_test_client.cpp b/ydb/core/kafka_proxy/ut/kafka_test_client.cpp index e7be14118343..fdc57d3ef3d6 100644 --- a/ydb/core/kafka_proxy/ut/kafka_test_client.cpp +++ b/ydb/core/kafka_proxy/ut/kafka_test_client.cpp @@ -34,16 +34,13 @@ TMessagePtr TKafkaTestClient::ApiVersions(bool silent) // YDB ignores AllowAutoTopicCreation, i.e. it never creates a new topic implicitly. // But in Apache Kafka the default behavior is to create a new topic, if there is no one at the moment of the request. // With this flag, allowAutoTopicCreation, you can stop this behavior in Apache Kafka. -TMessagePtr TKafkaTestClient::Metadata(const TVector& topics, std::optional allowAutoTopicCreation) { +TMessagePtr TKafkaTestClient::Metadata(const TVector& topics, bool allowAutoTopicCreation) { Cerr << ">>>>> MetadataRequest\n"; TRequestHeaderData header = Header(NKafka::EApiKey::METADATA, 12); TMetadataRequestData request; - if (allowAutoTopicCreation.has_value()) { - // If allowAutoTopicCreation does not have a value, use the default value (= true). - request.AllowAutoTopicCreation = allowAutoTopicCreation.value() ? 1 : 0; - } + request.AllowAutoTopicCreation = allowAutoTopicCreation; request.Topics.reserve(topics.size()); for (auto topicName : topics) { NKafka::TMetadataRequestData::TMetadataRequestTopic topic; diff --git a/ydb/core/kafka_proxy/ut/kafka_test_client.h b/ydb/core/kafka_proxy/ut/kafka_test_client.h index c9422127daf5..ac4999581172 100644 --- a/ydb/core/kafka_proxy/ut/kafka_test_client.h +++ b/ydb/core/kafka_proxy/ut/kafka_test_client.h @@ -59,7 +59,7 @@ class TKafkaTestClient { TMessagePtr ApiVersions(bool silent = false); - TMessagePtr Metadata(const TVector& topics = {}, std::optional allowAutoTopicCreation = std::nullopt); + TMessagePtr Metadata(const TVector& topics = {}, bool allowAutoTopicCreation = true); TMessagePtr SaslHandshake(const TString& mechanism = "PLAIN");