Skip to content
Closed
Show file tree
Hide file tree
Changes from 29 commits
Commits
Show all changes
50 commits
Select commit Hold shift + click to select a range
9c0be2e
squash export mt part to obj storage
arthurpassos Jul 28, 2025
65397b8
fix build1
arthurpassos Jul 28, 2025
55a7ac1
fix build for sure
arthurpassos Jul 29, 2025
92f2f33
extension to lower
arthurpassos Jul 29, 2025
37ea31f
add tests and fix prefix
arthurpassos Jul 29, 2025
387cae4
fix test
arthurpassos Jul 29, 2025
43abc4c
reduce changes
arthurpassos Jul 30, 2025
c7003ad
reduce changes even further
arthurpassos Jul 30, 2025
bb156ab
some adjustments
arthurpassos Jul 30, 2025
bb742af
rmv unused files
arthurpassos Jul 30, 2025
4bac44a
rename a few things
arthurpassos Jul 30, 2025
b02789e
Merge branch 'antalya-25.6.5' into export_mt_part_to_object_storage
arthurpassos Aug 19, 2025
ea3a2a5
rewind the part names logic
arthurpassos Aug 19, 2025
180fda8
tmp
arthurpassos Aug 21, 2025
45bf82b
good for a demo
arthurpassos Aug 22, 2025
41020a1
do not drop parts, lock partition for further exports
arthurpassos Aug 23, 2025
61928e4
add partition_id to commit filename, remove unused code and refactor …
arthurpassos Aug 25, 2025
f8ad06f
simplify the code a bit
arthurpassos Aug 25, 2025
1859244
rename from commit id to transaction id
arthurpassos Aug 25, 2025
cdfa5ab
use snowflakeid as transaction id
arthurpassos Aug 25, 2025
9f9fcb2
add back the sync behavior
arthurpassos Aug 25, 2025
bfb72ae
minor changes
arthurpassos Aug 25, 2025
7dbb53f
add missing include for build
arthurpassos Aug 26, 2025
2506663
freakin ai code suggestions..
arthurpassos Aug 26, 2025
6bc7c09
add roundtrip check
arthurpassos Aug 26, 2025
8e08991
opsy
arthurpassos Aug 27, 2025
54bf678
remove export part, add some partition exp sanity checking, change ex…
arthurpassos Aug 27, 2025
71bc26f
add tests
arthurpassos Aug 27, 2025
b489f83
Refactor to use a background task instead of inline code
arthurpassos Sep 1, 2025
ff68ba9
small stuff
arthurpassos Sep 2, 2025
d3697c7
Merge branch 'antalya-25.6.5' into export_mt_part_to_object_storage
arthurpassos Sep 2, 2025
44c697c
fix test
arthurpassos Sep 2, 2025
8f171b8
fiox tests
arthurpassos Sep 2, 2025
8a51270
implement single part task
arthurpassos Sep 3, 2025
af3352b
fix privileges test
arthurpassos Sep 3, 2025
502b501
improve system.exports, show failed exports
arthurpassos Sep 3, 2025
34f7130
opsy
arthurpassos Sep 3, 2025
a39d63a
remove old partition task
arthurpassos Sep 3, 2025
6663949
rmv no longer used method
arthurpassos Sep 3, 2025
46cda68
fix settingschangehist?
arthurpassos Sep 4, 2025
d4f01f1
remove exportslist
arthurpassos Sep 4, 2025
4c39630
small changes
arthurpassos Sep 4, 2025
46b1724
try to remove deadlock, fail
arthurpassos Sep 4, 2025
543ff36
use the background scheduler instead of scheduling upon every request…
arthurpassos Sep 4, 2025
61e43cf
do not lock parts, only hold references so they are not deleted from …
arthurpassos Sep 7, 2025
646a69f
do not capture exception in importer sink
arthurpassos Sep 7, 2025
20244c3
disable file level parallelism
arthurpassos Sep 8, 2025
8f2557f
set max_retries
arthurpassos Sep 8, 2025
ff551f6
exports throtler
arthurpassos Sep 8, 2025
bad3bc0
add comment
arthurpassos Sep 8, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/Access/Common/AccessType.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ enum class AccessType : uint8_t
enabled implicitly by the grant ALTER_TABLE */\
M(ALTER_SETTINGS, "ALTER SETTING, ALTER MODIFY SETTING, MODIFY SETTING, RESET SETTING", TABLE, ALTER_TABLE) /* allows to execute ALTER MODIFY SETTING */\
M(ALTER_MOVE_PARTITION, "ALTER MOVE PART, MOVE PARTITION, MOVE PART", TABLE, ALTER_TABLE) \
M(ALTER_EXPORT_PARTITION, "ALTER EXPORT PART, EXPORT PARTITION, EXPORT PART", TABLE, ALTER_TABLE) \
M(ALTER_FETCH_PARTITION, "ALTER FETCH PART, FETCH PARTITION", TABLE, ALTER_TABLE) \
M(ALTER_FREEZE_PARTITION, "FREEZE PARTITION, UNFREEZE", TABLE, ALTER_TABLE) \
M(ALTER_UNLOCK_SNAPSHOT, "UNLOCK SNAPSHOT", TABLE, ALTER_TABLE) \
Expand Down
1 change: 1 addition & 0 deletions src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ add_headers_and_sources(dbms Storages/ObjectStorage/Azure)
add_headers_and_sources(dbms Storages/ObjectStorage/S3)
add_headers_and_sources(dbms Storages/ObjectStorage/HDFS)
add_headers_and_sources(dbms Storages/ObjectStorage/Local)
add_headers_and_sources(dbms Storages/ObjectStorage/MergeTree)
add_headers_and_sources(dbms Storages/ObjectStorage/DataLakes)
add_headers_and_sources(dbms Storages/ObjectStorage/DataLakes/Iceberg)
add_headers_and_sources(dbms Storages/ObjectStorage/DataLakes/DeltaLake)
Expand Down
1 change: 1 addition & 0 deletions src/Common/CurrentMetrics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
M(Merge, "Number of executing background merges") \
M(MergeParts, "Number of source parts participating in current background merges") \
M(Move, "Number of currently executing moves") \
M(Export, "Number of currently executing exports") \
M(PartMutation, "Number of mutations (ALTER DELETE/UPDATE)") \
M(ReplicatedFetch, "Number of data parts being fetched from replica") \
M(ReplicatedSend, "Number of data parts being sent to replicas") \
Expand Down
6 changes: 6 additions & 0 deletions src/Core/Settings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6874,6 +6874,12 @@ Possible values:
DECLARE_WITH_ALIAS(Bool, allow_experimental_time_series_aggregate_functions, false, R"(
Experimental timeSeries* aggregate functions for Prometheus-like timeseries resampling, rate, delta calculation.
)", EXPERIMENTAL, allow_experimental_ts_to_grid_aggregate_function) \
DECLARE_WITH_ALIAS(Bool, allow_experimental_export_merge_tree_partition, false, R"(
Experimental export merge tree partition.
)", EXPERIMENTAL, allow_experimental_export_merge_tree_partition) \
DECLARE_WITH_ALIAS(Bool, export_merge_tree_partition_background_execution, true, R"(
Process exports asynchronously in background threads
)", EXPERIMENTAL, export_merge_tree_partition_background_execution) \
\
/* ####################################################### */ \
/* ############ END OF EXPERIMENTAL FEATURES ############# */ \
Expand Down
1 change: 1 addition & 0 deletions src/Databases/DatabaseS3.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <Storages/IStorage.h>
#include <Storages/NamedCollectionsHelpers.h>
#include <TableFunctions/TableFunctionFactory.h>
#include <Parsers/ASTInsertQuery.h>

