Skip to content

25.6.5 Antalya port - Alternative syntax for cluster functions #949

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 19 commits into
base: antalya-25.6.5
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions src/Analyzer/FunctionSecretArgumentsFinderTreeNode.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,14 @@ class FunctionTreeNodeImpl : public AbstractFunction
{
public:
explicit ArgumentsTreeNode(const QueryTreeNodes * arguments_) : arguments(arguments_) {}
size_t size() const override { return arguments ? arguments->size() : 0; }
std::unique_ptr<Argument> at(size_t n) const override { return std::make_unique<ArgumentTreeNode>(arguments->at(n).get()); }
size_t size() const override
{ /// size withous skipped indexes
return arguments ? arguments->size() - skippedSize() : 0;
}
std::unique_ptr<Argument> at(size_t n) const override
{ /// n is relative index, some can be skipped
return std::make_unique<ArgumentTreeNode>(arguments->at(getRealIndex(n)).get());
}
private:
const QueryTreeNodes * arguments = nullptr;
};
Expand Down
9 changes: 9 additions & 0 deletions src/Core/Settings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6841,6 +6841,15 @@ Enable PRQL - an alternative to SQL.
)", EXPERIMENTAL) \
DECLARE(Bool, enable_adaptive_memory_spill_scheduler, false, R"(
Trigger processor to spill data into external storage adpatively. grace join is supported at present.
)", EXPERIMENTAL) \
DECLARE(String, object_storage_cluster, "", R"(
Cluster to make distributed requests to object storages with alternative syntax.
)", EXPERIMENTAL) \
DECLARE(UInt64, object_storage_max_nodes, 0, R"(
Limit for hosts used for request in object storage cluster table functions - azureBlobStorageCluster, s3Cluster, hdfsCluster, etc.
Possible values:
- Positive integer.
- 0 — All hosts in cluster.
)", EXPERIMENTAL) \
DECLARE(Bool, allow_experimental_delta_kernel_rs, true, R"(
Allow experimental delta-kernel-rs implementation.
Expand Down
6 changes: 6 additions & 0 deletions src/Core/SettingsChangesHistory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,12 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory()
/// controls new feature and it's 'true' by default, use 'false' as previous_value).
/// It's used to implement `compatibility` setting (see https://github.com/ClickHouse/ClickHouse/issues/35972)
/// Note: please check if the key already exists to prevent duplicate entries.
addSettingsChanges(settings_changes_history, "25.6.2.20000",
{
// Altinity Antalya modifications atop of 25.6
{"object_storage_cluster", "", "", "New setting"},
{"object_storage_max_nodes", 0, 0, "New setting"},
});
addSettingsChanges(settings_changes_history, "25.6",
{
{"output_format_native_use_flattened_dynamic_and_json_serialization", false, false, "Add flattened Dynamic/JSON serializations to Native format"},
Expand Down
17 changes: 10 additions & 7 deletions src/Databases/DataLake/DatabaseDataLake.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include <Storages/ConstraintsDescription.h>
#include <Storages/StorageNull.h>
#include <Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h>
#include <Storages/ObjectStorage/StorageObjectStorageCluster.h>

#include <Interpreters/evaluateConstantExpression.h>
#include <Interpreters/Context.h>
Expand All @@ -43,6 +44,7 @@ namespace DatabaseDataLakeSetting
extern const DatabaseDataLakeSettingsString storage_endpoint;
extern const DatabaseDataLakeSettingsString oauth_server_uri;
extern const DatabaseDataLakeSettingsBool vended_credentials;
extern const DatabaseDataLakeSettingsString object_storage_cluster;


extern const DatabaseDataLakeSettingsString aws_access_key_id;
Expand Down Expand Up @@ -428,21 +430,22 @@ StoragePtr DatabaseDataLake::tryGetTableImpl(const String & name, ContextPtr con

/// with_table_structure = false: because there will be
/// no table structure in table definition AST.
StorageObjectStorage::Configuration::initialize(*configuration, args, context_copy, /* with_table_structure */false);
configuration->initialize(args, context_copy, /* with_table_structure */false);

return std::make_shared<StorageObjectStorage>(
auto cluster_name = settings[DatabaseDataLakeSetting::object_storage_cluster].value;

return std::make_shared<StorageObjectStorageCluster>(
cluster_name,
configuration,
configuration->createObjectStorage(context_copy, /* is_readonly */ false),
context_copy,
StorageID(getDatabaseName(), name),
/* columns */columns,
/* constraints */ConstraintsDescription{},
/* partition_by */nullptr,
context_copy,
/* comment */"",
getFormatSettings(context_copy),
LoadingStrictnessLevel::CREATE,
/* distributed_processing */false,
/* partition_by */nullptr,
/* is_table_function */false,
/* lazy_init */true);
}

Expand Down Expand Up @@ -477,7 +480,7 @@ DatabaseTablesIteratorPtr DatabaseDataLake::getLightweightTablesIterator(
const FilterByNameFunction & filter_by_table_name,
bool skip_not_loaded) const
{
Tables tables;
Tables tables;
auto catalog = getCatalog();
const auto iceberg_tables = catalog->getTables();

Expand Down
60 changes: 42 additions & 18 deletions src/Disks/DiskType.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ namespace ErrorCodes
extern const int UNKNOWN_ELEMENT_IN_CONFIG;
}

MetadataStorageType metadataTypeFromString(const String & type)
MetadataStorageType metadataTypeFromString(const std::string & type)
{
auto check_type = Poco::toLower(type);
if (check_type == "local")
Expand Down Expand Up @@ -53,23 +53,47 @@ std::string DataSourceDescription::toString() const
case DataSourceType::RAM:
return "memory";
case DataSourceType::ObjectStorage:
{
switch (object_storage_type)
{
case ObjectStorageType::S3:
return "s3";
case ObjectStorageType::HDFS:
return "hdfs";
case ObjectStorageType::Azure:
return "azure_blob_storage";
case ObjectStorageType::Local:
return "local_blob_storage";
case ObjectStorageType::Web:
return "web";
case ObjectStorageType::None:
return "none";
}
}
return DB::toString(object_storage_type);
}
}

ObjectStorageType objectStorageTypeFromString(const std::string & type)
{
auto check_type = Poco::toLower(type);
if (check_type == "s3")
return ObjectStorageType::S3;
if (check_type == "hdfs")
return ObjectStorageType::HDFS;
if (check_type == "azure_blob_storage" || check_type == "azure")
return ObjectStorageType::Azure;
if (check_type == "local_blob_storage" || check_type == "local")
return ObjectStorageType::Local;
if (check_type == "web")
return ObjectStorageType::Web;
if (check_type == "none")
return ObjectStorageType::None;

throw Exception(ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG,
"Unknown object storage type: {}", type);
}

std::string toString(ObjectStorageType type)
{
switch (type)
{
case ObjectStorageType::S3:
return "s3";
case ObjectStorageType::HDFS:
return "hdfs";
case ObjectStorageType::Azure:
return "azure_blob_storage";
case ObjectStorageType::Local:
return "local_blob_storage";
case ObjectStorageType::Web:
return "web";
case ObjectStorageType::None:
return "none";
}
}

}
6 changes: 4 additions & 2 deletions src/Disks/DiskType.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,10 @@ enum class MetadataStorageType : uint8_t
Memory,
};

MetadataStorageType metadataTypeFromString(const String & type);
String toString(DataSourceType data_source_type);
MetadataStorageType metadataTypeFromString(const std::string & type);

ObjectStorageType objectStorageTypeFromString(const std::string & type);
std::string toString(ObjectStorageType type);

struct DataSourceDescription
{
Expand Down
37 changes: 31 additions & 6 deletions src/Interpreters/Cluster.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -732,9 +732,9 @@ void Cluster::initMisc()
}
}

