From 2264a1208bfbd656e093908dac9c40f13daba61d Mon Sep 17 00:00:00 2001 From: Xin Yong Date: Thu, 27 Jan 2022 21:41:30 +0800 Subject: [PATCH 1/3] include successrate in loadgen --- .gitmodules | 2 +- loadgen/bindings/c_api.cc | 9 +- loadgen/bindings/c_api.h | 2 + loadgen/bindings/python_api.cc | 25 ++- loadgen/loadgen.cc | 371 ++++++++++++--------------------- loadgen/system_under_test.h | 7 + 6 files changed, 168 insertions(+), 248 deletions(-) diff --git a/.gitmodules b/.gitmodules index caa2d250c5..e6d02d44d0 100644 --- a/.gitmodules +++ b/.gitmodules @@ -12,4 +12,4 @@ url = https://github.com/MIC-DKFZ/nnUNet.git [submodule "tools/submission/power-dev"] path = tools/submission/power-dev - url = ../power-dev + url = https://github.com/mlcommons/power-dev.git diff --git a/loadgen/bindings/c_api.cc b/loadgen/bindings/c_api.cc index cb653c3a78..4b35971c44 100644 --- a/loadgen/bindings/c_api.cc +++ b/loadgen/bindings/c_api.cc @@ -30,11 +30,13 @@ class SystemUnderTestTrampoline : public SystemUnderTest { SystemUnderTestTrampoline( ClientData client_data, std::string name, IssueQueryCallback issue_cb, FlushQueriesCallback flush_queries_cb, + UserConstraintsMetCallback user_constraints_met_cb, ReportLatencyResultsCallback report_latency_results_cb) : client_data_(client_data), name_(std::move(name)), issue_cb_(issue_cb), flush_queries_cb_(flush_queries_cb), + user_constraints_met_cb_(user_constraints_met_cb), report_latency_results_cb_(report_latency_results_cb) {} ~SystemUnderTestTrampoline() override = default; @@ -46,6 +48,8 @@ class SystemUnderTestTrampoline : public SystemUnderTest { void FlushQueries() override { (*flush_queries_cb_)(); } + bool UserConstraintsMet() override { (*user_constraints_met_cb_)(); } + void ReportLatencyResults( const std::vector& latencies_ns) override { (*report_latency_results_cb_)(client_data_, latencies_ns.data(), @@ -57,6 +61,7 @@ class SystemUnderTestTrampoline : public SystemUnderTest { std::string name_; IssueQueryCallback issue_cb_; FlushQueriesCallback flush_queries_cb_; + UserConstraintsMetCallback user_constraints_met_cb_; ReportLatencyResultsCallback report_latency_results_cb_; }; @@ -65,10 +70,10 @@ class SystemUnderTestTrampoline : public SystemUnderTest { void* ConstructSUT(ClientData client_data, const char* name, size_t name_length, IssueQueryCallback issue_cb, FlushQueriesCallback flush_queries_cb, + UserConstraintsMetCallback user_constraints_met_cb, ReportLatencyResultsCallback report_latency_results_cb) { SystemUnderTestTrampoline* sut = new SystemUnderTestTrampoline( - client_data, std::string(name, name_length), issue_cb, flush_queries_cb, - report_latency_results_cb); + client_data, std::string(name, name_length), issue_cb, flush_queries_cb, user_constraints_met_cb, report_latency_results_cb); return reinterpret_cast(sut); } diff --git a/loadgen/bindings/c_api.h b/loadgen/bindings/c_api.h index af6d5b38c9..64b5d5e32d 100644 --- a/loadgen/bindings/c_api.h +++ b/loadgen/bindings/c_api.h @@ -38,6 +38,7 @@ typedef uintptr_t ClientData; typedef void (*IssueQueryCallback)(ClientData, const QuerySample*, size_t); typedef void (*FlushQueriesCallback)(); +typedef bool (*UserConstraintsMetCallback)(); typedef void (*ReportLatencyResultsCallback)(ClientData, const int64_t*, size_t); typedef void (*ResponseCallback)(ClientData, QuerySampleResponse*); @@ -54,6 +55,7 @@ void QuerySamplesCompleteResponseCb(QuerySampleResponse* responses, /// \brief Create an opaque SUT pointer based on C callbacks. void* ConstructSUT(ClientData client_data, const char* name, size_t name_length, IssueQueryCallback issue_cb, + UserConstraintsMetCallback user_constraints_met_cb, FlushQueriesCallback flush_queries_cb, ReportLatencyResultsCallback report_latency_results_cb); /// \brief Destroys the SUT created by ConstructSUT. diff --git a/loadgen/bindings/python_api.cc b/loadgen/bindings/python_api.cc index 2bcf257e51..686e3110ad 100644 --- a/loadgen/bindings/python_api.cc +++ b/loadgen/bindings/python_api.cc @@ -36,6 +36,7 @@ using IssueQueryCallback = std::function)>; using FastIssueQueriesCallback = std::function, std::vector)>; using FlushQueriesCallback = std::function; +using UserConstraintsMetCallback = std::function; using ReportLatencyResultsCallback = std::function)>; // Forwards SystemUnderTest calls to relevant callbacks. @@ -44,10 +45,12 @@ class SystemUnderTestTrampoline : public SystemUnderTest { SystemUnderTestTrampoline( std::string name, IssueQueryCallback issue_cb, FlushQueriesCallback flush_queries_cb, + UserConstraintsMetCallback user_constraints_met_cb, ReportLatencyResultsCallback report_latency_results_cb) : name_(std::move(name)), issue_cb_(issue_cb), flush_queries_cb_(flush_queries_cb), + user_constraints_met_cb_(user_constraints_met_cb), report_latency_results_cb_(report_latency_results_cb) {} ~SystemUnderTestTrampoline() override = default; @@ -60,6 +63,8 @@ class SystemUnderTestTrampoline : public SystemUnderTest { void FlushQueries() override { flush_queries_cb_(); } + bool UserConstraintsMet() override { return user_constraints_met_cb_(); } + void ReportLatencyResults( const std::vector& latencies_ns) override { pybind11::gil_scoped_acquire gil_acquirer; @@ -70,6 +75,7 @@ class SystemUnderTestTrampoline : public SystemUnderTest { std::string name_; IssueQueryCallback issue_cb_; FlushQueriesCallback flush_queries_cb_; + UserConstraintsMetCallback user_constraints_met_cb_; ReportLatencyResultsCallback report_latency_results_cb_; }; @@ -78,8 +84,10 @@ class FastSystemUnderTestTrampoline : public SystemUnderTestTrampoline { FastSystemUnderTestTrampoline( std::string name, FastIssueQueriesCallback fast_issue_cb, FlushQueriesCallback flush_queries_cb, + UserConstraintsMetCallback user_constraints_met_cb, ReportLatencyResultsCallback report_latency_results_cb) - : SystemUnderTestTrampoline(name, nullptr, flush_queries_cb, + : SystemUnderTestTrampoline(name, nullptr, flush_queries_cb, + user_constraints_met_cb, report_latency_results_cb), fast_issue_cb_(fast_issue_cb) {} ~FastSystemUnderTestTrampoline() override = default; @@ -148,9 +156,10 @@ namespace py { uintptr_t ConstructSUT(IssueQueryCallback issue_cb, FlushQueriesCallback flush_queries_cb, + UserConstraintsMetCallback user_constraints_met_cb, ReportLatencyResultsCallback report_latency_results_cb) { SystemUnderTestTrampoline* sut = new SystemUnderTestTrampoline( - "PySUT", issue_cb, flush_queries_cb, report_latency_results_cb); + "PySUT", issue_cb, flush_queries_cb, user_constraints_met_cb, report_latency_results_cb); return reinterpret_cast(sut); } @@ -163,9 +172,10 @@ void DestroySUT(uintptr_t sut) { uintptr_t ConstructFastSUT( FastIssueQueriesCallback fast_issue_cb, FlushQueriesCallback flush_queries_cb, + UserConstraintsMetCallback user_constraints_met_cb, ReportLatencyResultsCallback report_latency_results_cb) { FastSystemUnderTestTrampoline* sut = new FastSystemUnderTestTrampoline( - "PyFastSUT", fast_issue_cb, flush_queries_cb, report_latency_results_cb); + "PyFastSUT", fast_issue_cb, flush_queries_cb, user_constraints_met_cb, report_latency_results_cb); return reinterpret_cast(sut); } @@ -231,6 +241,7 @@ PYBIND11_MODULE(mlperf_loadgen, m) { pybind11::enum_(m, "TestScenario") .value("SingleStream", TestScenario::SingleStream) .value("MultiStream", TestScenario::MultiStream) + .value("MultiStreamFree", TestScenario::MultiStreamFree) .value("Server", TestScenario::Server) .value("Offline", TestScenario::Offline); @@ -248,12 +259,16 @@ PYBIND11_MODULE(mlperf_loadgen, m) { &TestSettings::single_stream_expected_latency_ns) .def_readwrite("single_stream_target_latency_percentile", &TestSettings::single_stream_target_latency_percentile) - .def_readwrite("multi_stream_expected_latency_ns", - &TestSettings::multi_stream_expected_latency_ns) + .def_readwrite("multi_stream_target_qps", + &TestSettings::multi_stream_target_qps) + .def_readwrite("multi_stream_target_latency_ns", + &TestSettings::multi_stream_target_latency_ns) .def_readwrite("multi_stream_target_latency_percentile", &TestSettings::multi_stream_target_latency_percentile) .def_readwrite("multi_stream_samples_per_query", &TestSettings::multi_stream_samples_per_query) + .def_readwrite("multi_stream_max_async_queries", + &TestSettings::multi_stream_max_async_queries) .def_readwrite("server_target_qps", &TestSettings::server_target_qps) .def_readwrite("server_target_latency_ns", &TestSettings::server_target_latency_ns) diff --git a/loadgen/loadgen.cc b/loadgen/loadgen.cc index 6ebf55af88..88397e01dd 100644 --- a/loadgen/loadgen.cc +++ b/loadgen/loadgen.cc @@ -31,7 +31,6 @@ limitations under the License. #include #include -#include "early_stopping.h" #include "issue_query_controller.h" #include "logging.h" #include "query_sample.h" @@ -51,7 +50,7 @@ namespace loadgen { /// loaded together. struct LoadableSampleSet { std::vector set; - const size_t sample_distribution_end; // Excludes padding in MultiStream. + const size_t sample_distribution_end; // Excludes padding in multi-stream. }; /// \brief Generates nanoseconds from a start time to multiple end times. @@ -232,7 +231,8 @@ std::vector GenerateQueries( std::mt19937 sample_rng(settings.sample_index_rng_seed); std::mt19937 schedule_rng(settings.schedule_rng_seed); - constexpr bool kIsMultiStream = scenario == TestScenario::MultiStream; + constexpr bool kIsMultiStream = scenario == TestScenario::MultiStream || + scenario == TestScenario::MultiStreamFree; const size_t sample_stride = kIsMultiStream ? samples_per_query : 1; auto sample_distribution = SampleDistribution( @@ -272,7 +272,7 @@ std::vector GenerateQueries( // This will not overflow, since GenerateLoadableSets adds padding at // the end of the loadable sets in the MultiStream scenario. // The padding allows the starting samples to be the same for each - // query with respect to samples_per_query. + // query as the value of samples_per_query increases. s = loaded_samples[sample_i++]; } } else if (scenario == TestScenario::Offline) { @@ -354,12 +354,14 @@ std::vector GenerateQueries( /// \todo Move to results.h/cc struct PerformanceResult { std::vector sample_latencies; - std::vector query_latencies; + std::vector query_latencies; // MultiStream only. + std::vector query_intervals; // MultiStream only. size_t queries_issued; double max_latency; double final_query_scheduled_time; // seconds from start. double final_query_issued_time; // seconds from start. double final_query_all_samples_done_time; // seconds from start. + bool user_constraints_met; // indicator of whether user defined constraints are met }; /// \brief Issues a series of pre-generated queries. @@ -462,6 +464,9 @@ PerformanceResult IssueQueries(SystemUnderTest* sut, // Log contention counters after every test as a sanity check. GlobalLogger().LogContentionAndAllocations(); + // Find out if user defined constrains are met + bool user_constraints_met = sut->UserConstraintsMet(); + // This properly accounts for the fact that the max completion time may not // belong to the final query. It also excludes any time spent postprocessing // in the loadgen itself after final completion, which may be significant @@ -470,10 +475,7 @@ PerformanceResult IssueQueries(SystemUnderTest* sut, GlobalLogger().GetMaxCompletionTime(); auto sut_active_duration = max_completion_time - start; LogDetail([start_for_power, sut_active_duration](AsyncDetail& detail) { - auto end_for_power = - start_for_power + - std::chrono::duration_cast( - sut_active_duration); + auto end_for_power = start_for_power + std::chrono::duration_cast(sut_active_duration); #if USE_NEW_LOGGING_FORMAT MLPERF_LOG_INTERVAL_START(detail, "power_begin", DateTimeStringForPower(start_for_power)); @@ -497,21 +499,38 @@ PerformanceResult IssueQueries(SystemUnderTest* sut, DurationToSeconds(final_query.all_samples_done_time - start); std::vector query_latencies; - if (scenario == TestScenario::MultiStream) { + std::vector query_intervals; + if (scenario == TestScenario::MultiStream || + scenario == TestScenario::MultiStreamFree) { query_latencies.resize(queries_issued); + query_intervals.resize(queries_issued); for (size_t i = 0; i < queries_issued; i++) { query_latencies[i] = DurationGeneratorNs{queries[i].scheduled_time}.delta( queries[i].all_samples_done_time); + if (i < queries_issued - settings.max_async_queries) { + // For all queries except the last few, take into account actual + // skipped intervals to the next query. + query_intervals[i] = + queries[i + settings.max_async_queries].scheduled_intervals; + } else { + // For the last queries, use query latency to guess if imaginary + // queries issued at the end would have skipped intervals. + query_intervals[i] = + std::ceil(settings.target_qps * + QuerySampleLatencyToSeconds(query_latencies[i])); + } } } return PerformanceResult{std::move(sample_latencies), std::move(query_latencies), + std::move(query_intervals), queries_issued, max_latency, final_query_scheduled_time, final_query_issued_time, - final_query_all_samples_done_time}; + final_query_all_samples_done_time, + user_constraints_met}; } /// \brief Wraps PerformanceResult with relevant context to change how @@ -524,30 +543,22 @@ struct PerformanceSummary { // Set by ProcessLatencies. size_t sample_count = 0; - size_t query_count = 0; - size_t overlatency_query_count = 0; QuerySampleLatency sample_latency_min = 0; QuerySampleLatency sample_latency_max = 0; QuerySampleLatency sample_latency_mean = 0; - QuerySampleLatency query_latency_min = 0; - QuerySampleLatency query_latency_max = 0; - QuerySampleLatency query_latency_mean = 0; /// \brief The latency at a given percentile. struct PercentileEntry { const double percentile; QuerySampleLatency sample_latency = 0; QuerySampleLatency query_latency = 0; // MultiStream only. + size_t query_intervals = 0; // MultiStream only. }; // Latency target percentile PercentileEntry target_latency_percentile{settings.target_latency_percentile}; PercentileEntry latency_percentiles[6] = {{.50}, {.90}, {.95}, {.97}, {.99}, {.999}}; - // Early stopping percentile estimates for SingleStream and MultiStream - QuerySampleLatency early_stopping_latency_ss = 0; - QuerySampleLatency early_stopping_latency_ms = 0; - #if defined(_WIN32) || defined(WIN32) || defined(_WIN64) || defined(WIN64) // MSVC complains if there is no explicit constructor. // (target_latency_percentile above depends on construction with settings) @@ -559,7 +570,6 @@ struct PerformanceSummary { void ProcessLatencies(); bool MinDurationMet(std::string* recommendation); - bool EarlyStopping(std::string* recommendation); bool MinQueriesMet(); bool MinSamplesMet(); bool HasPerfConstraints(); @@ -575,11 +585,11 @@ void PerformanceSummary::ProcessLatencies() { sample_count = pr.sample_latencies.size(); - QuerySampleLatency accumulated_sample_latency = 0; + QuerySampleLatency accumulated_latency = 0; for (auto latency : pr.sample_latencies) { - accumulated_sample_latency += latency; + accumulated_latency += latency; } - sample_latency_mean = accumulated_sample_latency / sample_count; + sample_latency_mean = accumulated_latency / sample_count; std::sort(pr.sample_latencies.begin(), pr.sample_latencies.end()); @@ -593,21 +603,9 @@ void PerformanceSummary::ProcessLatencies() { lp.sample_latency = pr.sample_latencies[sample_count * lp.percentile]; } - query_count = pr.queries_issued; - - // Count the number of overlatency queries. Only for Server scenario. Since in - // this scenario the number of samples per query is 1, sample_latencies are - // used. - if (settings.scenario == TestScenario::Server) { - QuerySampleLatency max_latency = settings.target_latency.count() + 1; - overlatency_query_count = - pr.sample_latencies.end() - - std::lower_bound(pr.sample_latencies.begin(), pr.sample_latencies.end(), - max_latency); - } - // MultiStream only after this point. - if (settings.scenario != TestScenario::MultiStream) { + if (settings.scenario != TestScenario::MultiStream && + settings.scenario != TestScenario::MultiStreamFree) { return; } @@ -616,156 +614,17 @@ void PerformanceSummary::ProcessLatencies() { assert(pr.query_latencies.size() == query_count); assert(pr.query_intervals.size() == query_count); std::sort(pr.query_latencies.begin(), pr.query_latencies.end()); - QuerySampleLatency accumulated_query_latency = 0; - for (auto latency : pr.query_latencies) { - accumulated_query_latency += latency; - } - query_latency_mean = accumulated_query_latency / query_count; - query_latency_min = pr.query_latencies.front(); - query_latency_max = pr.query_latencies.back(); + std::sort(pr.query_intervals.begin(), pr.query_intervals.end()); target_latency_percentile.query_latency = pr.query_latencies[query_count * target_latency_percentile.percentile]; + target_latency_percentile.query_intervals = + pr.query_intervals[query_count * target_latency_percentile.percentile]; for (auto& lp : latency_percentiles) { lp.query_latency = pr.query_latencies[query_count * lp.percentile]; + lp.query_intervals = pr.query_intervals[query_count * lp.percentile]; } } -bool PerformanceSummary::EarlyStopping(std::string* recommendation) { - recommendation->clear(); - - int64_t overlatency_queries_bound = (1 << 10); - int64_t queries_issued = pr.queries_issued; - MinPassingQueriesFinder find_min_passing; - double confidence = 0.99; - double tolerance = 0.0; - - ProcessLatencies(); - switch (settings.scenario) { - case TestScenario::SingleStream: { - // TODO: Grab multistream percentile from settings, instead of hardcoding. - double multi_stream_percentile = 0.99; - int64_t t = 1; - int64_t h_min = find_min_passing(1, target_latency_percentile.percentile, - tolerance, confidence); - int64_t h = h_min; - if (queries_issued < h_min + 1) { - *recommendation = - " * Only processed " + std::to_string(queries_issued) + - " queries.\n * Need to process at least " + - std::to_string(h_min + 1) + " queries for early stopping."; - return false; - } else { - for (int64_t i = 2; i <= overlatency_queries_bound; ++i) { - h = find_min_passing(i, target_latency_percentile.percentile, - tolerance, confidence); - if (queries_issued < h + i) { - t = i - 1; - break; - } - } - } - QuerySampleLatency percentile_estimate = - pr.sample_latencies[queries_issued - t]; - *recommendation = - " * Processed at least " + std::to_string(h_min + 1) + " queries (" + - std::to_string(queries_issued) + ").\n" + " * Would discard " + - std::to_string(t - 1) + " highest latency queries.\n" + - " * Early stopping " + - DoubleToString(target_latency_percentile.percentile * 100, 0) + - "th percentile estimate: " + std::to_string(percentile_estimate); - early_stopping_latency_ss = percentile_estimate; - - // Early stopping estimate for 99%ile (used for infering multi-stream from - // single-stream) - t = 1; - h_min = - find_min_passing(1, multi_stream_percentile, tolerance, confidence); - h = h_min; - if (queries_issued < h_min + 1) { - *recommendation += - "\n * Not enough queries processed for " + - DoubleToString(multi_stream_percentile * 100, 0) + - "th percentile\n" + - " early stopping estimate (would need to process at\n least " + - std::to_string(h_min + 1) + " total queries)."; - } else { - for (int64_t i = 2; i <= overlatency_queries_bound; ++i) { - h = find_min_passing(i, multi_stream_percentile, tolerance, - confidence); - if (queries_issued < h + i) { - t = i - 1; - break; - } - } - percentile_estimate = pr.sample_latencies[queries_issued - t]; - *recommendation += - "\n * Early stopping " + - DoubleToString(multi_stream_percentile * 100, 0) + - "th percentile estimate: " + std::to_string(percentile_estimate); - early_stopping_latency_ms = percentile_estimate; - } - break; - } - case TestScenario::Server: { - int64_t t = - std::count_if(pr.sample_latencies.begin(), pr.sample_latencies.end(), - [=](auto const& latency) { - return latency > settings.target_latency.count(); - }); - int64_t h = find_min_passing(t, target_latency_percentile.percentile, - tolerance, confidence); - if (queries_issued >= h + t) { - *recommendation = " * Run successful."; - } else { - *recommendation = " * Run unsuccessful.\n * Processed " + - std::to_string(queries_issued) + " queries.\n" + - " * Would need to run at least " + - std::to_string(h + t - queries_issued) + - " more queries,\n with the run being successful if " - "every additional\n query were under latency."; - return false; - } - break; - } - case TestScenario::MultiStream: { - int64_t t = 1; - int64_t h_min = find_min_passing(1, target_latency_percentile.percentile, - tolerance, confidence); - int64_t h = h_min; - if (queries_issued < h_min + 1) { - *recommendation = - " * Only processed " + std::to_string(queries_issued) + - " queries.\n * Need to process at least " + - std::to_string(h_min + 1) + " queries for early stopping."; - return false; - } else { - for (int64_t i = 2; i <= overlatency_queries_bound; ++i) { - h = find_min_passing(i, target_latency_percentile.percentile, - tolerance, confidence); - if (queries_issued < h + i) { - t = i - 1; - break; - } - } - } - QuerySampleLatency percentile_estimate = - pr.query_latencies[queries_issued - t]; - *recommendation = - " * Processed at least " + std::to_string(h_min + 1) + " queries (" + - std::to_string(queries_issued) + ").\n" + " * Would discard " + - std::to_string(t - 1) + " highest latency queries.\n" + - " * Early stopping " + - DoubleToString(target_latency_percentile.percentile * 100, 0) + - "th percentile estimate: " + std::to_string(percentile_estimate); - early_stopping_latency_ms = percentile_estimate; - break; - } - case TestScenario::Offline: - break; - } - return true; -} - bool PerformanceSummary::MinDurationMet(std::string* recommendation) { recommendation->clear(); const double min_duration = DurationToSeconds(settings.min_duration); @@ -779,6 +638,7 @@ bool PerformanceSummary::MinDurationMet(std::string* recommendation) { break; case TestScenario::SingleStream: case TestScenario::MultiStream: + case TestScenario::MultiStreamFree: min_duration_met = pr.final_query_issued_time >= min_duration; break; } @@ -788,11 +648,19 @@ bool PerformanceSummary::MinDurationMet(std::string* recommendation) { switch (settings.scenario) { case TestScenario::SingleStream: - case TestScenario::MultiStream: *recommendation = "Decrease the expected latency so the loadgen pre-generates more " "queries."; break; + case TestScenario::MultiStream: + *recommendation = + "MultiStream should always meet the minimum duration. " + "Please file a bug."; + break; + case TestScenario::MultiStreamFree: + *recommendation = + "Increase the target QPS so the loadgen pre-generates more queries."; + break; case TestScenario::Server: *recommendation = "Increase the target QPS so the loadgen pre-generates more queries."; @@ -815,7 +683,9 @@ bool PerformanceSummary::MinSamplesMet() { } bool PerformanceSummary::HasPerfConstraints() { - return settings.scenario == TestScenario::Server; + return settings.scenario == TestScenario::MultiStream || + settings.scenario == TestScenario::MultiStreamFree || + settings.scenario == TestScenario::Server; } bool PerformanceSummary::PerfConstraintsMet(std::string* recommendation) { @@ -823,11 +693,28 @@ bool PerformanceSummary::PerfConstraintsMet(std::string* recommendation) { bool perf_constraints_met = true; switch (settings.scenario) { case TestScenario::SingleStream: + break; case TestScenario::MultiStream: + ProcessLatencies(); + if (target_latency_percentile.query_intervals >= 2) { + *recommendation = "Reduce samples per query to improve latency."; + perf_constraints_met = false; + } + break; + case TestScenario::MultiStreamFree: + ProcessLatencies(); + if (target_latency_percentile.query_latency > + settings.target_latency.count()) { + *recommendation = "Reduce samples per query to improve latency."; + perf_constraints_met = false; + } break; case TestScenario::Server: ProcessLatencies(); - if (target_latency_percentile.sample_latency > + if (!pr.user_constraints_met) { + *recommendation = "User constraints not met. Recommend to reduce target QPS."; + perf_constraints_met = false; + } else if (target_latency_percentile.sample_latency > settings.target_latency.count()) { *recommendation = "Reduce target QPS to improve latency."; perf_constraints_met = false; @@ -858,9 +745,14 @@ void PerformanceSummary::LogSummary(AsyncSummary& summary) { break; } case TestScenario::MultiStream: { - summary(DoubleToString(target_latency_percentile.percentile * 100, 0) + - "th percentile latency (ns) : ", - target_latency_percentile.query_latency); + summary("Samples per query : ", settings.samples_per_query); + break; + } + case TestScenario::MultiStreamFree: { + double samples_per_second = pr.queries_issued * + settings.samples_per_query / + pr.final_query_all_samples_done_time; + summary("Samples per second : ", samples_per_second); break; } case TestScenario::Server: { @@ -887,15 +779,13 @@ void PerformanceSummary::LogSummary(AsyncSummary& summary) { std::string min_duration_recommendation; std::string perf_constraints_recommendation; - std::string early_stopping_recommendation; bool min_duration_met = MinDurationMet(&min_duration_recommendation); bool min_queries_met = MinQueriesMet() && MinSamplesMet(); - bool early_stopping_met = EarlyStopping(&early_stopping_recommendation); bool perf_constraints_met = PerfConstraintsMet(&perf_constraints_recommendation); - bool all_constraints_met = min_duration_met && min_queries_met && - perf_constraints_met && early_stopping_met; + bool all_constraints_met = + min_duration_met && min_queries_met && perf_constraints_met; summary("Result is : ", all_constraints_met ? "VALID" : "INVALID"); if (HasPerfConstraints()) { summary(" Performance constraints satisfied : ", @@ -903,7 +793,6 @@ void PerformanceSummary::LogSummary(AsyncSummary& summary) { } summary(" Min duration satisfied : ", min_duration_met ? "Yes" : "NO"); summary(" Min queries satisfied : ", min_queries_met ? "Yes" : "NO"); - summary(" Early stopping satisfied: ", early_stopping_met ? "Yes" : "NO"); if (!all_constraints_met) { summary("Recommendations:"); @@ -919,13 +808,6 @@ void PerformanceSummary::LogSummary(AsyncSummary& summary) { " See the detailed log for why this may have occurred."); } } - // Early stopping results - if (settings.scenario == TestScenario::SingleStream || - settings.scenario == TestScenario::Server || - settings.scenario == TestScenario::MultiStream) { - summary("Early Stopping Result:"); - summary(early_stopping_recommendation); - } summary( "\n" @@ -945,27 +827,38 @@ void PerformanceSummary::LogSummary(AsyncSummary& summary) { summary("Completed samples per second : ", DoubleToString(qps_as_completed)); summary(""); - } else if (settings.scenario == TestScenario::MultiStream) { - summary("Per-query latency: "); - summary("Min latency (ns) : ", query_latency_min); - summary("Max latency (ns) : ", query_latency_max); - summary("Mean latency (ns) : ", query_latency_mean); + } else if (settings.scenario == TestScenario::MultiStream || + settings.scenario == TestScenario::MultiStreamFree) { + double ms_per_interval = std::milli::den / settings.target_qps; + summary("Intervals between each IssueQuery: ", "qps", settings.target_qps, + "ms", ms_per_interval); for (auto& lp : latency_percentiles) { - summary( - DoubleToString(lp.percentile * 100) + " percentile latency (ns) : ", - lp.query_latency); + summary(DoubleToString(lp.percentile * 100) + " percentile : ", + lp.query_intervals); } - } - if (settings.scenario != TestScenario::MultiStream) { - summary("Min latency (ns) : ", sample_latency_min); - summary("Max latency (ns) : ", sample_latency_max); - summary("Mean latency (ns) : ", sample_latency_mean); + summary(""); + double target_ns = settings.target_latency.count(); + double target_ms = target_ns * std::milli::den / std::nano::den; + summary("Per-query latency: ", "target_ns", + settings.target_latency.count(), "target_ms", target_ms); for (auto& lp : latency_percentiles) { summary( DoubleToString(lp.percentile * 100) + " percentile latency (ns) : ", - lp.sample_latency); + lp.query_latency); } + + summary(""); + summary("Per-sample latency:"); + } + + summary("Min latency (ns) : ", sample_latency_min); + summary("Max latency (ns) : ", sample_latency_max); + summary("Mean latency (ns) : ", sample_latency_mean); + for (auto& lp : latency_percentiles) { + summary( + DoubleToString(lp.percentile * 100) + " percentile latency (ns) : ", + lp.sample_latency); } summary( @@ -983,14 +876,12 @@ void PerformanceSummary::LogDetail(AsyncDetail& detail) { // General validity checking std::string min_duration_recommendation; std::string perf_constraints_recommendation; - std::string early_stopping_recommendation; bool min_duration_met = MinDurationMet(&min_duration_recommendation); bool min_queries_met = MinQueriesMet() && MinSamplesMet(); bool perf_constraints_met = PerfConstraintsMet(&perf_constraints_recommendation); - bool early_stopping_met = EarlyStopping(&early_stopping_recommendation); - bool all_constraints_met = min_duration_met && min_queries_met && - perf_constraints_met && early_stopping_met; + bool all_constraints_met = + min_duration_met && min_queries_met && perf_constraints_met; MLPERF_LOG(detail, "result_validity", all_constraints_met ? "VALID" : "INVALID"); @@ -999,7 +890,6 @@ void PerformanceSummary::LogDetail(AsyncDetail& detail) { } MLPERF_LOG(detail, "result_min_duration_met", min_duration_met); MLPERF_LOG(detail, "result_min_queries_met", min_queries_met); - MLPERF_LOG(detail, "early_stopping_met", early_stopping_met); if (!all_constraints_met) { std::string recommendation; if (!perf_constraints_met) { @@ -1014,21 +904,14 @@ void PerformanceSummary::LogDetail(AsyncDetail& detail) { } MLPERF_LOG(detail, "result_invalid_reason", recommendation); } - MLPERF_LOG(detail, "early_stopping_result", early_stopping_recommendation); - - // Report number of queries - MLPERF_LOG(detail, "result_query_count", std::to_string(query_count)); - if (settings.scenario == TestScenario::Server) { - MLPERF_LOG(detail, "result_overlatency_query_count", - std::to_string(overlatency_query_count)); - } auto reportPerQueryLatencies = [&]() { - MLPERF_LOG(detail, "result_min_query_latency_ns", query_latency_min); - MLPERF_LOG(detail, "result_max_query_latency_ns", query_latency_max); - MLPERF_LOG(detail, "result_mean_query_latency_ns", query_latency_mean); for (auto& lp : latency_percentiles) { std::string percentile = DoubleToString(lp.percentile * 100); + MLPERF_LOG( + detail, + "result_" + percentile + "_percentile_num_intervals_between_queries", + lp.query_intervals); MLPERF_LOG(detail, "result_" + percentile + "_percentile_per_query_latency_ns", lp.query_latency); @@ -1042,16 +925,18 @@ void PerformanceSummary::LogDetail(AsyncDetail& detail) { double qps_wo_lg = 1 / QuerySampleLatencyToSeconds(sample_latency_mean); MLPERF_LOG(detail, "result_qps_with_loadgen_overhead", qps_w_lg); MLPERF_LOG(detail, "result_qps_without_loadgen_overhead", qps_wo_lg); - MLPERF_LOG(detail, "early_stopping_latency_ss", - early_stopping_latency_ss); - MLPERF_LOG(detail, "early_stopping_latency_ms", - early_stopping_latency_ms); + break; + } + case TestScenario::MultiStreamFree: { + double samples_per_second = pr.queries_issued * + settings.samples_per_query / + pr.final_query_all_samples_done_time; + MLPERF_LOG(detail, "result_samples_per_second", samples_per_second); + reportPerQueryLatencies(); break; } case TestScenario::MultiStream: { reportPerQueryLatencies(); - MLPERF_LOG(detail, "early_stopping_latency_ms", - early_stopping_latency_ms); break; } case TestScenario::Server: { @@ -1131,9 +1016,11 @@ std::vector GenerateLoadableSets( // Partition the samples into loadable sets. const size_t set_size = settings.performance_sample_count; - const size_t set_padding = (settings.scenario == TestScenario::MultiStream) - ? settings.samples_per_query - 1 - : 0; + const size_t set_padding = + (settings.scenario == TestScenario::MultiStream || + settings.scenario == TestScenario::MultiStreamFree) + ? settings.samples_per_query - 1 + : 0; std::vector loadable_set; loadable_set.reserve(set_size + set_padding); @@ -1153,7 +1040,7 @@ std::vector GenerateLoadableSets( } // Add padding for the multi stream scenario. Padding allows the - // starting sample to be the same for all SUTs, independent of the value + // startings sample to be the same for all SUTs, independent of the value // of samples_per_query, while enabling samples in a query to be contiguous. for (auto& loadable_set : result) { auto& set = loadable_set.set; @@ -1436,7 +1323,9 @@ void FindPeakPerformanceMode(SystemUnderTest* sut, QuerySampleLibrary* qsl, #endif }); - if (scenario != TestScenario::Server) { + if (scenario != TestScenario::MultiStream && + scenario != TestScenario::MultiStreamFree && + scenario != TestScenario::Server) { LogDetail([unsupported_scenario = ToString(scenario)](AsyncDetail& detail) { #if USE_NEW_LOGGING_FORMAT MLPERF_LOG_ERROR(detail, "error_invalid_config", @@ -1629,6 +1518,8 @@ struct RunFunctions { return GetCompileTime(); case TestScenario::MultiStream: return GetCompileTime(); + case TestScenario::MultiStreamFree: + return GetCompileTime(); case TestScenario::Server: return GetCompileTime(); case TestScenario::Offline: diff --git a/loadgen/system_under_test.h b/loadgen/system_under_test.h index 4b98e06b12..6850713517 100644 --- a/loadgen/system_under_test.h +++ b/loadgen/system_under_test.h @@ -59,6 +59,13 @@ class SystemUnderTest { /// This is especially useful in the server scenario. virtual void FlushQueries() = 0; + /// \brief Called after all requests are complete + /// in a series is made. + /// \details Clients can use this to indicate if user defined constraints + /// are met for a test. The final validity of the test will be defined + /// on both the user constraints and performance constraints + virtual bool UserConstraintsMet() = 0; + /// \brief Reports the raw latency results to the SUT of each sample issued as /// recorded by the load generator. Units are nanoseconds. virtual void ReportLatencyResults( From 50944fd5b1ca53e0a68ac0c52e9b5ba1096515d7 Mon Sep 17 00:00:00 2001 From: Xin Yong Date: Thu, 27 Jan 2022 22:44:56 +0800 Subject: [PATCH 2/3] sync with upstream --- loadgen/bindings/python_api.cc | 9 +- loadgen/loadgen.cc | 354 ++++++++++++++++++++++----------- 2 files changed, 236 insertions(+), 127 deletions(-) diff --git a/loadgen/bindings/python_api.cc b/loadgen/bindings/python_api.cc index 686e3110ad..e9a5a426cb 100644 --- a/loadgen/bindings/python_api.cc +++ b/loadgen/bindings/python_api.cc @@ -241,7 +241,6 @@ PYBIND11_MODULE(mlperf_loadgen, m) { pybind11::enum_(m, "TestScenario") .value("SingleStream", TestScenario::SingleStream) .value("MultiStream", TestScenario::MultiStream) - .value("MultiStreamFree", TestScenario::MultiStreamFree) .value("Server", TestScenario::Server) .value("Offline", TestScenario::Offline); @@ -259,16 +258,12 @@ PYBIND11_MODULE(mlperf_loadgen, m) { &TestSettings::single_stream_expected_latency_ns) .def_readwrite("single_stream_target_latency_percentile", &TestSettings::single_stream_target_latency_percentile) - .def_readwrite("multi_stream_target_qps", - &TestSettings::multi_stream_target_qps) - .def_readwrite("multi_stream_target_latency_ns", - &TestSettings::multi_stream_target_latency_ns) + .def_readwrite("multi_stream_expected_latency_ns", + &TestSettings::multi_stream_expected_latency_ns) .def_readwrite("multi_stream_target_latency_percentile", &TestSettings::multi_stream_target_latency_percentile) .def_readwrite("multi_stream_samples_per_query", &TestSettings::multi_stream_samples_per_query) - .def_readwrite("multi_stream_max_async_queries", - &TestSettings::multi_stream_max_async_queries) .def_readwrite("server_target_qps", &TestSettings::server_target_qps) .def_readwrite("server_target_latency_ns", &TestSettings::server_target_latency_ns) diff --git a/loadgen/loadgen.cc b/loadgen/loadgen.cc index 88397e01dd..82e65a34bb 100644 --- a/loadgen/loadgen.cc +++ b/loadgen/loadgen.cc @@ -31,6 +31,7 @@ limitations under the License. #include #include +#include "early_stopping.h" #include "issue_query_controller.h" #include "logging.h" #include "query_sample.h" @@ -50,7 +51,7 @@ namespace loadgen { /// loaded together. struct LoadableSampleSet { std::vector set; - const size_t sample_distribution_end; // Excludes padding in multi-stream. + const size_t sample_distribution_end; // Excludes padding in MultiStream. }; /// \brief Generates nanoseconds from a start time to multiple end times. @@ -231,8 +232,7 @@ std::vector GenerateQueries( std::mt19937 sample_rng(settings.sample_index_rng_seed); std::mt19937 schedule_rng(settings.schedule_rng_seed); - constexpr bool kIsMultiStream = scenario == TestScenario::MultiStream || - scenario == TestScenario::MultiStreamFree; + constexpr bool kIsMultiStream = scenario == TestScenario::MultiStream; const size_t sample_stride = kIsMultiStream ? samples_per_query : 1; auto sample_distribution = SampleDistribution( @@ -272,7 +272,7 @@ std::vector GenerateQueries( // This will not overflow, since GenerateLoadableSets adds padding at // the end of the loadable sets in the MultiStream scenario. // The padding allows the starting samples to be the same for each - // query as the value of samples_per_query increases. + // query with respect to samples_per_query. s = loaded_samples[sample_i++]; } } else if (scenario == TestScenario::Offline) { @@ -354,8 +354,7 @@ std::vector GenerateQueries( /// \todo Move to results.h/cc struct PerformanceResult { std::vector sample_latencies; - std::vector query_latencies; // MultiStream only. - std::vector query_intervals; // MultiStream only. + std::vector query_latencies; size_t queries_issued; double max_latency; double final_query_scheduled_time; // seconds from start. @@ -499,32 +498,16 @@ PerformanceResult IssueQueries(SystemUnderTest* sut, DurationToSeconds(final_query.all_samples_done_time - start); std::vector query_latencies; - std::vector query_intervals; - if (scenario == TestScenario::MultiStream || - scenario == TestScenario::MultiStreamFree) { + if (scenario == TestScenario::MultiStream) { query_latencies.resize(queries_issued); - query_intervals.resize(queries_issued); for (size_t i = 0; i < queries_issued; i++) { query_latencies[i] = DurationGeneratorNs{queries[i].scheduled_time}.delta( queries[i].all_samples_done_time); - if (i < queries_issued - settings.max_async_queries) { - // For all queries except the last few, take into account actual - // skipped intervals to the next query. - query_intervals[i] = - queries[i + settings.max_async_queries].scheduled_intervals; - } else { - // For the last queries, use query latency to guess if imaginary - // queries issued at the end would have skipped intervals. - query_intervals[i] = - std::ceil(settings.target_qps * - QuerySampleLatencyToSeconds(query_latencies[i])); - } } } return PerformanceResult{std::move(sample_latencies), std::move(query_latencies), - std::move(query_intervals), queries_issued, max_latency, final_query_scheduled_time, @@ -543,22 +526,30 @@ struct PerformanceSummary { // Set by ProcessLatencies. size_t sample_count = 0; + size_t query_count = 0; + size_t overlatency_query_count = 0; QuerySampleLatency sample_latency_min = 0; QuerySampleLatency sample_latency_max = 0; QuerySampleLatency sample_latency_mean = 0; + QuerySampleLatency query_latency_min = 0; + QuerySampleLatency query_latency_max = 0; + QuerySampleLatency query_latency_mean = 0; /// \brief The latency at a given percentile. struct PercentileEntry { const double percentile; QuerySampleLatency sample_latency = 0; QuerySampleLatency query_latency = 0; // MultiStream only. - size_t query_intervals = 0; // MultiStream only. }; // Latency target percentile PercentileEntry target_latency_percentile{settings.target_latency_percentile}; PercentileEntry latency_percentiles[6] = {{.50}, {.90}, {.95}, {.97}, {.99}, {.999}}; + // Early stopping percentile estimates for SingleStream and MultiStream + QuerySampleLatency early_stopping_latency_ss = 0; + QuerySampleLatency early_stopping_latency_ms = 0; + #if defined(_WIN32) || defined(WIN32) || defined(_WIN64) || defined(WIN64) // MSVC complains if there is no explicit constructor. // (target_latency_percentile above depends on construction with settings) @@ -570,6 +561,7 @@ struct PerformanceSummary { void ProcessLatencies(); bool MinDurationMet(std::string* recommendation); + bool EarlyStopping(std::string* recommendation); bool MinQueriesMet(); bool MinSamplesMet(); bool HasPerfConstraints(); @@ -585,11 +577,11 @@ void PerformanceSummary::ProcessLatencies() { sample_count = pr.sample_latencies.size(); - QuerySampleLatency accumulated_latency = 0; + QuerySampleLatency accumulated_sample_latency = 0; for (auto latency : pr.sample_latencies) { - accumulated_latency += latency; + accumulated_sample_latency += latency; } - sample_latency_mean = accumulated_latency / sample_count; + sample_latency_mean = accumulated_sample_latency / sample_count; std::sort(pr.sample_latencies.begin(), pr.sample_latencies.end()); @@ -603,9 +595,21 @@ void PerformanceSummary::ProcessLatencies() { lp.sample_latency = pr.sample_latencies[sample_count * lp.percentile]; } + query_count = pr.queries_issued; + + // Count the number of overlatency queries. Only for Server scenario. Since in + // this scenario the number of samples per query is 1, sample_latencies are + // used. + if (settings.scenario == TestScenario::Server) { + QuerySampleLatency max_latency = settings.target_latency.count() + 1; + overlatency_query_count = + pr.sample_latencies.end() - + std::lower_bound(pr.sample_latencies.begin(), pr.sample_latencies.end(), + max_latency); + } + // MultiStream only after this point. - if (settings.scenario != TestScenario::MultiStream && - settings.scenario != TestScenario::MultiStreamFree) { + if (settings.scenario != TestScenario::MultiStream) { return; } @@ -614,17 +618,156 @@ void PerformanceSummary::ProcessLatencies() { assert(pr.query_latencies.size() == query_count); assert(pr.query_intervals.size() == query_count); std::sort(pr.query_latencies.begin(), pr.query_latencies.end()); - std::sort(pr.query_intervals.begin(), pr.query_intervals.end()); + QuerySampleLatency accumulated_query_latency = 0; + for (auto latency : pr.query_latencies) { + accumulated_query_latency += latency; + } + query_latency_mean = accumulated_query_latency / query_count; + query_latency_min = pr.query_latencies.front(); + query_latency_max = pr.query_latencies.back(); target_latency_percentile.query_latency = pr.query_latencies[query_count * target_latency_percentile.percentile]; - target_latency_percentile.query_intervals = - pr.query_intervals[query_count * target_latency_percentile.percentile]; for (auto& lp : latency_percentiles) { lp.query_latency = pr.query_latencies[query_count * lp.percentile]; - lp.query_intervals = pr.query_intervals[query_count * lp.percentile]; } } +bool PerformanceSummary::EarlyStopping(std::string* recommendation) { + recommendation->clear(); + + int64_t overlatency_queries_bound = (1 << 10); + int64_t queries_issued = pr.queries_issued; + MinPassingQueriesFinder find_min_passing; + double confidence = 0.99; + double tolerance = 0.0; + + ProcessLatencies(); + switch (settings.scenario) { + case TestScenario::SingleStream: { + // TODO: Grab multistream percentile from settings, instead of hardcoding. + double multi_stream_percentile = 0.99; + int64_t t = 1; + int64_t h_min = find_min_passing(1, target_latency_percentile.percentile, + tolerance, confidence); + int64_t h = h_min; + if (queries_issued < h_min + 1) { + *recommendation = + " * Only processed " + std::to_string(queries_issued) + + " queries.\n * Need to process at least " + + std::to_string(h_min + 1) + " queries for early stopping."; + return false; + } else { + for (int64_t i = 2; i <= overlatency_queries_bound; ++i) { + h = find_min_passing(i, target_latency_percentile.percentile, + tolerance, confidence); + if (queries_issued < h + i) { + t = i - 1; + break; + } + } + } + QuerySampleLatency percentile_estimate = + pr.sample_latencies[queries_issued - t]; + *recommendation = + " * Processed at least " + std::to_string(h_min + 1) + " queries (" + + std::to_string(queries_issued) + ").\n" + " * Would discard " + + std::to_string(t - 1) + " highest latency queries.\n" + + " * Early stopping " + + DoubleToString(target_latency_percentile.percentile * 100, 0) + + "th percentile estimate: " + std::to_string(percentile_estimate); + early_stopping_latency_ss = percentile_estimate; + + // Early stopping estimate for 99%ile (used for infering multi-stream from + // single-stream) + t = 1; + h_min = + find_min_passing(1, multi_stream_percentile, tolerance, confidence); + h = h_min; + if (queries_issued < h_min + 1) { + *recommendation += + "\n * Not enough queries processed for " + + DoubleToString(multi_stream_percentile * 100, 0) + + "th percentile\n" + + " early stopping estimate (would need to process at\n least " + + std::to_string(h_min + 1) + " total queries)."; + } else { + for (int64_t i = 2; i <= overlatency_queries_bound; ++i) { + h = find_min_passing(i, multi_stream_percentile, tolerance, + confidence); + if (queries_issued < h + i) { + t = i - 1; + break; + } + } + percentile_estimate = pr.sample_latencies[queries_issued - t]; + *recommendation += + "\n * Early stopping " + + DoubleToString(multi_stream_percentile * 100, 0) + + "th percentile estimate: " + std::to_string(percentile_estimate); + early_stopping_latency_ms = percentile_estimate; + } + break; + } + case TestScenario::Server: { + int64_t t = + std::count_if(pr.sample_latencies.begin(), pr.sample_latencies.end(), + [=](auto const& latency) { + return latency > settings.target_latency.count(); + }); + int64_t h = find_min_passing(t, target_latency_percentile.percentile, + tolerance, confidence); + if (queries_issued >= h + t) { + *recommendation = " * Run successful."; + } else { + *recommendation = " * Run unsuccessful.\n * Processed " + + std::to_string(queries_issued) + " queries.\n" + + " * Would need to run at least " + + std::to_string(h + t - queries_issued) + + " more queries,\n with the run being successful if " + "every additional\n query were under latency."; + return false; + } + break; + } + case TestScenario::MultiStream: { + int64_t t = 1; + int64_t h_min = find_min_passing(1, target_latency_percentile.percentile, + tolerance, confidence); + int64_t h = h_min; + if (queries_issued < h_min + 1) { + *recommendation = + " * Only processed " + std::to_string(queries_issued) + + " queries.\n * Need to process at least " + + std::to_string(h_min + 1) + " queries for early stopping."; + return false; + } else { + for (int64_t i = 2; i <= overlatency_queries_bound; ++i) { + h = find_min_passing(i, target_latency_percentile.percentile, + tolerance, confidence); + if (queries_issued < h + i) { + t = i - 1; + break; + } + } + } + QuerySampleLatency percentile_estimate = + pr.query_latencies[queries_issued - t]; + *recommendation = + " * Processed at least " + std::to_string(h_min + 1) + " queries (" + + std::to_string(queries_issued) + ").\n" + " * Would discard " + + std::to_string(t - 1) + " highest latency queries.\n" + + " * Early stopping " + + DoubleToString(target_latency_percentile.percentile * 100, 0) + + "th percentile estimate: " + std::to_string(percentile_estimate); + early_stopping_latency_ms = percentile_estimate; + break; + } + case TestScenario::Offline: + break; + } + return true; +} + bool PerformanceSummary::MinDurationMet(std::string* recommendation) { recommendation->clear(); const double min_duration = DurationToSeconds(settings.min_duration); @@ -638,7 +781,6 @@ bool PerformanceSummary::MinDurationMet(std::string* recommendation) { break; case TestScenario::SingleStream: case TestScenario::MultiStream: - case TestScenario::MultiStreamFree: min_duration_met = pr.final_query_issued_time >= min_duration; break; } @@ -648,19 +790,11 @@ bool PerformanceSummary::MinDurationMet(std::string* recommendation) { switch (settings.scenario) { case TestScenario::SingleStream: + case TestScenario::MultiStream: *recommendation = "Decrease the expected latency so the loadgen pre-generates more " "queries."; break; - case TestScenario::MultiStream: - *recommendation = - "MultiStream should always meet the minimum duration. " - "Please file a bug."; - break; - case TestScenario::MultiStreamFree: - *recommendation = - "Increase the target QPS so the loadgen pre-generates more queries."; - break; case TestScenario::Server: *recommendation = "Increase the target QPS so the loadgen pre-generates more queries."; @@ -683,9 +817,7 @@ bool PerformanceSummary::MinSamplesMet() { } bool PerformanceSummary::HasPerfConstraints() { - return settings.scenario == TestScenario::MultiStream || - settings.scenario == TestScenario::MultiStreamFree || - settings.scenario == TestScenario::Server; + return settings.scenario == TestScenario::Server; } bool PerformanceSummary::PerfConstraintsMet(std::string* recommendation) { @@ -693,21 +825,7 @@ bool PerformanceSummary::PerfConstraintsMet(std::string* recommendation) { bool perf_constraints_met = true; switch (settings.scenario) { case TestScenario::SingleStream: - break; case TestScenario::MultiStream: - ProcessLatencies(); - if (target_latency_percentile.query_intervals >= 2) { - *recommendation = "Reduce samples per query to improve latency."; - perf_constraints_met = false; - } - break; - case TestScenario::MultiStreamFree: - ProcessLatencies(); - if (target_latency_percentile.query_latency > - settings.target_latency.count()) { - *recommendation = "Reduce samples per query to improve latency."; - perf_constraints_met = false; - } break; case TestScenario::Server: ProcessLatencies(); @@ -745,14 +863,9 @@ void PerformanceSummary::LogSummary(AsyncSummary& summary) { break; } case TestScenario::MultiStream: { - summary("Samples per query : ", settings.samples_per_query); - break; - } - case TestScenario::MultiStreamFree: { - double samples_per_second = pr.queries_issued * - settings.samples_per_query / - pr.final_query_all_samples_done_time; - summary("Samples per second : ", samples_per_second); + summary(DoubleToString(target_latency_percentile.percentile * 100, 0) + + "th percentile latency (ns) : ", + target_latency_percentile.query_latency); break; } case TestScenario::Server: { @@ -779,13 +892,15 @@ void PerformanceSummary::LogSummary(AsyncSummary& summary) { std::string min_duration_recommendation; std::string perf_constraints_recommendation; + std::string early_stopping_recommendation; bool min_duration_met = MinDurationMet(&min_duration_recommendation); bool min_queries_met = MinQueriesMet() && MinSamplesMet(); + bool early_stopping_met = EarlyStopping(&early_stopping_recommendation); bool perf_constraints_met = PerfConstraintsMet(&perf_constraints_recommendation); - bool all_constraints_met = - min_duration_met && min_queries_met && perf_constraints_met; + bool all_constraints_met = min_duration_met && min_queries_met && + perf_constraints_met && early_stopping_met; summary("Result is : ", all_constraints_met ? "VALID" : "INVALID"); if (HasPerfConstraints()) { summary(" Performance constraints satisfied : ", @@ -793,6 +908,7 @@ void PerformanceSummary::LogSummary(AsyncSummary& summary) { } summary(" Min duration satisfied : ", min_duration_met ? "Yes" : "NO"); summary(" Min queries satisfied : ", min_queries_met ? "Yes" : "NO"); + summary(" Early stopping satisfied: ", early_stopping_met ? "Yes" : "NO"); if (!all_constraints_met) { summary("Recommendations:"); @@ -808,6 +924,13 @@ void PerformanceSummary::LogSummary(AsyncSummary& summary) { " See the detailed log for why this may have occurred."); } } + // Early stopping results + if (settings.scenario == TestScenario::SingleStream || + settings.scenario == TestScenario::Server || + settings.scenario == TestScenario::MultiStream) { + summary("Early Stopping Result:"); + summary(early_stopping_recommendation); + } summary( "\n" @@ -827,38 +950,27 @@ void PerformanceSummary::LogSummary(AsyncSummary& summary) { summary("Completed samples per second : ", DoubleToString(qps_as_completed)); summary(""); - } else if (settings.scenario == TestScenario::MultiStream || - settings.scenario == TestScenario::MultiStreamFree) { - double ms_per_interval = std::milli::den / settings.target_qps; - summary("Intervals between each IssueQuery: ", "qps", settings.target_qps, - "ms", ms_per_interval); - for (auto& lp : latency_percentiles) { - summary(DoubleToString(lp.percentile * 100) + " percentile : ", - lp.query_intervals); - } - - summary(""); - double target_ns = settings.target_latency.count(); - double target_ms = target_ns * std::milli::den / std::nano::den; - summary("Per-query latency: ", "target_ns", - settings.target_latency.count(), "target_ms", target_ms); + } else if (settings.scenario == TestScenario::MultiStream) { + summary("Per-query latency: "); + summary("Min latency (ns) : ", query_latency_min); + summary("Max latency (ns) : ", query_latency_max); + summary("Mean latency (ns) : ", query_latency_mean); for (auto& lp : latency_percentiles) { summary( DoubleToString(lp.percentile * 100) + " percentile latency (ns) : ", lp.query_latency); } - - summary(""); - summary("Per-sample latency:"); } - summary("Min latency (ns) : ", sample_latency_min); - summary("Max latency (ns) : ", sample_latency_max); - summary("Mean latency (ns) : ", sample_latency_mean); - for (auto& lp : latency_percentiles) { - summary( - DoubleToString(lp.percentile * 100) + " percentile latency (ns) : ", - lp.sample_latency); + if (settings.scenario != TestScenario::MultiStream) { + summary("Min latency (ns) : ", sample_latency_min); + summary("Max latency (ns) : ", sample_latency_max); + summary("Mean latency (ns) : ", sample_latency_mean); + for (auto& lp : latency_percentiles) { + summary( + DoubleToString(lp.percentile * 100) + " percentile latency (ns) : ", + lp.sample_latency); + } } summary( @@ -876,12 +988,14 @@ void PerformanceSummary::LogDetail(AsyncDetail& detail) { // General validity checking std::string min_duration_recommendation; std::string perf_constraints_recommendation; + std::string early_stopping_recommendation; bool min_duration_met = MinDurationMet(&min_duration_recommendation); bool min_queries_met = MinQueriesMet() && MinSamplesMet(); bool perf_constraints_met = PerfConstraintsMet(&perf_constraints_recommendation); - bool all_constraints_met = - min_duration_met && min_queries_met && perf_constraints_met; + bool early_stopping_met = EarlyStopping(&early_stopping_recommendation); + bool all_constraints_met = min_duration_met && min_queries_met && + perf_constraints_met && early_stopping_met; MLPERF_LOG(detail, "result_validity", all_constraints_met ? "VALID" : "INVALID"); @@ -890,6 +1004,7 @@ void PerformanceSummary::LogDetail(AsyncDetail& detail) { } MLPERF_LOG(detail, "result_min_duration_met", min_duration_met); MLPERF_LOG(detail, "result_min_queries_met", min_queries_met); + MLPERF_LOG(detail, "early_stopping_met", early_stopping_met); if (!all_constraints_met) { std::string recommendation; if (!perf_constraints_met) { @@ -904,14 +1019,21 @@ void PerformanceSummary::LogDetail(AsyncDetail& detail) { } MLPERF_LOG(detail, "result_invalid_reason", recommendation); } + MLPERF_LOG(detail, "early_stopping_result", early_stopping_recommendation); + + // Report number of queries + MLPERF_LOG(detail, "result_query_count", std::to_string(query_count)); + if (settings.scenario == TestScenario::Server) { + MLPERF_LOG(detail, "result_overlatency_query_count", + std::to_string(overlatency_query_count)); + } auto reportPerQueryLatencies = [&]() { + MLPERF_LOG(detail, "result_min_query_latency_ns", query_latency_min); + MLPERF_LOG(detail, "result_max_query_latency_ns", query_latency_max); + MLPERF_LOG(detail, "result_mean_query_latency_ns", query_latency_mean); for (auto& lp : latency_percentiles) { std::string percentile = DoubleToString(lp.percentile * 100); - MLPERF_LOG( - detail, - "result_" + percentile + "_percentile_num_intervals_between_queries", - lp.query_intervals); MLPERF_LOG(detail, "result_" + percentile + "_percentile_per_query_latency_ns", lp.query_latency); @@ -925,18 +1047,16 @@ void PerformanceSummary::LogDetail(AsyncDetail& detail) { double qps_wo_lg = 1 / QuerySampleLatencyToSeconds(sample_latency_mean); MLPERF_LOG(detail, "result_qps_with_loadgen_overhead", qps_w_lg); MLPERF_LOG(detail, "result_qps_without_loadgen_overhead", qps_wo_lg); - break; - } - case TestScenario::MultiStreamFree: { - double samples_per_second = pr.queries_issued * - settings.samples_per_query / - pr.final_query_all_samples_done_time; - MLPERF_LOG(detail, "result_samples_per_second", samples_per_second); - reportPerQueryLatencies(); + MLPERF_LOG(detail, "early_stopping_latency_ss", + early_stopping_latency_ss); + MLPERF_LOG(detail, "early_stopping_latency_ms", + early_stopping_latency_ms); break; } case TestScenario::MultiStream: { reportPerQueryLatencies(); + MLPERF_LOG(detail, "early_stopping_latency_ms", + early_stopping_latency_ms); break; } case TestScenario::Server: { @@ -1016,11 +1136,9 @@ std::vector GenerateLoadableSets( // Partition the samples into loadable sets. const size_t set_size = settings.performance_sample_count; - const size_t set_padding = - (settings.scenario == TestScenario::MultiStream || - settings.scenario == TestScenario::MultiStreamFree) - ? settings.samples_per_query - 1 - : 0; + const size_t set_padding = (settings.scenario == TestScenario::MultiStream) + ? settings.samples_per_query - 1 + : 0; std::vector loadable_set; loadable_set.reserve(set_size + set_padding); @@ -1040,7 +1158,7 @@ std::vector GenerateLoadableSets( } // Add padding for the multi stream scenario. Padding allows the - // startings sample to be the same for all SUTs, independent of the value + // starting sample to be the same for all SUTs, independent of the value // of samples_per_query, while enabling samples in a query to be contiguous. for (auto& loadable_set : result) { auto& set = loadable_set.set; @@ -1323,9 +1441,7 @@ void FindPeakPerformanceMode(SystemUnderTest* sut, QuerySampleLibrary* qsl, #endif }); - if (scenario != TestScenario::MultiStream && - scenario != TestScenario::MultiStreamFree && - scenario != TestScenario::Server) { + if (scenario != TestScenario::Server) { LogDetail([unsupported_scenario = ToString(scenario)](AsyncDetail& detail) { #if USE_NEW_LOGGING_FORMAT MLPERF_LOG_ERROR(detail, "error_invalid_config", @@ -1518,8 +1634,6 @@ struct RunFunctions { return GetCompileTime(); case TestScenario::MultiStream: return GetCompileTime(); - case TestScenario::MultiStreamFree: - return GetCompileTime(); case TestScenario::Server: return GetCompileTime(); case TestScenario::Offline: From 1774bf34fbca64b44cebda1df3cd2449e6b07c82 Mon Sep 17 00:00:00 2001 From: Xin Yong Date: Thu, 10 Mar 2022 23:10:45 +0800 Subject: [PATCH 3/3] add cb for reporting target qps --- loadgen/bindings/c_api.cc | 8 +++++++- loadgen/bindings/c_api.h | 4 +++- loadgen/bindings/python_api.cc | 17 +++++++++++++++-- loadgen/loadgen.cc | 3 +++ loadgen/system_under_test.h | 4 ++++ 5 files changed, 32 insertions(+), 4 deletions(-) diff --git a/loadgen/bindings/c_api.cc b/loadgen/bindings/c_api.cc index 4b35971c44..ca03a51a03 100644 --- a/loadgen/bindings/c_api.cc +++ b/loadgen/bindings/c_api.cc @@ -31,12 +31,14 @@ class SystemUnderTestTrampoline : public SystemUnderTest { ClientData client_data, std::string name, IssueQueryCallback issue_cb, FlushQueriesCallback flush_queries_cb, UserConstraintsMetCallback user_constraints_met_cb, + ReportTargetQPSCallback report_target_qps_cb, ReportLatencyResultsCallback report_latency_results_cb) : client_data_(client_data), name_(std::move(name)), issue_cb_(issue_cb), flush_queries_cb_(flush_queries_cb), user_constraints_met_cb_(user_constraints_met_cb), + report_target_qps_cb_(report_target_qps_cb), report_latency_results_cb_(report_latency_results_cb) {} ~SystemUnderTestTrampoline() override = default; @@ -50,6 +52,8 @@ class SystemUnderTestTrampoline : public SystemUnderTest { bool UserConstraintsMet() override { (*user_constraints_met_cb_)(); } + void ReportTargetQPS(const double target_qps) override { (*report_target_qps_cb_)(target_qps); } + void ReportLatencyResults( const std::vector& latencies_ns) override { (*report_latency_results_cb_)(client_data_, latencies_ns.data(), @@ -62,6 +66,7 @@ class SystemUnderTestTrampoline : public SystemUnderTest { IssueQueryCallback issue_cb_; FlushQueriesCallback flush_queries_cb_; UserConstraintsMetCallback user_constraints_met_cb_; + ReportTargetQPSCallback report_target_qps_cb_; ReportLatencyResultsCallback report_latency_results_cb_; }; @@ -71,9 +76,10 @@ void* ConstructSUT(ClientData client_data, const char* name, size_t name_length, IssueQueryCallback issue_cb, FlushQueriesCallback flush_queries_cb, UserConstraintsMetCallback user_constraints_met_cb, + ReportTargetQPSCallback report_target_qps_cb, ReportLatencyResultsCallback report_latency_results_cb) { SystemUnderTestTrampoline* sut = new SystemUnderTestTrampoline( - client_data, std::string(name, name_length), issue_cb, flush_queries_cb, user_constraints_met_cb, report_latency_results_cb); + client_data, std::string(name, name_length), issue_cb, flush_queries_cb, user_constraints_met_cb, report_target_qps_cb, report_latency_results_cb); return reinterpret_cast(sut); } diff --git a/loadgen/bindings/c_api.h b/loadgen/bindings/c_api.h index 64b5d5e32d..758f03ba10 100644 --- a/loadgen/bindings/c_api.h +++ b/loadgen/bindings/c_api.h @@ -39,6 +39,7 @@ typedef uintptr_t ClientData; typedef void (*IssueQueryCallback)(ClientData, const QuerySample*, size_t); typedef void (*FlushQueriesCallback)(); typedef bool (*UserConstraintsMetCallback)(); +typedef void (*ReportTargetQPSCallback)(const double target_qps); typedef void (*ReportLatencyResultsCallback)(ClientData, const int64_t*, size_t); typedef void (*ResponseCallback)(ClientData, QuerySampleResponse*); @@ -55,8 +56,9 @@ void QuerySamplesCompleteResponseCb(QuerySampleResponse* responses, /// \brief Create an opaque SUT pointer based on C callbacks. void* ConstructSUT(ClientData client_data, const char* name, size_t name_length, IssueQueryCallback issue_cb, - UserConstraintsMetCallback user_constraints_met_cb, FlushQueriesCallback flush_queries_cb, + UserConstraintsMetCallback user_constraints_met_cb, + ReportTargetQPSCallback report_target_qps_cb, ReportLatencyResultsCallback report_latency_results_cb); /// \brief Destroys the SUT created by ConstructSUT. void DestroySUT(void* sut); diff --git a/loadgen/bindings/python_api.cc b/loadgen/bindings/python_api.cc index e9a5a426cb..ef5774806c 100644 --- a/loadgen/bindings/python_api.cc +++ b/loadgen/bindings/python_api.cc @@ -37,6 +37,7 @@ using FastIssueQueriesCallback = std::function, std::vector)>; using FlushQueriesCallback = std::function; using UserConstraintsMetCallback = std::function; +using ReportTargetQPSCallback = std::function; using ReportLatencyResultsCallback = std::function)>; // Forwards SystemUnderTest calls to relevant callbacks. @@ -46,11 +47,13 @@ class SystemUnderTestTrampoline : public SystemUnderTest { std::string name, IssueQueryCallback issue_cb, FlushQueriesCallback flush_queries_cb, UserConstraintsMetCallback user_constraints_met_cb, + ReportTargetQPSCallback report_target_qps_cb, ReportLatencyResultsCallback report_latency_results_cb) : name_(std::move(name)), issue_cb_(issue_cb), flush_queries_cb_(flush_queries_cb), user_constraints_met_cb_(user_constraints_met_cb), + report_target_qps_cb_(report_target_qps_cb), report_latency_results_cb_(report_latency_results_cb) {} ~SystemUnderTestTrampoline() override = default; @@ -65,6 +68,11 @@ class SystemUnderTestTrampoline : public SystemUnderTest { bool UserConstraintsMet() override { return user_constraints_met_cb_(); } + void ReportTargetQPS(const double target_qps) override { + pybind11::gil_scoped_acquire gil_acquirer; + report_target_qps_cb_(target_qps); + } + void ReportLatencyResults( const std::vector& latencies_ns) override { pybind11::gil_scoped_acquire gil_acquirer; @@ -76,6 +84,7 @@ class SystemUnderTestTrampoline : public SystemUnderTest { IssueQueryCallback issue_cb_; FlushQueriesCallback flush_queries_cb_; UserConstraintsMetCallback user_constraints_met_cb_; + ReportTargetQPSCallback report_target_qps_cb_; ReportLatencyResultsCallback report_latency_results_cb_; }; @@ -85,9 +94,11 @@ class FastSystemUnderTestTrampoline : public SystemUnderTestTrampoline { std::string name, FastIssueQueriesCallback fast_issue_cb, FlushQueriesCallback flush_queries_cb, UserConstraintsMetCallback user_constraints_met_cb, + ReportTargetQPSCallback report_target_qps_cb, ReportLatencyResultsCallback report_latency_results_cb) : SystemUnderTestTrampoline(name, nullptr, flush_queries_cb, user_constraints_met_cb, + report_target_qps_cb, report_latency_results_cb), fast_issue_cb_(fast_issue_cb) {} ~FastSystemUnderTestTrampoline() override = default; @@ -157,9 +168,10 @@ namespace py { uintptr_t ConstructSUT(IssueQueryCallback issue_cb, FlushQueriesCallback flush_queries_cb, UserConstraintsMetCallback user_constraints_met_cb, + ReportTargetQPSCallback report_target_qps_cb, ReportLatencyResultsCallback report_latency_results_cb) { SystemUnderTestTrampoline* sut = new SystemUnderTestTrampoline( - "PySUT", issue_cb, flush_queries_cb, user_constraints_met_cb, report_latency_results_cb); + "PySUT", issue_cb, flush_queries_cb, user_constraints_met_cb, report_target_qps_cb, report_latency_results_cb); return reinterpret_cast(sut); } @@ -173,9 +185,10 @@ uintptr_t ConstructFastSUT( FastIssueQueriesCallback fast_issue_cb, FlushQueriesCallback flush_queries_cb, UserConstraintsMetCallback user_constraints_met_cb, + ReportTargetQPSCallback report_target_qps_cb, ReportLatencyResultsCallback report_latency_results_cb) { FastSystemUnderTestTrampoline* sut = new FastSystemUnderTestTrampoline( - "PyFastSUT", fast_issue_cb, flush_queries_cb, user_constraints_met_cb, report_latency_results_cb); + "PyFastSUT", fast_issue_cb, flush_queries_cb, user_constraints_met_cb, report_target_qps_cb, report_latency_results_cb); return reinterpret_cast(sut); } diff --git a/loadgen/loadgen.cc b/loadgen/loadgen.cc index 82e65a34bb..db0e644204 100644 --- a/loadgen/loadgen.cc +++ b/loadgen/loadgen.cc @@ -372,6 +372,9 @@ PerformanceResult IssueQueries(SystemUnderTest* sut, const TestSettingsInternal& settings, const LoadableSampleSet& loaded_sample_set, SequenceGen* sequence_gen) { + // report target qps + sut->ReportTargetQPS(settings.target_qps); + // Create reponse handler. ResponseDelegateDetailed response_logger; std::uniform_real_distribution accuracy_log_offset_dist = diff --git a/loadgen/system_under_test.h b/loadgen/system_under_test.h index 6850713517..0cda54a5db 100644 --- a/loadgen/system_under_test.h +++ b/loadgen/system_under_test.h @@ -66,6 +66,10 @@ class SystemUnderTest { /// on both the user constraints and performance constraints virtual bool UserConstraintsMet() = 0; + /// \brief Reports the target qps to the SUT + /// recorded by the load generator. + virtual void ReportTargetQPS(const double target_qps) = 0; + /// \brief Reports the raw latency results to the SUT of each sample issued as /// recorded by the load generator. Units are nanoseconds. virtual void ReportLatencyResults(