diff --git a/tree/dataframe/inc/ROOT/RDF/InterfaceUtils.hxx b/tree/dataframe/inc/ROOT/RDF/InterfaceUtils.hxx index a7b2fdfb81ed4..cab252e336494 100644 --- a/tree/dataframe/inc/ROOT/RDF/InterfaceUtils.hxx +++ b/tree/dataframe/inc/ROOT/RDF/InterfaceUtils.hxx @@ -293,19 +293,13 @@ BuildAction(const ColumnNames_t &colNames, const std::shared_ptr actionPtr; if (snapHelperArgs->fToNTuple) { - if (!ROOT::IsImplicitMTEnabled()) { - // single-thread snapshot - using Helper_t = UntypedSnapshotRNTupleHelper; - using Action_t = RActionSnapshot; + // We use the same helper for single- and multi-thread snapshot. + using Helper_t = UntypedSnapshotRNTupleHelper; + using Action_t = RActionSnapshot; - actionPtr.reset(new Action_t(Helper_t(filename, dirname, treename, colNames, outputColNames, options, inputLM, - outputLM, std::move(isDefine), colTypeIDs), - colNames, colTypeIDs, prevNode, colRegister)); - } else { - // multi-thread snapshot to RNTuple is not yet supported - // TODO(fdegeus) Add MT snapshotting - throw std::runtime_error("Snapshot: Snapshotting to RNTuple with IMT enabled is not supported yet."); - } + actionPtr.reset(new Action_t(Helper_t(nSlots, filename, dirname, treename, colNames, outputColNames, options, + inputLM, outputLM, colTypeIDs), + colNames, colTypeIDs, prevNode, colRegister)); } else { if (!ROOT::IsImplicitMTEnabled()) { // single-thread snapshot diff --git a/tree/dataframe/inc/ROOT/RDF/SnapshotHelpers.hxx b/tree/dataframe/inc/ROOT/RDF/SnapshotHelpers.hxx index e7ebca8041e95..ef10695b6c4ca 100644 --- a/tree/dataframe/inc/ROOT/RDF/SnapshotHelpers.hxx +++ b/tree/dataframe/inc/ROOT/RDF/SnapshotHelpers.hxx @@ -30,8 +30,12 @@ class TBranch; class TFile; namespace ROOT { -class RNTupleWriter; +namespace Experimental { +class RNTupleFillContext; +class RNTupleParallelWriter; +} // namespace Experimental class REntry; +class RFieldToken; class TBufferMerger; class TBufferMergerFile; } // namespace ROOT @@ -63,19 +67,20 @@ class R__CLING_PTRCHECK(off) UntypedSnapshotRNTupleHelper final : public RAction ROOT::Detail::RDF::RLoopManager *fOutputLoopManager; ColumnNames_t fInputFieldNames; // This contains the resolved aliases ColumnNames_t fOutputFieldNames; - std::unique_ptr fWriter; + std::unique_ptr fWriter; + std::vector fFieldTokens; - ROOT::REntry *fOutputEntry; - - std::vector fIsDefine; + unsigned int fNSlots; + std::vector> fFillContexts; + std::vector> fEntries; std::vector fInputColumnTypeIDs; // Types for the input columns public: - UntypedSnapshotRNTupleHelper(std::string_view filename, std::string_view dirname, std::string_view ntuplename, - const ColumnNames_t &vfnames, const ColumnNames_t &fnames, + UntypedSnapshotRNTupleHelper(unsigned int nSlots, std::string_view filename, std::string_view dirname, + std::string_view ntuplename, const ColumnNames_t &vfnames, const ColumnNames_t &fnames, const RSnapshotOptions &options, ROOT::Detail::RDF::RLoopManager *inputLM, - ROOT::Detail::RDF::RLoopManager *outputLM, std::vector &&isDefine, + ROOT::Detail::RDF::RLoopManager *outputLM, const std::vector &colTypeIDs); UntypedSnapshotRNTupleHelper(const UntypedSnapshotRNTupleHelper &) = delete; @@ -84,11 +89,13 @@ public: UntypedSnapshotRNTupleHelper &operator=(UntypedSnapshotRNTupleHelper &&) noexcept; ~UntypedSnapshotRNTupleHelper() final; - void InitTask(TTreeReader *, unsigned int /* slot */) {} + void Initialize(); + + void Exec(unsigned int slot, const std::vector &values); - void Exec(unsigned int /* slot */, const std::vector &values); + void InitTask(TTreeReader *, unsigned int slot); - void Initialize(); + void FinalizeTask(unsigned int slot); void Finalize(); diff --git a/tree/dataframe/src/RDFSnapshotHelpers.cxx b/tree/dataframe/src/RDFSnapshotHelpers.cxx index 0bbeda451cefe..89b04e7d591f0 100644 --- a/tree/dataframe/src/RDFSnapshotHelpers.cxx +++ b/tree/dataframe/src/RDFSnapshotHelpers.cxx @@ -20,9 +20,11 @@ #include #include +#include #include #include -#include +#include +#include #include #include @@ -799,22 +801,21 @@ ROOT::Internal::RDF::UntypedSnapshotTTreeHelperMT::MakeNew(void *newName, std::s } ROOT::Internal::RDF::UntypedSnapshotRNTupleHelper::UntypedSnapshotRNTupleHelper( - std::string_view filename, std::string_view dirname, std::string_view ntuplename, const ColumnNames_t &vfnames, - const ColumnNames_t &fnames, const RSnapshotOptions &options, ROOT::Detail::RDF::RLoopManager *inputLM, - ROOT::Detail::RDF::RLoopManager *outputLM, std::vector &&isDefine, + unsigned int nSlots, std::string_view filename, std::string_view dirname, std::string_view ntuplename, + const ColumnNames_t &vfnames, const ColumnNames_t &fnames, const RSnapshotOptions &options, + ROOT::Detail::RDF::RLoopManager *inputLM, ROOT::Detail::RDF::RLoopManager *outputLM, const std::vector &colTypeIDs) : fFileName(filename), fDirName(dirname), fNTupleName(ntuplename), - fOutputFile(nullptr), fOptions(options), fInputLoopManager(inputLM), fOutputLoopManager(outputLM), fInputFieldNames(vfnames), fOutputFieldNames(ReplaceDotWithUnderscore(fnames)), - fWriter(nullptr), - fOutputEntry(nullptr), - fIsDefine(std::move(isDefine)), + fNSlots(nSlots), + fFillContexts(nSlots), + fEntries(nSlots), fInputColumnTypeIDs(colTypeIDs) { EnsureValidSnapshotRNTupleOutput(fOptions, fNTupleName, fFileName); @@ -828,23 +829,15 @@ ROOT::Internal::RDF::UntypedSnapshotRNTupleHelper &ROOT::Internal::RDF::UntypedS ROOT::Internal::RDF::UntypedSnapshotRNTupleHelper::~UntypedSnapshotRNTupleHelper() { - if (!fNTupleName.empty() && !fOutputLoopManager->GetDataSource() && fOptions.fLazy) + if (!fNTupleName.empty() /* not moved from */ && !fOutputFile /* did not run */ && fOptions.fLazy) Warning("Snapshot", "A lazy Snapshot action was booked but never triggered."); } -void ROOT::Internal::RDF::UntypedSnapshotRNTupleHelper::Exec(unsigned int /* slot */, const std::vector &values) -{ - assert(values.size() == fOutputFieldNames.size()); - for (decltype(values.size()) i = 0; i < values.size(); i++) { - fOutputEntry->BindRawPtr(fOutputFieldNames[i], values[i]); - } - fWriter->Fill(); -} - void ROOT::Internal::RDF::UntypedSnapshotRNTupleHelper::Initialize() { - auto model = ROOT::RNTupleModel::Create(); + auto model = ROOT::RNTupleModel::CreateBare(); auto nFields = fOutputFieldNames.size(); + fFieldTokens.resize(nFields); for (decltype(nFields) i = 0; i < nFields; i++) { // Need to retrieve the type of every field to create as a string // If the input type for a field does not have RTTI, internally we store it as the tag UseNativeDataType. When @@ -854,8 +847,9 @@ void ROOT::Internal::RDF::UntypedSnapshotRNTupleHelper::Initialize() fInputFieldNames[i], fOptions.fVector2RVec) : ROOT::Internal::RDF::TypeID2TypeName(*fInputColumnTypeIDs[i]); model->AddField(ROOT::RFieldBase::Create(fOutputFieldNames[i], typeName).Unwrap()); + fFieldTokens[i] = model->GetToken(fOutputFieldNames[i]); } - fOutputEntry = &model->GetDefaultEntry(); + model->Freeze(); ROOT::RNTupleWriteOptions writeOptions; writeOptions.SetCompression(fOptions.fCompressionAlgorithm, fOptions.fCompressionLevel); @@ -874,11 +868,41 @@ void ROOT::Internal::RDF::UntypedSnapshotRNTupleHelper::Initialize() outputDir = fOutputFile->mkdir(fDirName.c_str()); } - fWriter = ROOT::RNTupleWriter::Append(std::move(model), fNTupleName, *outputDir, writeOptions); + fWriter = ROOT::Experimental::RNTupleParallelWriter::Append(std::move(model), fNTupleName, *outputDir, writeOptions); +} + +void ROOT::Internal::RDF::UntypedSnapshotRNTupleHelper::InitTask(TTreeReader *, unsigned int slot) +{ + if (!fFillContexts[slot]) { + fFillContexts[slot] = fWriter->CreateFillContext(); + fEntries[slot] = fFillContexts[slot]->GetModel().CreateBareEntry(); + } +} + +void ROOT::Internal::RDF::UntypedSnapshotRNTupleHelper::Exec(unsigned int slot, const std::vector &values) +{ + auto &fillContext = fFillContexts[slot]; + auto &outputEntry = fEntries[slot]; + assert(values.size() == fFieldTokens.size()); + for (decltype(values.size()) i = 0; i < values.size(); i++) { + outputEntry->BindRawPtr(fFieldTokens[i], values[i]); + } + fillContext->Fill(*outputEntry); +} + +void ROOT::Internal::RDF::UntypedSnapshotRNTupleHelper::FinalizeTask(unsigned int slot) +{ + // In principle we would not need to flush a cluster here, but we want to benefit from parallelism for compression. + // NB: RNTupleFillContext::FlushCluster() is a nop if there is no new entry since the last flush. + fFillContexts[slot]->FlushCluster(); } void ROOT::Internal::RDF::UntypedSnapshotRNTupleHelper::Finalize() { + // First clear and destroy all entries, which were created from the RNTupleFillContexts. + fEntries.clear(); + fFillContexts.clear(); + // Then destroy the RNTupleParallelWriter and write the metadata. fWriter.reset(); // We can now set the data source of the loop manager for the RDataFrame that is returned by the Snapshot call. fOutputLoopManager->SetDataSource(std::make_unique(fDirName + "/" + fNTupleName, fFileName)); @@ -899,8 +923,7 @@ ROOT::Internal::RDF::UntypedSnapshotRNTupleHelper ROOT::Internal::RDF::UntypedSnapshotRNTupleHelper::MakeNew(void *newName) { const std::string finalName = *reinterpret_cast(newName); - return UntypedSnapshotRNTupleHelper{finalName, fDirName, fNTupleName, - fInputFieldNames, fOutputFieldNames, fOptions, - fInputLoopManager, fOutputLoopManager, std::vector(fIsDefine), - fInputColumnTypeIDs}; + return UntypedSnapshotRNTupleHelper{ + fNSlots, finalName, fDirName, fNTupleName, fInputFieldNames, + fOutputFieldNames, fOptions, fInputLoopManager, fOutputLoopManager, fInputColumnTypeIDs}; } diff --git a/tree/dataframe/test/dataframe_snapshot_ntuple.cxx b/tree/dataframe/test/dataframe_snapshot_ntuple.cxx index 5a1c268f848e2..64323c0650f4f 100644 --- a/tree/dataframe/test/dataframe_snapshot_ntuple.cxx +++ b/tree/dataframe/test/dataframe_snapshot_ntuple.cxx @@ -72,6 +72,18 @@ TEST(RDFSnapshotRNTuple, FromScratch) } } +TEST(RDFSnapshotRNTuple, LazyTriggered) +{ + FileRAII fileGuard{"RDFSnapshotRNTuple_lazy.root"}; + auto d = ROOT::RDataFrame(1); + ROOT::RDF::RSnapshotOptions opts; + opts.fOutputFormat = ROOT::RDF::ESnapshotOutputFormat::kRNTuple; + opts.fLazy = true; + auto r = d.Snapshot("t", fileGuard.GetPath(), {"rdfentry_"}, opts); + *r; + r = {}; +} + void BookLazySnapshot(std::string_view filename) { auto d = ROOT::RDataFrame(1); @@ -614,23 +626,25 @@ struct TIMTEnabler { ~TIMTEnabler() { ROOT::DisableImplicitMT(); } }; -TEST(RDFSnapshotRNTuple, ThrowIfMT) +TEST(RDFSnapshotRNTuple, WithMT) { TIMTEnabler _(4); - FileRAII fileGuard{"RDFSnapshotRNTuple_throw_if_mt.root"}; + FileRAII fileGuard{"RDFSnapshotRNTuple_mt.root"}; - auto df = ROOT::RDataFrame(25ull).Define("x", [] { return 10; }); + auto df = ROOT::RDataFrame(25ull).Define("x", [](ULong64_t e) { return e; }, {"rdfentry_"}); RSnapshotOptions opts; opts.fOutputFormat = ROOT::RDF::ESnapshotOutputFormat::kRNTuple; - try { - auto sdf = df.Snapshot("ntuple", fileGuard.GetPath(), {"x"}, opts); - *sdf; - FAIL() << "MT snapshotting to RNTuple is not supported yet"; - } catch (const std::runtime_error &err) { - EXPECT_STREQ(err.what(), "Snapshot: Snapshotting to RNTuple with IMT enabled is not supported yet."); - } + auto sdf = df.Snapshot("ntuple", fileGuard.GetPath(), {"x"}, opts); + *sdf; + + auto sum = sdf->Sum("x"); + EXPECT_EQ(300, sum.GetValue()); + + auto reader = RNTupleReader::Open("ntuple", fileGuard.GetPath()); + EXPECT_EQ(25, reader->GetNEntries()); + // There should be more than one cluster, but this is not guaranteed because of scheduling... } #endif // R__USE_IMT