Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 38 additions & 16 deletions ydb/core/kafka_proxy/actors/kafka_metadata_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ TVector<TKafkaMetadataActor::TNodeInfo*> TKafkaMetadataActor::CheckTopicNodes(TE
TVector<TNodeInfo*> partitionNodes;
for (const auto& part : response->Partitions) {
auto iter = Nodes.find(part.NodeId);
if (iter.IsEnd()) {
if (iter == Nodes.end()) {
return {};
}
partitionNodes.push_back(&iter->second);
Expand Down Expand Up @@ -215,21 +215,36 @@ void TKafkaMetadataActor::AddTopicResponse(
responsePartition.ErrorCode = NONE_ERROR;
responsePartition.LeaderId = nodeId;
responsePartition.LeaderEpoch = part.Generation;
responsePartition.ReplicaNodes.push_back(nodeId);
responsePartition.IsrNodes.push_back(nodeId);

topic.Partitions.emplace_back(std::move(responsePartition));

// adding replica nodes in a roundrobin manner based on sorted NodeId
std::vector<ui64> nodesToAdd = {nodeId};
if (!WithProxy && !NeedAllNodes) {
auto ins = AllClusterNodes.insert(part.NodeId);
if (ins.second) {
auto hostname = (*nodeIter)->Host;
if (hostname.StartsWith(UnderlayPrefix)) {
hostname = hostname.substr(sizeof(UnderlayPrefix) - 1);
AddBroker(nodeId, (*nodeIter)->Host, (*nodeIter)->Port);
}
if (!WithProxy) {
auto nodeToAddIter = Nodes.find(part.NodeId);
nodeToAddIter++;
for (size_t i = 0; i < 2; ++i) {
if (nodeToAddIter == Nodes.end()) {
nodeToAddIter = Nodes.begin();
}
if (nodeToAddIter->first == nodeId) {
break;
}
nodesToAdd.push_back(nodeToAddIter->first);
if (!NeedAllNodes) {
AddBroker(nodeToAddIter->first, nodeToAddIter->second.Host, nodeToAddIter->second.Port);
}
AddBroker(part.NodeId, hostname, (*nodeIter)->Port);
nodeToAddIter++;
}
std::sort(nodesToAdd.begin(), nodesToAdd.end());
}

for (size_t i = 0; i < nodesToAdd.size(); i++) {
responsePartition.ReplicaNodes.push_back(nodesToAdd[i]);
responsePartition.IsrNodes.push_back(nodesToAdd[i]);
}
topic.Partitions.emplace_back(std::move(responsePartition));
++nodeIter;
}
}
Expand Down Expand Up @@ -312,11 +327,18 @@ void TKafkaMetadataActor::SendCreateTopicsRequest(const TString& topicName, ui32
}

void TKafkaMetadataActor::AddBroker(ui64 nodeId, const TString& host, ui64 port) {
auto broker = TMetadataResponseData::TMetadataResponseBroker{};
broker.NodeId = nodeId;
broker.Host = host;
broker.Port = port;
Response->Brokers.emplace_back(std::move(broker));
auto ins = AddedNodes.insert(nodeId);
if (ins.second) {
auto hostname = host;
if (hostname.StartsWith(UnderlayPrefix)) {
hostname = hostname.substr(sizeof(UnderlayPrefix) - 1);
};
auto broker = TMetadataResponseData::TMetadataResponseBroker{};
broker.NodeId = nodeId;
broker.Host = hostname;
broker.Port = port;
Response->Brokers.emplace_back(std::move(broker));
}
}

void TKafkaMetadataActor::RespondIfRequired(const TActorContext& ctx) {
Expand Down
4 changes: 2 additions & 2 deletions ydb/core/kafka_proxy/actors/kafka_metadata_actor.h
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ class TKafkaMetadataActor: public NActors::TActorBootstrapped<TKafkaMetadataActo

TMetadataResponseData::TPtr Response;
THashMap<TActorId, TVector<ui64>> TopicIndexes;
THashSet<ui64> AllClusterNodes;
THashSet<ui64> AddedNodes;
EKafkaErrors ErrorCode = EKafkaErrors::NONE_ERROR;

TActorId DiscoveryCacheActor;
Expand All @@ -102,7 +102,7 @@ class TKafkaMetadataActor: public NActors::TActorBootstrapped<TKafkaMetadataActo
void Handle(const TEvKafka::TEvResponse::TPtr& ev, const TActorContext& ctx);
void SendCreateTopicsRequest(const TString& topicName, ui32 index, const TActorContext& ctx);

THashMap<ui64, TNodeInfo> Nodes;
TMap<ui64, TNodeInfo> Nodes;
THashMap<TString, TActorId> PartitionActors;
THashSet<ui64> HaveBrokers;

Expand Down
182 changes: 175 additions & 7 deletions ydb/core/kafka_proxy/ut/actors_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

using namespace NKikimr;

static constexpr const ui64 FAKE_SERVERLESS_KAFKA_PROXY_PORT = 19092;

auto UnpackDiscoveryData(const TString& data) {
Ydb::Discovery::ListEndpointsResponse leResponse;
Ydb::Discovery::ListEndpointsResult leResult;
Expand Down Expand Up @@ -71,7 +73,7 @@ struct TMetarequestTestParams {
TString FullTopicName;
};

TMetarequestTestParams SetupServer(const TString shortTopicName) {
TMetarequestTestParams SetupServer(const TString shortTopicName, bool serverless = false) {
TStringBuilder fullTopicName;
fullTopicName << "rt3.dc1--" << shortTopicName;
auto pm = MakeSimpleShared<TPortManager>();
Expand All @@ -80,6 +82,10 @@ TMetarequestTestParams SetupServer(const TString shortTopicName) {
serverSettings.AppConfig->MutableKafkaProxyConfig()->SetEnableKafkaProxy(true);

serverSettings.AppConfig->MutableKafkaProxyConfig()->SetListeningPort(kafkaPort);
if (serverless) {
serverSettings.AppConfig->MutableKafkaProxyConfig()->MutableProxy()->SetHostname("localhost");
serverSettings.AppConfig->MutableKafkaProxyConfig()->MutableProxy()->SetPort(FAKE_SERVERLESS_KAFKA_PROXY_PORT);
}
NPersQueue::TTestServer server(serverSettings, true, {}, NActors::NLog::PRI_INFO, pm);

server.AnnoyingClient->CreateTopic(fullTopicName, 1);
Expand Down Expand Up @@ -219,12 +225,14 @@ namespace NKafka::NTests {
runtime->EnableScheduleForActor(actorId);
}

void CheckKafkaMetaResponse(TTestActorRuntime* runtime, ui64 kafkaPort, bool error = false, ui64 expectedCount = 1) {
void CheckKafkaMetaResponse(TTestActorRuntime* runtime, ui64 kafkaPort, bool error = false,
ui64 expectedTopicCount = 1, ui64 expectedBrokersCount = 1,
const std::vector<ui32>& expectedNodeIds = {}) {
TAutoPtr<IEventHandle> handle;
auto* ev = runtime->GrabEdgeEvent<TEvKafka::TEvResponse>(handle);
UNIT_ASSERT(ev);
auto response = dynamic_cast<TMetadataResponseData*>(ev->Response.get());
UNIT_ASSERT_VALUES_EQUAL(response->Topics.size(), expectedCount);
UNIT_ASSERT_VALUES_EQUAL(response->Topics.size(), expectedTopicCount);
if (!error) {
for (const auto& topic : response->Topics) {
UNIT_ASSERT(topic.ErrorCode == EKafkaErrors::NONE_ERROR);
Expand All @@ -234,9 +242,26 @@ namespace NKafka::NTests {
UNIT_ASSERT(ev->ErrorCode == EKafkaErrors::LISTENER_NOT_FOUND);
return;
}
UNIT_ASSERT_VALUES_EQUAL(response->Brokers.size(), 1);
Cerr << "Broker " << response->Brokers[0].NodeId << " - " << response->Brokers[0].Host << ":" << response->Brokers[0].Port << Endl;
UNIT_ASSERT_VALUES_EQUAL(response->Brokers[0].Port, kafkaPort);
UNIT_ASSERT_VALUES_EQUAL(response->Brokers.size(), expectedBrokersCount);
for (size_t i = 0; i < response->Topics[0].Partitions.size(); i++) {
i64 lastNodeId = -1;
for (const auto& nodeId : response->Topics[0].Partitions[i].ReplicaNodes) {
if (lastNodeId != -1) {
UNIT_ASSERT(lastNodeId < nodeId);
}
lastNodeId = nodeId;
}
}
if (!expectedNodeIds.empty()) {
UNIT_ASSERT_VALUES_EQUAL(response->Topics[0].Partitions[0].ReplicaNodes.size(), expectedNodeIds.size());
for (size_t i = 0; i < response->Topics[0].Partitions[0].ReplicaNodes.size(); ++i) {
UNIT_ASSERT_VALUES_EQUAL(response->Topics[0].Partitions[0].ReplicaNodes[i], expectedNodeIds[i]);
}
}
if (expectedBrokersCount == 1) {
Cerr << "Broker " << response->Brokers[0].NodeId << " - " << response->Brokers[0].Host << ":" << response->Brokers[0].Port << Endl;
UNIT_ASSERT_VALUES_EQUAL(response->Brokers[0].Port, kafkaPort);
}
}

Y_UNIT_TEST(MetadataActorGetsEndpoint) {
Expand Down Expand Up @@ -271,7 +296,7 @@ namespace NKafka::NTests {
CreateMetarequestActor(edge, {NKikimr::JoinPath({"/Root/PQ/", topicName})}, runtime,
config, fakeCache);

CheckKafkaMetaResponse(runtime, kafkaPort);
CheckKafkaMetaResponse(runtime, kafkaPort, false, 1, 3);
}

Y_UNIT_TEST(DiscoveryResponsesWithError) {
Expand Down Expand Up @@ -308,6 +333,149 @@ namespace NKafka::NTests {
CheckKafkaMetaResponse(runtime, 12345);
}

Y_UNIT_TEST(TopicMetadataOnlyThreeReplicaNodesReturnedFromMany) {
auto [server, kafkaPort, config, topicName] = SetupServer("topic1");

auto* runtime = server.GetRuntime();
auto edge = runtime->AllocateEdgeActor();
TVector<ui32> allNodesIds = {0, runtime->GetNodeId(0), 10000, 10001, 10002};
Ydb::Discovery::ListEndpointsResult leResult;
auto* ep = leResult.add_endpoints();
for (size_t i = 0; i < allNodesIds.size(); i++) {
ep->set_address("localhost");
ep->set_port(i + 1);
ep->set_node_id(allNodesIds[i]);
if (i != allNodesIds.size() - 1) {
ep = leResult.add_endpoints();
}
}

auto fakeCache = runtime->Register(new TFakeDiscoveryCache(leResult, false));
runtime->EnableScheduleForActor(fakeCache);
CreateMetarequestActor(edge, {NKikimr::JoinPath({"/Root/PQ/", topicName})}, runtime,
config, fakeCache);

std::vector<ui32> expectedNodeIds = {runtime->GetNodeId(0), 10000, 10001};
CheckKafkaMetaResponse(runtime, 12345, false, 1, 3, expectedNodeIds);
}

Y_UNIT_TEST(TopicMetadataOnlyTwoReplicaNodeReturned) {
auto [server, kafkaPort, config, topicName] = SetupServer("topic1");

auto* runtime = server.GetRuntime();
auto edge = runtime->AllocateEdgeActor();

Ydb::Discovery::ListEndpointsResult leResult;
auto* ep = leResult.add_endpoints();
ep->set_address("localhost");
ep->set_port(12345);
ep->set_node_id(10000);

auto fakeCache = runtime->Register(new TFakeDiscoveryCache(leResult, false));
runtime->EnableScheduleForActor(fakeCache);
CreateMetarequestActor(edge, {NKikimr::JoinPath({"/Root/PQ/", topicName})}, runtime,
config, fakeCache);

std::vector<ui32> expectedNodeIds = {runtime->GetNodeId(0), 10000};
CheckKafkaMetaResponse(runtime, 12345, false, 1, 2, expectedNodeIds);
}

Y_UNIT_TEST(TopicMetadataOnlyOneReplicaNodeReturned) {
auto [server, kafkaPort, config, topicName] = SetupServer("topic1");

auto* runtime = server.GetRuntime();
auto edge = runtime->AllocateEdgeActor();

Ydb::Discovery::ListEndpointsResult leResult;

auto fakeCache = runtime->Register(new TFakeDiscoveryCache(leResult, false));
runtime->EnableScheduleForActor(fakeCache);
CreateMetarequestActor(edge, {NKikimr::JoinPath({"/Root/PQ/", topicName})}, runtime,
config, fakeCache);

std::vector<ui32> expectedNodeIds = {runtime->GetNodeId(0)};
CheckKafkaMetaResponse(runtime, kafkaPort, false, 1, 1, expectedNodeIds);
}

Y_UNIT_TEST(TopicMetadataNodesCorrectOrderReturned) {
auto [server, kafkaPort, config, topicName] = SetupServer("topic1");

auto* runtime = server.GetRuntime();
auto edge = runtime->AllocateEdgeActor();

Ydb::Discovery::ListEndpointsResult leResult;
auto* ep = leResult.add_endpoints();
ep->set_address("localhost");
ep->set_port(12345);
ep->set_node_id(0);

ep = leResult.add_endpoints();
ep->set_address("localhost");
ep->set_port(1999);
ep->set_node_id(10000);

auto fakeCache = runtime->Register(new TFakeDiscoveryCache(leResult, false));
runtime->EnableScheduleForActor(fakeCache);
CreateMetarequestActor(edge, {NKikimr::JoinPath({"/Root/PQ/", topicName})}, runtime,
config, fakeCache);

std::vector<ui32> expectedNodeIds = {0, runtime->GetNodeId(0), 10000};
std::sort(expectedNodeIds.begin(), expectedNodeIds.end());
CheckKafkaMetaResponse(runtime, 12345, false, 1, 3, expectedNodeIds);
}

Y_UNIT_TEST(TopicMetadataTwoNodesCheckReverseOrder) {
auto [server, kafkaPort, config, topicName] = SetupServer("topic1");

auto* runtime = server.GetRuntime();
auto edge = runtime->AllocateEdgeActor();

Ydb::Discovery::ListEndpointsResult leResult;
auto* ep = leResult.add_endpoints();
ep->set_address("localhost");
ep->set_port(12345);
ep->set_node_id(0);

auto fakeCache = runtime->Register(new TFakeDiscoveryCache(leResult, false));
runtime->EnableScheduleForActor(fakeCache);
CreateMetarequestActor(edge, {NKikimr::JoinPath({"/Root/PQ/", topicName})}, runtime,
config, fakeCache);

std::vector<ui32> expectedNodeIds = {0, runtime->GetNodeId(0)};
CheckKafkaMetaResponse(runtime, 12345, false, 1, 2, expectedNodeIds);
}

Y_UNIT_TEST(TopicMetadataServerlessReturnsSingleNode) {
auto [server, kafkaPort, config, topicName] = SetupServer("topic1", true);

auto* runtime = server.GetRuntime();
auto edge = runtime->AllocateEdgeActor();

Ydb::Discovery::ListEndpointsResult leResult;
auto* ep = leResult.add_endpoints();
ep->set_address("localhost");
ep->set_port(12345);
ep->set_node_id(0);

ep = leResult.add_endpoints();
ep->set_address("localhost");
ep->set_port(111);
ep->set_node_id(10000);

ep = leResult.add_endpoints();
ep->set_address("localhost");
ep->set_port(222);
ep->set_node_id(100);

auto fakeCache = runtime->Register(new TFakeDiscoveryCache(leResult, false));
runtime->EnableScheduleForActor(fakeCache);
CreateMetarequestActor(edge, {NKikimr::JoinPath({"/Root/PQ/", topicName})}, runtime,
config, fakeCache);

std::vector<ui32> expectedNodeIds = {NKafka::ProxyNodeId};
CheckKafkaMetaResponse(runtime, FAKE_SERVERLESS_KAFKA_PROXY_PORT, false, 1, 1, expectedNodeIds);
}


Y_UNIT_TEST(MetadataActorDoubleTopic) {
auto [server, kafkaPort, config, topicName] = SetupServer("topic1");
Expand Down
7 changes: 2 additions & 5 deletions ydb/core/kafka_proxy/ut/kafka_test_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,16 +34,13 @@ TMessagePtr<TApiVersionsResponseData> TKafkaTestClient::ApiVersions(bool silent)
// YDB ignores AllowAutoTopicCreation, i.e. it never creates a new topic implicitly.
// But in Apache Kafka the default behavior is to create a new topic, if there is no one at the moment of the request.
// With this flag, allowAutoTopicCreation, you can stop this behavior in Apache Kafka.
TMessagePtr<TMetadataResponseData> TKafkaTestClient::Metadata(const TVector<TString>& topics, std::optional<bool> allowAutoTopicCreation) {
TMessagePtr<TMetadataResponseData> TKafkaTestClient::Metadata(const TVector<TString>& topics, bool allowAutoTopicCreation) {
Cerr << ">>>>> MetadataRequest\n";

TRequestHeaderData header = Header(NKafka::EApiKey::METADATA, 12);

TMetadataRequestData request;
if (allowAutoTopicCreation.has_value()) {
// If allowAutoTopicCreation does not have a value, use the default value (= true).
request.AllowAutoTopicCreation = allowAutoTopicCreation.value() ? 1 : 0;
}
request.AllowAutoTopicCreation = allowAutoTopicCreation;
request.Topics.reserve(topics.size());
for (auto topicName : topics) {
NKafka::TMetadataRequestData::TMetadataRequestTopic topic;
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/kafka_proxy/ut/kafka_test_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ class TKafkaTestClient {

TMessagePtr<TApiVersionsResponseData> ApiVersions(bool silent = false);

TMessagePtr<TMetadataResponseData> Metadata(const TVector<TString>& topics = {}, std::optional<bool> allowAutoTopicCreation = std::nullopt);
TMessagePtr<TMetadataResponseData> Metadata(const TVector<TString>& topics = {}, bool allowAutoTopicCreation = true);

TMessagePtr<TSaslHandshakeResponseData> SaslHandshake(const TString& mechanism = "PLAIN");

Expand Down
Loading