Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 65 additions & 27 deletions sources/Launcher/DYNSystematicAnalysisLauncher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,50 +63,88 @@ static DYN::TraceStream TraceInfo(const std::string& tag = "") {
void
SystematicAnalysisLauncher::launch() {
boost::posix_time::ptime t0 = boost::posix_time::second_clock::local_time();

boost::shared_ptr<Scenarios> scenarios = multipleJobs_->getScenarios();
if (!scenarios) {
throw DYNAlgorithmsError(SystematicAnalysisTaskNotFound);
}
const std::string& baseJobsFile = scenarios->getJobsFile();
const std::vector<boost::shared_ptr<Scenario> >& events = scenarios->getScenarios();
inputs_.readInputs(workingDirectory_, baseJobsFile);

auto& context = multiprocessing::context();
bool isSingleThread = multiprocessing::context().nbProcs() == 1;
bool isServerThread = !isSingleThread && multiprocessing::context().isRootProc();
bool isWorkerThread = !isSingleThread && !multiprocessing::context().isRootProc();

if (context.isRootProc()) {
// only required for root proc
results_.resize(events.size());
if (isSingleThread || isWorkerThread) { // worker mode : actually run the simulations
int scenId = -1;
while (getNextScenId(scenId, events.size())) {
createWorkingDir(events[scenId]->getId());
exportResult(launchScenario(events[scenId]));
}
} else { // isServerThread
serverLoop(events.size());
}

multiprocessing::forEach(0, events.size(), [this, &events](unsigned int i){
std::string workingDir = createAbsolutePath(events[i]->getId(), workingDirectory_);
if (!exists(workingDir))
createDirectory(workingDir);
else if (!isDirectory(workingDir))
throw DYNAlgorithmsError(DirectoryDoesNotExist, workingDir);
});
if (isSingleThread || isServerThread)
collectResults(events);

inputs_.readInputs(workingDirectory_, baseJobsFile);
boost::posix_time::ptime t1 = boost::posix_time::second_clock::local_time();
boost::posix_time::time_duration diff = t1 - t0;
TraceInfo(logTag_) << DYNAlgorithmsLog(AlgorithmsWallTime, "Systematic analysis", diff.total_milliseconds()/1000) << Trace::endline;
}

multiprocessing::forEach(0, events.size(), [this, &events](unsigned int i){
auto result = launchScenario(events[i]);
exportResult(result);
});
void
SystematicAnalysisLauncher::createWorkingDir(const std::string & path) {
std::string workingDir = createAbsolutePath(path, workingDirectory_);
if (!exists(workingDir))
createDirectory(workingDir);
else if (!isDirectory(workingDir))
throw DYNAlgorithmsError(DirectoryDoesNotExist, workingDir);
}

multiprocessing::Context::sync();
bool
SystematicAnalysisLauncher::getNextScenId(int &scenId, int maxId) {
++scenId; // monothread : simply increment ID

// Update results for root proc
if (context.isRootProc()) {
for (unsigned int i = 0; i < events.size(); i++) {
const auto& scenario = events.at(i);
results_.at(i) = importResult(scenario->getId());
cleanResult(scenario->getId());
}
#ifdef _MPI_
if (multiprocessing::context().nbProcs() > 1) { // multithread : request ID to server (root process) instead, who centralizes incrementations
MPI_Send(nullptr, 0, MPI_INT, 0, 0, MPI_COMM_WORLD);
MPI_Recv(&scenId, 1, MPI_INT, 0, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
}
boost::posix_time::ptime t1 = boost::posix_time::second_clock::local_time();
boost::posix_time::time_duration diff = t1 - t0;
TraceInfo(logTag_) << DYNAlgorithmsLog(AlgorithmsWallTime, "Systematic analysis", diff.total_milliseconds()/1000) << Trace::endline;
#endif // _MPI_

return scenId < maxId;
}

void
SystematicAnalysisLauncher::serverLoop(int maxId) {
#ifdef _MPI_
std::set<int> workersLeft;
for (int i=1; i < multiprocessing::context().nbProcs(); ++i)
workersLeft.insert(i);

int scenId = 0;
while (workersLeft.size() > 0) {
MPI_Status senderInfo;
MPI_Recv(nullptr, 0, MPI_INT, MPI_ANY_SOURCE, 0, MPI_COMM_WORLD, &senderInfo);
MPI_Send(&scenId, 1, MPI_INT, senderInfo.MPI_SOURCE, 0, MPI_COMM_WORLD);
if (scenId >= maxId)
workersLeft.erase(senderInfo.MPI_SOURCE);
++scenId;
}
#endif // _MPI_
}

void
SystematicAnalysisLauncher::collectResults(const std::vector<boost::shared_ptr<Scenario> >& events) {
results_.resize(events.size());
for (unsigned int i = 0; i < events.size(); i++) {
const auto& scenario = events.at(i);
results_.at(i) = importResult(scenario->getId());
cleanResult(scenario->getId());
}
}
SimulationResult
SystematicAnalysisLauncher::launchScenario(const boost::shared_ptr<Scenario>& scenario) {
if (multiprocessing::context().nbProcs() == 1)
Expand Down
27 changes: 27 additions & 0 deletions sources/Launcher/DYNSystematicAnalysisLauncher.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,33 @@ class SystematicAnalysisLauncher : public RobustnessAnalysisLauncher {
*/
SimulationResult launchScenario(const boost::shared_ptr<Scenario>& scenario);

/**
* creates the appropriate working directory for a given scenario, if it does not already exist
* @param path pathname to create
*/
void createWorkingDir(const std::string & path);

/**
* obtain the ID of the next scenario to simulate, either by incrementing it locally or requesting it remotely
* @param scenId current ID to update
* @param maxId first invalid ID on the scenario list
* @return true if the updated ID is valid and to be processed, false otherwise
*/
bool getNextScenId(int & scenId, int maxId);

/**
* imports all simulation results from (possibly network) disk
* @param events the list of scenarios whose simulations results are to be collected
*/
void collectResults(const std::vector<boost::shared_ptr<Scenario> > & events);

/**
* distributes scenario IDs by answering requests and incrementing a local counter, returns when all workers
* have been dealt an invalid ID and thus terminated
* @param maxId first invalid ID on the scenario list
*/
void serverLoop(int maxId);

private:
std::vector<SimulationResult> results_; ///< results of the systematic analysis
};
Expand Down
3 changes: 3 additions & 0 deletions util/envDynawoAlgorithms.sh
Original file line number Diff line number Diff line change
Expand Up @@ -1020,6 +1020,9 @@ launch_CS() {
launch_SA() {
export_preload
if [ "${DYNAWO_USE_MPI}" == "YES" ]; then
if (($NBPROCS > 1)); then
NBPROCS=$((NBPROCS+1)) # n workers (what the user has in mind) + 1 quasi-transparent server
fi
"$MPIRUN_PATH" -np $NBPROCS $DYNAWO_ALGORITHMS_INSTALL_DIR/bin/dynawoAlgorithms --simulationType SA $@
else
$DYNAWO_ALGORITHMS_INSTALL_DIR/bin/dynawoAlgorithms --simulationType SA $@
Expand Down
Loading