Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ntuple] add TFileMerger options to mirror RNTupleMergeOptions #17299

Merged
merged 3 commits into from
Jan 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions README/ReleaseNotes/v636/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,18 @@ The following people have contributed to this new version:

## IO

* New options have been added to TFileMerger (which can be passed as whitespace-separated TStrings via `TFileMerger::SetMergeOptions`)
* "FirstSrcCompression": when merging multiple files, instructs the class-specific merger to use the same compression as the
first object of the destination's class as the destination's compression. Currently only recognized by the RNTuple merger;
* "DefaultCompression": specifies that the merged output should use the class-specific default compression. Currently only
meaningful for RNTuple, which has a default compression different from the TFile's default compression (ZSTD instead of ZLIB).
This option is automatically set by `hadd` when no other compression option is specified;
* "rntuple.MergingMode=(Filter|Union|Strict)": RNTuple-specific option that specifies the merging mode that should be used by
the RNTupleMerger (see
[RNTupleMergeOptions](https://root.cern/doc/v634/structROOT_1_1Experimental_1_1Internal_1_1RNTupleMergeOptions.html));
* "rntuple.ErrBehavior=(Abort|Skip)": RNTuple-specific option that specifies the behavior of the RNTupleMerger on error (see link above);
* "rntuple.ExtraVerbose": RNTuple-specific option that tells the RNTupleMerger to emit more information during the merge process.

## RDataFrame

## Tutorials and Code Examples
Expand Down
4 changes: 2 additions & 2 deletions main/src/hadd.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -674,10 +674,10 @@ int main(int argc, char **argv)
else
newcomp = ROOT::RCompressionSetting::EDefaults::kUseCompiledDefault;
delete firstInput;
fileMerger.SetMergeOptions(TString("first_source_compression"));
fileMerger.SetMergeOptions(TString("FirstSrcCompression"));
} else {
newcomp = ROOT::RCompressionSetting::EDefaults::kUseCompiledDefault;
fileMerger.SetMergeOptions(TString("default_compression"));
fileMerger.SetMergeOptions(TString("DefaultCompression"));
}
}
if (verbosity > 1) {
Expand Down
9 changes: 9 additions & 0 deletions tree/ntuple/v7/inc/ROOT/RNTupleMerger.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,15 @@ struct RSealedPageMergeData;

class RClusterPool;

/// Set of merging options to pass to RNTupleMerger.
/// If you're using the merger through TFileMerger you need to give it string-based options instead.
/// Here is the mapping for the TFileMerger options:
pcanal marked this conversation as resolved.
Show resolved Hide resolved
/// - "rntuple.MergingMode=(Filter|Union|...)" -> sets fMergingMode
/// - "rntuple.ErrBehavior=(Abort|Skip|...)" -> sets fErrBehavior
/// - "rntuple.ExtraVerbose" -> sets fExtraVerbose to true
/// Rules about the string-based options:
/// 1. there must be no space between the separators (i.e. `:` and `=`)
/// 2. all string matching is case insensitive
struct RNTupleMergeOptions {
/// If `fCompressionSettings == kNTupleUnknownCompression` (the default), the merger will not change the
/// compression of any of its sources (fast merging). Otherwise, all sources will be converted to the specified
Expand Down
70 changes: 64 additions & 6 deletions tree/ntuple/v7/src/RNTupleMerger.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,63 @@
#include <algorithm>
#include <deque>
#include <inttypes.h> // for PRIu64
#include <initializer_list>
#include <unordered_map>
#include <vector>

using namespace ROOT::Experimental;
using namespace ROOT::Experimental::Internal;

// TFile options parsing
// -------------------------------------------------------------------------------------
static bool BeginsWithDelimitedWord(const TString &str, const char *word)
{
const Ssiz_t wordLen = strlen(word);
if (str.Length() < wordLen)
return false;
if (!str.BeginsWith(word, TString::ECaseCompare::kIgnoreCase))
return false;
return str.Length() == wordLen || str(wordLen) == ' ';
}

template <typename T>
static std::optional<T> ParseStringOption(const TString &opts, const char *pattern,
std::initializer_list<std::pair<const char *, T>> validValues)
{
const Ssiz_t patternLen = strlen(pattern);
assert(pattern[patternLen - 1] == '='); // we want to parse options with the format `option=Value`
if (auto idx = opts.Index(pattern, 0, TString::ECaseCompare::kIgnoreCase);
idx >= 0 && opts.Length() > idx + patternLen) {
auto sub = TString(opts(idx + patternLen, opts.Length() - idx - patternLen));
for (const auto &[name, value] : validValues) {
if (BeginsWithDelimitedWord(sub, name)) {
return value;
}
}
}
return std::nullopt;
}

static std::optional<ENTupleMergingMode> ParseOptionMergingMode(const TString &opts)
{
return ParseStringOption<ENTupleMergingMode>(opts, "rntuple.MergingMode=",
{
{"Filter", ENTupleMergingMode::kFilter},
{"Union", ENTupleMergingMode::kUnion},
{"Strict", ENTupleMergingMode::kStrict},
});
}

static std::optional<ENTupleMergeErrBehavior> ParseOptionErrBehavior(const TString &opts)
{
return ParseStringOption<ENTupleMergeErrBehavior>(opts, "rntuple.ErrBehavior=",
{
{"Abort", ENTupleMergeErrBehavior::kAbort},
{"Skip", ENTupleMergeErrBehavior::kSkip},
});
}
// -------------------------------------------------------------------------------------

// Entry point for TFileMerger. Internally calls RNTupleMerger::Merge().
Long64_t ROOT::RNTuple::Merge(TCollection *inputs, TFileMergeInfo *mergeInfo)
// IMPORTANT: this function must not throw, as it is used in exception-unsafe code (TFileMerger).
Expand Down Expand Up @@ -78,13 +129,13 @@ try {
// pointer we just got.
}

const bool defaultComp = mergeInfo->fOptions.Contains("default_compression");
const bool firstSrcComp = mergeInfo->fOptions.Contains("first_source_compression");
const bool defaultComp = mergeInfo->fOptions.Contains("DefaultCompression");
const bool firstSrcComp = mergeInfo->fOptions.Contains("FirstSrcCompression");
const bool extraVerbose = mergeInfo->fOptions.Contains("rntuple.ExtraVerbose");
if (defaultComp && firstSrcComp) {
// this should never happen through hadd, but a user may call RNTuple::Merge() from custom code...
Warning(
"RNTuple::Merge",
"Passed both options \"default_compression\" and \"first_source_compression\": only the latter will apply.");
// this should never happen through hadd, but a user may call RNTuple::Merge() from custom code.
Warning("RNTuple::Merge", "Passed both options \"DefaultCompression\" and \"FirstSrcCompression\": "
"only the latter will apply.");
}
int compression = kNTupleUnknownCompression;
if (firstSrcComp) {
Expand Down Expand Up @@ -167,6 +218,13 @@ try {
RNTupleMerger merger;
RNTupleMergeOptions mergerOpts;
mergerOpts.fCompressionSettings = compression;
mergerOpts.fExtraVerbose = extraVerbose;
if (auto mergingMode = ParseOptionMergingMode(mergeInfo->fOptions)) {
mergerOpts.fMergingMode = *mergingMode;
}
if (auto errBehavior = ParseOptionErrBehavior(mergeInfo->fOptions)) {
mergerOpts.fErrBehavior = *errBehavior;
}
merger.Merge(sourcePtrs, *destination, mergerOpts).ThrowOnError();

// Provide the caller with a merged anchor object (even though we've already
Expand Down
93 changes: 92 additions & 1 deletion tree/ntuple/v7/test/ntuple_merger.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
#include <zlib.h>
#include "gmock/gmock.h"

using ROOT::TestSupport::CheckDiagsRAII;

namespace {

// Reads an integer from a little-endian 4 byte buffer
Expand Down Expand Up @@ -1441,7 +1443,7 @@ TEST_P(RNTupleMergerCheckEncoding, CorrectEncoding)
fileMerger.AddFile(nt2.get());
// If `useDefaultComp` is true, it's as if we were calling hadd without a -f* flag
if (useDefaultComp)
fileMerger.SetMergeOptions(TString("default_compression"));
fileMerger.SetMergeOptions(TString("DefaultCompression"));
fileMerger.Merge();

EXPECT_TRUE(VerifyPageCompression(fileGuard3.GetPath(), expectedComp));
Expand Down Expand Up @@ -1479,3 +1481,92 @@ INSTANTIATE_TEST_SUITE_P(Seq, RNTupleMergerCheckEncoding,
::testing::Values(0, 101, 207, 404, 505),
// use default compression
::testing::Values(true, false)));

TEST(RNTupleMerger, MergeAsymmetric1TFileMerger)
{
// Exactly the same test as MergeAsymmetric1, but passing through TFileMerger.

// Write two test ntuples to be merged
FileRaii fileGuard1("test_ntuple_merge_in_1.root");
{
auto model = RNTupleModel::Create();
auto fieldFoo = model->MakeField<int>("foo");
auto ntuple = RNTupleWriter::Recreate(std::move(model), "ntuple", fileGuard1.GetPath());
for (size_t i = 0; i < 10; ++i) {
*fieldFoo = i * 123;
ntuple->Fill();
}
}

FileRaii fileGuard2("test_ntuple_merge_in_2.root");
{
auto model = RNTupleModel::Create();
auto fieldBar = model->MakeField<int>("bar");
auto ntuple = RNTupleWriter::Recreate(std::move(model), "ntuple", fileGuard2.GetPath());
for (size_t i = 0; i < 10; ++i) {
*fieldBar = i * 765;
ntuple->Fill();
}
}

// Now merge the inputs
FileRaii fileGuard3("test_ntuple_merge_out.root");
{
// Gather the input sources
std::vector<std::unique_ptr<RPageSource>> sources;
sources.push_back(RPageSource::Create("ntuple", fileGuard1.GetPath(), RNTupleReadOptions()));
sources.push_back(RPageSource::Create("ntuple", fileGuard2.GetPath(), RNTupleReadOptions()));
std::vector<RPageSource *> sourcePtrs;
for (const auto &s : sources) {
sourcePtrs.push_back(s.get());
}

// Now Merge the inputs
// We expect this to fail in Filter and Strict mode since the fields between the sources do NOT match
{
auto nt1 = std::unique_ptr<TFile>(TFile::Open(fileGuard1.GetPath().c_str()));
auto nt2 = std::unique_ptr<TFile>(TFile::Open(fileGuard2.GetPath().c_str()));
TFileMerger fileMerger(kFALSE, kFALSE);
fileMerger.OutputFile(fileGuard3.GetPath().c_str(), "RECREATE");
fileMerger.AddFile(nt1.get());
fileMerger.AddFile(nt2.get());
fileMerger.SetMergeOptions(TString("rntuple.MergingMode=Filter"));
CheckDiagsRAII diags;
diags.requiredDiag(kError, "TFileMerger::Merge", "error during merge", false);
diags.requiredDiag(kError, "RNTuple::Merge", "missing the following field", false);
diags.requiredDiag(kError, "TFileMerger::MergeRecursive", "Could NOT merge RNTuples!", false);
diags.optionalDiag(kWarning, "TFileMerger::MergeRecursive", "Merging RNTuples is experimental", false);
auto res = fileMerger.Merge();
EXPECT_FALSE(res);
}
{
auto nt1 = std::unique_ptr<TFile>(TFile::Open(fileGuard1.GetPath().c_str()));
auto nt2 = std::unique_ptr<TFile>(TFile::Open(fileGuard2.GetPath().c_str()));
TFileMerger fileMerger(kFALSE, kFALSE);
fileMerger.OutputFile(fileGuard3.GetPath().c_str(), "RECREATE");
fileMerger.AddFile(nt1.get());
fileMerger.AddFile(nt2.get());
fileMerger.SetMergeOptions(TString("rntuple.MergingMode=Strict"));
CheckDiagsRAII diags;
diags.requiredDiag(kError, "TFileMerger::Merge", "error during merge", false);
diags.requiredDiag(kError, "RNTuple::Merge", "missing the following field", false);
diags.requiredDiag(kError, "TFileMerger::MergeRecursive", "Could NOT merge RNTuples!", false);
diags.optionalDiag(kWarning, "TFileMerger::MergeRecursive", "Merging RNTuples is experimental", false);
auto res = fileMerger.Merge();
EXPECT_FALSE(res);
}
{
auto nt1 = std::unique_ptr<TFile>(TFile::Open(fileGuard1.GetPath().c_str()));
auto nt2 = std::unique_ptr<TFile>(TFile::Open(fileGuard2.GetPath().c_str()));
TFileMerger fileMerger(kFALSE, kFALSE);
fileMerger.OutputFile(fileGuard3.GetPath().c_str(), "RECREATE");
fileMerger.AddFile(nt1.get());
fileMerger.AddFile(nt2.get());
fileMerger.SetMergeOptions(TString("rntuple.MergingMode=Union"));
CheckDiagsRAII diags;
diags.optionalDiag(kWarning, "TFileMerger::MergeRecursive", "Merging RNTuples is experimental", false);
auto res = fileMerger.Merge();
EXPECT_TRUE(res);
}
}
}
Loading