Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
244 changes: 209 additions & 35 deletions ydb/core/config/init/init.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#include "init_impl.h"
#include "mock.h"
#include <ydb/library/yaml_json/yaml_to_json.h>
#include <ydb/core/util/backoff.h>

namespace NKikimr::NConfig {

Expand Down Expand Up @@ -202,22 +203,7 @@ class TDefaultNodeBrokerClient
const TString& nodeRegistrationToken,
const IEnv& env)
{
TCommandConfig::TServerEndpoint endpoint = TCommandConfig::ParseServerAddress(addr);
NYdb::TDriverConfig config;
if (endpoint.EnableSsl.Defined() && endpoint.EnableSsl.GetRef()) {
if (grpcSettings.PathToGrpcCaFile) {
config.UseSecureConnection(env.ReadFromFile(grpcSettings.PathToGrpcCaFile, "CA certificates").c_str());
}
if (grpcSettings.PathToGrpcCertFile && grpcSettings.PathToGrpcPrivateKeyFile) {
auto certificate = env.ReadFromFile(grpcSettings.PathToGrpcCertFile, "Client certificates");
auto privateKey = env.ReadFromFile(grpcSettings.PathToGrpcPrivateKeyFile, "Client certificates key");
config.UseClientCertificate(certificate.c_str(), privateKey.c_str());
}
}
if (nodeRegistrationToken) {
config.SetAuthToken(nodeRegistrationToken);
}
config.SetEndpoint(endpoint.Address);
NYdb::TDriverConfig config = CreateDriverConfig(grpcSettings, addr, env, nodeRegistrationToken);
auto connection = NYdb::TDriver(config);

auto client = NYdb::NDiscovery::TDiscoveryClient(connection);
Expand Down Expand Up @@ -339,6 +325,73 @@ class TDynConfigResultWrapper
}
};

namespace {

struct RetryResult {
bool Success;
int TotalAttempts;
int Rounds;
};

template <typename TAttemptFn>
RetryResult RetryWithJitter(
const TVector<TString>& addrs,
const IEnv& env,
TAttemptFn attempt)
{
const int maxRounds = 10;
const TDuration baseRoundDelay = TDuration::MilliSeconds(500);
const TDuration maxIntraAddrDelay = TDuration::Minutes(3);
const TDuration maxDelay = TDuration::Minutes(5);
const TDuration baseAddressDelay = TDuration::MilliSeconds(250);

auto sleepWithJitteredExponentialDelay = [&env](TDuration baseDelay, TDuration maxDelay, int exponent) {
ui64 multiplier = 1ULL << exponent;
TDuration delay = baseDelay * multiplier;
delay = Min(delay, maxDelay);

ui64 maxMs = delay.MilliSeconds();
ui64 jitteredMs = RandomNumber<ui64>(maxMs + 1);
TDuration jitteredDelay = TDuration::MilliSeconds(jitteredMs);

env.Sleep(jitteredDelay);
};

int round = 0;
int totalAttempts = 0;
bool success = false;

while (!success && round < maxRounds) {
int addressIndex = 0;
for (const auto& addr : addrs) {
success = attempt(addr);
++totalAttempts;

if (success) {
break;
}

// Exponential delay between individual addresses - delay grows with each address in the round
if (addrs.size() > 1) {
sleepWithJitteredExponentialDelay(baseAddressDelay, maxIntraAddrDelay, Max(addressIndex, round));
}

++addressIndex;
}

if (!success) {
++round;

if (round < maxRounds) {
sleepWithJitteredExponentialDelay(baseRoundDelay, maxDelay, round - 1);
}
}
}
return {success, totalAttempts, round};
}

} // namespace

