diff --git a/src/Core/Settings.cpp b/src/Core/Settings.cpp index 77db44fe35d7..087739f7cc51 100644 --- a/src/Core/Settings.cpp +++ b/src/Core/Settings.cpp @@ -6860,6 +6860,9 @@ Default number of tasks for parallel reading in distributed query. Tasks are spr DECLARE(Bool, distributed_plan_optimize_exchanges, true, R"( Removes unnecessary exchanges in distributed query plan. Disable it for debugging. )", 0) \ + DECLARE(UInt64, lock_object_storage_task_distribution_ms, 0, R"( +In object storage distribution queries do not distibute tasks on non-prefetched nodes until prefetched node is active. +)", EXPERIMENTAL) \ DECLARE(String, distributed_plan_force_exchange_kind, "", R"( Force specified kind of Exchange operators between distributed query stages. diff --git a/src/Core/SettingsChangesHistory.cpp b/src/Core/SettingsChangesHistory.cpp index 6e6bfe580117..b3c5dce8751b 100644 --- a/src/Core/SettingsChangesHistory.cpp +++ b/src/Core/SettingsChangesHistory.cpp @@ -67,6 +67,11 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory() /// controls new feature and it's 'true' by default, use 'false' as previous_value). /// It's used to implement `compatibility` setting (see https://github.com/ClickHouse/ClickHouse/issues/35972) /// Note: please check if the key already exists to prevent duplicate entries. + addSettingsChanges(settings_changes_history, "25.6.5.20000", + { + // Altinity Antalya modifications atop of 25.6 + {"lock_object_storage_task_distribution_ms", 0, 0, "New setting."}, + }); addSettingsChanges(settings_changes_history, "25.6", { {"output_format_native_use_flattened_dynamic_and_json_serialization", false, false, "Add flattened Dynamic/JSON serializations to Native format"}, diff --git a/src/Disks/ObjectStorages/IObjectStorage.cpp b/src/Disks/ObjectStorages/IObjectStorage.cpp index da10528bbedf..bdaa391dd729 100644 --- a/src/Disks/ObjectStorages/IObjectStorage.cpp +++ b/src/Disks/ObjectStorages/IObjectStorage.cpp @@ -8,6 +8,10 @@ #include #include +#include +#include +#include + namespace DB { @@ -97,4 +101,36 @@ WriteSettings IObjectStorage::patchSettings(const WriteSettings & write_settings return write_settings; } +RelativePathWithMetadata::CommandInTaskResponse::CommandInTaskResponse(const std::string & task) +{ + Poco::JSON::Parser parser; + try + { + auto json = parser.parse(task).extract(); + if (!json) + return; + + successfully_parsed = true; + + if (json->has("retry_after_us")) + retry_after_us = json->getValue("retry_after_us"); + } + catch (const Poco::JSON::JSONException &) + { /// Not a JSON + return; + } +} + +std::string RelativePathWithMetadata::CommandInTaskResponse::to_string() const +{ + Poco::JSON::Object json; + if (retry_after_us.has_value()) + json.set("retry_after_us", retry_after_us.value()); + + std::ostringstream oss; + oss.exceptions(std::ios::failbit); + Poco::JSON::Stringifier::stringify(json, oss); + return oss.str(); +} + } diff --git a/src/Disks/ObjectStorages/IObjectStorage.h b/src/Disks/ObjectStorages/IObjectStorage.h index 0d9464b1ad7e..abe3f5210953 100644 --- a/src/Disks/ObjectStorages/IObjectStorage.h +++ b/src/Disks/ObjectStorages/IObjectStorage.h @@ -83,15 +83,37 @@ struct ObjectMetadata struct RelativePathWithMetadata { + class CommandInTaskResponse + { + public: + CommandInTaskResponse() = default; + explicit CommandInTaskResponse(const std::string & task); + + bool is_parsed() const { return successfully_parsed; } + void set_retry_after_us(Poco::Timestamp::TimeDiff time_us) { retry_after_us = time_us; } + + std::string to_string() const; + + std::optional get_retry_after_us() const { return retry_after_us; } + + private: + bool successfully_parsed = false; + std::optional retry_after_us; + }; + String relative_path; std::optional metadata; + CommandInTaskResponse command; RelativePathWithMetadata() = default; - explicit RelativePathWithMetadata(String relative_path_, std::optional metadata_ = std::nullopt) - : relative_path(std::move(relative_path_)) - , metadata(std::move(metadata_)) - {} + explicit RelativePathWithMetadata(const String & task_string, std::optional metadata_ = std::nullopt) + : metadata(std::move(metadata_)) + , command(task_string) + { + if (!command.is_parsed()) + relative_path = task_string; + } virtual ~RelativePathWithMetadata() = default; @@ -100,6 +122,8 @@ struct RelativePathWithMetadata virtual bool isArchive() const { return false; } virtual std::string getPathToArchive() const { throw Exception(ErrorCodes::LOGICAL_ERROR, "Not an archive"); } virtual size_t fileSizeInArchive() const { throw Exception(ErrorCodes::LOGICAL_ERROR, "Not an archive"); } + + const CommandInTaskResponse & getCommand() const { return command; } }; struct ObjectKeyWithMetadata diff --git a/src/Storages/IStorageCluster.cpp b/src/Storages/IStorageCluster.cpp index 7c8d410cee53..b7f1c9760322 100644 --- a/src/Storages/IStorageCluster.cpp +++ b/src/Storages/IStorageCluster.cpp @@ -93,7 +93,7 @@ class ReadFromCluster : public SourceStepWithFilter std::optional extension; - void createExtension(const ActionsDAG::Node * predicate, size_t number_of_replicas); + void createExtension(const ActionsDAG::Node * predicate); ContextPtr updateSettings(const Settings & settings); }; @@ -105,19 +105,15 @@ void ReadFromCluster::applyFilters(ActionDAGNodes added_filter_nodes) if (filter_actions_dag) predicate = filter_actions_dag->getOutputs().at(0); - auto max_replicas_to_use = static_cast(cluster->getShardsInfo().size()); - if (context->getSettingsRef()[Setting::max_parallel_replicas] > 1) - max_replicas_to_use = std::min(max_replicas_to_use, context->getSettingsRef()[Setting::max_parallel_replicas].value); - - createExtension(predicate, max_replicas_to_use); + createExtension(predicate); } -void ReadFromCluster::createExtension(const ActionsDAG::Node * predicate, size_t number_of_replicas) +void ReadFromCluster::createExtension(const ActionsDAG::Node * predicate) { if (extension) return; - extension = storage->getTaskIteratorExtension(predicate, context, number_of_replicas); + extension = storage->getTaskIteratorExtension(predicate, context, cluster); } /// The code executes on initiator @@ -196,7 +192,7 @@ void ReadFromCluster::initializePipeline(QueryPipelineBuilder & pipeline, const if (current_settings[Setting::max_parallel_replicas] > 1) max_replicas_to_use = std::min(max_replicas_to_use, current_settings[Setting::max_parallel_replicas].value); - createExtension(nullptr, max_replicas_to_use); + createExtension(nullptr); for (const auto & shard_info : cluster->getShardsInfo()) { diff --git a/src/Storages/IStorageCluster.h b/src/Storages/IStorageCluster.h index 6017613c7bea..e4ff87f9b5e4 100644 --- a/src/Storages/IStorageCluster.h +++ b/src/Storages/IStorageCluster.h @@ -35,7 +35,10 @@ class IStorageCluster : public IStorage ClusterPtr getCluster(ContextPtr context) const; /// Query is needed for pruning by virtual columns (_file, _path) - virtual RemoteQueryExecutor::Extension getTaskIteratorExtension(const ActionsDAG::Node * predicate, const ContextPtr & context, size_t number_of_replicas) const = 0; + virtual RemoteQueryExecutor::Extension getTaskIteratorExtension( + const ActionsDAG::Node * predicate, + const ContextPtr & context, + ClusterPtr cluster) const = 0; QueryProcessingStage::Enum getQueryProcessingStage(ContextPtr, QueryProcessingStage::Enum, const StorageSnapshotPtr &, SelectQueryInfo &) const override; diff --git a/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp b/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp index 424b0d5bfb52..51b40b55c72d 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp @@ -18,18 +18,19 @@ #include #include - namespace DB { namespace Setting { extern const SettingsBool use_hive_partitioning; + extern const SettingsUInt64 lock_object_storage_task_distribution_ms; } namespace ErrorCodes { extern const int LOGICAL_ERROR; extern const int INCORRECT_DATA; + extern const int INVALID_SETTING_VALUE; } String StorageObjectStorageCluster::getPathSample(ContextPtr context) @@ -214,13 +215,43 @@ void StorageObjectStorageCluster::updateQueryToSendIfNeeded( } RemoteQueryExecutor::Extension StorageObjectStorageCluster::getTaskIteratorExtension( - const ActionsDAG::Node * predicate, const ContextPtr & local_context, const size_t number_of_replicas) const + const ActionsDAG::Node * predicate, + const ContextPtr & local_context, + ClusterPtr cluster) const { auto iterator = StorageObjectStorageSource::createFileIterator( configuration, configuration->getQuerySettings(local_context), object_storage, /* distributed_processing */false, local_context, predicate, {}, virtual_columns, hive_partition_columns_to_read_from_file_path, nullptr, local_context->getFileProgressCallback(), /*ignore_archive_globs=*/true, /*skip_object_metadata=*/true); - auto task_distributor = std::make_shared(iterator, number_of_replicas); + std::vector ids_of_hosts; + for (const auto & shard : cluster->getShardsInfo()) + { + if (shard.per_replica_pools.empty()) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Cluster {} with empty shard {}", cluster->getName(), shard.shard_num); + for (const auto & replica : shard.per_replica_pools) + { + if (!replica) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Cluster {}, shard {} with empty node", cluster->getName(), shard.shard_num); + ids_of_hosts.push_back(replica->getAddress()); + } + } + + uint64_t lock_object_storage_task_distribution_ms = local_context->getSettingsRef()[Setting::lock_object_storage_task_distribution_ms]; + + /// Check value to avoid negative result after conversion in microseconds. + /// Poco::Timestamp::TimeDiff is signed int 64. + static const uint64_t lock_object_storage_task_distribution_ms_max = 0x0020000000000000ULL; + if (lock_object_storage_task_distribution_ms > lock_object_storage_task_distribution_ms_max) + throw Exception(ErrorCodes::INVALID_SETTING_VALUE, + "Value lock_object_storage_task_distribution_ms is too big: {}, allowed maximum is {}", + lock_object_storage_task_distribution_ms, + lock_object_storage_task_distribution_ms_max + ); + + auto task_distributor = std::make_shared( + iterator, + ids_of_hosts, + lock_object_storage_task_distribution_ms); auto callback = std::make_shared( [task_distributor](size_t number_of_current_replica) mutable -> String diff --git a/src/Storages/ObjectStorage/StorageObjectStorageCluster.h b/src/Storages/ObjectStorage/StorageObjectStorageCluster.h index 1a557143076a..85a584c5e0d6 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageCluster.h +++ b/src/Storages/ObjectStorage/StorageObjectStorageCluster.h @@ -25,7 +25,9 @@ class StorageObjectStorageCluster : public IStorageCluster std::string getName() const override; RemoteQueryExecutor::Extension getTaskIteratorExtension( - const ActionsDAG::Node * predicate, const ContextPtr & context, size_t number_of_replicas) const override; + const ActionsDAG::Node * predicate, + const ContextPtr & context, + ClusterPtr cluster) const override; String getPathSample(ContextPtr context); diff --git a/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp b/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp index b4ae724abd03..28f61c2e2fa6 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp @@ -19,6 +19,7 @@ #include #include #include +#include #include #include #include @@ -432,11 +433,31 @@ StorageObjectStorageSource::ReaderHolder StorageObjectStorageSource::createReade ObjectInfoPtr object_info; auto query_settings = configuration->getQuerySettings(context_); + bool not_a_path = false; + do { + not_a_path = false; object_info = file_iterator->next(processor); - if (!object_info || object_info->getPath().empty()) + if (!object_info) + return {}; + + if (object_info->getCommand().is_parsed()) + { + auto retry_after_us = object_info->getCommand().get_retry_after_us(); + if (retry_after_us.has_value()) + { + not_a_path = true; + /// TODO: Make asyncronous waiting without sleep in thread + /// Now this sleep is on executor node in worker thread + /// Does not block query initiator + sleepForMicroseconds(std::min(Poco::Timestamp::TimeDiff(100000ul), retry_after_us.value())); + continue; + } + } + + if (object_info->getPath().empty()) return {}; if (!object_info->metadata) @@ -455,7 +476,7 @@ StorageObjectStorageSource::ReaderHolder StorageObjectStorageSource::createReade object_info->metadata = object_storage->getObjectMetadata(path); } } - while (query_settings.skip_empty_files && object_info->metadata->size_bytes == 0); + while (not_a_path || (query_settings.skip_empty_files && object_info->metadata->size_bytes == 0)); QueryPipelineBuilder builder; std::shared_ptr source; diff --git a/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp b/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp index c5c86a47babb..5f709f2ec1d9 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp @@ -13,9 +13,12 @@ namespace ErrorCodes StorageObjectStorageStableTaskDistributor::StorageObjectStorageStableTaskDistributor( std::shared_ptr iterator_, - size_t number_of_replicas_) + std::vector ids_of_nodes_, + uint64_t lock_object_storage_task_distribution_ms_) : iterator(std::move(iterator_)) - , connection_to_files(number_of_replicas_) + , connection_to_files(ids_of_nodes_.size()) + , ids_of_nodes(ids_of_nodes_) + , lock_object_storage_task_distribution_us(lock_object_storage_task_distribution_ms_ * 1000) , iterator_exhausted(false) { } @@ -31,6 +34,8 @@ std::optional StorageObjectStorageStableTaskDistributor::getNextTask(siz number_of_current_replica, connection_to_files.size() - 1); + saveLastNodeActivity(number_of_current_replica); + // 1. Check pre-queued files first if (auto file = getPreQueuedFile(number_of_current_replica)) return file; @@ -45,13 +50,39 @@ std::optional StorageObjectStorageStableTaskDistributor::getNextTask(siz size_t StorageObjectStorageStableTaskDistributor::getReplicaForFile(const String & file_path) { - return ConsistentHashing(sipHash64(file_path), connection_to_files.size()); + size_t nodes_count = ids_of_nodes.size(); + + /// Trivial case + if (nodes_count < 2) + return 0; + + /// Rendezvous hashing + size_t best_id = 0; + UInt64 best_weight = sipHash64(ids_of_nodes[0] + file_path); + for (size_t id = 1; id < nodes_count; ++id) + { + UInt64 weight = sipHash64(ids_of_nodes[id] + file_path); + if (weight > best_weight) + { + best_weight = weight; + best_id = id; + } + } + return best_id; } std::optional StorageObjectStorageStableTaskDistributor::getPreQueuedFile(size_t number_of_current_replica) { std::lock_guard lock(mutex); + if (connection_to_files.size() <= number_of_current_replica) + throw Exception( + ErrorCodes::LOGICAL_ERROR, + "Replica number {} is out of range. Expected range: [0, {})", + number_of_current_replica, + connection_to_files.size() + ); + auto & files = connection_to_files[number_of_current_replica]; while (!files.empty()) @@ -129,7 +160,7 @@ std::optional StorageObjectStorageStableTaskDistributor::getMatchingFile // Queue file for its assigned replica { std::lock_guard lock(mutex); - unprocessed_files.insert(file_path); + unprocessed_files[file_path] = number_of_current_replica; connection_to_files[file_replica_idx].push_back(file_path); } } @@ -139,25 +170,64 @@ std::optional StorageObjectStorageStableTaskDistributor::getMatchingFile std::optional StorageObjectStorageStableTaskDistributor::getAnyUnprocessedFile(size_t number_of_current_replica) { + /// Limit time of node activity to keep task in queue + Poco::Timestamp activity_limit; + Poco::Timestamp oldest_activity; + if (lock_object_storage_task_distribution_us > 0) + activity_limit -= lock_object_storage_task_distribution_us; + std::lock_guard lock(mutex); if (!unprocessed_files.empty()) { auto it = unprocessed_files.begin(); - String next_file = *it; - unprocessed_files.erase(it); + + while (it != unprocessed_files.end()) + { + auto last_activity = last_node_activity.find(it->second); + if (lock_object_storage_task_distribution_us <= 0 + || last_activity == last_node_activity.end() + || activity_limit > last_activity->second) + { + String next_file = it->first; + unprocessed_files.erase(it); + + LOG_TRACE( + log, + "Iterator exhausted. Assigning unprocessed file {} to replica {}", + next_file, + number_of_current_replica + ); + + return next_file; + } + + oldest_activity = std::min(oldest_activity, last_activity->second); + ++it; + } LOG_TRACE( log, - "Iterator exhausted. Assigning unprocessed file {} to replica {}", - next_file, - number_of_current_replica + "No unprocessed file for replica {}, need to retry after {} us", + number_of_current_replica, + oldest_activity - activity_limit ); - return next_file; + /// All unprocessed files owned by alive replicas with recenlty activity + /// Need to retry after (oldest_activity - activity_limit) microseconds + RelativePathWithMetadata::CommandInTaskResponse response; + response.set_retry_after_us(oldest_activity - activity_limit); + return response.to_string(); } return std::nullopt; } +void StorageObjectStorageStableTaskDistributor::saveLastNodeActivity(size_t number_of_current_replica) +{ + Poco::Timestamp now; + std::lock_guard lock(mutex); + last_node_activity[number_of_current_replica] = now; +} + } diff --git a/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.h b/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.h index 46e805a59603..2132ba95a752 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.h +++ b/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.h @@ -4,7 +4,12 @@ #include #include #include +#include + +#include + #include +#include #include #include #include @@ -17,7 +22,8 @@ class StorageObjectStorageStableTaskDistributor public: StorageObjectStorageStableTaskDistributor( std::shared_ptr iterator_, - size_t number_of_replicas_); + std::vector ids_of_nodes_, + uint64_t lock_object_storage_task_distribution_ms_); std::optional getNextTask(size_t number_of_current_replica); @@ -27,10 +33,17 @@ class StorageObjectStorageStableTaskDistributor std::optional getMatchingFileFromIterator(size_t number_of_current_replica); std::optional getAnyUnprocessedFile(size_t number_of_current_replica); + void saveLastNodeActivity(size_t number_of_current_replica); + std::shared_ptr iterator; std::vector> connection_to_files; - std::unordered_set unprocessed_files; + /// Map of unprocessed files in format filename => number of prefetched replica + std::unordered_map unprocessed_files; + + std::vector ids_of_nodes; + std::unordered_map last_node_activity; + Poco::Timestamp::TimeDiff lock_object_storage_task_distribution_us; std::mutex mutex; bool iterator_exhausted = false; diff --git a/src/Storages/StorageDistributed.cpp b/src/Storages/StorageDistributed.cpp index d3f6407c88b1..0ae57212024d 100644 --- a/src/Storages/StorageDistributed.cpp +++ b/src/Storages/StorageDistributed.cpp @@ -1317,8 +1317,7 @@ std::optional StorageDistributed::distributedWriteFromClusterStor const auto cluster = getCluster(); /// Select query is needed for pruining on virtual columns - auto number_of_replicas = static_cast(cluster->getShardsInfo().size()); - auto extension = src_storage_cluster.getTaskIteratorExtension(predicate, local_context, number_of_replicas); + auto extension = src_storage_cluster.getTaskIteratorExtension(predicate, local_context, cluster); /// Here we take addresses from destination cluster and assume source table exists on these nodes size_t replica_index = 0; diff --git a/src/Storages/StorageFileCluster.cpp b/src/Storages/StorageFileCluster.cpp index 1946fdc8c77b..0155c3a08cec 100644 --- a/src/Storages/StorageFileCluster.cpp +++ b/src/Storages/StorageFileCluster.cpp @@ -100,7 +100,10 @@ void StorageFileCluster::updateQueryToSendIfNeeded(DB::ASTPtr & query, const Sto ); } -RemoteQueryExecutor::Extension StorageFileCluster::getTaskIteratorExtension(const ActionsDAG::Node * predicate, const ContextPtr & context, const size_t) const +RemoteQueryExecutor::Extension StorageFileCluster::getTaskIteratorExtension( + const ActionsDAG::Node * predicate, + const ContextPtr & context, + ClusterPtr) const { auto iterator = std::make_shared(paths, std::nullopt, predicate, getVirtualsList(), hive_partition_columns_to_read_from_file_path, context); auto callback = std::make_shared([iter = std::move(iterator)](size_t) mutable -> String { return iter->next(); }); diff --git a/src/Storages/StorageFileCluster.h b/src/Storages/StorageFileCluster.h index 2cbd82ba4000..5fb08a48eec6 100644 --- a/src/Storages/StorageFileCluster.h +++ b/src/Storages/StorageFileCluster.h @@ -27,7 +27,10 @@ class StorageFileCluster : public IStorageCluster const ConstraintsDescription & constraints_); std::string getName() const override { return "FileCluster"; } - RemoteQueryExecutor::Extension getTaskIteratorExtension(const ActionsDAG::Node * predicate, const ContextPtr & context, size_t number_of_replicas) const override; + RemoteQueryExecutor::Extension getTaskIteratorExtension( + const ActionsDAG::Node * predicate, + const ContextPtr & context, + ClusterPtr) const override; private: void updateQueryToSendIfNeeded(ASTPtr & query, const StorageSnapshotPtr & storage_snapshot, const ContextPtr & context) override; diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index f349e9317db0..9ec0231a8c71 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -6086,8 +6086,7 @@ std::optional StorageReplicatedMergeTree::distributedWriteFromClu ContextMutablePtr query_context = Context::createCopy(local_context); query_context->increaseDistributedDepth(); - auto number_of_replicas = static_cast(src_cluster->getShardsAddresses().size()); - auto extension = src_storage_cluster->getTaskIteratorExtension(nullptr, local_context, number_of_replicas); + auto extension = src_storage_cluster->getTaskIteratorExtension(nullptr, local_context, src_cluster); size_t replica_index = 0; for (const auto & replicas : src_cluster->getShardsAddresses()) diff --git a/src/Storages/StorageURLCluster.cpp b/src/Storages/StorageURLCluster.cpp index fff85117b37a..12ae6b32e018 100644 --- a/src/Storages/StorageURLCluster.cpp +++ b/src/Storages/StorageURLCluster.cpp @@ -129,7 +129,10 @@ void StorageURLCluster::updateQueryToSendIfNeeded(ASTPtr & query, const StorageS ); } -RemoteQueryExecutor::Extension StorageURLCluster::getTaskIteratorExtension(const ActionsDAG::Node * predicate, const ContextPtr & context, size_t) const +RemoteQueryExecutor::Extension StorageURLCluster::getTaskIteratorExtension( + const ActionsDAG::Node * predicate, + const ContextPtr & context, + ClusterPtr) const { auto iterator = std::make_shared( uri, context->getSettingsRef()[Setting::glob_expansion_max_elements], predicate, getVirtualsList(), hive_partition_columns_to_read_from_file_path, context); diff --git a/src/Storages/StorageURLCluster.h b/src/Storages/StorageURLCluster.h index 8349f7594294..e360eb22d701 100644 --- a/src/Storages/StorageURLCluster.h +++ b/src/Storages/StorageURLCluster.h @@ -30,7 +30,10 @@ class StorageURLCluster : public IStorageCluster const StorageURL::Configuration & configuration_); std::string getName() const override { return "URLCluster"; } - RemoteQueryExecutor::Extension getTaskIteratorExtension(const ActionsDAG::Node * predicate, const ContextPtr & context, size_t number_of_replicas) const override; + RemoteQueryExecutor::Extension getTaskIteratorExtension( + const ActionsDAG::Node * predicate, + const ContextPtr & context, + ClusterPtr) const override; private: void updateQueryToSendIfNeeded(ASTPtr & query, const StorageSnapshotPtr & storage_snapshot, const ContextPtr & context) override; diff --git a/tests/integration/test_s3_cache_locality/__init__.py b/tests/integration/test_s3_cache_locality/__init__.py new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/tests/integration/test_s3_cache_locality/configs/cluster.xml b/tests/integration/test_s3_cache_locality/configs/cluster.xml new file mode 100644 index 000000000000..db54c35374b9 --- /dev/null +++ b/tests/integration/test_s3_cache_locality/configs/cluster.xml @@ -0,0 +1,126 @@ + + + + + + + + clickhouse1 + 9000 + + + clickhouse2 + 9000 + + + clickhouse3 + 9000 + + + clickhouse4 + 9000 + + + clickhouse5 + 9000 + + + + + + + + clickhouse1 + 9000 + + + clickhouse2 + 9000 + + + clickhouse3 + 9000 + + + clickhouse4 + 9000 + + + + + + + + clickhouse2 + 9000 + + + clickhouse3 + 9000 + + + clickhouse4 + 9000 + + + clickhouse5 + 9000 + + + + + + + + clickhouse3 + 9000 + + + clickhouse4 + 9000 + + + clickhouse5 + 9000 + + + clickhouse1 + 9000 + + + clickhouse2 + 9000 + + + + + + + + clickhouse4 + 9000 + + + clickhouse5 + 9000 + + + clickhouse2 + 9000 + + + clickhouse3 + 9000 + + + + + + + + + /var/lib/clickhouse/raw_s3_cache + 10Gi + + + diff --git a/tests/integration/test_s3_cache_locality/configs/named_collections.xml b/tests/integration/test_s3_cache_locality/configs/named_collections.xml new file mode 100644 index 000000000000..6994aa3f5e77 --- /dev/null +++ b/tests/integration/test_s3_cache_locality/configs/named_collections.xml @@ -0,0 +1,10 @@ + + + + http://minio1:9001/root/data/* + minio + ClickHouse_Minio_P@ssw0rd + CSV> + + + diff --git a/tests/integration/test_s3_cache_locality/configs/users.xml b/tests/integration/test_s3_cache_locality/configs/users.xml new file mode 100644 index 000000000000..4b6ba057ecb1 --- /dev/null +++ b/tests/integration/test_s3_cache_locality/configs/users.xml @@ -0,0 +1,9 @@ + + + + + default + 1 + + + diff --git a/tests/integration/test_s3_cache_locality/test.py b/tests/integration/test_s3_cache_locality/test.py new file mode 100644 index 000000000000..da85e78a5643 --- /dev/null +++ b/tests/integration/test_s3_cache_locality/test.py @@ -0,0 +1,196 @@ +import csv +import logging +import os +import shutil +import uuid + +import pytest + +from helpers.cluster import ClickHouseCluster +from helpers.config_cluster import minio_secret_key + + +logging.getLogger().setLevel(logging.INFO) +logging.getLogger().addHandler(logging.StreamHandler()) + +SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__)) + + +def create_buckets_s3(cluster, files=1000): + minio = cluster.minio_client + + s3_data = [] + + for file_number in range(files): + file_name = f"data/generated/file_{file_number}.csv" + os.makedirs(os.path.join(SCRIPT_DIR, "data/generated/"), exist_ok=True) + s3_data.append(file_name) + with open(os.path.join(SCRIPT_DIR, file_name), "w+", encoding="utf-8") as f: + # a String, b UInt64 + data = [] + + # Make all files a bit different + data.append( + ["str_" + str(file_number), file_number] + ) + + writer = csv.writer(f) + writer.writerows(data) + + for file in s3_data: + minio.fput_object( + bucket_name=cluster.minio_bucket, + object_name=file, + file_path=os.path.join(SCRIPT_DIR, file), + ) + + for obj in minio.list_objects(cluster.minio_bucket, recursive=True): + print(obj.object_name) + + +@pytest.fixture(scope="module") +def started_cluster(): + try: + cluster = ClickHouseCluster(__file__) + # clickhouse0 not a member of cluster_XXX + for i in range(6): + cluster.add_instance( + f"clickhouse{i}", + main_configs=["configs/cluster.xml", "configs/named_collections.xml"], + user_configs=["configs/users.xml"], + macros={"replica": f"clickhouse{i}"}, + with_minio=True, + with_zookeeper=True, + stay_alive=True, + ) + + logging.info("Starting cluster...") + cluster.start() + logging.info("Cluster started") + + create_buckets_s3(cluster) + + yield cluster + finally: + shutil.rmtree(os.path.join(SCRIPT_DIR, "data/generated/"), ignore_errors=True) + cluster.shutdown() + + +def check_s3_gets(cluster, node, expected_result, cluster_first, cluster_second, enable_filesystem_cache, + lock_object_storage_task_distribution_ms): + for host in list(cluster.instances.values()): + host.query("SYSTEM DROP FILESYSTEM CACHE 'raw_s3_cache'", ignore_error=True) + + settings = { + "enable_filesystem_cache": enable_filesystem_cache, + "filesystem_cache_name": "'raw_s3_cache'", + } + + if lock_object_storage_task_distribution_ms > 0: + settings["lock_object_storage_task_distribution_ms"] = lock_object_storage_task_distribution_ms + + query_id_first = str(uuid.uuid4()) + result_first = node.query( + f""" + SELECT count(*) + FROM s3Cluster('{cluster_first}', 'http://minio1:9001/root/data/generated/*', 'minio', '{minio_secret_key}', 'CSV', 'a String, b UInt64') + WHERE b=42 + SETTINGS {",".join(f"{k}={v}" for k, v in settings.items())} + """, + query_id=query_id_first, + ) + assert result_first == expected_result + query_id_second = str(uuid.uuid4()) + result_second = node.query( + f""" + SELECT count(*) + FROM s3Cluster('{cluster_second}', 'http://minio1:9001/root/data/generated/*', 'minio', '{minio_secret_key}', 'CSV', 'a String, b UInt64') + WHERE b=42 + SETTINGS {",".join(f"{k}={v}" for k, v in settings.items())} + """, + query_id=query_id_second, + ) + assert result_second == expected_result + + node.query(f"SYSTEM FLUSH LOGS ON CLUSTER {cluster_first}") + node.query(f"SYSTEM FLUSH LOGS ON CLUSTER {cluster_second}") + + s3_get_first = node.query( + f""" + SELECT sum(ProfileEvents['S3GetObject']) + FROM clusterAllReplicas('{cluster_first}', system.query_log) + WHERE type='QueryFinish' + AND initial_query_id='{query_id_first}' + """, + ) + s3_get_second = node.query( + f""" + SELECT sum(ProfileEvents['S3GetObject']) + FROM clusterAllReplicas('{cluster_second}', system.query_log) + WHERE type='QueryFinish' + AND initial_query_id='{query_id_second}' + """, + ) + + return int(s3_get_first), int(s3_get_second) + + +def check_s3_gets_repeat(cluster, node, expected_result, cluster_first, cluster_second, enable_filesystem_cache, + lock_object_storage_task_distribution_ms): + # Repeat test several times to get average result + iterations = 1 if lock_object_storage_task_distribution_ms > 0 else 10 + s3_get_first_sum = 0 + s3_get_second_sum = 0 + for _ in range(iterations): + (s3_get_first, s3_get_second) = check_s3_gets(cluster, node, expected_result, cluster_first, cluster_second, enable_filesystem_cache, lock_object_storage_task_distribution_ms) + s3_get_first_sum += s3_get_first + s3_get_second_sum += s3_get_second + return s3_get_first_sum, s3_get_second_sum + + +@pytest.mark.parametrize("lock_object_storage_task_distribution_ms ", [0, 30000]) +def test_cache_locality(started_cluster, lock_object_storage_task_distribution_ms): + node = started_cluster.instances["clickhouse0"] + + expected_result = node.query( + f""" + SELECT count(*) + FROM s3('http://minio1:9001/root/data/generated/*', 'minio', '{minio_secret_key}', 'CSV', 'a String, b UInt64') + WHERE b=42 + """ + ) + + # Algorithm does not give 100% guarantee, so add 10% on dispersion + dispersion = 0.0 if lock_object_storage_task_distribution_ms > 0 else 0.1 + + # No cache + (s3_get_first, s3_get_second) = check_s3_gets_repeat(started_cluster, node, expected_result, 'cluster_12345', 'cluster_12345', 0, lock_object_storage_task_distribution_ms) + assert s3_get_second == s3_get_first + + # With cache + (s3_get_first, s3_get_second) = check_s3_gets_repeat(started_cluster, node, expected_result, 'cluster_12345', 'cluster_12345', 1, lock_object_storage_task_distribution_ms) + assert s3_get_second <= s3_get_first * dispersion + + # Different nodes order + (s3_get_first, s3_get_second) = check_s3_gets_repeat(started_cluster, node, expected_result, 'cluster_12345', 'cluster_34512', 1, lock_object_storage_task_distribution_ms) + assert s3_get_second <= s3_get_first * dispersion + + # No last node + (s3_get_first, s3_get_second) = check_s3_gets_repeat(started_cluster, node, expected_result, 'cluster_12345', 'cluster_1234', 1, lock_object_storage_task_distribution_ms) + assert s3_get_second <= s3_get_first * (0.211 + dispersion) # actual value - 24 for 100 files, 211 for 1000 + + # No first node + (s3_get_first, s3_get_second) = check_s3_gets_repeat(started_cluster, node, expected_result, 'cluster_12345', 'cluster_2345', 1, lock_object_storage_task_distribution_ms) + assert s3_get_second <= s3_get_first * (0.189 + dispersion) # actual value - 12 for 100 files, 189 for 1000 + + # No first node, different nodes order + (s3_get_first, s3_get_second) = check_s3_gets_repeat(started_cluster, node, expected_result, 'cluster_12345', 'cluster_4523', 1, lock_object_storage_task_distribution_ms) + assert s3_get_second <= s3_get_first * (0.189 + dispersion) + + # Add new node, different nodes order + (s3_get_first, s3_get_second) = check_s3_gets_repeat(started_cluster, node, expected_result, 'cluster_4523', 'cluster_12345', 1, lock_object_storage_task_distribution_ms) + assert s3_get_second <= s3_get_first * (0.189 + dispersion) + + # New node and old node, different nodes order + (s3_get_first, s3_get_second) = check_s3_gets_repeat(started_cluster, node, expected_result, 'cluster_1234', 'cluster_4523', 1, lock_object_storage_task_distribution_ms) + assert s3_get_second <= s3_get_first * (0.400 + dispersion) # actual value - 36 for 100 files, 400 for 1000 diff --git a/tests/integration/test_storage_iceberg/configs/config.d/named_collections.xml b/tests/integration/test_storage_iceberg/configs/config.d/named_collections.xml index 892665d3934d..77f9e7e4b17b 100644 --- a/tests/integration/test_storage_iceberg/configs/config.d/named_collections.xml +++ b/tests/integration/test_storage_iceberg/configs/config.d/named_collections.xml @@ -14,7 +14,7 @@ http://minio1:9001/root/ minio - minio123 + ClickHouse_Minio_P@ssw0rd s3