#include <boost/algorithm/string.hpp>
#include <filesystem>
Expand Down
1 change: 1 addition & 0 deletions src/Disks/ObjectStorages/IObjectStorage.h
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ struct RelativePathWithMetadata
virtual ~RelativePathWithMetadata() = default;

virtual std::string getFileName() const { return std::filesystem::path(relative_path).filename(); }
virtual std::string getFileNameWithoutExtension() const { return std::filesystem::path(relative_path).stem(); }
virtual std::string getPath() const { return relative_path; }
virtual bool isArchive() const { return false; }
virtual std::string getPathToArchive() const { throw Exception(ErrorCodes::LOGICAL_ERROR, "Not an archive"); }
Expand Down
4 changes: 4 additions & 0 deletions src/Interpreters/Context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
#include <Storages/MarkCache.h>
#include <Storages/MergeTree/MergeList.h>
#include <Storages/MergeTree/MovesList.h>
#include <Storages/ExportsList.h>
#include <Storages/MergeTree/ReplicatedFetchList.h>
#include <Storages/MergeTree/MergeTreeData.h>
#include <Storages/MergeTree/MergeTreeSettings.h>
Expand Down Expand Up @@ -463,6 +464,7 @@ struct ContextSharedPart : boost::noncopyable
GlobalOvercommitTracker global_overcommit_tracker;
MergeList merge_list; /// The list of executable merge (for (Replicated)?MergeTree)
MovesList moves_list; /// The list of executing moves (for (Replicated)?MergeTree)
ExportsList exports_list; /// The list of executing exports MergeTree -> Object storage
ReplicatedFetchList replicated_fetch_list;
RefreshSet refresh_set; /// The list of active refreshes (for MaterializedView)
ConfigurationPtr users_config TSA_GUARDED_BY(mutex); /// Config with the users, profiles and quotas sections.
Expand Down Expand Up @@ -1148,6 +1150,8 @@ MergeList & Context::getMergeList() { return shared->merge_list; }
const MergeList & Context::getMergeList() const { return shared->merge_list; }
MovesList & Context::getMovesList() { return shared->moves_list; }
const MovesList & Context::getMovesList() const { return shared->moves_list; }
ExportsList & Context::getExportsList() { return shared->exports_list; }
const ExportsList & Context::getExportsList() const { return shared->exports_list; }
ReplicatedFetchList & Context::getReplicatedFetchList() { return shared->replicated_fetch_list; }
const ReplicatedFetchList & Context::getReplicatedFetchList() const { return shared->replicated_fetch_list; }
RefreshSet & Context::getRefreshSet() { return shared->refresh_set; }
Expand Down
4 changes: 4 additions & 0 deletions src/Interpreters/Context.h
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ class AsynchronousMetrics;
class BackgroundSchedulePool;
class MergeList;
class MovesList;
class ExportsList;
class ReplicatedFetchList;
class RefreshSet;
class Cluster;
Expand Down Expand Up @@ -1141,6 +1142,9 @@ class Context: public ContextData, public std::enable_shared_from_this<Context>
MovesList & getMovesList();
const MovesList & getMovesList() const;

