Skip to content

Commit

Permalink
Improve DQMEDAnalyzer interface and other backward compatible changes…
Browse files Browse the repository at this point in the history
… related to multithreading/standalone-saving.
  • Loading branch information
rovere committed Nov 29, 2013
1 parent 2c5fd11 commit c48c5fb
Show file tree
Hide file tree
Showing 11 changed files with 205 additions and 93 deletions.
9 changes: 7 additions & 2 deletions DQM/HLXMonitor/src/HLXMonitor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -818,8 +818,13 @@ void HLXMonitor::SaveDQMFile(){

for( size_t i = 0, e = systems.size(); i != e; ++i )
if (systems[i] != "Reference")
dbe_->save( tempStreamer.str(), systems[i], "^(Reference/)?([^/]+)", rewrite,
(DQMStore::SaveReferenceTag)saveReference_, saveReferenceQMin_);
// TODO(rovere): fix the saving to comply w/ the DQM-multithread
dbe_->save( tempStreamer.str(),
systems[i],
"^(Reference/)?([^/]+)",
rewrite,
0,
(DQMStore::SaveReferenceTag)saveReference_, saveReferenceQMin_);

//dbe_->save(tempStreamer.str());
}
Expand Down
3 changes: 3 additions & 0 deletions DQMServices/Components/python/DQMFileSaver_cfi.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@
version = cms.untracked.int32(1),
# runIsComplete
runIsComplete = cms.untracked.bool(False),
# Enable MultiThread behaviour, i.e., save only MonitorElements
# indexed by the current Run
enableMultiThread = cms.untracked.bool(False),

# Save file every N lumi sections (-1: disabled)
saveByLumiSection = cms.untracked.int32(-1),
Expand Down
35 changes: 23 additions & 12 deletions DQMServices/Components/src/DQMFileSaver.cc
Original file line number Diff line number Diff line change
Expand Up @@ -72,12 +72,13 @@ DQMFileSaver::saveForOffline(const std::string &workflow, int run, int lumi)
}

