Skip to content

Commit 27bec5f

Browse files
authored
Merge pull request cms-sw#43368 from smorovic/master-per-thread-microstate
DAQ - add per-tbb-thread microstate (14_0_X)
2 parents 890ce42 + b5d1cf2 commit 27bec5f

14 files changed

+332
-294
lines changed

EventFilter/Utilities/interface/FastMonitor.h

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -77,13 +77,12 @@ namespace jsoncollector {
7777
std::string getCSVString(int sid = -1);
7878

7979
//fastpath file output
80-
void outputCSV(std::string const& path, std::string const& csvString);
80+
void outputCSV(std::string const& path, std::vector<std::string> const& csvString);
8181

8282
//provide merged variable back to user
8383
JsonMonitorable* getMergedIntJForLumi(std::string const& name, unsigned int forLumi);
8484

8585
// merges and outputs everything collected for the given stream to JSON file
86-
bool outputFullJSONs(std::string const& pathstem, std::string const& ext, unsigned int lumi, bool output = true);
8786
bool outputFullJSON(std::string const& path, unsigned int lumi, bool output = true);
8887

8988
//discard what was collected for a lumisection

EventFilter/Utilities/interface/FastMonitoringService.h

Lines changed: 39 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33

44
#include "FWCore/ParameterSet/interface/ParameterSet.h"
55
#include "FWCore/ServiceRegistry/interface/ActivityRegistry.h"
6+
#include "FWCore/ServiceRegistry/interface/StreamContext.h"
67
#include "DataFormats/Provenance/interface/EventID.h"
78
#include "DataFormats/Provenance/interface/LuminosityBlockID.h"
89
#include "DataFormats/Provenance/interface/Timestamp.h"
@@ -20,6 +21,8 @@
2021
#include <queue>
2122
#include <sstream>
2223
#include <unordered_map>
24+
#include "oneapi/tbb/task_arena.h"
25+
#include "oneapi/tbb/task_scheduler_observer.h"
2326

2427
/*Description
2528
this is an evolution of the MicroStateService intended to be run in standalone multi-threaded cmsRun jobs
@@ -28,9 +31,6 @@
2831
moduledesc pointer to key into the map instead and no string or string pointers are used for the microstates.
2932
Only a pointer value is stored using relaxed ordering at the time of module execution which is fast.
3033
At snapshot time only (every few seconds) we do the map lookup to produce snapshot.
31-
Path names use a similar logic. However path names are not accessible in the same way as later so they need to be
32-
when starting to run associated to the memory location of path name strings as accessible when path is executed.
33-
Path intermediate info will be called "ministate" :D
3434
The general counters and status variables (event number, number of processed events, number of passed and stored
3535
events, luminosity section etc.) are also monitored here.
3636
@@ -47,7 +47,10 @@ namespace edm {
4747

4848
namespace evf {
4949

50+
template <typename T>
51+
struct ContainableAtomic;
5052
class FastMonitoringThread;
53+
class ConcurrencyTracker;
5154

5255
namespace FastMonState {
5356

@@ -62,7 +65,11 @@ namespace evf {
6265
mBoL,
6366
mEoL,
6467
mGlobEoL,
65-
mCOUNT
68+
mFwk,
69+
mIdleSource,
70+
mEvent,
71+
mIgnore,
72+
mCOUNT,
6673
};
6774

6875
enum Macrostate {
@@ -153,19 +160,21 @@ namespace evf {
153160
};
154161
} // namespace FastMonState
155162

163+
constexpr int nSpecialModules = FastMonState::mCOUNT;
164+
//reserve output module space
165+
constexpr int nReservedModules = 128;
166+
156167
class FastMonitoringService : public MicroStateService {
157168
public:
158169
// the names of the states - some of them are never reached in an online app
159-
static const edm::ModuleDescription reservedMicroStateNames[FastMonState::mCOUNT];
170+
static const edm::ModuleDescription specialMicroStateNames[FastMonState::mCOUNT];
160171
static const std::string macroStateNames[FastMonState::MCOUNT];
161172
static const std::string inputStateNames[FastMonState::inCOUNT];
162173
// Reserved names for microstates
163-
static const std::string nopath_;
164174
FastMonitoringService(const edm::ParameterSet&, edm::ActivityRegistry&);
165175
~FastMonitoringService() override;
166176
static void fillDescriptions(edm::ConfigurationDescriptions& descriptions);
167177

168-
std::string makePathLegendaJson();
169178
std::string makeModuleLegendaJson();
170179
std::string makeInputLegendaJson();
171180

@@ -200,10 +209,6 @@ namespace evf {
200209
void preSourceEarlyTermination(edm::TerminationOrigin);
201210
void setExceptionDetected(unsigned int ls);
202211

203-
//this is still needed for use in special functions like DQM which are in turn framework services
204-
void setMicroState(FastMonState::Microstate);
205-
void setMicroState(edm::StreamID, FastMonState::Microstate);
206-
207212
void accumulateFileSize(unsigned int lumi, unsigned long fileSize);
208213
void startedLookingForFile();
209214
void stoppedLookingForFile(unsigned int lumi);
@@ -223,29 +228,40 @@ namespace evf {
223228
void setInputSource(DAQSource* inputSource) { daqInputSource_ = inputSource; }
224229
void setInState(FastMonState::InputState inputState) { inputState_ = inputState; }
225230
void setInStateSup(FastMonState::InputState inputState) { inputSupervisorState_ = inputState; }
231+
//available for other modules
232+
void setTMicrostate(FastMonState::Microstate m);
233+
234+
static unsigned int getTID() { return tbb::this_task_arena::current_thread_index(); }
226235

227236
private:
228237
void doSnapshot(const unsigned int ls, const bool isGlobalEOL);
229238

230239
void snapshotRunner();
231240

241+
static unsigned int getSID(edm::StreamContext const& sc) { return sc.streamID().value(); }
242+
243+
static unsigned int getSID(edm::StreamID const& sid) { return sid.value(); }
244+
232245
//the actual monitoring thread is held by a separate class object for ease of maintenance
233-
std::shared_ptr<FastMonitoringThread> fmt_;
246+
std::unique_ptr<FastMonitoringThread> fmt_;
247+
std::unique_ptr<ConcurrencyTracker> ct_;
234248
//Encoding encModule_;
235249
//std::vector<Encoding> encPath_;
236250
FedRawDataInputSource* inputSource_ = nullptr;
237251
DAQSource* daqInputSource_ = nullptr;
238252
std::atomic<FastMonState::InputState> inputState_{FastMonState::InputState::inInit};
239253
std::atomic<FastMonState::InputState> inputSupervisorState_{FastMonState::InputState::inInit};
240254

241-
unsigned int nStreams_;
242-
unsigned int nThreads_;
255+
unsigned int nStreams_ = 0;
256+
unsigned int nMonThreads_ = 0;
257+
unsigned int nThreads_ = 0;
258+
bool tbbMonitoringMode_;
259+
bool tbbConcurrencyTracker_;
243260
int sleepTime_;
244261
unsigned int fastMonIntervals_;
245262
unsigned int snapCounter_ = 0;
246263
std::string microstateDefPath_, fastMicrostateDefPath_;
247-
std::string fastName_, fastPath_, slowName_;
248-
bool filePerFwkStream_;
264+
std::string fastName_, fastPath_;
249265

250266
//variables that are used by/monitored by FastMonitoringThread / FastMonitor
251267

@@ -272,9 +288,6 @@ namespace evf {
272288
//to disable this behavior, set #ATOMIC_LEVEL 0 or 1 in DataPoint.h
273289
std::vector<std::atomic<bool>*> streamCounterUpdating_;
274290

275-
std::vector<std::atomic<bool>*> collectedPathList_;
276-
std::vector<bool> pathNamesReady_;
277-
278291
std::filesystem::path workingDirectory_, runDirectory_;
279292

280293
bool threadIDAvailable_ = false;
@@ -283,8 +296,6 @@ namespace evf {
283296

284297
std::string moduleLegendFile_;
285298
std::string moduleLegendFileJson_;
286-
std::string pathLegendFile_;
287-
std::string pathLegendFileJson_;
288299
std::string inputLegendFileJson_;
289300
unsigned int nOutputModules_ = 0;
290301

@@ -293,7 +304,13 @@ namespace evf {
293304
std::atomic<bool> has_source_exception_ = false;
294305
std::atomic<bool> has_data_exception_ = false;
295306
std::vector<unsigned int> exceptionInLS_;
296-
std::vector<std::string> fastPathList_;
307+
308+
//per stream
309+
std::vector<ContainableAtomic<const void*>> microstate_;
310+
std::vector<ContainableAtomic<unsigned char>> microstateAcqFlag_;
311+
//per thread
312+
std::vector<ContainableAtomic<const void*>> tmicrostate_;
313+
std::vector<ContainableAtomic<unsigned char>> tmicrostateAcqFlag_;
297314

298315
bool verbose_ = false;
299316
};

EventFilter/Utilities/interface/FastMonitoringThread.h

Lines changed: 23 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -13,13 +13,9 @@
1313

1414
namespace evf {
1515

16-
constexpr int nReservedModules = 128;
17-
constexpr int nSpecialModules = 10;
18-
constexpr int nReservedPaths = 1;
19-
20-
namespace FastMonState {
21-
enum Macrostate;
22-
}
16+
//namespace FastMonState {
17+
// enum Macrostate;
18+
//}
2319

2420
class FastMonitoringService;
2521

@@ -131,31 +127,23 @@ namespace evf {
131127
unsigned int varIndexThrougput_;
132128

133129
//per stream
130+
std::vector<unsigned int> tmicrostateEncoded_;
134131
std::vector<unsigned int> microstateEncoded_;
135-
std::vector<unsigned int> ministateEncoded_;
136132
std::vector<jsoncollector::AtomicMonUInt*> processed_;
137133
jsoncollector::IntJ fastPathProcessedJ_;
138-
std::vector<unsigned int> threadMicrostateEncoded_;
139134
std::vector<unsigned int> inputState_;
140135

141136
//tracking luminosity of a stream
142137
std::vector<unsigned int> streamLumi_;
143138

144139
//N bins for histograms
145140
unsigned int macrostateBins_;
146-
unsigned int ministateBins_;
147141
unsigned int microstateBins_;
148142
unsigned int inputstateBins_;
149143

150144
//global state
151145
std::atomic<FastMonState::Macrostate> macrostate_;
152146

153-
//per stream
154-
std::vector<ContainableAtomic<const std::string*>> ministate_;
155-
std::vector<ContainableAtomic<const void*>> microstate_;
156-
std::vector<ContainableAtomic<unsigned char>> microstateAcqFlag_;
157-
std::vector<ContainableAtomic<const void*>> threadMicrostate_;
158-
159147
FastMonEncoding encModule_;
160148
std::vector<FastMonEncoding> encPath_;
161149

@@ -180,7 +168,10 @@ namespace evf {
180168
}
181169

182170
//to be called after fast monitor is constructed
183-
void registerVariables(jsoncollector::FastMonitor* fm, unsigned int nStreams, unsigned int nThreads) {
171+
void registerVariables(jsoncollector::FastMonitor* fm,
172+
unsigned nMaxSlices,
173+
unsigned nMaxStreams,
174+
unsigned nMaxThreads) {
184175
//tell FM to track these global variables(for fast and slow monitoring)
185176
fm->registerGlobalMonitorable(&fastMacrostateJ_, true, &macrostateBins_);
186177
fm->registerGlobalMonitorable(&fastThroughputJ_, false);
@@ -189,27 +180,30 @@ namespace evf {
189180
fm->registerGlobalMonitorable(&fastLockWaitJ_, false);
190181
fm->registerGlobalMonitorable(&fastLockCountJ_, false);
191182

192-
for (unsigned int i = 0; i < nStreams; i++) {
183+
for (unsigned int i = 0; i < nMaxSlices; i++) {
193184
jsoncollector::AtomicMonUInt* p = new jsoncollector::AtomicMonUInt;
194185
*p = 0;
195186
processed_.push_back(p);
196187
streamLumi_.push_back(0);
197188
}
198189

199-
microstateEncoded_.resize(nStreams);
200-
ministateEncoded_.resize(nStreams);
201-
threadMicrostateEncoded_.resize(nThreads);
202-
inputState_.resize(nStreams);
203-
for (unsigned int j = 0; j < inputState_.size(); j++)
204-
inputState_[j] = 0;
190+
tmicrostateEncoded_.resize(nMaxSlices, FastMonState::mInvalid);
191+
for (unsigned int i = nMaxThreads; i < nMaxSlices; i++) {
192+
tmicrostateEncoded_[i] = FastMonState::mIgnore;
193+
}
194+
microstateEncoded_.resize(nMaxSlices, FastMonState::mInvalid);
195+
inputState_.resize(nMaxSlices, FastMonState::inInit);
196+
for (unsigned int i = nMaxStreams; i < nMaxSlices; i++) {
197+
microstateEncoded_[i] = FastMonState::mIgnore;
198+
inputState_[i] = FastMonState::inIgnore;
199+
}
200+
//for (unsigned int j = 0; j < nMaxStreams; j++)
201+
// inputState_[j] = 0;
205202

206203
//tell FM to track these int vectors
207-
fm->registerStreamMonitorableUIntVec("Ministate", &ministateEncoded_, true, &ministateBins_);
204+
fm->registerStreamMonitorableUIntVec("tMicrostate", &tmicrostateEncoded_, true, &microstateBins_);
208205

209-
if (nThreads <= nStreams) //no overlapping in module execution per stream
210-
fm->registerStreamMonitorableUIntVec("Microstate", &microstateEncoded_, true, &microstateBins_);
211-
else
212-
fm->registerStreamMonitorableUIntVec("Microstate", &threadMicrostateEncoded_, true, &microstateBins_);
206+
fm->registerStreamMonitorableUIntVec("Microstate", &microstateEncoded_, true, &microstateBins_);
213207

214208
fm->registerStreamMonitorableUIntVecAtomic("Processed", &processed_, false, nullptr);
215209

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
#ifndef EventFilter_Utilities_SourceCommon_h
2+
#define EventFilter_Utilities_SourceCommon_h
3+
4+
/*
5+
* This header will host common definitions used by FedRawDataInputSource and DAQSource
6+
* */
7+
8+
#include "EventFilter/Utilities/interface/FastMonitoringService.h"
9+
10+
class IdleSourceSentry {
11+
public:
12+
IdleSourceSentry(evf::FastMonitoringService* fms) : fms_(fms) {
13+
if (fms_)
14+
fms_->setTMicrostate(evf::FastMonState::mIdleSource);
15+
}
16+
~IdleSourceSentry() {
17+
if (fms_)
18+
fms_->setTMicrostate(evf::FastMonState::mIdle);
19+
}
20+
21+
private:
22+
evf::FastMonitoringService* fms_;
23+
};
24+
25+
#endif

EventFilter/Utilities/plugins/microstatedef.jsd

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
"operation" : "histo"
66
},
77
{
8-
"name" : "Ministate",
8+
"name" : "tMicrostate",
99
"operation" : "histo"
1010
},
1111
{

EventFilter/Utilities/plugins/microstatedeffast.jsd

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
"operation" : "histo"
66
},
77
{
8-
"name" : "Ministate",
8+
"name" : "tMicrostate",
99
"operation" : "histo"
1010
},
1111
{

EventFilter/Utilities/src/DAQSource.cc

Lines changed: 24 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
#include "DataFormats/Provenance/interface/EventID.h"
2020
#include "DataFormats/Provenance/interface/Timestamp.h"
2121

22-
#include "EventFilter/Utilities/interface/FastMonitoringService.h"
22+
#include "EventFilter/Utilities/interface/SourceCommon.h"
2323
#include "EventFilter/Utilities/interface/DataPointDefinition.h"
2424
#include "EventFilter/Utilities/interface/FFFNamingSchema.h"
2525
#include "EventFilter/Utilities/interface/crc32c.h"
@@ -373,11 +373,14 @@ evf::EvFDaqDirector::FileStatus DAQSource::getNextDataBlock() {
373373
if (!currentFile_.get()) {
374374
evf::EvFDaqDirector::FileStatus status = evf::EvFDaqDirector::noFile;
375375
setMonState(inWaitInput);
376-
if (!fileQueue_.try_pop(currentFile_)) {
377-
//sleep until wakeup (only in single-buffer mode) or timeout
378-
std::unique_lock<std::mutex> lkw(mWakeup_);
379-
if (cvWakeup_.wait_for(lkw, std::chrono::milliseconds(100)) == std::cv_status::timeout || !currentFile_.get())
380-
return evf::EvFDaqDirector::noFile;
376+
{
377+
IdleSourceSentry ids(fms_);
378+
if (!fileQueue_.try_pop(currentFile_)) {
379+
//sleep until wakeup (only in single-buffer mode) or timeout
380+
std::unique_lock<std::mutex> lkw(mWakeup_);
381+
if (cvWakeup_.wait_for(lkw, std::chrono::milliseconds(100)) == std::cv_status::timeout || !currentFile_.get())
382+
return evf::EvFDaqDirector::noFile;
383+
}
381384
}
382385
status = currentFile_->status_;
383386
if (status == evf::EvFDaqDirector::runEnded) {
@@ -468,10 +471,13 @@ evf::EvFDaqDirector::FileStatus DAQSource::getNextDataBlock() {
468471
//multibuffer mode
469472
//wait for the current chunk to become added to the vector
470473
setMonState(inWaitChunk);
471-
while (!currentFile_->waitForChunk(currentFile_->currentChunk_)) {
472-
usleep(10000);
473-
if (setExceptionState_)
474-
threadError();
474+
{
475+
IdleSourceSentry ids(fms_);
476+
while (!currentFile_->waitForChunk(currentFile_->currentChunk_)) {
477+
usleep(10000);
478+
if (setExceptionState_)
479+
threadError();
480+
}
475481
}
476482
setMonState(inChunkReceived);
477483

@@ -508,13 +514,14 @@ evf::EvFDaqDirector::FileStatus DAQSource::getNextDataBlock() {
508514
currentFile_->rewindChunk(dataMode_->headerSize());
509515

510516
setMonState(inWaitChunk);
511-
512-
//do the copy to the beginning of the starting chunk. move pointers for next event in the next chunk
513-
chunkEnd = currentFile_->advance(dataPosition, dataMode_->headerSize() + msgSize);
514-
assert(chunkEnd);
515-
//mark to release old chunk
516-
chunkIsFree_ = true;
517-
517+
{
518+
IdleSourceSentry ids(fms_);
519+
//do the copy to the beginning of the starting chunk. move pointers for next event in the next chunk
520+
chunkEnd = currentFile_->advance(dataPosition, dataMode_->headerSize() + msgSize);
521+
assert(chunkEnd);
522+
//mark to release old chunk
523+
chunkIsFree_ = true;
524+
}
518525
setMonState(inChunkReceived);
519526
//header and payload is moved, update view
520527
dataMode_->makeDataBlockView(

0 commit comments

Comments
 (0)