ExportsList & getExportsList();
const ExportsList & getExportsList() const;

ReplicatedFetchList & getReplicatedFetchList();
const ReplicatedFetchList & getReplicatedFetchList() const;

Expand Down
6 changes: 6 additions & 0 deletions src/Interpreters/InterpreterAlterQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -502,6 +502,12 @@ AccessRightsElements InterpreterAlterQuery::getRequiredAccessForCommand(const AS
required_access.emplace_back(AccessType::ALTER_DELETE | AccessType::INSERT, database, table);
break;
}
case ASTAlterCommand::EXPORT_PARTITION:
{
required_access.emplace_back(AccessType::ALTER_EXPORT_PARTITION, database, table);
required_access.emplace_back(AccessType::INSERT, command.to_database, command.to_table);
break;
}
case ASTAlterCommand::FETCH_PARTITION:
{
required_access.emplace_back(AccessType::ALTER_FETCH_PARTITION, database, table);
Expand Down
4 changes: 3 additions & 1 deletion src/Interpreters/PartLog.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ ColumnsDescription PartLogElement::getColumnsDescription()
{"MovePart", static_cast<Int8>(MOVE_PART)},
{"MergePartsStart", static_cast<Int8>(MERGE_PARTS_START)},
{"MutatePartStart", static_cast<Int8>(MUTATE_PART_START)},
{"ExportPart", static_cast<Int8>(EXPORT_PART)},
}
);