dbe_->save(filename,
"",
"^(Reference/)?([^/]+)",
rewrite,
(DQMStore::SaveReferenceTag) saveReference_,
saveReferenceQMin_,
fileUpdate_);
"",
"^(Reference/)?([^/]+)",
rewrite,
enableMultiThread_ ? run : 0,
(DQMStore::SaveReferenceTag) saveReference_,
saveReferenceQMin_,
fileUpdate_);
}
else // save EventInfo folders for luminosity sections
{
Expand All @@ -91,10 +92,12 @@ DQMFileSaver::saveForOffline(const std::string &workflow, int run, int lumi)
dbe_->cd();
std::cout << systems[i] << " " ;
dbe_->save(filename,
systems[i]+"/EventInfo", "^(Reference/)?([^/]+)", rewrite,
DQMStore::SaveWithoutReference,
dqm::qstatus::STATUS_OK,
fileUpdate_);
systems[i]+"/EventInfo", "^(Reference/)?([^/]+)",
rewrite,
enableMultiThread_ ? run : 0,
DQMStore::SaveWithoutReference,
dqm::qstatus::STATUS_OK,
fileUpdate_);
// from now on update newly created file
if (fileUpdate_=="RECREATE") fileUpdate_="UPDATE";
}
Expand Down Expand Up @@ -122,8 +125,15 @@ doSaveForOnline(std::list<std::string> &pastSavedFiles,
DQMStore::SaveReferenceTag saveref,
int saveRefQMin)
{
store->save(filename, directory , rxpat,
rewrite, saveref, saveRefQMin);
// TODO(rovere): fix the online case. so far we simply rely on the
// fact that we assume we will not run multithreaded in online.
store->save(filename,
directory ,
rxpat,
rewrite,
0,
saveref,
saveRefQMin);
pastSavedFiles.push_back(filename);
if (pastSavedFiles.size() > numKeepSavedFiles)
{
Expand Down Expand Up @@ -205,6 +215,7 @@ DQMFileSaver::DQMFileSaver(const edm::ParameterSet &ps)
dirName_ ("."),
version_ (1),
runIsComplete_ (false),
enableMultiThread_(ps.getUntrackedParameter<bool>("enableMultiThread", false)),
saveByLumiSection_ (-1),
saveByEvent_ (-1),
saveByMinute_ (-1),
Expand Down
1 change: 1 addition & 0 deletions DQMServices/Components/src/DQMFileSaver.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ class DQMFileSaver : public edm::EDAnalyzer
std::string dirName_;
int version_;
bool runIsComplete_;
bool enableMultiThread_;

int saveByLumiSection_;
int saveByEvent_;
Expand Down
35 changes: 19 additions & 16 deletions DQMServices/Core/interface/DQMEDAnalyzer.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
//<<<<<< INCLUDES >>>>>>
#include "FWCore/Framework/interface/stream/EDAnalyzer.h"
#include "FWCore/Framework/interface/stream/EDAnalyzerAdaptor.h"
#include "DQMServices/Core/interface/DQMStore.h"
//<<<<<< PUBLIC DEFINES >>>>>>
//<<<<<< PUBLIC CONSTANTS >>>>>>
//<<<<<< PUBLIC TYPES >>>>>>
Expand All @@ -13,41 +14,43 @@

namespace edm {class StreamID;}

namespace dqmDetails {struct NoCache {};}


class DQMEDAnalyzer
: public edm::stream::EDAnalyzer<edm::RunSummaryCache<int>,
edm::LuminosityBlockSummaryCache<int> >
: public edm::stream::EDAnalyzer<edm::RunSummaryCache<dqmDetails::NoCache>,
edm::LuminosityBlockSummaryCache<dqmDetails::NoCache> >
{
public:
public:
DQMEDAnalyzer(void);
// implicit copy constructor
// implicit assignment operator
// implicit destructor
virtual void beginStream(edm::StreamID id) final;
static std::shared_ptr<int> globalBeginRunSummary(edm::Run const&,
edm::EventSetup const&,
RunContext const*);
virtual void beginRun(edm::Run const &, edm::EventSetup const&) final;
static std::shared_ptr<dqmDetails::NoCache> globalBeginRunSummary(edm::Run const&,
edm::EventSetup const&,
RunContext const*);
virtual void endRunSummary(edm::Run const&,
edm::EventSetup const&,
int*) const = 0;
dqmDetails::NoCache*) const final;
static void globalEndRunSummary(edm::Run const&,
edm::EventSetup const&,
RunContext const*,
int*);
static std::shared_ptr<int> globalBeginLuminosityBlockSummary(edm::LuminosityBlock const&,
edm::EventSetup const&,
LuminosityBlockContext const*);
dqmDetails::NoCache*);
static std::shared_ptr<dqmDetails::NoCache> globalBeginLuminosityBlockSummary(edm::LuminosityBlock const&,
edm::EventSetup const&,
LuminosityBlockContext const*);
virtual void endLuminosityBlockSummary(edm::LuminosityBlock const&,
edm::EventSetup const&,
int*) const = 0;
dqmDetails::NoCache*) const final;
static void globalEndLuminosityBlockSummary(edm::LuminosityBlock const&,
edm::EventSetup const&,
LuminosityBlockContext const*,
int*);
dqmDetails::NoCache*);
uint32_t streamId() const {return stream_id_;}
virtual void bookHistograms(edm::Run const&,
uint32_t streamId,
uint32_t moduleId) = 0;
virtual void dqmBeginRun(edm::Run const&, edm::EventSetup const&) {}
virtual void bookHistograms(DQMStore::IBooker &) = 0;

private:
uint32_t stream_id_;
Expand Down
1 change: 1 addition & 0 deletions DQMServices/Core/interface/DQMStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,7 @@ class DQMStore
const std::string &path = "",
const std::string &pattern = "",
const std::string &rewrite = "",
const uint32_t run = 0,
SaveReferenceTag ref = SaveWithReference,
int minStatus = dqm::qstatus::STATUS_OK,
const std::string &fileupdate = "RECREATE");
Expand Down
5 changes: 5 additions & 0 deletions DQMServices/Core/interface/MonitorElement.h
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,11 @@ class MonitorElement

const uint32_t getTag(void) const
{ return data_.tag; }

