23
23
#include < ROOT/RFieldToken.hxx>
24
24
#include < ROOT/RNTuple.hxx>
25
25
#include < ROOT/RNTupleDS.hxx>
26
- #include < ROOT/RNTupleWriter.hxx>
26
+ #include < ROOT/RNTupleFillContext.hxx>
27
+ #include < ROOT/RNTupleParallelWriter.hxx>
27
28
#include < ROOT/RTTreeDS.hxx>
28
29
#include < ROOT/TBufferMerger.hxx>
29
30
@@ -800,9 +801,10 @@ ROOT::Internal::RDF::UntypedSnapshotTTreeHelperMT::MakeNew(void *newName, std::s
800
801
}
801
802
802
803
ROOT::Internal::RDF::UntypedSnapshotRNTupleHelper::UntypedSnapshotRNTupleHelper (
803
- std::string_view filename, std::string_view dirname, std::string_view ntuplename, const ColumnNames_t &vfnames,
804
- const ColumnNames_t &fnames, const RSnapshotOptions &options, ROOT::Detail::RDF::RLoopManager *inputLM,
805
- ROOT::Detail::RDF::RLoopManager *outputLM, const std::vector<const std::type_info *> &colTypeIDs)
804
+ unsigned int nSlots, std::string_view filename, std::string_view dirname, std::string_view ntuplename,
805
+ const ColumnNames_t &vfnames, const ColumnNames_t &fnames, const RSnapshotOptions &options,
806
+ ROOT::Detail::RDF::RLoopManager *inputLM, ROOT::Detail::RDF::RLoopManager *outputLM,
807
+ const std::vector<const std::type_info *> &colTypeIDs)
806
808
: fFileName(filename),
807
809
fDirName(dirname),
808
810
fNTupleName(ntuplename),
@@ -811,6 +813,9 @@ ROOT::Internal::RDF::UntypedSnapshotRNTupleHelper::UntypedSnapshotRNTupleHelper(
811
813
fOutputLoopManager(outputLM),
812
814
fInputFieldNames(vfnames),
813
815
fOutputFieldNames(ReplaceDotWithUnderscore(fnames)),
816
+ fNSlots(nSlots),
817
+ fFillContexts(nSlots),
818
+ fEntries(nSlots),
814
819
fInputColumnTypeIDs(colTypeIDs)
815
820
{
816
821
EnsureValidSnapshotRNTupleOutput (fOptions , fNTupleName , fFileName );
@@ -845,7 +850,6 @@ void ROOT::Internal::RDF::UntypedSnapshotRNTupleHelper::Initialize()
845
850
fFieldTokens [i] = model->GetToken (fOutputFieldNames [i]);
846
851
}
847
852
model->Freeze ();
848
- fOutputEntry = model->CreateBareEntry ();
849
853
850
854
ROOT::RNTupleWriteOptions writeOptions;
851
855
writeOptions.SetCompression (fOptions .fCompressionAlgorithm , fOptions .fCompressionLevel );
@@ -864,20 +868,43 @@ void ROOT::Internal::RDF::UntypedSnapshotRNTupleHelper::Initialize()
864
868
outputDir = fOutputFile ->mkdir (fDirName .c_str ());
865
869
}
866
870
867
- fWriter = ROOT::RNTupleWriter::Append (std::move (model), fNTupleName , *outputDir, writeOptions);
871
+ // The RNTupleParallelWriter has exclusive access to the underlying TFile, no further synchronization is needed for
872
+ // calls to Fill() (in Exec) and FlushCluster (in FinalizeTask).
873
+ fWriter = ROOT::Experimental::RNTupleParallelWriter::Append (std::move (model), fNTupleName , *outputDir, writeOptions);
874
+ }
875
+
876
+ void ROOT::Internal::RDF::UntypedSnapshotRNTupleHelper::InitTask (TTreeReader *, unsigned int slot)
877
+ {
878
+ if (!fFillContexts [slot]) {
879
+ fFillContexts [slot] = fWriter ->CreateFillContext ();
880
+ fEntries [slot] = fFillContexts [slot]->GetModel ().CreateBareEntry ();
881
+ }
868
882
}
869
883
870
- void ROOT::Internal::RDF::UntypedSnapshotRNTupleHelper::Exec (unsigned int /* slot */ , const std::vector<void *> &values)
884
+ void ROOT::Internal::RDF::UntypedSnapshotRNTupleHelper::Exec (unsigned int slot, const std::vector<void *> &values)
871
885
{
886
+ auto &fillContext = fFillContexts [slot];
887
+ auto &outputEntry = fEntries [slot];
872
888
assert (values.size () == fFieldTokens .size ());
873
889
for (decltype (values.size ()) i = 0 ; i < values.size (); i++) {
874
- fOutputEntry ->BindRawPtr (fFieldTokens [i], values[i]);
890
+ outputEntry ->BindRawPtr (fFieldTokens [i], values[i]);
875
891
}
876
- fWriter ->Fill (*fOutputEntry );
892
+ fillContext->Fill (*outputEntry);
893
+ }
894
+
895
+ void ROOT::Internal::RDF::UntypedSnapshotRNTupleHelper::FinalizeTask (unsigned int slot)
896
+ {
897
+ // In principle we would not need to flush a cluster here, but we want to benefit from parallelism for compression.
898
+ // NB: RNTupleFillContext::FlushCluster() is a nop if there is no new entry since the last flush.
899
+ fFillContexts [slot]->FlushCluster ();
877
900
}
878
901
879
902
void ROOT::Internal::RDF::UntypedSnapshotRNTupleHelper::Finalize ()
880
903
{
904
+ // First clear and destroy all entries, which were created from the RNTupleFillContexts.
905
+ fEntries .clear ();
906
+ fFillContexts .clear ();
907
+ // Then destroy the RNTupleParallelWriter and write the metadata.
881
908
fWriter .reset ();
882
909
// We can now set the data source of the loop manager for the RDataFrame that is returned by the Snapshot call.
883
910
fOutputLoopManager ->SetDataSource (std::make_unique<ROOT::RDF::RNTupleDS>(fDirName + " /" + fNTupleName , fFileName ));
@@ -898,7 +925,7 @@ ROOT::Internal::RDF::UntypedSnapshotRNTupleHelper
898
925
ROOT::Internal::RDF::UntypedSnapshotRNTupleHelper::MakeNew (void *newName)
899
926
{
900
927
const std::string finalName = *reinterpret_cast <const std::string *>(newName);
901
- return UntypedSnapshotRNTupleHelper{finalName, fDirName , fNTupleName ,
902
- fInputFieldNames , fOutputFieldNames , fOptions ,
903
- fInputLoopManager , fOutputLoopManager , fInputColumnTypeIDs };
928
+ return UntypedSnapshotRNTupleHelper{
929
+ fNSlots , finalName, fDirName , fNTupleName , fInputFieldNames ,
930
+ fOutputFieldNames , fOptions , fInputLoopManager , fOutputLoopManager , fInputColumnTypeIDs };
904
931
}
0 commit comments