diff --git a/src/Access/Common/AccessType.h b/src/Access/Common/AccessType.h index 961a90283160..d73c2f4fd1ad 100644 --- a/src/Access/Common/AccessType.h +++ b/src/Access/Common/AccessType.h @@ -74,6 +74,7 @@ enum class AccessType : uint8_t enabled implicitly by the grant ALTER_TABLE */\ M(ALTER_SETTINGS, "ALTER SETTING, ALTER MODIFY SETTING, MODIFY SETTING, RESET SETTING", TABLE, ALTER_TABLE) /* allows to execute ALTER MODIFY SETTING */\ M(ALTER_MOVE_PARTITION, "ALTER MOVE PART, MOVE PARTITION, MOVE PART", TABLE, ALTER_TABLE) \ + M(ALTER_EXPORT_PARTITION, "ALTER EXPORT PART, EXPORT PARTITION, EXPORT PART", TABLE, ALTER_TABLE) \ M(ALTER_FETCH_PARTITION, "ALTER FETCH PART, FETCH PARTITION", TABLE, ALTER_TABLE) \ M(ALTER_FREEZE_PARTITION, "FREEZE PARTITION, UNFREEZE", TABLE, ALTER_TABLE) \ M(ALTER_UNLOCK_SNAPSHOT, "UNLOCK SNAPSHOT", TABLE, ALTER_TABLE) \ diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 0f7a3cf851b1..510c8b7bfa4b 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -130,6 +130,7 @@ add_headers_and_sources(dbms Storages/ObjectStorage/Azure) add_headers_and_sources(dbms Storages/ObjectStorage/S3) add_headers_and_sources(dbms Storages/ObjectStorage/HDFS) add_headers_and_sources(dbms Storages/ObjectStorage/Local) +add_headers_and_sources(dbms Storages/ObjectStorage/MergeTree) add_headers_and_sources(dbms Storages/ObjectStorage/DataLakes) add_headers_and_sources(dbms Storages/ObjectStorage/DataLakes/Iceberg) add_headers_and_sources(dbms Storages/ObjectStorage/DataLakes/DeltaLake) diff --git a/src/Common/CurrentMetrics.cpp b/src/Common/CurrentMetrics.cpp index ea3bd714930f..97bc7d309ac5 100644 --- a/src/Common/CurrentMetrics.cpp +++ b/src/Common/CurrentMetrics.cpp @@ -10,6 +10,7 @@ M(Merge, "Number of executing background merges") \ M(MergeParts, "Number of source parts participating in current background merges") \ M(Move, "Number of currently executing moves") \ + M(Export, "Number of currently executing exports") \ M(PartMutation, "Number of mutations (ALTER DELETE/UPDATE)") \ M(ReplicatedFetch, "Number of data parts being fetched from replica") \ M(ReplicatedSend, "Number of data parts being sent to replicas") \ diff --git a/src/Core/ServerSettings.cpp b/src/Core/ServerSettings.cpp index fc964e21892f..c8986e3053c6 100644 --- a/src/Core/ServerSettings.cpp +++ b/src/Core/ServerSettings.cpp @@ -99,6 +99,7 @@ namespace DB DECLARE(UInt64, max_unexpected_parts_loading_thread_pool_size, 8, R"(The number of threads to load inactive set of data parts (Unexpected ones) at startup.)", 0) \ DECLARE(UInt64, max_parts_cleaning_thread_pool_size, 128, R"(The number of threads for concurrent removal of inactive data parts.)", 0) \ DECLARE(UInt64, max_mutations_bandwidth_for_server, 0, R"(The maximum read speed of all mutations on server in bytes per second. Zero means unlimited.)", 0) \ + DECLARE(UInt64, max_exports_bandwidth_for_server, 0, R"(The maximum read speed of all exports on server in bytes per second. Zero means unlimited.)", 0) \ DECLARE(UInt64, max_merges_bandwidth_for_server, 0, R"(The maximum read speed of all merges on server in bytes per second. Zero means unlimited.)", 0) \ DECLARE(UInt64, max_replicated_fetches_network_bandwidth_for_server, 0, R"(The maximum speed of data exchange over the network in bytes per second for replicated fetches. Zero means unlimited.)", 0) \ DECLARE(UInt64, max_replicated_sends_network_bandwidth_for_server, 0, R"(The maximum speed of data exchange over the network in bytes per second for replicated sends. Zero means unlimited.)", 0) \ @@ -1064,6 +1065,7 @@ The policy on how to perform a scheduling of CPU slots specified by `concurrent_ See [Controlling behavior on server CPU overload](/operations/settings/server-overload) for more details. )", 0) \ DECLARE(Float, distributed_cache_keep_up_free_connections_ratio, 0.1f, "Soft limit for number of active connection distributed cache will try to keep free. After the number of free connections goes below distributed_cache_keep_up_free_connections_ratio * max_connections, connections with oldest activity will be closed until the number goes above the limit.", 0) \ + DECLARE(UInt64, export_merge_tree_partition_max_retries, 3, "Max number of retries for exporting merge tree partition. Currently not persisted across re-starts", 0) \ // clang-format on diff --git a/src/Core/Settings.cpp b/src/Core/Settings.cpp index 77db44fe35d7..3456e1824084 100644 --- a/src/Core/Settings.cpp +++ b/src/Core/Settings.cpp @@ -6874,6 +6874,9 @@ Possible values: DECLARE_WITH_ALIAS(Bool, allow_experimental_time_series_aggregate_functions, false, R"( Experimental timeSeries* aggregate functions for Prometheus-like timeseries resampling, rate, delta calculation. )", EXPERIMENTAL, allow_experimental_ts_to_grid_aggregate_function) \ + DECLARE_WITH_ALIAS(Bool, allow_experimental_export_merge_tree_partition, false, R"( +Experimental export merge tree partition. +)", EXPERIMENTAL, allow_experimental_export_merge_tree_partition) \ \ /* ####################################################### */ \ /* ############ END OF EXPERIMENTAL FEATURES ############# */ \ diff --git a/src/Core/SettingsChangesHistory.cpp b/src/Core/SettingsChangesHistory.cpp index 89a550f3326a..c75d08b4e046 100644 --- a/src/Core/SettingsChangesHistory.cpp +++ b/src/Core/SettingsChangesHistory.cpp @@ -71,6 +71,7 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory() addSettingsChanges(settings_changes_history, "25.6.5.2000", { {"output_format_parquet_enum_as_byte_array", true, true, "Enable writing Enum as byte array in Parquet by default"}, + {"allow_experimental_export_merge_tree_partition", true, false, "Enable experimental export merge tree partition"}, }); addSettingsChanges(settings_changes_history, "25.6", { diff --git a/src/Databases/DatabaseS3.cpp b/src/Databases/DatabaseS3.cpp index 2a42aa744909..2b657c75da96 100644 --- a/src/Databases/DatabaseS3.cpp +++ b/src/Databases/DatabaseS3.cpp @@ -19,6 +19,7 @@ #include #include #include +#include #include #include diff --git a/src/Disks/ObjectStorages/IObjectStorage.h b/src/Disks/ObjectStorages/IObjectStorage.h index 0d9464b1ad7e..824bc0a52f25 100644 --- a/src/Disks/ObjectStorages/IObjectStorage.h +++ b/src/Disks/ObjectStorages/IObjectStorage.h @@ -96,6 +96,7 @@ struct RelativePathWithMetadata virtual ~RelativePathWithMetadata() = default; virtual std::string getFileName() const { return std::filesystem::path(relative_path).filename(); } + virtual std::string getFileNameWithoutExtension() const { return std::filesystem::path(relative_path).stem(); } virtual std::string getPath() const { return relative_path; } virtual bool isArchive() const { return false; } virtual std::string getPathToArchive() const { throw Exception(ErrorCodes::LOGICAL_ERROR, "Not an archive"); } diff --git a/src/Formats/FormatFactory.cpp b/src/Formats/FormatFactory.cpp index db22070bd7f6..8b1cd15c542c 100644 --- a/src/Formats/FormatFactory.cpp +++ b/src/Formats/FormatFactory.cpp @@ -568,8 +568,12 @@ OutputFormatPtr FormatFactory::getOutputFormatParallelIfPossible( auto format_settings = _format_settings ? *_format_settings : getFormatSettings(context); const Settings & settings = context->getSettingsRef(); - if (settings[Setting::output_format_parallel_formatting] && getCreators(name).supports_parallel_formatting - && !settings[Setting::output_format_json_array_of_rows]) + const bool parallel_formatting_enabled = settings[Setting::output_format_parallel_formatting]; + const bool supports_parallel_formatting = getCreators(name).supports_parallel_formatting; + + bool are_we_doing_parallel_formatting = parallel_formatting_enabled && supports_parallel_formatting && !settings[Setting::output_format_json_array_of_rows]; + + if (are_we_doing_parallel_formatting) { auto formatter_creator = [output_getter, sample, format_settings] (WriteBuffer & output) -> OutputFormatPtr { diff --git a/src/Interpreters/Context.cpp b/src/Interpreters/Context.cpp index db58e2352267..f226ecb98972 100644 --- a/src/Interpreters/Context.cpp +++ b/src/Interpreters/Context.cpp @@ -288,6 +288,7 @@ namespace ServerSetting extern const ServerSettingsUInt64 max_local_write_bandwidth_for_server; extern const ServerSettingsUInt64 max_merges_bandwidth_for_server; extern const ServerSettingsUInt64 max_mutations_bandwidth_for_server; + extern const ServerSettingsUInt64 max_exports_bandwidth_for_server; extern const ServerSettingsUInt64 max_remote_read_network_bandwidth_for_server; extern const ServerSettingsUInt64 max_remote_write_network_bandwidth_for_server; extern const ServerSettingsUInt64 max_replicated_fetches_network_bandwidth_for_server; @@ -504,6 +505,8 @@ struct ContextSharedPart : boost::noncopyable mutable ThrottlerPtr mutations_throttler; /// A server-wide throttler for mutations mutable ThrottlerPtr merges_throttler; /// A server-wide throttler for merges + mutable ThrottlerPtr exports_throttler; /// A server-wide throttler for exports + MultiVersion macros; /// Substitutions extracted from config. std::unique_ptr ddl_worker TSA_GUARDED_BY(mutex); /// Process ddl commands from zk. LoadTaskPtr ddl_worker_startup_task; /// To postpone `ddl_worker->startup()` after all tables startup @@ -992,6 +995,9 @@ struct ContextSharedPart : boost::noncopyable if (auto bandwidth = server_settings[ServerSetting::max_merges_bandwidth_for_server]) merges_throttler = std::make_shared(bandwidth); + + if (auto bandwidth = server_settings[ServerSetting::max_exports_bandwidth_for_server]) + exports_throttler = std::make_shared(bandwidth); } }; @@ -4041,6 +4047,11 @@ ThrottlerPtr Context::getMergesThrottler() const return shared->merges_throttler; } +ThrottlerPtr Context::getExportsThrottler() const +{ + return shared->exports_throttler; +} + void Context::reloadRemoteThrottlerConfig(size_t read_bandwidth, size_t write_bandwidth) const { if (read_bandwidth) diff --git a/src/Interpreters/Context.h b/src/Interpreters/Context.h index f26899ea3c72..bff74c55bc11 100644 --- a/src/Interpreters/Context.h +++ b/src/Interpreters/Context.h @@ -1635,6 +1635,7 @@ class Context: public ContextData, public std::enable_shared_from_this ThrottlerPtr getMutationsThrottler() const; ThrottlerPtr getMergesThrottler() const; + ThrottlerPtr getExportsThrottler() const; void reloadRemoteThrottlerConfig(size_t read_bandwidth, size_t write_bandwidth) const; void reloadLocalThrottlerConfig(size_t read_bandwidth, size_t write_bandwidth) const; diff --git a/src/Interpreters/InterpreterAlterQuery.cpp b/src/Interpreters/InterpreterAlterQuery.cpp index 262b4ce13cfd..81c8a7e5b6d0 100644 --- a/src/Interpreters/InterpreterAlterQuery.cpp +++ b/src/Interpreters/InterpreterAlterQuery.cpp @@ -502,6 +502,12 @@ AccessRightsElements InterpreterAlterQuery::getRequiredAccessForCommand(const AS required_access.emplace_back(AccessType::ALTER_DELETE | AccessType::INSERT, database, table); break; } + case ASTAlterCommand::EXPORT_PARTITION: + { + required_access.emplace_back(AccessType::ALTER_EXPORT_PARTITION, database, table); + required_access.emplace_back(AccessType::INSERT, command.to_database, command.to_table); + break; + } case ASTAlterCommand::FETCH_PARTITION: { required_access.emplace_back(AccessType::ALTER_FETCH_PARTITION, database, table); diff --git a/src/Interpreters/PartLog.cpp b/src/Interpreters/PartLog.cpp index 02c6f6e573b0..adec9baf659e 100644 --- a/src/Interpreters/PartLog.cpp +++ b/src/Interpreters/PartLog.cpp @@ -69,6 +69,7 @@ ColumnsDescription PartLogElement::getColumnsDescription() {"MovePart", static_cast(MOVE_PART)}, {"MergePartsStart", static_cast(MERGE_PARTS_START)}, {"MutatePartStart", static_cast(MUTATE_PART_START)}, + {"ExportPart", static_cast(EXPORT_PART)}, } ); @@ -109,7 +110,8 @@ ColumnsDescription PartLogElement::getColumnsDescription() "RemovePart — Removing or detaching a data part using [DETACH PARTITION](/sql-reference/statements/alter/partition#detach-partitionpart)." "MutatePartStart — Mutating of a data part has started, " "MutatePart — Mutating of a data part has finished, " - "MovePart — Moving the data part from the one disk to another one."}, + "MovePart — Moving the data part from the one disk to another one." + "ExportPart — Exporting the data part from a merge tree table to one (e.g, object storage)."}, {"merge_reason", std::move(merge_reason_datatype), "The reason for the event with type MERGE_PARTS. Can have one of the following values: " "NotAMerge — The current event has the type other than MERGE_PARTS, " diff --git a/src/Interpreters/PartLog.h b/src/Interpreters/PartLog.h index 44d2fb413c5f..4f58069dae55 100644 --- a/src/Interpreters/PartLog.h +++ b/src/Interpreters/PartLog.h @@ -30,6 +30,7 @@ struct PartLogElement MOVE_PART = 6, MERGE_PARTS_START = 7, MUTATE_PART_START = 8, + EXPORT_PART = 9, }; /// Copy of MergeAlgorithm since values are written to disk. diff --git a/src/Parsers/ASTAlterQuery.cpp b/src/Parsers/ASTAlterQuery.cpp index cdf8b558fd61..fac9724694de 100644 --- a/src/Parsers/ASTAlterQuery.cpp +++ b/src/Parsers/ASTAlterQuery.cpp @@ -355,6 +355,29 @@ void ASTAlterCommand::formatImpl(WriteBuffer & ostr, const FormatSettings & sett ostr << quoteString(move_destination_name); } } + else if (type == ASTAlterCommand::EXPORT_PARTITION) + { + ostr << (settings.hilite ? hilite_keyword : "") << "EXPORT " << (part ? "PART " : "PARTITION ") + << (settings.hilite ? hilite_none : ""); + partition->format(ostr, settings, state, frame); + ostr << " TO "; + switch (move_destination_type) + { + case DataDestinationType::TABLE: + ostr << "TABLE "; + if (!to_database.empty()) + { + ostr << (settings.hilite ? hilite_identifier : "") << backQuoteIfNeed(to_database) + << (settings.hilite ? hilite_none : "") << "."; + } + ostr << (settings.hilite ? hilite_identifier : "") << backQuoteIfNeed(to_table) + << (settings.hilite ? hilite_none : ""); + return; + default: + break; + } + + } else if (type == ASTAlterCommand::REPLACE_PARTITION) { ostr << (settings.hilite ? hilite_keyword : "") << (replace ? "REPLACE" : "ATTACH") << " PARTITION " diff --git a/src/Parsers/ASTAlterQuery.h b/src/Parsers/ASTAlterQuery.h index 3867a86cf797..7009e48fb7f5 100644 --- a/src/Parsers/ASTAlterQuery.h +++ b/src/Parsers/ASTAlterQuery.h @@ -71,6 +71,7 @@ class ASTAlterCommand : public IAST FREEZE_ALL, UNFREEZE_PARTITION, UNFREEZE_ALL, + EXPORT_PARTITION, DELETE, UPDATE, diff --git a/src/Parsers/CommonParsers.h b/src/Parsers/CommonParsers.h index e4db7beb9d4e..a634df26603a 100644 --- a/src/Parsers/CommonParsers.h +++ b/src/Parsers/CommonParsers.h @@ -331,6 +331,7 @@ namespace DB MR_MACROS(MONTHS, "MONTHS") \ MR_MACROS(MOVE_PART, "MOVE PART") \ MR_MACROS(MOVE_PARTITION, "MOVE PARTITION") \ + MR_MACROS(EXPORT_PARTITION, "EXPORT PARTITION") \ MR_MACROS(MOVE, "MOVE") \ MR_MACROS(MS, "MS") \ MR_MACROS(MUTATION, "MUTATION") \ diff --git a/src/Parsers/ParserAlterQuery.cpp b/src/Parsers/ParserAlterQuery.cpp index 2c127e6ff1e1..7278a9152787 100644 --- a/src/Parsers/ParserAlterQuery.cpp +++ b/src/Parsers/ParserAlterQuery.cpp @@ -82,6 +82,7 @@ bool ParserAlterCommand::parseImpl(Pos & pos, ASTPtr & node, Expected & expected ParserKeyword s_forget_partition(Keyword::FORGET_PARTITION); ParserKeyword s_move_partition(Keyword::MOVE_PARTITION); ParserKeyword s_move_part(Keyword::MOVE_PART); + ParserKeyword s_export_partition(Keyword::EXPORT_PARTITION); ParserKeyword s_drop_detached_partition(Keyword::DROP_DETACHED_PARTITION); ParserKeyword s_drop_detached_part(Keyword::DROP_DETACHED_PART); ParserKeyword s_fetch_partition(Keyword::FETCH_PARTITION); @@ -564,6 +565,22 @@ bool ParserAlterCommand::parseImpl(Pos & pos, ASTPtr & node, Expected & expected command->move_destination_name = ast_space_name->as().value.safeGet(); } } + else if (s_export_partition.ignore(pos, expected)) + { + if (!parser_partition.parse(pos, command_partition, expected)) + return false; + + command->type = ASTAlterCommand::EXPORT_PARTITION; + + if (!s_to_table.ignore(pos, expected)) + { + return false; + } + + if (!parseDatabaseAndTableName(pos, expected, command->to_database, command->to_table)) + return false; + command->move_destination_type = DataDestinationType::TABLE; + } else if (s_add_constraint.ignore(pos, expected)) { if (s_if_not_exists.ignore(pos, expected)) diff --git a/src/Storages/IPartitionStrategy.cpp b/src/Storages/IPartitionStrategy.cpp index 5ebdb249e661..676434869ce5 100644 --- a/src/Storages/IPartitionStrategy.cpp +++ b/src/Storages/IPartitionStrategy.cpp @@ -332,32 +332,27 @@ ColumnPtr HiveStylePartitionStrategy::computePartitionKey(const Chunk & chunk) return block_with_partition_by_expr.getByName(actions_with_column_name.column_name).column; } -ColumnRawPtrs HiveStylePartitionStrategy::getFormatChunkColumns(const Chunk & chunk) +Chunk HiveStylePartitionStrategy::getFormatChunk(const Chunk & chunk) { - ColumnRawPtrs result; + Chunk result; + if (partition_columns_in_data_file) { for (const auto & column : chunk.getColumns()) { - result.emplace_back(column.get()); + result.addColumn(column); } return result; } - if (chunk.getNumColumns() != sample_block.columns()) - { - throw Exception( - ErrorCodes::LOGICAL_ERROR, - "Incorrect number of columns in chunk. Expected {}, found {}", - sample_block.columns(), chunk.getNumColumns()); - } + chassert(chunk.getColumns().size() == sample_block.columns()); for (size_t i = 0; i < sample_block.columns(); i++) { if (!partition_columns_name_set.contains(sample_block.getByPosition(i).name)) { - result.emplace_back(chunk.getColumns()[i].get()); + result.addColumn(chunk.getColumns()[i]); } } diff --git a/src/Storages/IPartitionStrategy.h b/src/Storages/IPartitionStrategy.h index bc90d7f03461..cfd6294309a0 100644 --- a/src/Storages/IPartitionStrategy.h +++ b/src/Storages/IPartitionStrategy.h @@ -29,21 +29,18 @@ struct IPartitionStrategy virtual ColumnPtr computePartitionKey(const Chunk & chunk) = 0; - virtual std::string getPathForRead(const std::string & prefix) = 0; - virtual std::string getPathForWrite(const std::string & prefix, const std::string & partition_key) = 0; - - virtual ColumnRawPtrs getFormatChunkColumns(const Chunk & chunk) + ColumnPtr computePartitionKey(Block & block) { - ColumnRawPtrs result_columns; - - for (const auto & column : chunk.getColumns()) - { - result_columns.emplace_back(column.get()); - } + actions_with_column_name.actions->execute(block); - return result_columns; + return block.getByName(actions_with_column_name.column_name).column; } + virtual std::string getPathForRead(const std::string & prefix) = 0; + virtual std::string getPathForWrite(const std::string & prefix, const std::string & partition_key) = 0; + + virtual Chunk getFormatChunk(const Chunk & chunk) { return chunk.clone(); } + virtual Block getFormatHeader() { return sample_block; } NamesAndTypesList getPartitionColumns() const; @@ -53,6 +50,7 @@ struct IPartitionStrategy const KeyDescription partition_key_description; const Block sample_block; ContextPtr context; + PartitionExpressionActionsAndColumnName actions_with_column_name; }; /* @@ -91,9 +89,6 @@ struct WildcardPartitionStrategy : IPartitionStrategy ColumnPtr computePartitionKey(const Chunk & chunk) override; std::string getPathForRead(const std::string & prefix) override; std::string getPathForWrite(const std::string & prefix, const std::string & partition_key) override; - -private: - PartitionExpressionActionsAndColumnName actions_with_column_name; }; /* @@ -114,14 +109,13 @@ struct HiveStylePartitionStrategy : IPartitionStrategy std::string getPathForRead(const std::string & prefix) override; std::string getPathForWrite(const std::string & prefix, const std::string & partition_key) override; - ColumnRawPtrs getFormatChunkColumns(const Chunk & chunk) override; + Chunk getFormatChunk(const Chunk & chunk) override; Block getFormatHeader() override; private: const std::string file_format; const bool partition_columns_in_data_file; std::unordered_set partition_columns_name_set; - PartitionExpressionActionsAndColumnName actions_with_column_name; Block block_without_partition_columns; }; diff --git a/src/Storages/IStorage.h b/src/Storages/IStorage.h index 006b9e377f4b..1ecb7ddb9510 100644 --- a/src/Storages/IStorage.h +++ b/src/Storages/IStorage.h @@ -18,9 +18,12 @@ #include #include #include +#include #include +#include "MergeTree/RangesInDataPart.h" + namespace DB { @@ -67,6 +70,12 @@ class ConditionSelectivityEstimator; class ActionsDAG; +class MergeTreeData; + +class IMergeTreeDataPart; + +struct MergeTreePartImportStats; + /** Storage. Describes the table. Responsible for * - storage of the table data; * - the definition in which files (or not in files) the data is stored; @@ -205,6 +214,15 @@ class IStorage : public std::enable_shared_from_this, public TypePromo virtuals.set(std::make_unique(std::move(virtuals_))); } + virtual void commitExportPartitionTransaction( + const String & /* transaction_id */, + const String & /* partition_id */, + const Strings & /* exported_paths */, + ContextPtr /* local_context */) + { + throw Exception(ErrorCodes::NOT_IMPLEMENTED, "writeExportCommit is not implemented for storage type {}", getName()); + } + /// Return list of virtual columns (like _part, _table, etc). In the vast /// majority of cases virtual columns are static constant part of Storage /// class and don't depend on Storage object. But sometimes we have fake @@ -436,6 +454,21 @@ class IStorage : public std::enable_shared_from_this, public TypePromo ContextPtr /*context*/, bool /*async_insert*/); + virtual bool supportsImportMergeTreePartition() const { return false; } + + virtual void importMergeTreePartition( + const MergeTreeData &, + const std::vector &, + ContextPtr /*context*/, + std::function) {} + + virtual void importMergeTreePart( + const MergeTreeData &, + const DataPartPtr &, + ContextPtr, + std::function + ) {} + /** Writes the data to a table in distributed manner. * It is supposed that implementation looks into SELECT part of the query and executes distributed * INSERT SELECT if it is possible with current storage as a receiver and query SELECT part as a producer. diff --git a/src/Storages/MergeTree/MergeTreeData.cpp b/src/Storages/MergeTree/MergeTreeData.cpp index aa3ac075a5a7..3c82f7a24187 100644 --- a/src/Storages/MergeTree/MergeTreeData.cpp +++ b/src/Storages/MergeTree/MergeTreeData.cpp @@ -5940,6 +5940,11 @@ Pipe MergeTreeData::alterPartition( } } break; + case PartitionCommand::EXPORT_PARTITION: + { + exportPartitionToTable(command, query_context); + break; + } case PartitionCommand::DROP_DETACHED_PARTITION: dropDetached(command.partition, command.part, query_context); diff --git a/src/Storages/MergeTree/MergeTreeData.h b/src/Storages/MergeTree/MergeTreeData.h index 89d3507266e0..7d1f6b1f9a34 100644 --- a/src/Storages/MergeTree/MergeTreeData.h +++ b/src/Storages/MergeTree/MergeTreeData.h @@ -18,6 +18,7 @@ #include #include #include +#include #include #include #include @@ -901,6 +902,8 @@ class MergeTreeData : public IStorage, public WithMutableContext /// Moves partition to specified Table void movePartitionToTable(const PartitionCommand & command, ContextPtr query_context); + virtual void exportPartitionToTable(const PartitionCommand &, ContextPtr) { throw Exception(ErrorCodes::NOT_IMPLEMENTED, "export not implemented");} + /// Checks that Partition could be dropped right now /// Otherwise - throws an exception with detailed information. /// We do not use mutex because it is not very important that the size could change during the operation. @@ -964,6 +967,7 @@ class MergeTreeData : public IStorage, public WithMutableContext bool must_on_same_disk); virtual std::vector getMutationsStatus() const = 0; + virtual std::vector getExportsStatus() const { return {}; } /// Returns true if table can create new parts with adaptive granularity /// Has additional constraint in replicated version @@ -1149,7 +1153,7 @@ class MergeTreeData : public IStorage, public WithMutableContext /// Schedules background job to like merge/mutate/fetch an executor virtual bool scheduleDataProcessingJob(BackgroundJobsAssignee & assignee) = 0; /// Schedules job to move parts between disks/volumes and so on. - bool scheduleDataMovingJob(BackgroundJobsAssignee & assignee); + virtual bool scheduleDataMovingJob(BackgroundJobsAssignee & assignee); bool areBackgroundMovesNeeded() const; @@ -1230,6 +1234,8 @@ class MergeTreeData : public IStorage, public WithMutableContext friend class MergeTask; friend class IPartMetadataManager; friend class IMergedBlockOutputStream; // for access to log + friend class ExportPartitionPlainMergeTreeTask; + friend class ExportPartPlainMergeTreeTask; bool require_part_metadata; diff --git a/src/Storages/MergeTree/MergeTreeExportManifest.h b/src/Storages/MergeTree/MergeTreeExportManifest.h new file mode 100644 index 000000000000..407dc579cd18 --- /dev/null +++ b/src/Storages/MergeTree/MergeTreeExportManifest.h @@ -0,0 +1,217 @@ +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace DB +{ + +/** + * JSON manifest for exporting a set of parts to object storage. + * Layout on disk (pretty-printed JSON): + * { + * "transaction_id": "", + * "partition_id": "", + * "destination": ".", + * "create_time": , + * "status": "", + * "parts": [ {"part_name": "name", "remote_path": "path-or-empty"}, ... ] + * } + */ +struct MergeTreeExportManifest +{ + using DataPartPtr = std::shared_ptr; + + enum class Status { + pending, + completed, + failed + }; + + MergeTreeExportManifest() + : destination_storage_id(StorageID::createEmpty()) + , status(Status::pending) + {} + + struct Item + { + String part_name; + String remote_path; // empty until uploaded + bool in_progress = false; /// this is just a hackish workaround for now + DataPartPtr part; // hold reference to part so it does not get deleted from disk even if it is outdated. Should be null once we are done with it + }; + + + String transaction_id; + String partition_id; + StorageID destination_storage_id; + time_t create_time = 0; + std::vector items; + Status status = Status::pending; + + std::filesystem::path file_path; + DiskPtr disk; + + static std::shared_ptr create( + const DiskPtr & disk_, + const String & path_prefix, + const String & transaction_id_, + const String & partition_id_, + const StorageID & destination_storage_id_, + const std::vector & data_parts) + { + auto manifest = std::make_shared(); + manifest->disk = disk_; + manifest->transaction_id = transaction_id_; + manifest->partition_id = partition_id_; + manifest->destination_storage_id = destination_storage_id_; + manifest->create_time = std::time(nullptr); + manifest->file_path = std::filesystem::path(path_prefix) / ("export_partition_" + partition_id_ + "_transaction_" + transaction_id_ + ".json"); + manifest->items.reserve(data_parts.size()); + for (const auto & data_part : data_parts) + manifest->items.emplace_back(data_part->name, "", false, data_part); + manifest->write(); + return manifest; + } + + /// will not fill parts ref, maybe I should. + static std::shared_ptr read( + const DiskPtr & disk_, + const String & file_path_) + { + auto manifest = std::make_shared(); + manifest->disk = disk_; + manifest->file_path = file_path_; + + auto in = disk_->readFile(file_path_, ReadSettings{}); + + String json_str; + readStringUntilEOF(json_str, *in); + + Poco::JSON::Parser parser; + Poco::Dynamic::Var json = parser.parse(json_str); + const Poco::JSON::Object::Ptr & root = json.extract(); + if (!root) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Invalid export manifest JSON: {}", file_path_); + + manifest->transaction_id = root->getValue("transaction_id"); + manifest->partition_id = root->getValue("partition_id"); + const auto destination = root->getValue("destination"); + manifest->destination_storage_id = StorageID(QualifiedTableName::parseFromString(destination)); + + manifest->create_time = root->getValue("create_time"); + + String status_str = root->getValue("status"); + manifest->status = magic_enum::enum_cast(status_str).value(); + + manifest->items.clear(); + auto parts = root->get("parts").extract(); + for (unsigned int i = 0; i < parts->size(); ++i) + { + const auto part_obj = parts->getObject(i); + Item item; + item.part_name = part_obj->getValue("part_name"); + item.remote_path = part_obj->getValue("remote_path"); + manifest->items.push_back(std::move(item)); + } + + return manifest; + } + + void write() const + { + auto out = disk->writeFile(file_path, DBMS_DEFAULT_BUFFER_SIZE, WriteMode::Rewrite); + + Poco::JSON::Object::Ptr root(new Poco::JSON::Object()); + root->set("transaction_id", transaction_id); + root->set("partition_id", partition_id); + root->set("destination", destination_storage_id.getQualifiedName().getFullName()); + root->set("create_time", static_cast(create_time)); + root->set("status", String(magic_enum::enum_name(status))); + + Poco::JSON::Array::Ptr parts(new Poco::JSON::Array()); + for (const auto & i : items) + { + Poco::JSON::Object::Ptr obj(new Poco::JSON::Object()); + obj->set("part_name", i.part_name); + obj->set("remote_path", i.remote_path); + parts->add(obj); + } + root->set("parts", parts); + + std::ostringstream oss; + Poco::JSON::Stringifier::stringify(root, oss, 2); + const std::string s = oss.str(); + out->write(s.data(), s.size()); + out->finalize(); + out->sync(); + } + + void deleteFile() const + { + disk->removeFile(file_path); + } + + void updateRemotePathAndWrite(const String & part_name, const String & remote_path) + { + for (auto & i : items) + { + if (i.part_name == part_name) + { + i.remote_path = remote_path; + break; + } + } + write(); + } + + void setInProgress(const String & part_name) + { + for (auto & i : items) + { + if (i.part_name == part_name) + i.in_progress = true; + } + } + + std::vector pendingParts() const + { + std::vector res; + for (const auto & i : items) + if (i.remote_path.empty()) + res.push_back(i.part_name); + return res; + } + + std::vector exportedPaths() const + { + std::vector res; + res.reserve(items.size()); + + for (const auto & i : items) + { + if (!i.remote_path.empty()) + { + res.push_back(i.remote_path); + } + } + + return res; + } +}; + +} diff --git a/src/Storages/MergeTree/MergeTreeExportStatus.cpp b/src/Storages/MergeTree/MergeTreeExportStatus.cpp new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/src/Storages/MergeTree/MergeTreeExportStatus.h b/src/Storages/MergeTree/MergeTreeExportStatus.h new file mode 100644 index 000000000000..7f6905c2f1a4 --- /dev/null +++ b/src/Storages/MergeTree/MergeTreeExportStatus.h @@ -0,0 +1,25 @@ +#pragma once + +#include +#include +#include +#include + + +namespace DB +{ + +struct MergeTreeExportStatus +{ + String source_database; + String source_table; + String destination_database; + String destination_table; + String transaction_id; + time_t create_time = 0; + std::vector parts_to_do_names; + MergeTreeExportManifest::Status status; +}; + +} + diff --git a/src/Storages/MergeTree/MergeTreePartition.cpp b/src/Storages/MergeTree/MergeTreePartition.cpp index 445fc8846da3..19c33cbca5ed 100644 --- a/src/Storages/MergeTree/MergeTreePartition.cpp +++ b/src/Storages/MergeTree/MergeTreePartition.cpp @@ -479,6 +479,22 @@ void MergeTreePartition::create(const StorageMetadataPtr & metadata_snapshot, Bl } } +Block MergeTreePartition::getBlockWithPartitionValues(const NamesAndTypesList & partition_columns) const +{ + chassert(partition_columns.size() == value.size()); + + Block result; + + std::size_t i = 0; + for (const auto & partition_column : partition_columns) + { + auto column = partition_column.type->createColumnConst(1, value[i++]); + result.insert({column, partition_column.type, partition_column.name}); + } + + return result; +} + NamesAndTypesList MergeTreePartition::executePartitionByExpression(const StorageMetadataPtr & metadata_snapshot, Block & block, ContextPtr context) { auto adjusted_partition_key = adjustPartitionKey(metadata_snapshot, context); diff --git a/src/Storages/MergeTree/MergeTreePartition.h b/src/Storages/MergeTree/MergeTreePartition.h index 4338b216cdb8..811cfdc2a90c 100644 --- a/src/Storages/MergeTree/MergeTreePartition.h +++ b/src/Storages/MergeTree/MergeTreePartition.h @@ -60,6 +60,8 @@ struct MergeTreePartition void create(const StorageMetadataPtr & metadata_snapshot, Block block, size_t row, ContextPtr context); + Block getBlockWithPartitionValues(const NamesAndTypesList & partition_columns) const; + /// Adjust partition key and execute its expression on block. Return sample block according to used expression. static NamesAndTypesList executePartitionByExpression(const StorageMetadataPtr & metadata_snapshot, Block & block, ContextPtr context); diff --git a/src/Storages/MergeTree/MergeTreeSequentialSource.cpp b/src/Storages/MergeTree/MergeTreeSequentialSource.cpp index 9335e08fa4c2..d74a04744a11 100644 --- a/src/Storages/MergeTree/MergeTreeSequentialSource.cpp +++ b/src/Storages/MergeTree/MergeTreeSequentialSource.cpp @@ -140,6 +140,9 @@ MergeTreeSequentialSource::MergeTreeSequentialSource( case Merge: read_settings.local_throttler = context->getMergesThrottler(); break; + case Export: + read_settings.local_throttler = context->getExportsThrottler(); + break; } read_settings.remote_throttler = read_settings.local_throttler; diff --git a/src/Storages/MergeTree/MergeTreeSequentialSource.h b/src/Storages/MergeTree/MergeTreeSequentialSource.h index abba230d9e79..a858adf33bb5 100644 --- a/src/Storages/MergeTree/MergeTreeSequentialSource.h +++ b/src/Storages/MergeTree/MergeTreeSequentialSource.h @@ -15,6 +15,7 @@ enum MergeTreeSequentialSourceType { Mutation, Merge, + Export, }; /// Create stream for reading single part from MergeTree. diff --git a/src/Storages/ObjectStorage/FilePathGenerator.h b/src/Storages/ObjectStorage/FilePathGenerator.h new file mode 100644 index 000000000000..c2ba6a6f5d94 --- /dev/null +++ b/src/Storages/ObjectStorage/FilePathGenerator.h @@ -0,0 +1,61 @@ +#pragma once + +#include +#include +#include +#include + +namespace DB +{ + struct ObjectStorageFilePathGenerator + { + virtual ~ObjectStorageFilePathGenerator() = default; + virtual std::string getWritingPath(const std::string & partition_id) const = 0; + virtual std::string getReadingPath() const = 0; + }; + + struct ObjectStorageWildcardFilePathGenerator : ObjectStorageFilePathGenerator + { + explicit ObjectStorageWildcardFilePathGenerator(const std::string & raw_path_) : raw_path(raw_path_) {} + + std::string getWritingPath(const std::string & partition_id) const override + { + return PartitionedSink::replaceWildcards(raw_path, partition_id); + } + + std::string getReadingPath() const override + { + return raw_path; + } + + private: + std::string raw_path; + + }; + + struct ObjectStorageAppendFilePathGenerator : ObjectStorageFilePathGenerator + { + explicit ObjectStorageAppendFilePathGenerator( + const std::string & raw_path_, + const std::string & file_format_, + const std::shared_ptr & filename_generator_) + : raw_path(raw_path_), file_format(Poco::toLower(file_format_)), filename_generator(filename_generator_){} + + std::string getWritingPath(const std::string & partition_id) const override + { + return raw_path + "/" + partition_id + "/" + filename_generator->generate() + "." + file_format; + } + + std::string getReadingPath() const override + { + return raw_path + "**." + file_format; + } + + private: + std::string raw_path; + std::string file_format; + + std::shared_ptr filename_generator; + }; + +} diff --git a/src/Storages/ObjectStorage/MergeTree/ExportPartPlainMergeTreeTask.cpp b/src/Storages/ObjectStorage/MergeTree/ExportPartPlainMergeTreeTask.cpp new file mode 100644 index 000000000000..e58693291115 --- /dev/null +++ b/src/Storages/ObjectStorage/MergeTree/ExportPartPlainMergeTreeTask.cpp @@ -0,0 +1,218 @@ +#include +#include +#include +#include +#include +#include +#include +#include + +namespace DB +{ + +ExportPartPlainMergeTreeTask::ExportPartPlainMergeTreeTask( + StorageMergeTree & storage_, + const DataPartPtr & part_to_export_, + const StoragePtr & destination_storage_, + ContextPtr context_, + std::shared_ptr manifest_, + IExecutableTask::TaskResultCallback & task_result_callback_, + size_t max_retries_) + : storage(storage_) + , part_to_export(part_to_export_) + , destination_storage(destination_storage_) + , context(std::move(context_)) + , manifest(std::move(manifest_)) + , task_result_callback(task_result_callback_) + , max_retries(max_retries_) +{ + UInt64 transaction_id = std::stoull(manifest->transaction_id); + priority.value = transaction_id; +} + +StorageID ExportPartPlainMergeTreeTask::getStorageID() const +{ + return storage.getStorageID(); +} + +String ExportPartPlainMergeTreeTask::getQueryId() const +{ + return getStorageID().getShortName() + "::export_partition::" + manifest->transaction_id; +} + +bool ExportPartPlainMergeTreeTask::executeStep() +{ + if (cancelled) + return false; + + switch (state) + { + case State::NEED_PREPARE: + { + prepare(); + state = State::NEED_EXECUTE; + return true; + } + case State::NEED_EXECUTE: + { + if (executeExport()) + { + state = State::NEED_COMMIT; + } + else if (retry_count < max_retries) + { + retry_count++; + LOG_INFO(getLogger("ExportPartPlainMergeTreeTask"), + "Retrying export attempt {} for part {}", + retry_count, part_to_export->name); + state = State::NEED_EXECUTE; + } + else + { + state = State::FAILED; + } + + return true; + } + case State::NEED_COMMIT: + { + if (commitExport()) + { + state = State::SUCCESS; + } + else if (retry_count < max_retries) + { + retry_count++; + LOG_INFO(getLogger("ExportPartPlainMergeTreeTask"), + "Retrying export attempt {} for part {}", + retry_count, part_to_export->name); + state = State::NEED_COMMIT; + } + else + { + state = State::FAILED; + } + + return true; + } + case State::FAILED: + { + std::lock_guard lock(storage.export_partition_transaction_id_to_manifest_mutex); + + manifest->status = MergeTreeExportManifest::Status::failed; + manifest->write(); + + /// doesn't sound ideal, but it is actually ok to allow this partition to be re-exported as soon as a single part fails + /// this is because the ongoing export will never commit, so it won't cause duplicates + storage.already_exported_partition_ids.erase(manifest->partition_id); + + return false; + } + case State::SUCCESS: + { + return false; + } + } + + return false; +} + + +void ExportPartPlainMergeTreeTask::prepare() +{ + stopwatch_ptr = std::make_unique(); +} + +bool ExportPartPlainMergeTreeTask::executeExport() +{ + if (cancelled) + return false; + + std::function part_log_wrapper = [this](MergeTreePartImportStats stats) { + auto table_id = storage.getStorageID(); + + UInt64 elapsed_ns = stopwatch_ptr->elapsedNanoseconds(); + + storage.writePartLog( + PartLogElement::Type::EXPORT_PART, + stats.status, + elapsed_ns, + stats.part->name, + stats.part, + {stats.part}, + nullptr, + nullptr); + + if (stats.status.code != 0) + { + LOG_INFO(getLogger("ExportMergeTreePartitionToObjectStorageTask"), "Error importing part {}: {}", stats.part->name, stats.status.message); + return; + } + + std::lock_guard lock(storage.export_partition_transaction_id_to_manifest_mutex); + + storage.export_partition_transaction_id_to_manifest[manifest->transaction_id]->updateRemotePathAndWrite( + stats.part->name, + stats.file_path); + }; + + try + { + auto context_copy = Context::createCopy(context); + + /// Manually disable parallelism because the idea is to control parallelism with tasks, not with formatting + context_copy->setSetting("output_format_parallel_formatting", false); + context_copy->setSetting("max_threads", 1); + + destination_storage->importMergeTreePart( + storage, + part_to_export, + context_copy, + part_log_wrapper); + + return true; + } + catch (...) + { + LOG_ERROR(getLogger("ExportPartPlainMergeTreeTask"), "Failed to export part {}", part_to_export->name); + + return false; + } +} + +bool ExportPartPlainMergeTreeTask::commitExport() +{ + std::lock_guard lock(storage.export_partition_transaction_id_to_manifest_mutex); + + if (manifest->exportedPaths().size() == manifest->items.size()) + { + destination_storage->commitExportPartitionTransaction( + manifest->transaction_id, + manifest->partition_id, + manifest->exportedPaths(), + context); + manifest->status = MergeTreeExportManifest::Status::completed; + manifest->write(); + storage.export_partition_transaction_id_to_manifest.erase(manifest->transaction_id); + LOG_INFO(getLogger("ExportMergeTreePartitionToObjectStorageTask"), + "Successfully committed export transaction {} for partition {}", + manifest->transaction_id, manifest->partition_id); + } + + LOG_INFO(getLogger("ExportPartPlainMergeTreeTask"), "Not all parts have been exported yet for transaction id {}, not comitting for this part", manifest->transaction_id); + + return true; +} + +void ExportPartPlainMergeTreeTask::onCompleted() +{ + bool success = (state == State::SUCCESS); + task_result_callback(success); +} + +void ExportPartPlainMergeTreeTask::cancel() noexcept +{ + cancelled = true; +} + +} diff --git a/src/Storages/ObjectStorage/MergeTree/ExportPartPlainMergeTreeTask.h b/src/Storages/ObjectStorage/MergeTree/ExportPartPlainMergeTreeTask.h new file mode 100644 index 000000000000..cc1b40e60a94 --- /dev/null +++ b/src/Storages/ObjectStorage/MergeTree/ExportPartPlainMergeTreeTask.h @@ -0,0 +1,70 @@ +#pragma once + +#include +#include +#include +#include +#include +#include +#include + +namespace DB +{ + +class MergeTreeData; + +class ExportPartPlainMergeTreeTask : public IExecutableTask +{ +public: + ExportPartPlainMergeTreeTask( + StorageMergeTree & storage_, + const DataPartPtr & part_to_export_, + const StoragePtr & destination_storage_, + ContextPtr context_, + std::shared_ptr manifest_, + IExecutableTask::TaskResultCallback & task_result_callback_, + size_t max_retries_); + + void onCompleted() override; + bool executeStep() override; + void cancel() noexcept override; + StorageID getStorageID() const override; + Priority getPriority() const override { return priority; } + String getQueryId() const override; + +private: + void prepare(); + bool executeExport(); + bool commitExport(); + bool exportedAllIndividualParts() const; + + enum class State : uint8_t + { + NEED_PREPARE, + NEED_EXECUTE, + NEED_COMMIT, + FAILED, + SUCCESS + }; + + State state{State::NEED_PREPARE}; + + StorageMergeTree & storage; + DataPartPtr part_to_export; + StoragePtr destination_storage; + ContextPtr context; + std::shared_ptr manifest; + IExecutableTask::TaskResultCallback task_result_callback; + + size_t max_retries; + size_t retry_count = 0; + Priority priority; + std::unique_ptr stopwatch_ptr; + + bool cancelled = false; + std::exception_ptr current_exception; +}; + +using ExportPartPlainMergeTreeTaskPtr = std::shared_ptr; + +} diff --git a/src/Storages/ObjectStorage/MergeTree/StorageObjectStorageMergeTreePartImporterSink.cpp b/src/Storages/ObjectStorage/MergeTree/StorageObjectStorageMergeTreePartImporterSink.cpp new file mode 100644 index 000000000000..bb2d6514956e --- /dev/null +++ b/src/Storages/ObjectStorage/MergeTree/StorageObjectStorageMergeTreePartImporterSink.cpp @@ -0,0 +1,66 @@ +#include + +namespace DB +{ + +StorageObjectStorageMergeTreePartImporterSink::StorageObjectStorageMergeTreePartImporterSink( + const DataPartPtr & part_, + const std::string & path_, + const ObjectStoragePtr & object_storage_, + const ConfigurationPtr & configuration_, + const std::optional & format_settings_, + const Block & sample_block_, + const std::function & part_log_, + const ContextPtr & context_) + : SinkToStorage(sample_block_) + , object_storage(object_storage_) + , configuration(configuration_) + , format_settings(format_settings_) + , sample_block(sample_block_) + , context(context_) + , part_log(part_log_) +{ + stats.part = part_; + stats.file_path = path_; + sink = std::make_shared( + stats.file_path, + object_storage, + configuration, + format_settings, + sample_block, + context); +} + +String StorageObjectStorageMergeTreePartImporterSink::getName() const +{ + return "StorageObjectStorageMergeTreePartImporterSink"; +} + +void StorageObjectStorageMergeTreePartImporterSink::consume(Chunk & chunk) +{ + sink->consume(chunk); + stats.read_bytes += chunk.bytes(); + stats.read_rows += chunk.getNumRows(); +} + +void StorageObjectStorageMergeTreePartImporterSink::onFinish() +{ + sink->onFinish(); + + if (const auto object_metadata = object_storage->tryGetObjectMetadata(stats.file_path)) + { + stats.bytes_on_disk = object_metadata->size_bytes; + } + + part_log(stats); +} + +void StorageObjectStorageMergeTreePartImporterSink::onException(std::exception_ptr exception) +{ + sink->onException(exception); + + stats.status = ExecutionStatus(-1, "Error importing part"); + part_log(stats); +} + +} diff --git a/src/Storages/ObjectStorage/MergeTree/StorageObjectStorageMergeTreePartImporterSink.h b/src/Storages/ObjectStorage/MergeTree/StorageObjectStorageMergeTreePartImporterSink.h new file mode 100644 index 000000000000..21f435a08e4b --- /dev/null +++ b/src/Storages/ObjectStorage/MergeTree/StorageObjectStorageMergeTreePartImporterSink.h @@ -0,0 +1,61 @@ +#pragma once + +#include +#include +#include +#include +#include +#include + +namespace DB +{ + +struct MergeTreePartImportStats +{ + ExecutionStatus status; + std::size_t bytes_on_disk = 0; + std::size_t read_rows = 0; + std::size_t read_bytes = 0; + std::string file_path = ""; + DataPartPtr part = nullptr; +}; + +/* + * Wrapper around `StorageObjectsStorageSink` that takes care of accounting & metrics for partition export + */ +class StorageObjectStorageMergeTreePartImporterSink : public SinkToStorage +{ +public: + using ConfigurationPtr = StorageObjectStorage::ConfigurationPtr; + + StorageObjectStorageMergeTreePartImporterSink( + const DataPartPtr & part_, + const std::string & path_, + const ObjectStoragePtr & object_storage_, + const ConfigurationPtr & configuration_, + const std::optional & format_settings_, + const Block & sample_block_, + const std::function & part_log_, + const ContextPtr & context_); + + String getName() const override; + + void consume(Chunk & chunk) override; + + void onFinish() override; + + void onException(std::exception_ptr exception) override; + +private: + std::shared_ptr sink; + ObjectStoragePtr object_storage; + ConfigurationPtr configuration; + std::optional format_settings; + Block sample_block; + ContextPtr context; + std::function part_log; + + MergeTreePartImportStats stats; +}; + +} diff --git a/src/Storages/ObjectStorage/ObjectStorageFilenameGenerator.h b/src/Storages/ObjectStorage/ObjectStorageFilenameGenerator.h new file mode 100644 index 000000000000..0dcdb26b719f --- /dev/null +++ b/src/Storages/ObjectStorage/ObjectStorageFilenameGenerator.h @@ -0,0 +1,48 @@ +#pragma once + +#include +#include + +namespace DB +{ + +struct ObjectStorageFilenameGenerator +{ + virtual ~ObjectStorageFilenameGenerator() = default; + virtual std::string generate() const = 0; +}; + +struct NoOpObjectStorageFilenameGenerator : ObjectStorageFilenameGenerator +{ + std::string generate() const override + { + return ""; + } +}; + +struct SnowflakeObjectStorageFilenameGenerator : ObjectStorageFilenameGenerator +{ + std::string generate() const override + { + return std::to_string(generateSnowflakeID()); + } + +private: + std::string file_format; +}; + +struct PredefinedObjectStorageFilenameGenerator : public ObjectStorageFilenameGenerator +{ + explicit PredefinedObjectStorageFilenameGenerator(const std::string & filename_) + : file_name(filename_) {} + + std::string generate() const override + { + return file_name; + } + +private: + std::string file_name; +}; + +} diff --git a/src/Storages/ObjectStorage/StorageObjectStorage.cpp b/src/Storages/ObjectStorage/StorageObjectStorage.cpp index f910691767aa..eba74d5758e0 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorage.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorage.cpp @@ -1,13 +1,15 @@ #include +#include #include +#include "MergeTree/StorageObjectStorageMergeTreePartImporterSink.h" -#include #include #include -#include #include -#include #include +#include +#include +#include #include #include @@ -20,6 +22,8 @@ #include #include #include +#include +#include #include #include #include @@ -29,8 +33,13 @@ #include #include #include +#include #include +#include +#include +#include +#include namespace DB { @@ -538,6 +547,124 @@ void StorageObjectStorage::read( query_plan.addStep(std::move(read_step)); } +bool StorageObjectStorage::supportsImportMergeTreePartition() const +{ + return configuration->partition_strategy != nullptr && configuration->partition_strategy_type == PartitionStrategyFactory::StrategyType::HIVE; +} + +void StorageObjectStorage::importMergeTreePart( + const MergeTreeData & merge_tree_data, + const DataPartPtr & data_part, + ContextPtr local_context, + std::function part_log +) +{ + auto metadata_snapshot = merge_tree_data.getInMemoryMetadataPtr(); + Names columns_to_read = metadata_snapshot->getColumns().getNamesOfPhysical(); + StorageSnapshotPtr storage_snapshot = merge_tree_data.getStorageSnapshot(metadata_snapshot, local_context); + + QueryPlan plan; + + /// using the mutations type for now. This impacts in the throttling strategy + MergeTreeSequentialSourceType read_type = MergeTreeSequentialSourceType::Export; + + /// todo implement these settings + bool apply_deleted_mask = true; + bool read_with_direct_io = false; + bool prefetch = false; + + std::string partition_key; + + if (configuration->partition_strategy) + { + const auto partition_columns = configuration->partition_strategy->getPartitionColumns(); + + auto block_with_partition_values = data_part->partition.getBlockWithPartitionValues(partition_columns); + + const auto column_with_partition_key = configuration->partition_strategy->computePartitionKey(block_with_partition_values); + + if (!column_with_partition_key->empty()) + { + partition_key = column_with_partition_key->getDataAt(0).toString(); + } + } + + const auto file_path = configuration->file_path_generator->getWritingPath(partition_key); + + MergeTreeData::IMutationsSnapshot::Params params + { + .metadata_version = metadata_snapshot->getMetadataVersion(), + .min_part_metadata_version = data_part->getMetadataVersion(), + }; + + auto mutations_snapshot = merge_tree_data.getMutationsSnapshot(params); + + auto alter_conversions = MergeTreeData::getAlterConversionsForPart( + data_part, + mutations_snapshot, + local_context); + + QueryPlan plan_for_part; + + createReadFromPartStep( + read_type, + plan_for_part, + merge_tree_data, + storage_snapshot, + RangesInDataPart(data_part), + alter_conversions, + nullptr, + columns_to_read, + nullptr, + apply_deleted_mask, + std::nullopt, + read_with_direct_io, + prefetch, + local_context, + getLogger("ExportPartition")); + + QueryPlanOptimizationSettings optimization_settings(local_context); + auto pipeline_settings = BuildQueryPipelineSettings(local_context); + auto builder = plan_for_part.buildQueryPipeline(optimization_settings, pipeline_settings); + auto pipeline = QueryPipelineBuilder::getPipeline(std::move(*builder)); + + auto sink = std::make_shared( + data_part, + file_path, + object_storage, + configuration, + format_settings, + metadata_snapshot->getSampleBlock(), + part_log, + local_context + ); + + pipeline.complete(sink); + + CompletedPipelineExecutor exec(pipeline); + exec.execute(); +} + +void StorageObjectStorage::commitExportPartitionTransaction(const String & transaction_id, const String & partition_id, const Strings & exported_paths, ContextPtr local_context) +{ + const String commit_object = configuration->getRawPath().path + "/commit_" + partition_id + "_" + transaction_id; + + /// if file already exists, nothing to be done + if (object_storage->exists(StoredObject(commit_object))) + { + LOG_DEBUG(getLogger("StorageObjectStorage"), "Commit file already exists, nothing to be done: {}", commit_object); + return; + } + + auto out = object_storage->writeObject(StoredObject(commit_object), WriteMode::Rewrite, /* attributes= */ {}, DBMS_DEFAULT_BUFFER_SIZE, local_context->getWriteSettings()); + for (const auto & p : exported_paths) + { + out->write(p.data(), p.size()); + out->write("\n", 1); + } + out->finalize(); +} + SinkToStoragePtr StorageObjectStorage::write( const ASTPtr &, const StorageMetadataPtr & metadata_snapshot, @@ -577,7 +704,8 @@ SinkToStoragePtr StorageObjectStorage::write( if (configuration->partition_strategy) { - return std::make_shared(object_storage, configuration, format_settings, sample_block, local_context); + auto sink_creator = std::make_shared(object_storage, configuration, configuration->file_path_generator, format_settings, sample_block, local_context); + return std::make_shared(configuration->partition_strategy, sink_creator, local_context, sample_block); } auto paths = configuration->getPaths(); @@ -757,6 +885,18 @@ void StorageObjectStorage::Configuration::initialize( configuration_to_initialize.partition_strategy_type = PartitionStrategyFactory::StrategyType::WILDCARD; } + if (configuration_to_initialize.partition_strategy_type == PartitionStrategyFactory::StrategyType::HIVE) + { + configuration_to_initialize.file_path_generator = std::make_shared( + configuration_to_initialize.getRawPath().path, + configuration_to_initialize.format, + std::make_shared()); + } + else + { + configuration_to_initialize.file_path_generator = std::make_shared(configuration_to_initialize.getRawPath().path); + } + if (configuration_to_initialize.format == "auto") { if (configuration_to_initialize.isDataLakeConfiguration()) @@ -878,3 +1018,4 @@ void StorageObjectStorage::Configuration::assertInitialized() const } } + diff --git a/src/Storages/ObjectStorage/StorageObjectStorage.h b/src/Storages/ObjectStorage/StorageObjectStorage.h index 0c75431cd1c1..ac8c7de72721 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorage.h +++ b/src/Storages/ObjectStorage/StorageObjectStorage.h @@ -7,6 +7,7 @@ #include #include #include +#include "Storages/MergeTree/RangesInDataPart.h" #include #include #include @@ -16,7 +17,7 @@ #include -#include +#include namespace DB { @@ -98,6 +99,18 @@ class StorageObjectStorage : public IStorage ContextPtr context, bool async_insert) override; + bool supportsImportMergeTreePartition() const override; + + void importMergeTreePart( + const MergeTreeData & merge_tree_data, + const DataPartPtr & data_part, + ContextPtr, + std::function part_log + ) override; + + /// Write an export commit file containing the list of exported remote paths + void commitExportPartitionTransaction(const String & transaction_id, const String & partition_id, const Strings & exported_paths, ContextPtr local_context) override; + void truncate( const ASTPtr & query, const StorageMetadataPtr & metadata_snapshot, @@ -311,6 +324,7 @@ class StorageObjectStorage::Configuration bool if_not_updated_before, bool check_consistent_with_previous_metadata); + const StorageObjectStorageSettings & getSettingsRef() const; virtual const DataLakeStorageSettings & getDataLakeSettings() const { throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method getDataLakeSettings() is not implemented for configuration type {}", getTypeName()); @@ -324,6 +338,7 @@ class StorageObjectStorage::Configuration /// And alternative is with hive partitioning, when they are contained in file path. bool partition_columns_in_data_file = true; std::shared_ptr partition_strategy; + std::shared_ptr file_path_generator; protected: virtual void fromNamedCollection(const NamedCollection & collection, ContextPtr context) = 0; diff --git a/src/Storages/ObjectStorage/StorageObjectStorageSink.cpp b/src/Storages/ObjectStorage/StorageObjectStorageSink.cpp index e23787b5aa6b..08a272c9d1f2 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageSink.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageSink.cpp @@ -130,12 +130,13 @@ void StorageObjectStorageSink::cancelBuffers() PartitionedStorageObjectStorageSink::PartitionedStorageObjectStorageSink( ObjectStoragePtr object_storage_, ConfigurationPtr configuration_, + const std::shared_ptr & file_path_generator_, std::optional format_settings_, const Block & sample_block_, ContextPtr context_) - : PartitionedSink(configuration_->partition_strategy, context_, sample_block_) - , object_storage(object_storage_) + : object_storage(object_storage_) , configuration(configuration_) + , file_path_generator(file_path_generator_) , query_settings(configuration_->getQuerySettings(context_)) , format_settings(format_settings_) , sample_block(sample_block_) @@ -151,7 +152,7 @@ StorageObjectStorageSink::~StorageObjectStorageSink() SinkPtr PartitionedStorageObjectStorageSink::createSinkForPartition(const String & partition_id) { - auto file_path = configuration->getPathForWrite(partition_id).path; + auto file_path = file_path_generator->getWritingPath(partition_id); validateNamespace(configuration->getNamespace(), configuration); validateKey(file_path); @@ -167,7 +168,7 @@ SinkPtr PartitionedStorageObjectStorageSink::createSinkForPartition(const String object_storage, configuration, format_settings, - partition_strategy->getFormatHeader(), + configuration->partition_strategy->getFormatHeader(), context ); } diff --git a/src/Storages/ObjectStorage/StorageObjectStorageSink.h b/src/Storages/ObjectStorage/StorageObjectStorageSink.h index ebfee5ab96e6..d46bd53bb355 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageSink.h +++ b/src/Storages/ObjectStorage/StorageObjectStorageSink.h @@ -8,6 +8,7 @@ namespace DB { class StorageObjectStorageSink : public SinkToStorage { +friend class StorageObjectStorageMergeTreePartImporterSink; public: using ConfigurationPtr = StorageObjectStorage::ConfigurationPtr; @@ -38,7 +39,7 @@ class StorageObjectStorageSink : public SinkToStorage void cancelBuffers(); }; -class PartitionedStorageObjectStorageSink : public PartitionedSink +class PartitionedStorageObjectStorageSink : public PartitionedSink::SinkCreator { public: using ConfigurationPtr = StorageObjectStorage::ConfigurationPtr; @@ -46,6 +47,7 @@ class PartitionedStorageObjectStorageSink : public PartitionedSink PartitionedStorageObjectStorageSink( ObjectStoragePtr object_storage_, ConfigurationPtr configuration_, + const std::shared_ptr & file_path_generator_, std::optional format_settings_, const Block & sample_block_, ContextPtr context_); @@ -55,6 +57,7 @@ class PartitionedStorageObjectStorageSink : public PartitionedSink private: ObjectStoragePtr object_storage; ConfigurationPtr configuration; + std::shared_ptr file_path_generator; const StorageObjectStorage::QuerySettings query_settings; const std::optional format_settings; diff --git a/src/Storages/PartitionCommands.cpp b/src/Storages/PartitionCommands.cpp index 41cc03552458..6fb3c2bfd6d3 100644 --- a/src/Storages/PartitionCommands.cpp +++ b/src/Storages/PartitionCommands.cpp @@ -52,6 +52,17 @@ std::optional PartitionCommand::parse(const ASTAlterCommand * res.part = command_ast->part; return res; } + if (command_ast->type == ASTAlterCommand::EXPORT_PARTITION) + { + PartitionCommand res; + res.type = EXPORT_PARTITION; + res.partition = command_ast->partition->clone(); + res.part = command_ast->part; + res.move_destination_type = PartitionCommand::MoveDestinationType::TABLE; + res.to_database = command_ast->to_database; + res.to_table = command_ast->to_table; + return res; + } if (command_ast->type == ASTAlterCommand::MOVE_PARTITION) { PartitionCommand res; diff --git a/src/Storages/PartitionCommands.h b/src/Storages/PartitionCommands.h index 917e510f24b4..6a844567d073 100644 --- a/src/Storages/PartitionCommands.h +++ b/src/Storages/PartitionCommands.h @@ -33,6 +33,7 @@ struct PartitionCommand UNFREEZE_ALL_PARTITIONS, UNFREEZE_PARTITION, REPLACE_PARTITION, + EXPORT_PARTITION, }; Type type = UNKNOWN; diff --git a/src/Storages/PartitionedSink.cpp b/src/Storages/PartitionedSink.cpp index ec43e4b4ca1f..1a852975e424 100644 --- a/src/Storages/PartitionedSink.cpp +++ b/src/Storages/PartitionedSink.cpp @@ -6,10 +6,9 @@ #include #include - #include - #include +#include namespace DB @@ -22,12 +21,14 @@ namespace ErrorCodes PartitionedSink::PartitionedSink( std::shared_ptr partition_strategy_, + std::shared_ptr sink_creator_, ContextPtr context_, - const Block & sample_block_) - : SinkToStorage(sample_block_) + const Block & source_header_) + : SinkToStorage(source_header_) , partition_strategy(partition_strategy_) + , sink_creator(sink_creator_) , context(context_) - , sample_block(sample_block_) + , source_header(source_header_) { } @@ -37,7 +38,7 @@ SinkPtr PartitionedSink::getSinkForPartitionKey(StringRef partition_key) auto it = partition_id_to_sink.find(partition_key); if (it == partition_id_to_sink.end()) { - auto sink = createSinkForPartition(partition_key.toString()); + auto sink = sink_creator->createSinkForPartition(partition_key.toString()); std::tie(it, std::ignore) = partition_id_to_sink.emplace(partition_key, sink); } @@ -49,7 +50,8 @@ void PartitionedSink::consume(Chunk & source_chunk) const ColumnPtr partition_by_result_column = partition_strategy->computePartitionKey(source_chunk); /// Not all columns are serialized using the format writer (e.g, hive partitioning stores partition columns in the file path) - const auto columns_to_consume = partition_strategy->getFormatChunkColumns(source_chunk); + auto format_chunk = partition_strategy->getFormatChunk(source_chunk); + const auto & columns_to_consume = format_chunk.getColumns(); if (columns_to_consume.empty()) { @@ -146,8 +148,11 @@ String PartitionedSink::replaceWildcards(const String & haystack, const String & PartitionedSink::~PartitionedSink() { if (isCancelled()) - for (auto & item : partition_id_to_sink) + { + for (auto &item: partition_id_to_sink) item.second->cancel(); + } + } } diff --git a/src/Storages/PartitionedSink.h b/src/Storages/PartitionedSink.h index 481230792db0..9ee120ad101d 100644 --- a/src/Storages/PartitionedSink.h +++ b/src/Storages/PartitionedSink.h @@ -17,12 +17,26 @@ namespace DB class PartitionedSink : public SinkToStorage { public: + struct SinkCreator + { + virtual ~SinkCreator() = default; + virtual SinkPtr createSinkForPartition(const String & partition_id) = 0; + }; + static constexpr auto PARTITION_ID_WILDCARD = "{_partition_id}"; PartitionedSink( std::shared_ptr partition_strategy_, + std::shared_ptr sink_creator_, ContextPtr context_, - const Block & sample_block_); + const Block & source_header_ + ); + + struct ChunkSplitStatistics + { + uint64_t time_spent_on_partition_calculation = 0; + uint64_t time_spent_on_chunk_split = 0; + }; ~PartitionedSink() override; @@ -34,18 +48,15 @@ class PartitionedSink : public SinkToStorage void onFinish() override; - virtual SinkPtr createSinkForPartition(const String & partition_id) = 0; - static void validatePartitionKey(const String & str, bool allow_slash); static String replaceWildcards(const String & haystack, const String & partition_id); -protected: - std::shared_ptr partition_strategy; - private: + std::shared_ptr partition_strategy; + std::shared_ptr sink_creator; ContextPtr context; - Block sample_block; + Block source_header; absl::flat_hash_map partition_id_to_sink; HashMapWithSavedHash partition_id_to_chunk_index; diff --git a/src/Storages/StorageFile.cpp b/src/Storages/StorageFile.cpp index 5a3637ec33c2..f2ca6372937e 100644 --- a/src/Storages/StorageFile.cpp +++ b/src/Storages/StorageFile.cpp @@ -74,6 +74,7 @@ #include #include +#include namespace ProfileEvents { @@ -1942,7 +1943,7 @@ class StorageFileSink final : public SinkToStorage, WithContext std::unique_lock lock; }; -class PartitionedStorageFileSink : public PartitionedSink +class PartitionedStorageFileSink : public PartitionedSink::SinkCreator { public: PartitionedStorageFileSink( @@ -1957,7 +1958,7 @@ class PartitionedStorageFileSink : public PartitionedSink const String format_name_, ContextPtr context_, int flags_) - : PartitionedSink(partition_strategy_, context_, metadata_snapshot_->getSampleBlock()) + : partition_strategy(partition_strategy_) , path(path_) , metadata_snapshot(metadata_snapshot_) , table_name_for_log(table_name_for_log_) @@ -1973,11 +1974,13 @@ class PartitionedStorageFileSink : public PartitionedSink SinkPtr createSinkForPartition(const String & partition_id) override { - std::string filepath = partition_strategy->getPathForWrite(path, partition_id); + const auto file_path_generator = std::make_shared(path); + std::string filepath = file_path_generator->getWritingPath(partition_id); fs::create_directories(fs::path(filepath).parent_path()); - validatePartitionKey(filepath, true); + PartitionedSink::validatePartitionKey(filepath, true); + checkCreationIsAllowed(context, context->getUserFilesPath(), filepath, /*can_be_directory=*/ true); return std::make_shared( metadata_snapshot, @@ -1994,6 +1997,7 @@ class PartitionedStorageFileSink : public PartitionedSink } private: + std::shared_ptr partition_strategy; const String path; StorageMetadataPtr metadata_snapshot; String table_name_for_log; @@ -2045,7 +2049,7 @@ SinkToStoragePtr StorageFile::write( has_wildcards, /* partition_columns_in_data_file */true); - return std::make_shared( + auto sink_creator = std::make_shared( partition_strategy, metadata_snapshot, getStorageID().getNameForLogs(), @@ -2057,6 +2061,13 @@ SinkToStoragePtr StorageFile::write( format_name, context, flags); + + return std::make_shared( + partition_strategy, + sink_creator, + context, + metadata_snapshot->getSampleBlock() + ); } String path; diff --git a/src/Storages/StorageMergeTree.cpp b/src/Storages/StorageMergeTree.cpp index 716733224dbf..c17140155a30 100644 --- a/src/Storages/StorageMergeTree.cpp +++ b/src/Storages/StorageMergeTree.cpp @@ -18,6 +18,7 @@ #include #include #include +#include #include #include #include @@ -45,8 +46,17 @@ #include #include #include -#include "Core/BackgroundSchedulePool.h" -#include "Core/Names.h" +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include namespace DB { @@ -72,6 +82,12 @@ namespace Setting extern const SettingsBool parallel_replicas_for_non_replicated_merge_tree; extern const SettingsBool throw_on_unsupported_query_inside_transaction; extern const SettingsUInt64 max_parts_to_move; + extern const SettingsBool allow_experimental_export_merge_tree_partition; +} + +namespace ServerSetting +{ + extern const ServerSettingsUInt64 export_merge_tree_partition_max_retries; } namespace MergeTreeSetting @@ -109,6 +125,7 @@ namespace ErrorCodes extern const int TABLE_IS_READ_ONLY; extern const int TOO_MANY_PARTS; extern const int PART_IS_LOCKED; + extern const int INCOMPATIBLE_COLUMNS; } namespace ActionLocks @@ -171,6 +188,7 @@ StorageMergeTree::StorageMergeTree( increment.set(getMaxBlockNumber()); loadMutations(); + loadExportPartition(); loadDeduplicationLog(); prewarmCaches( @@ -484,6 +502,86 @@ void StorageMergeTree::alter( } } +/* + * For now, this function is meant to be used when exporting to different formats (i.e, the case where data needs to be re-encoded / serialized) + * For the cases where this is not necessary, there are way more optimal ways of doing that, such as hard links implemented by `movePartitionToTable` + * */ +void StorageMergeTree::exportPartitionToTable(const PartitionCommand & command, ContextPtr query_context) +{ + if (!query_context->getSettingsRef()[Setting::allow_experimental_export_merge_tree_partition]) + { + throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, + "Exporting merge tree partition is experimental. Set `allow_experimental_export_merge_tree_partition` to enable it"); + } + + String dest_database = query_context->resolveDatabase(command.to_database); + auto dest_storage = DatabaseCatalog::instance().getTable({dest_database, command.to_table}, query_context); + + if (dest_storage->getStorageID() == this->getStorageID()) + { + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Exporting to the same table is not allowed"); + } + + if (!dest_storage->supportsImportMergeTreePartition()) + throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Destination storage {} does not support importing merge tree partitions", dest_storage->getName()); + + auto query_to_string = [] (const ASTPtr & ast) + { + return ast ? ast->formatWithSecretsOneLine() : ""; + }; + + auto src_snapshot = getInMemoryMetadataPtr(); + auto destination_snapshot = dest_storage->getInMemoryMetadataPtr(); + + if (destination_snapshot->getColumns().getAllPhysical().sizeOfDifference(src_snapshot->getColumns().getAllPhysical())) + throw Exception(ErrorCodes::INCOMPATIBLE_COLUMNS, "Tables have different structure"); + + if (query_to_string(src_snapshot->getPartitionKeyAST()) != query_to_string(destination_snapshot->getPartitionKeyAST())) + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Tables have different partition key"); + + String partition_id = getPartitionIDFromQuery(command.partition, getContext()); + + auto lock1 = lockForShare( + query_context->getCurrentQueryId(), + query_context->getSettingsRef()[Setting::lock_acquire_timeout]); + auto merges_blocker = stopMergesAndWaitForPartition(partition_id); + + /// todo is getVisible the right api? Shouldn't it be get parts for internal usage + auto all_parts = getVisibleDataPartsVectorInPartition(getContext(), partition_id); + + if (all_parts.empty()) + { + LOG_INFO(log, "No parts to export for partition {}, skipping", partition_id); + return; + } + + { + std::lock_guard lock_background_mutex(currently_processing_in_background_mutex); + + if (!already_exported_partition_ids.emplace(partition_id).second) + { + throw Exception(ErrorCodes::PART_IS_LOCKED, "Partition {} has already been exported", partition_id); + } + } + + const auto transaction_id = std::to_string(generateSnowflakeID()); + + const auto manifest = MergeTreeExportManifest::create( + getStoragePolicy()->getAnyDisk(), + relative_data_path, + transaction_id, + partition_id, + dest_storage->getStorageID(), + all_parts); + + { + std::lock_guard lock(export_partition_transaction_id_to_manifest_mutex); + + export_partition_transaction_id_to_manifest[transaction_id] = manifest; + } + + background_moves_assignee.trigger(); +} /// While exists, marks parts as 'currently_merging_mutating_parts' and reserves free space on filesystem. CurrentlyMergingPartsTagger::CurrentlyMergingPartsTagger( @@ -839,6 +937,33 @@ std::map StorageMergeTree::getUnfinishedMutationC return result; } + +std::vector StorageMergeTree::getExportsStatus() const +{ + std::lock_guard lock(export_partition_transaction_id_to_manifest_mutex); + std::vector result; + + auto source_database = getStorageID().database_name; + auto source_table = getStorageID().table_name; + + for (const auto & [transaction_id, manifest] : export_partition_transaction_id_to_manifest) + { + MergeTreeExportStatus status; + + status.transaction_id = transaction_id; + status.source_database = source_database; + status.source_table = source_table; + status.destination_database = manifest->destination_storage_id.database_name; + status.destination_table = manifest->destination_storage_id.table_name; + status.create_time = manifest->create_time; + status.parts_to_do_names = manifest->pendingParts(); + status.status = manifest->status; + + result.emplace_back(std::move(status)); + } + return result; +} + std::vector StorageMergeTree::getMutationsStatus() const { std::lock_guard lock(currently_processing_in_background_mutex); @@ -1002,6 +1127,125 @@ void StorageMergeTree::loadMutations() increment.value = std::max(increment.value.load(), current_mutations_by_version.rbegin()->first); } +void StorageMergeTree::readExportPartitionManifests() +{ + static const auto states = {MergeTreeDataPartState::Active, MergeTreeDataPartState::Deleting, MergeTreeDataPartState::Outdated, MergeTreeDataPartState::DeleteOnDestroy}; + for (const auto & disk : getDisks()) + { + for (auto it = disk->iterateDirectory(relative_data_path); it->isValid(); it->next()) + { + const auto & name = it->name(); + if (startsWith(name, "export_")) + { + try + { + auto manifest = MergeTreeExportManifest::read(disk, fs::path(relative_data_path) / name); + + if (manifest->status != MergeTreeExportManifest::Status::failed) + { + already_exported_partition_ids.insert(manifest->partition_id); + + if (manifest->status == MergeTreeExportManifest::Status::completed) + { + LOG_INFO( + log, + "Export transaction {} of partition {} to destination storage {} already completed, skipping", + manifest->transaction_id, + manifest->partition_id, + manifest->destination_storage_id.getNameForLogs()); + continue; + } + } + + for (auto & item : manifest->items) + { + /// if this part has not been pushed yet + if (item.remote_path.empty()) + { + item.part = getPartIfExists(item.part_name, states); + + if (!item.part) + { + LOG_ERROR(log, "Part {} is present in the manifest file {}, but not found in the storage {}", + item.part_name, + manifest->transaction_id, + getStorageID().getNameForLogs()); + + manifest->status = MergeTreeExportManifest::Status::failed; + manifest->write(); + already_exported_partition_ids.erase(manifest->partition_id); + continue; + } + } + } + + export_partition_transaction_id_to_manifest.emplace(manifest->transaction_id, manifest); + + LOG_DEBUG(log, "Loaded export transaction manifest: {} (transaction_id: {})", name, manifest->transaction_id); + } + catch (const std::exception & ex) + { + LOG_ERROR(log, "Failed to load export transaction manifest {}: {}", name, ex.what()); + } + } + } + } + + background_moves_assignee.trigger(); +} + +void StorageMergeTree::resumeExportPartitionTasks() +{ + /// Initially I opted for having two separate methods: read and resume because I wanted to schedule the tasks in order + /// but it turns out the background executor schedules tasks based on their priority, so it is likely this is not needed anymore. + // for (const auto & [transaction_id, manifest] : export_partition_transaction_id_to_manifest) + // { + // if (manifest->status != MergeTreeExportManifest::Status::pending) + // continue; + + // auto destination_storage = DatabaseCatalog::instance().tryGetTable(manifest->destination_storage_id, getContext()); + // if (!destination_storage) + // { + // LOG_ERROR(log, "Failed to reconstruct destination storage: {}", manifest->destination_storage_id.getNameForLogs()); + // continue; + // } + + // auto pending_part_names = manifest->pendingParts(); + + // /// apparently, it is possible that pending parts are empty + // /// if it is empty, I have to somehow commit and mark as completed.. + + // std::vector parts_to_export; + + // for (const auto & part_name : pending_part_names) + // { + // auto part = getPartIfExists(part_name, {MergeTreeDataPartState::Active}); + + // if (!part) + // { + // LOG_ERROR(log, "Part {} is present in the manifest file {}, but not found in the storage {}", + // part_name, + // manifest->transaction_id, + // getStorageID().getNameForLogs()); + // manifest->status = MergeTreeExportManifest::Status::failed; + // manifest->write(); + + // already_exported_partition_ids.erase(manifest->partition_id); + // continue; + // } + + // parts_to_export.emplace_back(part); + // } + // } +} + +void StorageMergeTree::loadExportPartition() +{ + readExportPartitionManifests(); + resumeExportPartitionTasks(); +} + + std::expected StorageMergeTree::selectPartsToMerge( const StorageMetadataPtr & metadata_snapshot, bool aggressive, @@ -1405,6 +1649,66 @@ UInt32 StorageMergeTree::getMaxLevelInBetween(const PartProperties & left, const return level; } + +bool StorageMergeTree::scheduleDataMovingJob(BackgroundJobsAssignee & assignee) +{ + if (MergeTreeData::scheduleDataMovingJob(assignee)) + { + return true; + } + + /// Try to schedule one export part task if any pending export exists + { + std::lock_guard lock(export_partition_transaction_id_to_manifest_mutex); + for (const auto & [transaction_id, manifest] : export_partition_transaction_id_to_manifest) + { + if (manifest->status != MergeTreeExportManifest::Status::pending) + continue; + + auto destination_storage = DatabaseCatalog::instance().tryGetTable(manifest->destination_storage_id, getContext()); + if (!destination_storage) + { + LOG_ERROR(log, "Failed to reconstruct destination storage: {}", manifest->destination_storage_id.getNameForLogs()); + continue; + } + + static const auto states = {MergeTreeDataPartState::Active, MergeTreeDataPartState::Deleting, MergeTreeDataPartState::Outdated, MergeTreeDataPartState::DeleteOnDestroy}; + for (auto & item : manifest->items) + { + if (item.in_progress) + continue; + + auto part = getPartIfExists(item.part_name, states); + if (!part) + { + LOG_ERROR(log, "Part {} is present in the manifest file {}, but not found in the storage {}", + item.part_name, + manifest->transaction_id, + getStorageID().getNameForLogs()); + manifest->status = MergeTreeExportManifest::Status::failed; + manifest->write(); + already_exported_partition_ids.erase(manifest->partition_id); + continue; + } + + auto task = std::make_shared( + *this, + part, + destination_storage, + getContext(), + manifest, + moves_assignee_trigger, + getContext()->getServerSettings()[ServerSetting::export_merge_tree_partition_max_retries]); + item.in_progress = background_moves_assignee.scheduleMoveTask(task); + + /// todo arthur is returning true always correct? + return true; + } + } + } + return false; +} + bool StorageMergeTree::scheduleDataProcessingJob(BackgroundJobsAssignee & assignee) { if (shutdown_called) @@ -2497,6 +2801,7 @@ ActionLock StorageMergeTree::getActionLock(StorageActionBlockType action_type) return {}; } +/// TODO arthur do I need to do something about this? void StorageMergeTree::onActionLockRemove(StorageActionBlockType action_type) { if (action_type == ActionLocks::PartsMerge || action_type == ActionLocks::PartsTTLMerge) @@ -2691,8 +2996,8 @@ MutationCounters StorageMergeTree::getMutationCounters() const void StorageMergeTree::startBackgroundMovesIfNeeded() { - if (areBackgroundMovesNeeded()) - background_moves_assignee.start(); + /// always starting it regardless of areBackgroundMovesNeeded() because we need it for exports + background_moves_assignee.start(); } std::unique_ptr StorageMergeTree::getDefaultSettings() const diff --git a/src/Storages/StorageMergeTree.h b/src/Storages/StorageMergeTree.h index 36ac29f2918b..ffc86b5ec851 100644 --- a/src/Storages/StorageMergeTree.h +++ b/src/Storages/StorageMergeTree.h @@ -17,6 +17,7 @@ #include #include +#include namespace DB @@ -95,6 +96,8 @@ class StorageMergeTree final : public MergeTreeData /// Return introspection information about currently processing or recently processed mutations. std::vector getMutationsStatus() const override; + std::vector getExportsStatus() const override; + CancellationCode killMutation(const String & mutation_id) override; /// Makes backup entries to backup the data of the storage. @@ -116,6 +119,8 @@ class StorageMergeTree final : public MergeTreeData bool scheduleDataProcessingJob(BackgroundJobsAssignee & assignee) override; + bool scheduleDataMovingJob(BackgroundJobsAssignee & assignee) override; + std::map getUnfinishedMutationCommands() const override; MergeTreeDeduplicationLog * getDeduplicationLog() { return deduplication_log.get(); } @@ -151,7 +156,15 @@ class StorageMergeTree final : public MergeTreeData /// This set have to be used with `currently_processing_in_background_mutex`. DataParts currently_merging_mutating_parts; + /// Should be used with `currently_processing_in_background_mutex`. + /// Holds partition ids that have already been exported. + /// A partition can be exported only once. + std::unordered_set already_exported_partition_ids; + std::map current_mutations_by_version; + + std::map> export_partition_transaction_id_to_manifest; + mutable std::mutex export_partition_transaction_id_to_manifest_mutex; /// mutable because of getExportsStatus /// Unfinished mutations that are required for AlterConversions. MutationCounters mutation_counters; @@ -167,6 +180,14 @@ class StorageMergeTree final : public MergeTreeData std::map mutation_prepared_sets_cache; void loadMutations(); + /// Load persisted export partition locks and re-apply partition-level merge blockers. + void loadExportPartition(); + + /// Read export partition manifests from disk and populate internal data structures + void readExportPartitionManifests(); + + /// Restart export process for parts that were being exported before restart + void resumeExportPartitionTasks(); /// Load and initialize deduplication logs. Even if deduplication setting /// equals zero creates object with deduplication window equals zero. @@ -205,6 +226,7 @@ class StorageMergeTree final : public MergeTreeData void setMutationCSN(const String & mutation_id, CSN csn) override; friend struct CurrentlyMergingPartsTagger; + friend struct CurrentlyExportingPartsTagger; friend class MergeTreeMergePredicate; std::expected selectPartsToMerge( @@ -250,6 +272,7 @@ class StorageMergeTree final : public MergeTreeData void replacePartitionFrom(const StoragePtr & source_table, const ASTPtr & partition, bool replace, ContextPtr context) override; void movePartitionToTable(const StoragePtr & dest_table, const ASTPtr & partition, ContextPtr context) override; + void exportPartitionToTable(const PartitionCommand & command, ContextPtr query_context) override; bool partIsAssignedToBackgroundOperation(const DataPartPtr & part) const override; /// Update mutation entries after part mutation execution. May reset old /// errors if mutation was successful. Otherwise update last_failed* fields @@ -288,6 +311,8 @@ class StorageMergeTree final : public MergeTreeData friend class MergeTreeData; friend class MergePlainMergeTreeTask; friend class MutatePlainMergeTreeTask; + friend class ExportPartitionPlainMergeTreeTask; + friend class ExportPartPlainMergeTreeTask; struct DataValidationTasks : public IStorage::DataValidationTasksBase { @@ -381,19 +406,19 @@ class StorageMergeTree final : public MergeTreeData void resetMutationFailures() { - std::unique_lock _lock(parts_info_lock); + std::unique_lock parts_lock(parts_info_lock); failed_mutation_parts.clear(); } void removePartFromFailed(const String & part_name) { - std::unique_lock _lock(parts_info_lock); + std::unique_lock parts_lock(parts_info_lock); failed_mutation_parts.erase(part_name); } void addPartMutationFailure (const String& part_name, size_t max_postpone_time_ms_) { - std::unique_lock _lock(parts_info_lock); + std::unique_lock parts_lock(parts_info_lock); auto part_info_it = failed_mutation_parts.find(part_name); if (part_info_it == failed_mutation_parts.end()) { @@ -406,8 +431,7 @@ class StorageMergeTree final : public MergeTreeData bool partCanBeMutated(const String& part_name) { - - std::unique_lock _lock(parts_info_lock); + std::unique_lock parts_lock(parts_info_lock); auto iter = failed_mutation_parts.find(part_name); if (iter == failed_mutation_parts.end()) return true; diff --git a/src/Storages/StorageURL.cpp b/src/Storages/StorageURL.cpp index 354fd8c3b2b5..575f1cebb2b4 100644 --- a/src/Storages/StorageURL.cpp +++ b/src/Storages/StorageURL.cpp @@ -56,6 +56,11 @@ #include #include +#include "PartitionedSink.h" +#include "Formats/EscapingRuleUtils.h" +#include "Interpreters/convertFieldToType.h" +#include "ObjectStorage/FilePathGenerator.h" + namespace ProfileEvents { extern const Event EngineFileLikeReadFiles; @@ -729,7 +734,7 @@ void StorageURLSink::cancelBuffers() write_buf->cancel(); } -class PartitionedStorageURLSink : public PartitionedSink +class PartitionedStorageURLSink : public PartitionedSink::SinkCreator { public: PartitionedStorageURLSink( @@ -743,7 +748,7 @@ class PartitionedStorageURLSink : public PartitionedSink const CompressionMethod compression_method_, const HTTPHeaderEntries & headers_, const String & http_method_) - : PartitionedSink(partition_strategy_, context_, sample_block_) + : partition_strategy(partition_strategy_) , uri(uri_) , format(format_) , format_settings(format_settings_) @@ -758,7 +763,8 @@ class PartitionedStorageURLSink : public PartitionedSink SinkPtr createSinkForPartition(const String & partition_id) override { - std::string partition_path = partition_strategy->getPathForWrite(uri, partition_id); + const auto file_path_generator = std::make_shared(uri); + std::string partition_path = file_path_generator->getWritingPath(partition_id); context->getRemoteHostFilter().checkURL(Poco::URI(partition_path)); return std::make_shared( @@ -766,6 +772,7 @@ class PartitionedStorageURLSink : public PartitionedSink } private: + std::shared_ptr partition_strategy; const String uri; const String format; const std::optional format_settings; @@ -1408,7 +1415,7 @@ SinkToStoragePtr IStorageURLBase::write(const ASTPtr & query, const StorageMetad has_wildcards, /* partition_columns_in_data_file */true); - return std::make_shared( + auto sink_creator = std::make_shared( partition_strategy, uri, format_name, @@ -1419,6 +1426,8 @@ SinkToStoragePtr IStorageURLBase::write(const ASTPtr & query, const StorageMetad compression_method, headers, http_method); + + return std::make_shared(partition_strategy, sink_creator, context, metadata_snapshot->getSampleBlock()); } return std::make_shared( diff --git a/src/Storages/System/StorageSystemExports.cpp b/src/Storages/System/StorageSystemExports.cpp new file mode 100644 index 000000000000..eb6fe90b602e --- /dev/null +++ b/src/Storages/System/StorageSystemExports.cpp @@ -0,0 +1,134 @@ +#include +#include +#include +#include +#include "Columns/ColumnString.h" +#include "DataTypes/DataTypeString.h" +#include "DataTypes/DataTypesNumber.h" +#include "Storages/MergeTree/MergeTreeExportStatus.h" +#include "Storages/VirtualColumnUtils.h" +#include +#include +#include + + +namespace DB +{ + +ColumnsDescription StorageSystemExports::getColumnsDescription() +{ + return ColumnsDescription + { + {"source_database", std::make_shared(), "Name of the source database."}, + {"source_table", std::make_shared(), "Name of the source table."}, + {"destination_database", std::make_shared(), "Name of the destination database."}, + {"destination_table", std::make_shared(), "Name of the destination table."}, + {"transaction_id", std::make_shared(), "The ID of the export transaction."}, + {"create_time", std::make_shared(), "Date and time when the export command was submitted for execution."}, + {"parts_to_do_names", std::make_shared(std::make_shared()), "An array of names of data parts that need to be exported for the transaction to complete."}, + {"parts_to_do", std::make_shared(), "The number of data parts that need to be exported for the transaction to complete."}, + {"status", std::make_shared(), "The status of the export transaction."}, + // {"latest_failed_part_name", std::make_shared(), "The name of the most recent part that could not be exported."}, + // {"latest_fail_time", std::make_shared(), "The date and time of the most recent part export failure."}, + // {"latest_fail_reason", std::make_shared(), "The exception message that caused the most recent part export failure."}, + }; +} + +void StorageSystemExports::fillData(MutableColumns & res_columns, ContextPtr context, const ActionsDAG::Node * predicate, std::vector) const +{ + const auto access = context->getAccess(); + const bool check_access_for_databases = !access->isGranted(AccessType::SHOW_TABLES); + + /// Collect a set of *MergeTree tables. + std::map> merge_tree_tables; + for (const auto & db : DatabaseCatalog::instance().getDatabases()) + { + /// Check if database can contain MergeTree tables + if (!db.second->canContainMergeTreeTables()) + continue; + + const bool check_access_for_tables = check_access_for_databases && !access->isGranted(AccessType::SHOW_TABLES, db.first); + + for (auto iterator = db.second->getTablesIterator(context); iterator->isValid(); iterator->next()) + { + const auto & table = iterator->table(); + if (!table) + continue; + + if (!dynamic_cast(table.get())) + continue; + + if (check_access_for_tables && !access->isGranted(AccessType::SHOW_TABLES, db.first, iterator->name())) + continue; + + merge_tree_tables[db.first][iterator->name()] = table; + } + } + + MutableColumnPtr col_source_database_export = ColumnString::create(); + MutableColumnPtr col_source_table_export = ColumnString::create(); + + for (auto & db : merge_tree_tables) + { + for (auto & table : db.second) + { + col_source_database_export->insert(db.first); + col_source_table_export->insert(table.first); + } + } + + ColumnPtr col_source_database = std::move(col_source_database_export); + ColumnPtr col_source_table = std::move(col_source_table_export); + + /// Determine what tables are needed by the conditions in the query. + { + Block filtered_block + { + { col_source_database, std::make_shared(), "source_database" }, + { col_source_table, std::make_shared(), "source_table" }, + }; + + VirtualColumnUtils::filterBlockWithPredicate(predicate, filtered_block, context); + + if (!filtered_block.rows()) + return; + + col_source_database = filtered_block.getByName("source_database").column; + col_source_table = filtered_block.getByName("source_table").column; + } + + for (size_t i_storage = 0; i_storage < col_source_database->size(); ++i_storage) + { + auto database = (*col_source_database)[i_storage].safeGet(); + auto table = (*col_source_table)[i_storage].safeGet(); + + std::vector statuses; + { + const IStorage * storage = merge_tree_tables[database][table].get(); + if (const auto * merge_tree = dynamic_cast(storage)) + statuses = merge_tree->getExportsStatus(); + } + + for (const MergeTreeExportStatus & status : statuses) + { + /// fill based on getColumnsDescription + Array parts_to_do_names; + parts_to_do_names.reserve(status.parts_to_do_names.size()); + for (const String & part_name : status.parts_to_do_names) + parts_to_do_names.emplace_back(part_name); + + size_t col_num = 0; + res_columns[col_num++]->insert(status.source_database); + res_columns[col_num++]->insert(status.source_table); + res_columns[col_num++]->insert(status.destination_database); + res_columns[col_num++]->insert(status.destination_table); + res_columns[col_num++]->insert(status.transaction_id); + res_columns[col_num++]->insert(status.create_time); + res_columns[col_num++]->insert(parts_to_do_names); + res_columns[col_num++]->insert(status.parts_to_do_names.size()); + res_columns[col_num++]->insert(String(magic_enum::enum_name(status.status))); + } + } +} + +} diff --git a/src/Storages/System/StorageSystemExports.h b/src/Storages/System/StorageSystemExports.h new file mode 100644 index 000000000000..e13fbfa26aaa --- /dev/null +++ b/src/Storages/System/StorageSystemExports.h @@ -0,0 +1,25 @@ +#pragma once + +#include + + +namespace DB +{ + +class Context; + + +class StorageSystemExports final : public IStorageSystemOneBlock +{ +public: + std::string getName() const override { return "SystemExports"; } + + static ColumnsDescription getColumnsDescription(); + +protected: + using IStorageSystemOneBlock::IStorageSystemOneBlock; + + void fillData(MutableColumns & res_columns, ContextPtr context, const ActionsDAG::Node *, std::vector) const override; +}; + +} diff --git a/src/Storages/System/attachSystemTables.cpp b/src/Storages/System/attachSystemTables.cpp index 9249fb3530b6..4f2977d1d07b 100644 --- a/src/Storages/System/attachSystemTables.cpp +++ b/src/Storages/System/attachSystemTables.cpp @@ -106,6 +106,8 @@ #include +#include "StorageSystemExports.h" + #if defined(__ELF__) && !defined(OS_FREEBSD) #include #endif @@ -206,6 +208,7 @@ void attachSystemTablesServer(ContextPtr context, IDatabase & system_database, b attach(context, system_database, "histogram_metrics", "Contains histogram metrics which can be calculated instantly and exported in the Prometheus format. For example, the keeper response time. This table is always up to date."); attach(context, system_database, "merges", "Contains a list of merges currently executing merges of MergeTree tables and their progress. Each merge operation is represented by a single row."); attach(context, system_database, "moves", "Contains information about in-progress data part moves of MergeTree tables. Each data part movement is represented by a single row."); + attach(context, system_database, "exports", "Contains information about in-progress data part exports of MergeTree tables. Each data part movement is represented by a single row."); attach(context, system_database, "mutations", "Contains a list of mutations and their progress. Each mutation command is represented by a single row."); attachNoDescription(context, system_database, "replicas", "Contains information and status of all table replicas on current server. Each replica is represented by a single row."); attach(context, system_database, "replication_queue", "Contains information about tasks from replication queues stored in ClickHouse Keeper, or ZooKeeper, for each table replica."); diff --git a/src/TableFunctions/ITableFunction.h b/src/TableFunctions/ITableFunction.h index fe971bdbed7c..44039b09ea8f 100644 --- a/src/TableFunctions/ITableFunction.h +++ b/src/TableFunctions/ITableFunction.h @@ -15,6 +15,7 @@ namespace DB { class Context; +class ASTInsertQuery; /** Interface for table functions. * diff --git a/tests/integration/test_export_partition_disaster_recovery/__init__.py b/tests/integration/test_export_partition_disaster_recovery/__init__.py new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/tests/integration/test_export_partition_disaster_recovery/configs/named_collections.xml b/tests/integration/test_export_partition_disaster_recovery/configs/named_collections.xml new file mode 100644 index 000000000000..573822539c50 --- /dev/null +++ b/tests/integration/test_export_partition_disaster_recovery/configs/named_collections.xml @@ -0,0 +1,9 @@ + + + + http://minio1:9001/root/data + minio + ClickHouse_Minio_P@ssw0rd + + + diff --git a/tests/integration/test_export_partition_disaster_recovery/test.py b/tests/integration/test_export_partition_disaster_recovery/test.py new file mode 100644 index 000000000000..c4699410657a --- /dev/null +++ b/tests/integration/test_export_partition_disaster_recovery/test.py @@ -0,0 +1,104 @@ +import logging +import pytest +import random +import string +import time +from typing import Optional + +from helpers.cluster import ClickHouseCluster +from helpers.network import PartitionManager + +@pytest.fixture(scope="module") +def cluster(): + try: + cluster = ClickHouseCluster(__file__) + cluster.add_instance( + "node", + main_configs=["configs/named_collections.xml"], + user_configs=[], + with_minio=True, + stay_alive=True, + ) + cluster.add_instance( + "node2", + main_configs=["configs/named_collections.xml"], + user_configs=[], + with_minio=True, + ) + logging.info("Starting cluster...") + cluster.start() + yield cluster + finally: + cluster.shutdown() + + +def test_export_partition_with_network_delays(cluster): + """Test server kill during export with network delays.""" + node = cluster.instances["node"] + table_name = "disaster_test_network" + s3_table = "destination_s3_network" + + node.query(f""" + CREATE TABLE {table_name} ( + id UInt64, + year UInt16, + data String + ) ENGINE = MergeTree() + PARTITION BY year + ORDER BY id + """) + + node.query(f"INSERT INTO {table_name} VALUES (1, 2020, 'a'), (2, 2020, 'b'), (3, 2021, 'c')") + + node.query(f""" + CREATE TABLE {s3_table} ( + id UInt64, + year UInt16, + data String, + ) ENGINE = S3(s3_conn, filename='disaster-recovery-network', format=Parquet, partition_strategy='hive') + PARTITION BY year + """) + + node2 = cluster.instances["node2"] + node2.query(f""" + CREATE TABLE {s3_table} ( + id UInt64, + year UInt16, + data String, + ) ENGINE = S3(s3_conn, filename='disaster-recovery-network', format=Parquet, partition_strategy='hive') + PARTITION BY year + """) + + with PartitionManager() as pm: + pm.add_network_delay(node, delay_ms=1000) # 5 second delays + + export_queries = f""" + ALTER TABLE {table_name} + EXPORT PARTITION 2020 TO TABLE {s3_table} + SETTINGS allow_experimental_export_merge_tree_partition=1; + ALTER TABLE {table_name} + EXPORT PARTITION 2021 TO TABLE {s3_table} + SETTINGS allow_experimental_export_merge_tree_partition=1; + """ + + node.query(export_queries) + + # Kill server + logging.info("Killing server during network-delayed export") + node.stop_clickhouse(kill=True) + + # check s3 to make sure no data was written + assert node2.query(f"SELECT count() FROM {s3_table} where year = 2020") == '0\n', "Partition 2020 was written to S3 during network delay crash" + + assert node2.query(f"SELECT count() FROM {s3_table} where year = 2021") == '0\n', "Partition 2021 was written to S3 during network delay crash" + + node.start_clickhouse() + + # wait for the export to resume and complete + time.sleep(5) + + # verify that the export has been resumed and completed + assert node.query(f"SELECT count() FROM {s3_table} WHERE year = 2020") != f'0\n', "Export of partition 2020 did not resume after crash" + + assert node.query(f"SELECT count() FROM {s3_table} WHERE year = 2020") != f'0\n', "Export of partition 2021 did not resume after crash" + diff --git a/tests/queries/0_stateless/01271_show_privileges.reference b/tests/queries/0_stateless/01271_show_privileges.reference index 268959bb444c..56f2e76ed3a1 100644 --- a/tests/queries/0_stateless/01271_show_privileges.reference +++ b/tests/queries/0_stateless/01271_show_privileges.reference @@ -43,6 +43,7 @@ ALTER TTL ['ALTER MODIFY TTL','MODIFY TTL'] TABLE ALTER TABLE ALTER MATERIALIZE TTL ['MATERIALIZE TTL'] TABLE ALTER TABLE ALTER SETTINGS ['ALTER SETTING','ALTER MODIFY SETTING','MODIFY SETTING','RESET SETTING'] TABLE ALTER TABLE ALTER MOVE PARTITION ['ALTER MOVE PART','MOVE PARTITION','MOVE PART'] TABLE ALTER TABLE +ALTER EXPORT PARTITION ['ALTER EXPORT PART','EXPORT PARTITION','EXPORT PART'] TABLE ALTER TABLE ALTER FETCH PARTITION ['ALTER FETCH PART','FETCH PARTITION'] TABLE ALTER TABLE ALTER FREEZE PARTITION ['FREEZE PARTITION','UNFREEZE'] TABLE ALTER TABLE ALTER UNLOCK SNAPSHOT ['UNLOCK SNAPSHOT'] TABLE ALTER TABLE diff --git a/tests/queries/0_stateless/03572_export_mt_part_to_object_storage.reference b/tests/queries/0_stateless/03572_export_mt_part_to_object_storage.reference new file mode 100644 index 000000000000..760fd247967a --- /dev/null +++ b/tests/queries/0_stateless/03572_export_mt_part_to_object_storage.reference @@ -0,0 +1,33 @@ +---- Querying merge tree for comparison. It should include both partitions (2020 and 2021) +1 2020 +2 2020 +3 2020 +4 2021 +5 2020 +6 2020 +---- Make sure only the partition 2020 has been exported +test/s3_table_NAME/year=2020/SNOWFLAKE_ID.parquet 1 2020 +test/s3_table_NAME/year=2020/SNOWFLAKE_ID.parquet 2 2020 +test/s3_table_NAME/year=2020/SNOWFLAKE_ID.parquet 3 2020 +test/s3_table_NAME/year=2020/SNOWFLAKE_ID.parquet 5 2020 +test/s3_table_NAME/year=2020/SNOWFLAKE_ID.parquet 6 2020 +---- It should not be allowed to export the same partition twice +---- Check for commit file for partition 2020 +s3_table_NAME/year=2020//SNOWFLAKE_ID.parquet +s3_table_NAME/year=2020//SNOWFLAKE_ID.parquet +---- Finally, export the other partition (2021) +---- Assert both partitions are there +test/s3_table_NAME/year=2020/SNOWFLAKE_ID.parquet 1 2020 +test/s3_table_NAME/year=2020/SNOWFLAKE_ID.parquet 2 2020 +test/s3_table_NAME/year=2020/SNOWFLAKE_ID.parquet 3 2020 +test/s3_table_NAME/year=2021/SNOWFLAKE_ID.parquet 4 2021 +test/s3_table_NAME/year=2020/SNOWFLAKE_ID.parquet 5 2020 +test/s3_table_NAME/year=2020/SNOWFLAKE_ID.parquet 6 2020 +---- Round trip check: create a new MergeTree table as SELECT * from s3_table +---- Data in roundtrip MergeTree table (should match s3_table) +1 2020 +2 2020 +3 2020 +4 2021 +5 2020 +6 2020 diff --git a/tests/queries/0_stateless/03572_export_mt_part_to_object_storage.sh b/tests/queries/0_stateless/03572_export_mt_part_to_object_storage.sh new file mode 100755 index 000000000000..405faf2ddc98 --- /dev/null +++ b/tests/queries/0_stateless/03572_export_mt_part_to_object_storage.sh @@ -0,0 +1,53 @@ +#!/usr/bin/env bash + +CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +# shellcheck source=../shell_config.sh +. "$CURDIR"/../shell_config.sh + +mt_table="mt_table_${RANDOM}" +s3_table="s3_table_${RANDOM}" +mt_table_roundtrip="mt_table_roundtrip_${RANDOM}" + +query() { + $CLICKHOUSE_CLIENT --query "$1" +} + +query "DROP TABLE IF EXISTS $mt_table, $s3_table, $mt_table_roundtrip" + +query "CREATE TABLE $mt_table (id UInt64, year UInt16) ENGINE = MergeTree() PARTITION BY year ORDER BY tuple()" +query "CREATE TABLE $s3_table (id UInt64, year UInt16) ENGINE = S3(s3_conn, filename='$s3_table', format=Parquet, partition_strategy='hive') PARTITION BY year" + +query "SYSTEM STOP MERGES" + +query "INSERT INTO $mt_table VALUES (1, 2020), (2, 2020), (3, 2020), (4, 2021)" +query "INSERT INTO $mt_table VALUES (5, 2020), (6, 2020)" + +query "ALTER TABLE $mt_table EXPORT PARTITION ID '2020' TO TABLE $s3_table SETTINGS allow_experimental_export_merge_tree_partition = 1" + +echo "---- Querying merge tree for comparison. It should include both partitions (2020 and 2021)" +query "SELECT * FROM $mt_table ORDER BY id" + +echo "---- Make sure only the partition 2020 has been exported" +query "SELECT DISTINCT ON (id) replaceRegexpAll(replaceRegexpAll(_path, '$s3_table', 's3_table_NAME'), '[^/]+\\.parquet', 'SNOWFLAKE_ID.parquet'), * FROM $s3_table ORDER BY id" + +echo "---- It should not be allowed to export the same partition twice" +query "ALTER TABLE $mt_table EXPORT PARTITION ID '2020' TO TABLE $s3_table SETTINGS allow_experimental_export_merge_tree_partition = 1 -- {serverError PART_IS_LOCKED}" + +echo "---- Check for commit file for partition 2020" +$CLICKHOUSE_CLIENT --query "SELECT replaceRegexpAll(replaceRegexpAll(remote_file_path, '$s3_table', 's3_table_NAME'), '[^/]+\\.parquet', 'SNOWFLAKE_ID.parquet') FROM s3(s3_conn, filename='$s3_table/commit_2020_*', format='LineAsString', structure='remote_file_path String')" + +echo "---- Finally, export the other partition (2021)" +query "ALTER TABLE $mt_table EXPORT PARTITION ID '2021' TO TABLE $s3_table SETTINGS allow_experimental_export_merge_tree_partition = 1" + +echo "---- Assert both partitions are there" +query "SELECT DISTINCT ON (id) replaceRegexpAll(replaceRegexpAll(_path, '$s3_table', 's3_table_NAME'), '[^/]+\\.parquet', 'SNOWFLAKE_ID.parquet'), * FROM $s3_table ORDER BY id" + +echo "---- Round trip check: create a new MergeTree table as SELECT * from s3_table" + +query "CREATE TABLE $mt_table_roundtrip ENGINE = MergeTree() PARTITION BY year ORDER BY tuple() AS SELECT * FROM $s3_table" + +echo "---- Data in roundtrip MergeTree table (should match s3_table)" +query "SELECT DISTINCT ON (id) * FROM $mt_table_roundtrip ORDER BY id" + +query "SYSTEM START MERGES" +query "DROP TABLE IF EXISTS $mt_table, $s3_table, $mt_table_roundtrip" diff --git a/tests/queries/0_stateless/03572_export_mt_part_to_object_storage_simple.reference b/tests/queries/0_stateless/03572_export_mt_part_to_object_storage_simple.reference new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/tests/queries/0_stateless/03572_export_mt_part_to_object_storage_simple.sql b/tests/queries/0_stateless/03572_export_mt_part_to_object_storage_simple.sql new file mode 100644 index 000000000000..7c84ff608c54 --- /dev/null +++ b/tests/queries/0_stateless/03572_export_mt_part_to_object_storage_simple.sql @@ -0,0 +1,21 @@ +-- Tags: no-parallel + +DROP TABLE IF EXISTS 03572_mt_table, 03572_invalid_schema_table; + +CREATE TABLE 03572_mt_table (id UInt64, year UInt16) ENGINE = MergeTree() PARTITION BY year ORDER BY tuple(); + +INSERT INTO 03572_mt_table VALUES (1, 2020); + +-- Create a table with a different partition key and export a partition to it. It should throw +CREATE TABLE 03572_invalid_schema_table (id UInt64, x UInt16) ENGINE = S3(s3_conn, filename='03572_invalid_schema_table', format='Parquet', partition_strategy='hive') PARTITION BY x; + +ALTER TABLE 03572_mt_table EXPORT PARTITION ID '2020' TO TABLE 03572_invalid_schema_table +SETTINGS allow_experimental_export_merge_tree_partition = 1; -- {serverError INCOMPATIBLE_COLUMNS} + +DROP TABLE 03572_invalid_schema_table; + +-- The only partition strategy that supports exports is hive. Wildcard should throw +CREATE TABLE 03572_invalid_schema_table (id UInt64, year UInt16) ENGINE = S3(s3_conn, filename='03572_invalid_schema_table/{_partition_id}', format='Parquet', partition_strategy='wildcard') PARTITION BY (id, year); + +ALTER TABLE 03572_mt_table EXPORT PARTITION ID '2020' TO TABLE 03572_invalid_schema_table +SETTINGS allow_experimental_export_merge_tree_partition = 1; -- {serverError NOT_IMPLEMENTED}