std::unique_ptr<Cluster> Cluster::getClusterWithReplicasAsShards(const Settings & settings, size_t max_replicas_from_shard) const
std::unique_ptr<Cluster> Cluster::getClusterWithReplicasAsShards(const Settings & settings, size_t max_replicas_from_shard, size_t max_hosts) const
{
return std::unique_ptr<Cluster>{ new Cluster(ReplicasAsShardsTag{}, *this, settings, max_replicas_from_shard)};
return std::unique_ptr<Cluster>{ new Cluster(ReplicasAsShardsTag{}, *this, settings, max_replicas_from_shard, max_hosts)};
}

std::unique_ptr<Cluster> Cluster::getClusterWithSingleShard(size_t index) const
Expand Down Expand Up @@ -783,7 +783,7 @@ void shuffleReplicas(std::vector<Cluster::Address> & replicas, const Settings &

}

Cluster::Cluster(Cluster::ReplicasAsShardsTag, const Cluster & from, const Settings & settings, size_t max_replicas_from_shard)
Cluster::Cluster(Cluster::ReplicasAsShardsTag, const Cluster & from, const Settings & settings, size_t max_replicas_from_shard, size_t max_hosts)
{
if (from.addresses_with_failover.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cluster is empty");
Expand All @@ -805,6 +805,7 @@ Cluster::Cluster(Cluster::ReplicasAsShardsTag, const Cluster & from, const Setti

if (address.is_local)
info.local_addresses.push_back(address);
addresses_with_failover.emplace_back(Addresses({address}));

auto pool = ConnectionPoolFactory::instance().get(
static_cast<unsigned>(settings[Setting::distributed_connections_pool_size]),
Expand All @@ -828,9 +829,6 @@ Cluster::Cluster(Cluster::ReplicasAsShardsTag, const Cluster & from, const Setti
info.per_replica_pools = {std::move(pool)};
info.default_database = address.default_database;

addresses_with_failover.emplace_back(Addresses{address});

slot_to_shard.insert(std::end(slot_to_shard), info.weight, shards_info.size());
shards_info.emplace_back(std::move(info));
}
};
Expand All @@ -852,10 +850,37 @@ Cluster::Cluster(Cluster::ReplicasAsShardsTag, const Cluster & from, const Setti
secret = from.secret;
name = from.name;

constrainShardInfoAndAddressesToMaxHosts(max_hosts);

for (size_t i = 0; i < shards_info.size(); ++i)
slot_to_shard.insert(std::end(slot_to_shard), shards_info[i].weight, i);

initMisc();
}


void Cluster::constrainShardInfoAndAddressesToMaxHosts(size_t max_hosts)
{
if (max_hosts == 0 || shards_info.size() <= max_hosts)
return;

pcg64_fast gen{randomSeed()};
std::shuffle(shards_info.begin(), shards_info.end(), gen);
shards_info.resize(max_hosts);

AddressesWithFailover addresses_with_failover_;

UInt32 shard_num = 0;
for (auto & shard_info : shards_info)
{
addresses_with_failover_.push_back(addresses_with_failover[shard_info.shard_num - 1]);
shard_info.shard_num = ++shard_num;
}

addresses_with_failover.swap(addresses_with_failover_);
}


Cluster::Cluster(Cluster::SubclusterTag, const Cluster & from, const std::vector<size_t> & indices)
{
for (size_t index : indices)
Expand Down
7 changes: 5 additions & 2 deletions src/Interpreters/Cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ class Cluster
std::unique_ptr<Cluster> getClusterWithMultipleShards(const std::vector<size_t> & indices) const;

/// Get a new Cluster that contains all servers (all shards with all replicas) from existing cluster as independent shards.
std::unique_ptr<Cluster> getClusterWithReplicasAsShards(const Settings & settings, size_t max_replicas_from_shard = 0) const;
std::unique_ptr<Cluster> getClusterWithReplicasAsShards(const Settings & settings, size_t max_replicas_from_shard = 0, size_t max_hosts = 0) const;

/// Returns false if cluster configuration doesn't allow to use it for cross-replication.
/// NOTE: true does not mean, that it's actually a cross-replication cluster.
Expand All @@ -296,7 +296,7 @@ class Cluster

/// For getClusterWithReplicasAsShards implementation
struct ReplicasAsShardsTag {};
Cluster(ReplicasAsShardsTag, const Cluster & from, const Settings & settings, size_t max_replicas_from_shard);
Cluster(ReplicasAsShardsTag, const Cluster & from, const Settings & settings, size_t max_replicas_from_shard, size_t max_hosts);

void addShard(
const Settings & settings,
Expand All @@ -308,6 +308,9 @@ class Cluster
ShardInfoInsertPathForInternalReplication insert_paths = {},
bool internal_replication = false);

/// Reduce size of cluster to max_hosts
void constrainShardInfoAndAddressesToMaxHosts(size_t max_hosts);

/// Inter-server secret
String secret;

Expand Down
3 changes: 1 addition & 2 deletions src/Interpreters/InterpreterCreateQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1949,8 +1949,7 @@ bool InterpreterCreateQuery::doCreateTable(ASTCreateQuery & create,
auto table_function_ast = create.as_table_function->ptr();
auto table_function = TableFunctionFactory::instance().get(table_function_ast, getContext());

if (!table_function->canBeUsedToCreateTable())
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Table function '{}' cannot be used to create a table", table_function->getName());
table_function->validateUseToCreateTable();

/// In case of CREATE AS table_function() query we should use global context
/// in storage creation because there will be no query context on server startup
Expand Down
Loading
Loading