Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 6 additions & 12 deletions tree/dataframe/inc/ROOT/RDF/InterfaceUtils.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -293,19 +293,13 @@ BuildAction(const ColumnNames_t &colNames, const std::shared_ptr<SnapshotHelperA

std::unique_ptr<RActionBase> actionPtr;
if (snapHelperArgs->fToNTuple) {
if (!ROOT::IsImplicitMTEnabled()) {
// single-thread snapshot
using Helper_t = UntypedSnapshotRNTupleHelper;
using Action_t = RActionSnapshot<Helper_t, PrevNodeType>;
// We use the same helper for single- and multi-thread snapshot.
using Helper_t = UntypedSnapshotRNTupleHelper;
using Action_t = RActionSnapshot<Helper_t, PrevNodeType>;

actionPtr.reset(new Action_t(Helper_t(filename, dirname, treename, colNames, outputColNames, options, inputLM,
outputLM, std::move(isDefine), colTypeIDs),
colNames, colTypeIDs, prevNode, colRegister));
} else {
// multi-thread snapshot to RNTuple is not yet supported
// TODO(fdegeus) Add MT snapshotting
throw std::runtime_error("Snapshot: Snapshotting to RNTuple with IMT enabled is not supported yet.");
}
actionPtr.reset(new Action_t(Helper_t(nSlots, filename, dirname, treename, colNames, outputColNames, options,
inputLM, outputLM, colTypeIDs),
colNames, colTypeIDs, prevNode, colRegister));
} else {
if (!ROOT::IsImplicitMTEnabled()) {
// single-thread snapshot
Expand Down
29 changes: 18 additions & 11 deletions tree/dataframe/inc/ROOT/RDF/SnapshotHelpers.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,12 @@ class TBranch;
class TFile;

namespace ROOT {
class RNTupleWriter;
namespace Experimental {
class RNTupleFillContext;
class RNTupleParallelWriter;
} // namespace Experimental
class REntry;
class RFieldToken;
class TBufferMerger;
class TBufferMergerFile;
} // namespace ROOT
Expand Down Expand Up @@ -63,19 +67,20 @@ class R__CLING_PTRCHECK(off) UntypedSnapshotRNTupleHelper final : public RAction
ROOT::Detail::RDF::RLoopManager *fOutputLoopManager;
ColumnNames_t fInputFieldNames; // This contains the resolved aliases
ColumnNames_t fOutputFieldNames;
std::unique_ptr<ROOT::RNTupleWriter> fWriter;
std::unique_ptr<ROOT::Experimental::RNTupleParallelWriter> fWriter;
std::vector<ROOT::RFieldToken> fFieldTokens;

ROOT::REntry *fOutputEntry;

std::vector<bool> fIsDefine;
unsigned int fNSlots;
std::vector<std::shared_ptr<ROOT::Experimental::RNTupleFillContext>> fFillContexts;
std::vector<std::unique_ptr<ROOT::REntry>> fEntries;

std::vector<const std::type_info *> fInputColumnTypeIDs; // Types for the input columns

public:
UntypedSnapshotRNTupleHelper(std::string_view filename, std::string_view dirname, std::string_view ntuplename,
const ColumnNames_t &vfnames, const ColumnNames_t &fnames,
UntypedSnapshotRNTupleHelper(unsigned int nSlots, std::string_view filename, std::string_view dirname,
std::string_view ntuplename, const ColumnNames_t &vfnames, const ColumnNames_t &fnames,
const RSnapshotOptions &options, ROOT::Detail::RDF::RLoopManager *inputLM,
ROOT::Detail::RDF::RLoopManager *outputLM, std::vector<bool> &&isDefine,
ROOT::Detail::RDF::RLoopManager *outputLM,
const std::vector<const std::type_info *> &colTypeIDs);

UntypedSnapshotRNTupleHelper(const UntypedSnapshotRNTupleHelper &) = delete;
Expand All @@ -84,11 +89,13 @@ public:
UntypedSnapshotRNTupleHelper &operator=(UntypedSnapshotRNTupleHelper &&) noexcept;
~UntypedSnapshotRNTupleHelper() final;

void InitTask(TTreeReader *, unsigned int /* slot */) {}
void Initialize();

void Exec(unsigned int slot, const std::vector<void *> &values);

void Exec(unsigned int /* slot */, const std::vector<void *> &values);
void InitTask(TTreeReader *, unsigned int slot);

void Initialize();
void FinalizeTask(unsigned int slot);

void Finalize();

Expand Down
73 changes: 48 additions & 25 deletions tree/dataframe/src/RDFSnapshotHelpers.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@
#include <ROOT/RDF/SnapshotHelpers.hxx>

#include <ROOT/REntry.hxx>
#include <ROOT/RFieldToken.hxx>
#include <ROOT/RNTuple.hxx>
#include <ROOT/RNTupleDS.hxx>
#include <ROOT/RNTupleWriter.hxx>
#include <ROOT/RNTupleFillContext.hxx>
#include <ROOT/RNTupleParallelWriter.hxx>
#include <ROOT/RTTreeDS.hxx>
#include <ROOT/TBufferMerger.hxx>

Expand Down Expand Up @@ -799,22 +801,21 @@ ROOT::Internal::RDF::UntypedSnapshotTTreeHelperMT::MakeNew(void *newName, std::s
}

ROOT::Internal::RDF::UntypedSnapshotRNTupleHelper::UntypedSnapshotRNTupleHelper(
std::string_view filename, std::string_view dirname, std::string_view ntuplename, const ColumnNames_t &vfnames,
const ColumnNames_t &fnames, const RSnapshotOptions &options, ROOT::Detail::RDF::RLoopManager *inputLM,
ROOT::Detail::RDF::RLoopManager *outputLM, std::vector<bool> &&isDefine,
unsigned int nSlots, std::string_view filename, std::string_view dirname, std::string_view ntuplename,
const ColumnNames_t &vfnames, const ColumnNames_t &fnames, const RSnapshotOptions &options,
ROOT::Detail::RDF::RLoopManager *inputLM, ROOT::Detail::RDF::RLoopManager *outputLM,
const std::vector<const std::type_info *> &colTypeIDs)
: fFileName(filename),
fDirName(dirname),
fNTupleName(ntuplename),
fOutputFile(nullptr),
fOptions(options),
fInputLoopManager(inputLM),
fOutputLoopManager(outputLM),
fInputFieldNames(vfnames),
fOutputFieldNames(ReplaceDotWithUnderscore(fnames)),
fWriter(nullptr),
fOutputEntry(nullptr),
fIsDefine(std::move(isDefine)),
fNSlots(nSlots),
fFillContexts(nSlots),
fEntries(nSlots),
fInputColumnTypeIDs(colTypeIDs)
{
EnsureValidSnapshotRNTupleOutput(fOptions, fNTupleName, fFileName);
Expand All @@ -828,23 +829,15 @@ ROOT::Internal::RDF::UntypedSnapshotRNTupleHelper &ROOT::Internal::RDF::UntypedS

ROOT::Internal::RDF::UntypedSnapshotRNTupleHelper::~UntypedSnapshotRNTupleHelper()
{
if (!fNTupleName.empty() && !fOutputLoopManager->GetDataSource() && fOptions.fLazy)
if (!fNTupleName.empty() /* not moved from */ && !fOutputFile /* did not run */ && fOptions.fLazy)
Warning("Snapshot", "A lazy Snapshot action was booked but never triggered.");
}

void ROOT::Internal::RDF::UntypedSnapshotRNTupleHelper::Exec(unsigned int /* slot */, const std::vector<void *> &values)
{
assert(values.size() == fOutputFieldNames.size());
for (decltype(values.size()) i = 0; i < values.size(); i++) {
fOutputEntry->BindRawPtr(fOutputFieldNames[i], values[i]);
}
fWriter->Fill();
}

void ROOT::Internal::RDF::UntypedSnapshotRNTupleHelper::Initialize()
{
auto model = ROOT::RNTupleModel::Create();
auto model = ROOT::RNTupleModel::CreateBare();
auto nFields = fOutputFieldNames.size();
fFieldTokens.resize(nFields);
for (decltype(nFields) i = 0; i < nFields; i++) {
// Need to retrieve the type of every field to create as a string
// If the input type for a field does not have RTTI, internally we store it as the tag UseNativeDataType. When
Expand All @@ -854,8 +847,9 @@ void ROOT::Internal::RDF::UntypedSnapshotRNTupleHelper::Initialize()
fInputFieldNames[i], fOptions.fVector2RVec)
: ROOT::Internal::RDF::TypeID2TypeName(*fInputColumnTypeIDs[i]);
model->AddField(ROOT::RFieldBase::Create(fOutputFieldNames[i], typeName).Unwrap());
fFieldTokens[i] = model->GetToken(fOutputFieldNames[i]);
}
fOutputEntry = &model->GetDefaultEntry();
model->Freeze();

ROOT::RNTupleWriteOptions writeOptions;
writeOptions.SetCompression(fOptions.fCompressionAlgorithm, fOptions.fCompressionLevel);
Expand All @@ -874,11 +868,41 @@ void ROOT::Internal::RDF::UntypedSnapshotRNTupleHelper::Initialize()
outputDir = fOutputFile->mkdir(fDirName.c_str());
}

fWriter = ROOT::RNTupleWriter::Append(std::move(model), fNTupleName, *outputDir, writeOptions);
fWriter = ROOT::Experimental::RNTupleParallelWriter::Append(std::move(model), fNTupleName, *outputDir, writeOptions);
}

void ROOT::Internal::RDF::UntypedSnapshotRNTupleHelper::InitTask(TTreeReader *, unsigned int slot)
{
if (!fFillContexts[slot]) {
fFillContexts[slot] = fWriter->CreateFillContext();
fEntries[slot] = fFillContexts[slot]->GetModel().CreateBareEntry();
}
}

void ROOT::Internal::RDF::UntypedSnapshotRNTupleHelper::Exec(unsigned int slot, const std::vector<void *> &values)
{
auto &fillContext = fFillContexts[slot];
auto &outputEntry = fEntries[slot];
assert(values.size() == fFieldTokens.size());
for (decltype(values.size()) i = 0; i < values.size(); i++) {
outputEntry->BindRawPtr(fFieldTokens[i], values[i]);
}
fillContext->Fill(*outputEntry);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't it meant to be FillNoFlush?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not needed here because UntypedSnapshotRNTupleHelper exclusively owns the TFile and there is only one RNTupleParallelWriter appending to it. Without the need to lock on the user / "framework" side, there is no benefit to using FillNoFlush (except it's longer and more code to write).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
fillContext->Fill(*outputEntry);
// Any synchronization needed are handled by the underlying `RNTupleParallelWriter`
// which has exclusive access to its `TFile`.
fillContext->Fill(*outputEntry);

}

void ROOT::Internal::RDF::UntypedSnapshotRNTupleHelper::FinalizeTask(unsigned int slot)
{
// In principle we would not need to flush a cluster here, but we want to benefit from parallelism for compression.
// NB: RNTupleFillContext::FlushCluster() is a nop if there is no new entry since the last flush.
fFillContexts[slot]->FlushCluster();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With Snapshot to TTree, users can specify a value (expressed in number of entries) for the size of the output TTree clusters via fAutoFlush. I wonder how this line impacts this feature, which at the moment is not supported for the Snapshot to RNTuple, but I imagine it would be requested at some point.

}

void ROOT::Internal::RDF::UntypedSnapshotRNTupleHelper::Finalize()
{
// First clear and destroy all entries, which were created from the RNTupleFillContexts.
fEntries.clear();
fFillContexts.clear();
// Then destroy the RNTupleParallelWriter and write the metadata.
fWriter.reset();
// We can now set the data source of the loop manager for the RDataFrame that is returned by the Snapshot call.
fOutputLoopManager->SetDataSource(std::make_unique<ROOT::RDF::RNTupleDS>(fDirName + "/" + fNTupleName, fFileName));
Expand All @@ -899,8 +923,7 @@ ROOT::Internal::RDF::UntypedSnapshotRNTupleHelper
ROOT::Internal::RDF::UntypedSnapshotRNTupleHelper::MakeNew(void *newName)
{
const std::string finalName = *reinterpret_cast<const std::string *>(newName);
return UntypedSnapshotRNTupleHelper{finalName, fDirName, fNTupleName,
fInputFieldNames, fOutputFieldNames, fOptions,
fInputLoopManager, fOutputLoopManager, std::vector<bool>(fIsDefine),
fInputColumnTypeIDs};
return UntypedSnapshotRNTupleHelper{
fNSlots, finalName, fDirName, fNTupleName, fInputFieldNames,
fOutputFieldNames, fOptions, fInputLoopManager, fOutputLoopManager, fInputColumnTypeIDs};
}
34 changes: 24 additions & 10 deletions tree/dataframe/test/dataframe_snapshot_ntuple.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,18 @@ TEST(RDFSnapshotRNTuple, FromScratch)
}
}

TEST(RDFSnapshotRNTuple, LazyTriggered)
{
FileRAII fileGuard{"RDFSnapshotRNTuple_lazy.root"};
auto d = ROOT::RDataFrame(1);
ROOT::RDF::RSnapshotOptions opts;
opts.fOutputFormat = ROOT::RDF::ESnapshotOutputFormat::kRNTuple;
opts.fLazy = true;
auto r = d.Snapshot("t", fileGuard.GetPath(), {"rdfentry_"}, opts);
*r;
r = {};
}

void BookLazySnapshot(std::string_view filename)
{
auto d = ROOT::RDataFrame(1);
Expand Down Expand Up @@ -614,23 +626,25 @@ struct TIMTEnabler {
~TIMTEnabler() { ROOT::DisableImplicitMT(); }
};

TEST(RDFSnapshotRNTuple, ThrowIfMT)
TEST(RDFSnapshotRNTuple, WithMT)
{
TIMTEnabler _(4);

FileRAII fileGuard{"RDFSnapshotRNTuple_throw_if_mt.root"};
FileRAII fileGuard{"RDFSnapshotRNTuple_mt.root"};

auto df = ROOT::RDataFrame(25ull).Define("x", [] { return 10; });
auto df = ROOT::RDataFrame(25ull).Define("x", [](ULong64_t e) { return e; }, {"rdfentry_"});

RSnapshotOptions opts;
opts.fOutputFormat = ROOT::RDF::ESnapshotOutputFormat::kRNTuple;

try {
auto sdf = df.Snapshot("ntuple", fileGuard.GetPath(), {"x"}, opts);
*sdf;
FAIL() << "MT snapshotting to RNTuple is not supported yet";
} catch (const std::runtime_error &err) {
EXPECT_STREQ(err.what(), "Snapshot: Snapshotting to RNTuple with IMT enabled is not supported yet.");
}
auto sdf = df.Snapshot("ntuple", fileGuard.GetPath(), {"x"}, opts);
*sdf;

auto sum = sdf->Sum<std::uint64_t>("x");
EXPECT_EQ(300, sum.GetValue());

auto reader = RNTupleReader::Open("ntuple", fileGuard.GetPath());
EXPECT_EQ(25, reader->GetNEntries());
// There should be more than one cluster, but this is not guaranteed because of scheduling...
}
#endif // R__USE_IMT
Loading