const uint32_t run(void) const {return data_.run;}
const uint32_t lumi(void) const {return data_.lumi;}
const uint32_t streamId(void) const {return data_.streamId;}
const uint32_t moduleId(void) const {return data_.moduleId;}
};

#endif // DQMSERVICES_CORE_MONITOR_ELEMENT_H
Expand Down
35 changes: 31 additions & 4 deletions DQMServices/Core/python/test/dqm_testMultiThread_cfg.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,46 @@
process = cms.Process("DQMMULTITHREAD")
process.load("DQMServices.Core.DQM_cfg")

### standard MessageLoggerConfiguration
process.load("FWCore.MessageService.MessageLogger_cfi")
process.MessageLogger.debugModules.append('*')
process.MessageLogger.categories.append("DQMEDAnalyzer")
process.MessageLogger.cout = cms.untracked.PSet(
threshold = cms.untracked.string('DEBUG'),
default = cms.untracked.PSet( limit = cms.untracked.int32(-1) ))

process.maxEvents = cms.untracked.PSet(
input = cms.untracked.int32(20)
input = cms.untracked.int32(200)
)

process.source = cms.Source("EmptySource",
numberEventsInRun = cms.untracked.uint32(5),
numberEventsInRun = cms.untracked.uint32(50),
firstLuminosityBlock = cms.untracked.uint32(1),
firstEvent = cms.untracked.uint32(1),
firstRun = cms.untracked.uint32(1),
numberEventsInLuminosityBlock = cms.untracked.uint32(1))

process.dqm_multi_thread = cms.EDAnalyzer("DQMTestMultiThread")
process.load("DQMServices.Components.DQMFileSaver_cfi")
process.dqmSaver.saveByRun = cms.untracked.int32(1)
process.dqmSaver.workflow = cms.untracked.string("/My/Test/Workflow")
process.dqmSaver.enableMultiThread = cms.untracked.bool(True)

process.load("DQMServices.Components.DQMStoreStats_cfi")

process.dqm_multi_thread_a = cms.EDAnalyzer("DQMTestMultiThread",
folder = cms.untracked.string("A_Folder/Module"))
process.dqm_multi_thread_b = cms.EDAnalyzer("DQMTestMultiThread",
folder = cms.untracked.string("B_Folder/Module"))

process.p = cms.Path(process.dqm_multi_thread_a
* process.dqm_multi_thread_b
* process.dqmStoreStats
* process.dqmSaver)

process.options = cms.untracked.PSet(
numberOfStreams = cms.untracked.uint32( 5 ),
numberOfThreads = cms.untracked.uint32( 5 ),
)

process.p = cms.Path(process.dqm_multi_thread)

#process.Tracer = cms.Service('Tracer')
63 changes: 57 additions & 6 deletions DQMServices/Core/src/DQMEDAnalyzer.cc
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
#include "DQMServices/Core/interface/DQMEDAnalyzer.h"
#include "DQMServices/Core/interface/DQMStore.h"
#include "FWCore/ServiceRegistry/interface/Service.h"
#include "FWCore/Framework/interface/Run.h"
#include "FWCore/Framework/interface/LuminosityBlock.h"
#include "FWCore/ServiceRegistry/interface/ModuleCallingContext.h"
#include "FWCore/Utilities/interface/StreamID.h"
#include "FWCore/MessageLogger/interface/MessageLogger.h"

DQMEDAnalyzer::DQMEDAnalyzer() {}

Expand All @@ -8,31 +14,76 @@ void DQMEDAnalyzer::beginStream(edm::StreamID id)
stream_id_ = id.value();
}

std::shared_ptr<int>
void DQMEDAnalyzer::beginRun(edm::Run const &iRun,
edm::EventSetup const &iSetup) {
DQMStore * store = edm::Service<DQMStore>().operator->();
store->bookTransaction([this](DQMStore::IBooker &b) {
this->bookHistograms(b);
},
iRun.run(),
streamId(),
iRun.moduleCallingContext()->moduleDescription()->id());
dqmBeginRun(iRun, iSetup);
}


std::shared_ptr<dqmDetails::NoCache>
DQMEDAnalyzer::globalBeginRunSummary(edm::Run const&,
edm::EventSetup const&,
RunContext const*)
{
return 0;
return nullptr;
}

