Skip to content

Conversation

arthurpassos
Copy link
Collaborator

@arthurpassos arthurpassos commented Jul 28, 2025

Changelog category (leave one):

  • New Feature

Changelog entry (a user-readable short description of the changes that goes to CHANGELOG.md):

Implement exporting partitions from merge tree tables to object storage in a different format (e.g, parquet). The files are converted to the destination format in-memory.

Syntax: ALTER TABLE merge_tree_table EXPORT PARTITION ID 'ABC' TO TABLE 's3_hive_table'.

Related settings: export_merge_tree_partition_background_execution and allow_experimental_export_merge_tree_partition.

  1. The destination file names and paths, for now, are decided on the destination engine (I am only testing and thinking about S3 with hive, so <table_root>/pkey1=pvalue1/.../pkeyn=pvaluen/<snowflakeid>.parquet). Most likely in the future we'll not be using snowflakeids for the filenames.
  2. A commit file should be uploaded at the end of the execution to signal the completion of the transaction, the filename is: commit_<partition_id>_<transaction_id>. It shall contain the list of files that were uploaded in that transaction.
  3. A partition can not be exported twice. The limitation comes from the fact upon re-export we don't have a reliable way of telling which parts should be exported (we can't duplicate data). Parts might have been merged with un-exported parts and etc. Perhaps we could lock these parts from merges and mutations forever? That is a question for the audience.
  4. While a partition is being exported, the set of parts collected for that export can not be merged or mutated.
  5. Exports should be able to recover from hard failures/disasters (hard re-start or crash). This is controlled using export manifests that are written on disk.
  6. Upon re-start, exports are scheduled based on when they were created.
  7. For now, exports are being scheduled in the same list of disk moves. I still need to decide if I'll create yet another queue or re-use one of the existing ones.
  8. I have not tested how it behaves on "soft failures (e.g, one stream out of multiple ones failed)". I suspect it is not properly implemented yet.
  9. Export manifests are being written on anyDisk for now.
  10. Number of streams should be equal to max_threads
  11. There is some half-baked observability on system.exports and system.part_log

Documentation entry for user-facing changes

...

Exclude tests:

  • Fast test
  • Integration Tests
  • Stateless tests
  • Stateful tests
  • Performance tests
  • All with ASAN
  • All with TSAN
  • All with MSAN
  • All with UBSAN
  • All with Coverage
  • All with Aarch64
  • All Regression
  • Disable CI Cache

Copy link

github-actions bot commented Jul 28, 2025

Workflow [PR], commit [34f7130]

@svb-alt svb-alt added enhancement New feature or request tiered storage Antalya Roadmap: Tiered Storage labels Jul 30, 2025
@svb-alt svb-alt linked an issue Aug 8, 2025 that may be closed by this pull request
manifest->items.reserve(data_parts.size());
for (const auto & data_part : data_parts)
manifest->items.push_back({data_part->name, ""});
manifest->write();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

check fsync_metadata


if (stats.status.code != 0)
{
LOG_INFO(getLogger("ExportMergeTreePartitionToObjectStorageTask"), "Error importing part {}: {}", stats.part->name, stats.status.message);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exporting?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a bit confusing import vs export.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are just stubs, I will polish the entire PR once we are ok with approach, I fix all concurrency issues and etc.


std::vector<ExportsList::EntryPtr> export_list_entries;

for (const auto & data_part : data_parts)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sequential iteraion? I think we can make several parts run in parallel.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They run in parallel. Each part gets its own pipeline composed of ReadFromMergeTree -> StorageObjectStorageMergeTreeImporterSink.

The N pipelines created for the N parts in a given partition are put under a single QueryPipeline export_pipeline that will execute the individual pipelines in parallel.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

setNumThreads impact the parallelism of pipeline in different moments.
And you don't control how the work between processors in the pipeline is distributed between the threads

throw Exception(ErrorCodes::LOGICAL_ERROR, "Root pipeline is not completed");
}

export_pipeline.setNumThreads(local_context->getSettingsRef()[Setting::max_threads]);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think every single export should be single-threaded (similar to merges).
We can get many threads by exporting more files in parallel (again - similar to merges).

This way it's simpler to control the parallelism / resources used by that BG work

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this code already does what you are asking for: each part is single threaded, and many parts are parallelized according to max_threads


if (!already_exported_partition_ids.emplace(partition_id).second)
{
throw Exception(ErrorCodes::PART_IS_LOCKED, "Partition {} has already been exported", partition_id);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

option to reexport after changes?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, while you were out we established a partition could not be exported more than once

{
for (const auto & disk : getDisks())
{
for (auto it = disk->iterateDirectory(relative_data_path); it->isValid(); it->next())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we need a cleanup of old ones

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Initially, I was deleting the manifests as soon as the commit file was uploaded. But then we changed the requirements so that a partition could be exported only once. To be able to lock these partitions upon re-start, I opted for leaving the export manifests.

If we change that requirement, then I'll delete it for sure.

void StorageObjectStorageMergeTreePartImporterSink::onException(std::exception_ptr)
{
/// we should not reach here
std::terminate();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are you sure?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope, just stubs for now.

Part of the logic in this class is very hackish to keep the exceptions contained so that a single pipeline failure does not cause all the other pipelines to abort.

@@ -205,6 +214,15 @@ class IStorage : public std::enable_shared_from_this<IStorage>, public TypePromo
virtuals.set(std::make_unique<VirtualColumnsDescription>(std::move(virtuals_)));
}

virtual void commitExportPartitionTransaction(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe some better place for that? IStrorage is too generic.

throw Exception(ErrorCodes::PART_IS_LOCKED, "Partition {} has already been exported", partition_id);
}

auto exports_tagger = std::make_shared<CurrentlyExportingPartsTagger>(std::move(all_parts), *this);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i will probably be problematic to do the same with replicated without messing with replication queue.

I think that just holding the references to the parts should be enough (AFAIR they will stay on disk inactive while you hold the refernce even if will be merged).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
antalya-25.6 enhancement New feature or request tiered storage Antalya Roadmap: Tiered Storage
Projects
None yet
Development

Successfully merging this pull request may close these issues.

ALTER TABLE EXPORT to external table
3 participants