23
23
#include < ROOT/RFieldToken.hxx>
24
24
#include < ROOT/RNTuple.hxx>
25
25
#include < ROOT/RNTupleDS.hxx>
26
- #include < ROOT/RNTupleWriter.hxx>
26
+ #include < ROOT/RNTupleFillContext.hxx>
27
+ #include < ROOT/RNTupleParallelWriter.hxx>
27
28
#include < ROOT/RTTreeDS.hxx>
28
29
#include < ROOT/TBufferMerger.hxx>
29
30
@@ -800,9 +801,10 @@ ROOT::Internal::RDF::UntypedSnapshotTTreeHelperMT::MakeNew(void *newName, std::s
800
801
}
801
802
802
803
ROOT::Internal::RDF::UntypedSnapshotRNTupleHelper::UntypedSnapshotRNTupleHelper (
803
- std::string_view filename, std::string_view dirname, std::string_view ntuplename, const ColumnNames_t &vfnames,
804
- const ColumnNames_t &fnames, const RSnapshotOptions &options, ROOT::Detail::RDF::RLoopManager *inputLM,
805
- ROOT::Detail::RDF::RLoopManager *outputLM, const std::vector<const std::type_info *> &colTypeIDs)
804
+ unsigned int nSlots, std::string_view filename, std::string_view dirname, std::string_view ntuplename,
805
+ const ColumnNames_t &vfnames, const ColumnNames_t &fnames, const RSnapshotOptions &options,
806
+ ROOT::Detail::RDF::RLoopManager *inputLM, ROOT::Detail::RDF::RLoopManager *outputLM,
807
+ const std::vector<const std::type_info *> &colTypeIDs)
806
808
: fFileName(filename),
807
809
fDirName(dirname),
808
810
fNTupleName(ntuplename),
@@ -811,6 +813,9 @@ ROOT::Internal::RDF::UntypedSnapshotRNTupleHelper::UntypedSnapshotRNTupleHelper(
811
813
fOutputLoopManager(outputLM),
812
814
fInputFieldNames(vfnames),
813
815
fOutputFieldNames(ReplaceDotWithUnderscore(fnames)),
816
+ fNSlots(nSlots),
817
+ fFillContexts(nSlots),
818
+ fEntries(nSlots),
814
819
fInputColumnTypeIDs(colTypeIDs)
815
820
{
816
821
EnsureValidSnapshotRNTupleOutput (fOptions , fNTupleName , fFileName );
@@ -845,7 +850,6 @@ void ROOT::Internal::RDF::UntypedSnapshotRNTupleHelper::Initialize()
845
850
fFieldTokens [i] = model->GetToken (fOutputFieldNames [i]);
846
851
}
847
852
model->Freeze ();
848
- fOutputEntry = model->CreateBareEntry ();
849
853
850
854
ROOT::RNTupleWriteOptions writeOptions;
851
855
writeOptions.SetCompression (fOptions .fCompressionAlgorithm , fOptions .fCompressionLevel );
@@ -864,20 +868,41 @@ void ROOT::Internal::RDF::UntypedSnapshotRNTupleHelper::Initialize()
864
868
outputDir = fOutputFile ->mkdir (fDirName .c_str ());
865
869
}
866
870
867
- fWriter = ROOT::RNTupleWriter::Append (std::move (model), fNTupleName , *outputDir, writeOptions);
871
+ fWriter = ROOT::Experimental::RNTupleParallelWriter::Append (std::move (model), fNTupleName , *outputDir, writeOptions);
872
+ }
873
+
874
+ void ROOT::Internal::RDF::UntypedSnapshotRNTupleHelper::InitTask (TTreeReader *, unsigned int slot)
875
+ {
876
+ if (!fFillContexts [slot]) {
877
+ fFillContexts [slot] = fWriter ->CreateFillContext ();
878
+ fEntries [slot] = fFillContexts [slot]->GetModel ().CreateBareEntry ();
879
+ }
868
880
}
869
881
870
- void ROOT::Internal::RDF::UntypedSnapshotRNTupleHelper::Exec (unsigned int /* slot */ , const std::vector<void *> &values)
882
+ void ROOT::Internal::RDF::UntypedSnapshotRNTupleHelper::Exec (unsigned int slot, const std::vector<void *> &values)
871
883
{
884
+ auto &fillContext = fFillContexts [slot];
885
+ auto &outputEntry = fEntries [slot];
872
886
assert (values.size () == fFieldTokens .size ());
873
887
for (decltype (values.size ()) i = 0 ; i < values.size (); i++) {
874
- fOutputEntry ->BindRawPtr (fFieldTokens [i], values[i]);
888
+ outputEntry ->BindRawPtr (fFieldTokens [i], values[i]);
875
889
}
876
- fWriter ->Fill (*fOutputEntry );
890
+ fillContext->Fill (*outputEntry);
891
+ }
892
+
893
+ void ROOT::Internal::RDF::UntypedSnapshotRNTupleHelper::FinalizeTask (unsigned int slot)
894
+ {
895
+ // In principle we would not need to flush a cluster here, but we want to benefit from parallelism for compression.
896
+ // NB: RNTupleFillContext::FlushCluster() is a nop if there is no new entry since the last flush.
897
+ fFillContexts [slot]->FlushCluster ();
877
898
}
878
899
879
900
void ROOT::Internal::RDF::UntypedSnapshotRNTupleHelper::Finalize ()
880
901
{
902
+ // First clear and destroy all entries, which were created from the RNTupleFillContexts.
903
+ fEntries .clear ();
904
+ fFillContexts .clear ();
905
+ // Then destroy the RNTupleParallelWriter and write the metadata.
881
906
fWriter .reset ();
882
907
// We can now set the data source of the loop manager for the RDataFrame that is returned by the Snapshot call.
883
908
fOutputLoopManager ->SetDataSource (std::make_unique<ROOT::RDF::RNTupleDS>(fDirName + " /" + fNTupleName , fFileName ));
@@ -898,7 +923,7 @@ ROOT::Internal::RDF::UntypedSnapshotRNTupleHelper
898
923
ROOT::Internal::RDF::UntypedSnapshotRNTupleHelper::MakeNew (void *newName)
899
924
{
900
925
const std::string finalName = *reinterpret_cast <const std::string *>(newName);
901
- return UntypedSnapshotRNTupleHelper{finalName, fDirName , fNTupleName ,
902
- fInputFieldNames , fOutputFieldNames , fOptions ,
903
- fInputLoopManager , fOutputLoopManager , fInputColumnTypeIDs };
926
+ return UntypedSnapshotRNTupleHelper{
927
+ fNSlots , finalName, fDirName , fNTupleName , fInputFieldNames ,
928
+ fOutputFieldNames , fOptions , fInputLoopManager , fOutputLoopManager , fInputColumnTypeIDs };
904
929
}
0 commit comments