class TDefaultDynConfigClient
: public IDynConfigClient
{
Expand Down Expand Up @@ -390,36 +443,128 @@ class TDefaultDynConfigClient
IInitLogger& logger) const override
{
std::shared_ptr<IConfigurationResult> res;
bool success = false;
TString error;

SetRandomSeed(TInstant::Now().MicroSeconds());
int minAttempts = 10;
int attempts = 0;
while (!success && attempts < minAttempts) {
for (auto addr : addrs) {
success = TryToLoadConfigForDynamicNodeFromCMS(grpcSettings, addr, settings, env, logger, res, error);
++attempts;
if (success) {
break;
}
}
// Randomized backoff
if (!success) {
env.Sleep(TDuration::MilliSeconds(500 + RandomNumber<ui64>(1000)));
} else {
break;

auto attempt = [&](const TString& addr) {
logger.Out() << "Trying to get dynamic config from " << addr << Endl;
bool success = TryToLoadConfigForDynamicNodeFromCMS(grpcSettings, addr, settings, env, logger, res, error);
if (success) {
logger.Out() << "Success. Got dynamic config from " << addr << Endl;
}
}
return success;
};

if (!success) {
logger.Err() << "WARNING: couldn't load config from CMS: " << error << Endl;
const auto result = RetryWithJitter(addrs, env, attempt);

if (!result.Success) {
logger.Err() << "WARNING: couldn't load config from Console after "
<< result.TotalAttempts << " attempts across " << result.Rounds
<< " rounds: " << error << Endl;
}

return res;
}
};

class TConfigResultWrapper
: public IStorageConfigResult
{
public:
TConfigResultWrapper(const NYdb::NConfig::TFetchConfigResult& result) {
TString clusterConfig;
TString storageConfig;
for (const auto& entry : result.GetConfigs()) {
std::visit([&](auto&& arg) {
using T = std::decay_t<decltype(arg)>;
if constexpr (std::is_same_v<T, NYdb::NConfig::TMainConfigIdentity>) {
MainYamlConfig = entry.Config;
} else if constexpr (std::is_same_v<T, NYdb::NConfig::TStorageConfigIdentity>) {
StorageYamlConfig = entry.Config;
}
}, entry.Identity);
}
}

const TString& GetMainYamlConfig() const override {
return MainYamlConfig;
}

const TString& GetStorageYamlConfig() const override {
return StorageYamlConfig;
}

private:
TString MainYamlConfig;
TString StorageYamlConfig;
};

class TDefaultConfigClient
: public IConfigClient
{
private:
static NYdb::NConfig::TFetchConfigResult TryToFetchConfig(
const TGrpcSslSettings& grpcSettings,
const TString& addrs,
const IEnv& env)
{

NYdb::TDriverConfig config = CreateDriverConfig(grpcSettings, addrs, env);

auto connection = NYdb::TDriver(config);

auto client = NYdb::NConfig::TConfigClient(connection);
NYdb::NConfig::TFetchConfigResult result = client.FetchAllConfigs().GetValueSync();
connection.Stop(true);
return result;
}

static NYdb::NConfig::TFetchConfigResult FetchConfigImpl(
const TGrpcSslSettings& grpcSettings,
const TVector<TString>& addrs,
const IEnv& env,
IInitLogger& logger)
{
std::optional<NYdb::NConfig::TFetchConfigResult> result;
SetRandomSeed(TInstant::Now().MicroSeconds());

auto attempt = [&](const TString& addr) {
logger.Out() << "Trying to fetch config from " << addr << Endl;
result = TryToFetchConfig(grpcSettings, addr, env);
if (result->IsSuccess()) {
logger.Out() << "Success. Fetched config from " << addr << Endl;
return true;
}
logger.Err() << "Fetch config error: " << static_cast<NYdb::TStatus>(*result) << Endl;
return false;
};

const auto retryResult = RetryWithJitter(addrs, env, attempt);

if (!retryResult.Success) {
logger.Err() << "WARNING: couldn't fetch config from Console after "
<< retryResult.TotalAttempts << " attempts across " << retryResult.Rounds
<< " rounds. Last error: " << static_cast<NYdb::TStatus>(*result) << Endl;
}
return *result;
}

public:
std::shared_ptr<IStorageConfigResult> FetchConfig(
const TGrpcSslSettings& grpcSettings,
const TVector<TString>& addrs,
const IEnv& env,
IInitLogger& logger) const override
{
auto result = FetchConfigImpl(grpcSettings, addrs, env, logger);
if (!result.IsSuccess()) {
return nullptr;
}
return std::make_shared<TConfigResultWrapper>(std::move(result));
}
};

class TDefaultInitLogger
: public IInitLogger
{
Expand Down Expand Up @@ -461,6 +606,10 @@ std::unique_ptr<IDynConfigClient> MakeDefaultDynConfigClient() {
return std::make_unique<TDefaultDynConfigClient>();
}

std::unique_ptr<IConfigClient> MakeDefaultConfigClient() {
return std::make_unique<TDefaultConfigClient>();
}

std::unique_ptr<IInitLogger> MakeDefaultInitLogger() {
return std::make_unique<TDefaultInitLogger>();
}
Expand Down Expand Up @@ -770,6 +919,27 @@ NKikimrConfig::TAppConfig GetActualDynConfig(
return regularConfig;
}

NYdb::TDriverConfig CreateDriverConfig(const TGrpcSslSettings& grpcSettings, const TString& addr, const IEnv& env, const std::optional<TString>& authToken) {
TCommandConfig::TServerEndpoint endpoint = TCommandConfig::ParseServerAddress(addr);
NYdb::TDriverConfig config;
if (endpoint.EnableSsl.Defined() && endpoint.EnableSsl.GetRef()) {
if (grpcSettings.PathToGrpcCaFile) {
config.UseSecureConnection(env.ReadFromFile(grpcSettings.PathToGrpcCaFile, "CA certificates").c_str());
}
if (grpcSettings.PathToGrpcCertFile && grpcSettings.PathToGrpcPrivateKeyFile) {
auto certificate = env.ReadFromFile(grpcSettings.PathToGrpcCertFile, "Client certificates");
auto privateKey = env.ReadFromFile(grpcSettings.PathToGrpcPrivateKeyFile, "Client certificates key");
config.UseClientCertificate(certificate.c_str(), privateKey.c_str());
}
}
if (authToken) {
config.SetAuthToken(authToken.value());
}
config.SetEndpoint(endpoint.Address);

return config;
}

std::unique_ptr<IInitialConfigurator> MakeDefaultInitialConfigurator(TInitialConfiguratorDependencies deps) {
return std::make_unique<TInitialConfiguratorImpl>(deps);
}
Expand All @@ -781,13 +951,15 @@ class TInitialConfiguratorDepsRecorder
TProtoConfigFileProviderRecorder ProtoConfigFileProvider;
TNodeBrokerClientRecorder NodeBrokerClient;
TDynConfigClientRecorder DynConfigClient;
TConfigClientRecorder ConfigClient;
TEnvRecorder Env;
public:
TInitialConfiguratorDepsRecorder(TInitialConfiguratorDependencies deps)
: Impls(deps)
, ProtoConfigFileProvider(deps.ProtoConfigFileProvider)
, NodeBrokerClient(deps.NodeBrokerClient)
, DynConfigClient(deps.DynConfigClient)
, ConfigClient(deps.ConfigClient)
, Env(deps.Env)
{}

Expand All @@ -799,6 +971,7 @@ class TInitialConfiguratorDepsRecorder
.MemLogInit = Impls.MemLogInit,
.NodeBrokerClient = NodeBrokerClient,
.DynConfigClient = DynConfigClient,
.ConfigClient = ConfigClient,
.Env = Env,
.Logger = Impls.Logger,
};
Expand All @@ -812,6 +985,7 @@ class TInitialConfiguratorDepsRecorder
.MemLogInit = MakeNoopMemLogInitializer(),
.NodeBrokerClient = std::make_unique<TNodeBrokerClientMock>(NodeBrokerClient.GetMock()),
.DynConfigClient = std::make_unique<TDynConfigClientMock>(DynConfigClient.GetMock()),
.ConfigClient = std::make_unique<TConfigClientMock>(ConfigClient.GetMock()),
.Env = std::make_unique<TEnvMock>(Env.GetMock()),
.Logger = MakeNoopInitLogger(),
};
Expand Down
24 changes: 24 additions & 0 deletions ydb/core/config/init/init.h
Original file line number Diff line number Diff line change
Expand Up @@ -177,13 +177,33 @@ class IDynConfigClient {

// ===

class IStorageConfigResult {
public:
virtual ~IStorageConfigResult() {}
virtual const TString& GetMainYamlConfig() const = 0;
virtual const TString& GetStorageYamlConfig() const = 0;
};

class IConfigClient {
public:
virtual ~IConfigClient() {}
virtual std::shared_ptr<IStorageConfigResult> FetchConfig(
const TGrpcSslSettings& grpcSettings,
const TVector<TString>& addrs,
const IEnv& env,
IInitLogger& logger) const = 0;
};

// ===

struct TInitialConfiguratorDependencies {
NConfig::IErrorCollector& ErrorCollector;
NConfig::IProtoConfigFileProvider& ProtoConfigFileProvider;
NConfig::IConfigUpdateTracer& ConfigUpdateTracer;
NConfig::IMemLogInitializer& MemLogInit;
NConfig::INodeBrokerClient& NodeBrokerClient;
NConfig::IDynConfigClient& DynConfigClient;
NConfig::IConfigClient& ConfigClient;
NConfig::IEnv& Env;
NConfig::IInitLogger& Logger;
};
Expand All @@ -197,6 +217,7 @@ struct TRecordedInitialConfiguratorDeps {
*MemLogInit,
*NodeBrokerClient,
*DynConfigClient,
*ConfigClient,
*Env,
*Logger
};
Expand All @@ -208,6 +229,7 @@ struct TRecordedInitialConfiguratorDeps {
std::unique_ptr<NConfig::IMemLogInitializer> MemLogInit;
std::unique_ptr<NConfig::INodeBrokerClient> NodeBrokerClient;
std::unique_ptr<NConfig::IDynConfigClient> DynConfigClient;
std::unique_ptr<NConfig::IConfigClient> ConfigClient;
std::unique_ptr<NConfig::IEnv> Env;
std::unique_ptr<NConfig::IInitLogger> Logger;
};
Expand Down Expand Up @@ -260,11 +282,13 @@ std::unique_ptr<IErrorCollector> MakeDefaultErrorCollector();
std::unique_ptr<IMemLogInitializer> MakeDefaultMemLogInitializer();
std::unique_ptr<INodeBrokerClient> MakeDefaultNodeBrokerClient();
std::unique_ptr<IDynConfigClient> MakeDefaultDynConfigClient();
std::unique_ptr<IConfigClient> MakeDefaultConfigClient();
std::unique_ptr<IInitLogger> MakeDefaultInitLogger();

std::unique_ptr<IMemLogInitializer> MakeNoopMemLogInitializer();
std::unique_ptr<INodeBrokerClient> MakeNoopNodeBrokerClient();
std::unique_ptr<IDynConfigClient> MakeNoopDynConfigClient();
std::unique_ptr<IConfigClient> MakeNoopConfigClient();
std::unique_ptr<IInitLogger> MakeNoopInitLogger();

void AddProtoConfigOptions(IProtoConfigFileProvider& out);
Expand Down
Loading
Loading