diff --git a/sources/Launcher/DYNSystematicAnalysisLauncher.cpp b/sources/Launcher/DYNSystematicAnalysisLauncher.cpp index 60ce5a1e..3f74bd8c 100644 --- a/sources/Launcher/DYNSystematicAnalysisLauncher.cpp +++ b/sources/Launcher/DYNSystematicAnalysisLauncher.cpp @@ -63,50 +63,88 @@ static DYN::TraceStream TraceInfo(const std::string& tag = "") { void SystematicAnalysisLauncher::launch() { boost::posix_time::ptime t0 = boost::posix_time::second_clock::local_time(); + boost::shared_ptr scenarios = multipleJobs_->getScenarios(); if (!scenarios) { throw DYNAlgorithmsError(SystematicAnalysisTaskNotFound); } const std::string& baseJobsFile = scenarios->getJobsFile(); const std::vector >& events = scenarios->getScenarios(); + inputs_.readInputs(workingDirectory_, baseJobsFile); - auto& context = multiprocessing::context(); + bool isSingleThread = multiprocessing::context().nbProcs() == 1; + bool isServerThread = !isSingleThread && multiprocessing::context().isRootProc(); + bool isWorkerThread = !isSingleThread && !multiprocessing::context().isRootProc(); - if (context.isRootProc()) { - // only required for root proc - results_.resize(events.size()); + if (isSingleThread || isWorkerThread) { // worker mode : actually run the simulations + int scenId = -1; + while (getNextScenId(scenId, events.size())) { + createWorkingDir(events[scenId]->getId()); + exportResult(launchScenario(events[scenId])); + } + } else { // isServerThread + serverLoop(events.size()); } - multiprocessing::forEach(0, events.size(), [this, &events](unsigned int i){ - std::string workingDir = createAbsolutePath(events[i]->getId(), workingDirectory_); - if (!exists(workingDir)) - createDirectory(workingDir); - else if (!isDirectory(workingDir)) - throw DYNAlgorithmsError(DirectoryDoesNotExist, workingDir); - }); + if (isSingleThread || isServerThread) + collectResults(events); - inputs_.readInputs(workingDirectory_, baseJobsFile); + boost::posix_time::ptime t1 = boost::posix_time::second_clock::local_time(); + boost::posix_time::time_duration diff = t1 - t0; + TraceInfo(logTag_) << DYNAlgorithmsLog(AlgorithmsWallTime, "Systematic analysis", diff.total_milliseconds()/1000) << Trace::endline; +} - multiprocessing::forEach(0, events.size(), [this, &events](unsigned int i){ - auto result = launchScenario(events[i]); - exportResult(result); - }); +void +SystematicAnalysisLauncher::createWorkingDir(const std::string & path) { + std::string workingDir = createAbsolutePath(path, workingDirectory_); + if (!exists(workingDir)) + createDirectory(workingDir); + else if (!isDirectory(workingDir)) + throw DYNAlgorithmsError(DirectoryDoesNotExist, workingDir); +} - multiprocessing::Context::sync(); +bool +SystematicAnalysisLauncher::getNextScenId(int &scenId, int maxId) { + ++scenId; // monothread : simply increment ID - // Update results for root proc - if (context.isRootProc()) { - for (unsigned int i = 0; i < events.size(); i++) { - const auto& scenario = events.at(i); - results_.at(i) = importResult(scenario->getId()); - cleanResult(scenario->getId()); - } +#ifdef _MPI_ + if (multiprocessing::context().nbProcs() > 1) { // multithread : request ID to server (root process) instead, who centralizes incrementations + MPI_Send(nullptr, 0, MPI_INT, 0, 0, MPI_COMM_WORLD); + MPI_Recv(&scenId, 1, MPI_INT, 0, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE); } - boost::posix_time::ptime t1 = boost::posix_time::second_clock::local_time(); - boost::posix_time::time_duration diff = t1 - t0; - TraceInfo(logTag_) << DYNAlgorithmsLog(AlgorithmsWallTime, "Systematic analysis", diff.total_milliseconds()/1000) << Trace::endline; +#endif // _MPI_ + + return scenId < maxId; } +void +SystematicAnalysisLauncher::serverLoop(int maxId) { +#ifdef _MPI_ + std::set workersLeft; + for (int i=1; i < multiprocessing::context().nbProcs(); ++i) + workersLeft.insert(i); + + int scenId = 0; + while (workersLeft.size() > 0) { + MPI_Status senderInfo; + MPI_Recv(nullptr, 0, MPI_INT, MPI_ANY_SOURCE, 0, MPI_COMM_WORLD, &senderInfo); + MPI_Send(&scenId, 1, MPI_INT, senderInfo.MPI_SOURCE, 0, MPI_COMM_WORLD); + if (scenId >= maxId) + workersLeft.erase(senderInfo.MPI_SOURCE); + ++scenId; + } +#endif // _MPI_ +} + +void +SystematicAnalysisLauncher::collectResults(const std::vector >& events) { + results_.resize(events.size()); + for (unsigned int i = 0; i < events.size(); i++) { + const auto& scenario = events.at(i); + results_.at(i) = importResult(scenario->getId()); + cleanResult(scenario->getId()); + } +} SimulationResult SystematicAnalysisLauncher::launchScenario(const boost::shared_ptr& scenario) { if (multiprocessing::context().nbProcs() == 1) diff --git a/sources/Launcher/DYNSystematicAnalysisLauncher.h b/sources/Launcher/DYNSystematicAnalysisLauncher.h index b7679bc1..76d93435 100644 --- a/sources/Launcher/DYNSystematicAnalysisLauncher.h +++ b/sources/Launcher/DYNSystematicAnalysisLauncher.h @@ -57,6 +57,33 @@ class SystematicAnalysisLauncher : public RobustnessAnalysisLauncher { */ SimulationResult launchScenario(const boost::shared_ptr& scenario); + /** + * creates the appropriate working directory for a given scenario, if it does not already exist + * @param path pathname to create + */ + void createWorkingDir(const std::string & path); + + /** + * obtain the ID of the next scenario to simulate, either by incrementing it locally or requesting it remotely + * @param scenId current ID to update + * @param maxId first invalid ID on the scenario list + * @return true if the updated ID is valid and to be processed, false otherwise + */ + bool getNextScenId(int & scenId, int maxId); + + /** + * imports all simulation results from (possibly network) disk + * @param events the list of scenarios whose simulations results are to be collected + */ + void collectResults(const std::vector > & events); + + /** + * distributes scenario IDs by answering requests and incrementing a local counter, returns when all workers + * have been dealt an invalid ID and thus terminated + * @param maxId first invalid ID on the scenario list + */ + void serverLoop(int maxId); + private: std::vector results_; ///< results of the systematic analysis }; diff --git a/util/envDynawoAlgorithms.sh b/util/envDynawoAlgorithms.sh index a67c72f2..ce010ae8 100755 --- a/util/envDynawoAlgorithms.sh +++ b/util/envDynawoAlgorithms.sh @@ -1020,6 +1020,9 @@ launch_CS() { launch_SA() { export_preload if [ "${DYNAWO_USE_MPI}" == "YES" ]; then + if (($NBPROCS > 1)); then + NBPROCS=$((NBPROCS+1)) # n workers (what the user has in mind) + 1 quasi-transparent server + fi "$MPIRUN_PATH" -np $NBPROCS $DYNAWO_ALGORITHMS_INSTALL_DIR/bin/dynawoAlgorithms --simulationType SA $@ else $DYNAWO_ALGORITHMS_INSTALL_DIR/bin/dynawoAlgorithms --simulationType SA $@