void DQMEDAnalyzer::endLuminosityBlockSummary(edm::LuminosityBlock const &iLumi ,
edm::EventSetup const &iSetup,
dqmDetails::NoCache*) const {
DQMStore * store = edm::Service<DQMStore>().operator->();
assert(store);
LogDebug("DQMEDAnalyzer") << "Merging Lumi local MEs ("
<< iLumi.run() << ", "
<< iLumi.id().luminosityBlock() << ", "
<< stream_id_ << ", "
<< iLumi.moduleCallingContext()->moduleDescription()->id()
<< ") into the DQMStore@" << store << std::endl;
store->mergeAndResetMEsLuminositySummaryCache(iLumi.run(),
iLumi.id().luminosityBlock(),
stream_id_,
iLumi.moduleCallingContext()->moduleDescription()->id());
}

void DQMEDAnalyzer::endRunSummary(edm::Run const &iRun ,
edm::EventSetup const &iSetup,
dqmDetails::NoCache*) const {
DQMStore * store = edm::Service<DQMStore>().operator->();
assert(store);
LogDebug("DQMEDAnalyzer") << "Merging Run local MEs ("
<< iRun.run() << ", "
<< stream_id_ << ", "
<< iRun.moduleCallingContext()->moduleDescription()->id()
<< ") into the DQMStore@" << store << std::endl;
store->mergeAndResetMEsRunSummaryCache(iRun.run(),
stream_id_,
iRun.moduleCallingContext()->moduleDescription()->id());
}

void DQMEDAnalyzer::globalEndRunSummary(edm::Run const&,
edm::EventSetup const&,
RunContext const*,
int*)
dqmDetails::NoCache*)
{}

std::shared_ptr<int>
std::shared_ptr<dqmDetails::NoCache>
DQMEDAnalyzer::globalBeginLuminosityBlockSummary(edm::LuminosityBlock const&,
edm::EventSetup const&,
LuminosityBlockContext const*)
{
return 0;
return nullptr;
}

void DQMEDAnalyzer::globalEndLuminosityBlockSummary(edm::LuminosityBlock const&,
edm::EventSetup const&,
LuminosityBlockContext const*,
int*)
dqmDetails::NoCache*)
{}

20 changes: 19 additions & 1 deletion DQMServices/Core/src/DQMStore.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2267,6 +2267,7 @@ DQMStore::save(const std::string &filename,
const std::string &path /* = "" */,
const std::string &pattern /* = "" */,
const std::string &rewrite /* = "" */,
const uint32_t run /* = 0 */,
SaveReferenceTag ref /* = SaveWithReference */,
int minStatus /* = dqm::qstatus::STATUS_OK */,
const std::string &fileupdate /* = RECREATE */)
Expand Down Expand Up @@ -2323,14 +2324,31 @@ DQMStore::save(const std::string &filename,
continue;

// Loop over monitor elements in this directory.
MonitorElement proto(&*di, std::string());
MonitorElement proto(&*di, std::string(), run, 0, 0);
mi = data_.lower_bound(proto);
std::cout <<"DQMStore::save() " << run << " " << *di << std::endl;
for ( ; mi != me && isSubdirectory(*di, *mi->data_.dirname); ++mi)
{
if (verbose_ > 1)
std::cout << "Run: " << (*mi).run()
<< " Lumi: " << (*mi).lumi()
<< " LumiFlag: " << (*mi).getLumiFlag()
<< " streamId: " << (*mi).streamId()
<< " moduleId: " << (*mi).moduleId()
<< " fullpathname: " << (*mi).getPathname() << std::endl;
// Skip if it isn't a direct child.
if (*di != *mi->data_.dirname)
continue;

// Keep backward compatibility with the old way of
// booking/handlind MonitorElements into the DQMStore. If run is
// 0 it means that a booking happened w/ the old non-threadsafe
// style, and we have to ignore the streamId and moduleId as a
// consequence.

if (run != 0 && (mi->data_.streamId !=0 || mi->data_.moduleId !=0))
continue;

// Handle reference histograms, with three distinct cases:
// 1) Skip all references entirely on saving.
// 2) Blanket saving of all references.
Expand Down
Loading

0 comments on commit c48c5fb

Please sign in to comment.