Expand Down Expand Up @@ -109,7 +110,8 @@ ColumnsDescription PartLogElement::getColumnsDescription()
"RemovePart — Removing or detaching a data part using [DETACH PARTITION](/sql-reference/statements/alter/partition#detach-partitionpart)."
"MutatePartStart — Mutating of a data part has started, "
"MutatePart — Mutating of a data part has finished, "
"MovePart — Moving the data part from the one disk to another one."},
"MovePart — Moving the data part from the one disk to another one."
"ExportPart — Exporting the data part from a merge tree table to one (e.g, object storage)."},
{"merge_reason", std::move(merge_reason_datatype),
"The reason for the event with type MERGE_PARTS. Can have one of the following values: "
"NotAMerge — The current event has the type other than MERGE_PARTS, "
Expand Down
1 change: 1 addition & 0 deletions src/Interpreters/PartLog.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ struct PartLogElement
MOVE_PART = 6,
MERGE_PARTS_START = 7,
MUTATE_PART_START = 8,
EXPORT_PART = 9,
};

/// Copy of MergeAlgorithm since values are written to disk.
Expand Down
23 changes: 23 additions & 0 deletions src/Parsers/ASTAlterQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,29 @@ void ASTAlterCommand::formatImpl(WriteBuffer & ostr, const FormatSettings & sett
ostr << quoteString(move_destination_name);
}
}
else if (type == ASTAlterCommand::EXPORT_PARTITION)
{
ostr << (settings.hilite ? hilite_keyword : "") << "EXPORT " << (part ? "PART " : "PARTITION ")
<< (settings.hilite ? hilite_none : "");
partition->format(ostr, settings, state, frame);
ostr << " TO ";
switch (move_destination_type)
{
case DataDestinationType::TABLE:
ostr << "TABLE ";
if (!to_database.empty())
{
ostr << (settings.hilite ? hilite_identifier : "") << backQuoteIfNeed(to_database)
<< (settings.hilite ? hilite_none : "") << ".";
}
ostr << (settings.hilite ? hilite_identifier : "") << backQuoteIfNeed(to_table)
<< (settings.hilite ? hilite_none : "");
return;
default:
break;
}

}
else if (type == ASTAlterCommand::REPLACE_PARTITION)
{
ostr << (settings.hilite ? hilite_keyword : "") << (replace ? "REPLACE" : "ATTACH") << " PARTITION "
Expand Down
1 change: 1 addition & 0 deletions src/Parsers/ASTAlterQuery.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ class ASTAlterCommand : public IAST
FREEZE_ALL,
UNFREEZE_PARTITION,
UNFREEZE_ALL,
EXPORT_PARTITION,

DELETE,
UPDATE,
Expand Down
1 change: 1 addition & 0 deletions src/Parsers/CommonParsers.h
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,7 @@ namespace DB
MR_MACROS(MONTHS, "MONTHS") \
MR_MACROS(MOVE_PART, "MOVE PART") \
MR_MACROS(MOVE_PARTITION, "MOVE PARTITION") \
MR_MACROS(EXPORT_PARTITION, "EXPORT PARTITION") \
MR_MACROS(MOVE, "MOVE") \
MR_MACROS(MS, "MS") \
MR_MACROS(MUTATION, "MUTATION") \
Expand Down
17 changes: 17 additions & 0 deletions src/Parsers/ParserAlterQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ bool ParserAlterCommand::parseImpl(Pos & pos, ASTPtr & node, Expected & expected
ParserKeyword s_forget_partition(Keyword::FORGET_PARTITION);
ParserKeyword s_move_partition(Keyword::MOVE_PARTITION);
ParserKeyword s_move_part(Keyword::MOVE_PART);
ParserKeyword s_export_partition(Keyword::EXPORT_PARTITION);
ParserKeyword s_drop_detached_partition(Keyword::DROP_DETACHED_PARTITION);
ParserKeyword s_drop_detached_part(Keyword::DROP_DETACHED_PART);
ParserKeyword s_fetch_partition(Keyword::FETCH_PARTITION);
Expand Down Expand Up @@ -564,6 +565,22 @@ bool ParserAlterCommand::parseImpl(Pos & pos, ASTPtr & node, Expected & expected
command->move_destination_name = ast_space_name->as<ASTLiteral &>().value.safeGet<String>();
}
}
else if (s_export_partition.ignore(pos, expected))
{
if (!parser_partition.parse(pos, command_partition, expected))
return false;

command->type = ASTAlterCommand::EXPORT_PARTITION;

if (!s_to_table.ignore(pos, expected))
{
return false;
}

if (!parseDatabaseAndTableName(pos, expected, command->to_database, command->to_table))
return false;
command->move_destination_type = DataDestinationType::TABLE;
}
else if (s_add_constraint.ignore(pos, expected))
{
if (s_if_not_exists.ignore(pos, expected))
Expand Down
34 changes: 34 additions & 0 deletions src/Storages/ExportsList.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
#include <Storages/ExportsList.h>
#include "base/getThreadId.h"

namespace DB
{

ExportsListElement::ExportsListElement(
const StorageID & source_table_id_,
const StorageID & destination_table_id_,
const std::string & part_name_,
const std::string & destination_path_)
: source_table_id(source_table_id_),
destination_table_id(destination_table_id_),
part_name(part_name_),
destination_path(destination_path_),
thread_id(getThreadId())
{
}

ExportInfo ExportsListElement::getInfo() const
{
ExportInfo res;
res.source_database = source_table_id.database_name;
res.source_table = source_table_id.table_name;
res.destination_database = destination_table_id.database_name;
res.destination_table = destination_table_id.table_name;
res.part_name = part_name;
res.destination_path = destination_path;
res.elapsed = watch.elapsedSeconds();
res.thread_id = thread_id;
return res;
}

}
62 changes: 62 additions & 0 deletions src/Storages/ExportsList.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
#pragma once

#include <Storages/MergeTree/BackgroundProcessList.h>
#include <Interpreters/StorageID.h>
#include <Common/Stopwatch.h>
#include <Common/CurrentMetrics.h>
#include <boost/noncopyable.hpp>

namespace CurrentMetrics
{
extern const Metric Export;
}

namespace DB
{

struct ExportInfo
{
std::string source_database;
std::string destination_database;
std::string source_table;
std::string destination_table;
std::string part_name;
std::string destination_path;

Float64 elapsed;
UInt64 thread_id;
};

struct ExportsListElement : private boost::noncopyable
{
const StorageID source_table_id;
const StorageID destination_table_id;
const std::string part_name;
const std::string destination_path;

Stopwatch watch;
const UInt64 thread_id;

ExportsListElement(
const StorageID & source_table_id_,
const StorageID & destination_table_id_,
const std::string & part_name_,
const std::string & destination_path_);

ExportInfo getInfo() const;
};


/// List of currently processing moves
class ExportsList final : public BackgroundProcessList<ExportsListElement, ExportInfo>
{
private:
using Parent = BackgroundProcessList<ExportsListElement, ExportInfo>;

public:
ExportsList()
: Parent(CurrentMetrics::Export)
{}
};

}
17 changes: 6 additions & 11 deletions src/Storages/IPartitionStrategy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -332,32 +332,27 @@ ColumnPtr HiveStylePartitionStrategy::computePartitionKey(const Chunk & chunk)
return block_with_partition_by_expr.getByName(actions_with_column_name.column_name).column;
}

ColumnRawPtrs HiveStylePartitionStrategy::getFormatChunkColumns(const Chunk & chunk)
Chunk HiveStylePartitionStrategy::getFormatChunk(const Chunk & chunk)
{
ColumnRawPtrs result;
Chunk result;

if (partition_columns_in_data_file)
{
for (const auto & column : chunk.getColumns())
{
result.emplace_back(column.get());
result.addColumn(column);
}

return result;
}

if (chunk.getNumColumns() != sample_block.columns())
{
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Incorrect number of columns in chunk. Expected {}, found {}",
sample_block.columns(), chunk.getNumColumns());
}
chassert(chunk.getColumns().size() == sample_block.columns());

for (size_t i = 0; i < sample_block.columns(); i++)
{
if (!partition_columns_name_set.contains(sample_block.getByPosition(i).name))
{
result.emplace_back(chunk.getColumns()[i].get());
result.addColumn(chunk.getColumns()[i]);
}
}

Expand Down
Loading
Loading