Skip to content

Commit 4803d1b

Browse files
authored
Implement a new --failing-and-slow-first command line argument to test runner. (#24624)
This keeps track of results of previous test run, and on subsequent runs, failing tests are run first, then skipped tests, and last, successful tests in slowest-first order. This improves parallelism throughput of the suite. Add support for --failfast in the multithreaded test suite to help stop suite runs at first test failures quickly. These two flags `--failfast` and `--failing-and-slow-first` together can help achieve < 10 second test suite runs on a CI when the suite is failing. Example `core0` runtime with `test/runner core0` on a 16-core/32-thread system: ``` Total core time: 2818.016s. Wallclock time: 118.083s. Parallelization: 23.86x. ``` Same suite runtime with `test/runner --failing-and-slow-first core0`: ``` Total core time: 2940.180s. Wallclock time: 94.027s. Parallelization: 31.27x. ``` Gaining a better throughput and a -20.37% test suite wall time.
1 parent 1ba28a0 commit 4803d1b

File tree

3 files changed

+157
-15
lines changed

3 files changed

+157
-15
lines changed

test/common.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import hashlib
1616
import io
1717
import itertools
18+
import json
1819
import logging
1920
import multiprocessing
2021
import os
@@ -122,6 +123,7 @@ def configure(data_dir):
122123

123124
TEST_ROOT = path_from_root('test')
124125
LAST_TEST = path_from_root('out/last_test.txt')
126+
PREVIOUS_TEST_RUN_RESULTS_FILE = path_from_root('out/previous_test_run_results.json')
125127

126128
DEFAULT_BROWSER_DATA_DIR = path_from_root('out/browser-profile')
127129

@@ -142,6 +144,13 @@ def configure(data_dir):
142144
requires_network = unittest.skipIf(os.getenv('EMTEST_SKIP_NETWORK_TESTS'), 'This test requires network access')
143145

144146

147+
def load_previous_test_run_results():
148+
try:
149+
return json.load(open(PREVIOUS_TEST_RUN_RESULTS_FILE))
150+
except FileNotFoundError:
151+
return {}
152+
153+
145154
def test_file(*path_components):
146155
"""Construct a path relative to the emscripten "tests" directory."""
147156
return str(Path(TEST_ROOT, *path_components))

test/parallel_testsuite.py

Lines changed: 59 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
# University of Illinois/NCSA Open Source License. Both these licenses can be
44
# found in the LICENSE file.
55

6+
import json
67
import multiprocessing
78
import os
89
import sys
@@ -19,7 +20,12 @@
1920
seen_class = set()
2021

2122

22-
def run_test(test):
23+
def run_test(test, failfast_event):
24+
# If failfast mode is in effect and any of the tests have failed,
25+
# and then we should abort executing further tests immediately.
26+
if failfast_event and failfast_event.is_set():
27+
return None
28+
2329
olddir = os.getcwd()
2430
result = BufferedParallelTestResult()
2531
temp_dir = tempfile.mkdtemp(prefix='emtest_')
@@ -29,10 +35,16 @@ def run_test(test):
2935
seen_class.add(test.__class__)
3036
test.__class__.setUpClass()
3137
test(result)
38+
39+
# Alert all other multiprocess pool runners that they need to stop executing further tests.
40+
if failfast_event is not None and result.test_result not in ['success', 'skipped']:
41+
failfast_event.set()
3242
except unittest.SkipTest as e:
3343
result.addSkip(test, e)
3444
except Exception as e:
3545
result.addError(test, e)
46+
if failfast_event is not None:
47+
failfast_event.set()
3648
# Before attempting to delete the tmp dir make sure the current
3749
# working directory is not within it.
3850
os.chdir(olddir)
@@ -46,9 +58,11 @@ class ParallelTestSuite(unittest.BaseTestSuite):
4658
Creates worker threads, manages the task queue, and combines the results.
4759
"""
4860

49-
def __init__(self, max_cores):
61+
def __init__(self, max_cores, options):
5062
super().__init__()
5163
self.max_cores = max_cores
64+
self.failfast = options.failfast
65+
self.failing_and_slow_first = options.failing_and_slow_first
5266

5367
def addTest(self, test):
5468
super().addTest(test)
@@ -61,12 +75,42 @@ def run(self, result):
6175
# inherited by the child process, but can lead to hard-to-debug windows-only
6276
# issues.
6377
# multiprocessing.set_start_method('spawn')
64-
tests = list(self.reversed_tests())
78+
79+
# If we are running with --failing-and-slow-first, then the test list has been
80+
# pre-sorted based on previous test run results. Otherwise run the tests in
81+
# reverse alphabetical order.
82+
tests = list(self if self.failing_and_slow_first else self.reversed_tests())
6583
use_cores = cap_max_workers_in_pool(min(self.max_cores, len(tests), num_cores()))
6684
print('Using %s parallel test processes' % use_cores)
67-
pool = multiprocessing.Pool(use_cores)
68-
results = [pool.apply_async(run_test, (t,)) for t in tests]
69-
results = [r.get() for r in results]
85+
with multiprocessing.Manager() as manager:
86+
pool = multiprocessing.Pool(use_cores)
87+
failfast_event = manager.Event() if self.failfast else None
88+
results = [pool.apply_async(run_test, (t, failfast_event)) for t in tests]
89+
results = [r.get() for r in results]
90+
results = [r for r in results if r is not None]
91+
92+
if self.failing_and_slow_first:
93+
previous_test_run_results = common.load_previous_test_run_results()
94+
for r in results:
95+
# Save a test result record with the specific suite name (e.g. "core0.test_foo")
96+
test_failed = r.test_result not in ['success', 'skipped']
97+
98+
def update_test_results_to(test_name):
99+
fail_frequency = previous_test_run_results[test_name]['fail_frequency'] if test_name in previous_test_run_results else int(test_failed)
100+
# Apply exponential moving average with 50% weighting to merge previous fail frequency with new fail frequency
101+
fail_frequency = (fail_frequency + int(test_failed)) / 2
102+
previous_test_run_results[test_name] = {
103+
'result': r.test_result,
104+
'duration': r.test_duration,
105+
'fail_frequency': fail_frequency,
106+
}
107+
108+
update_test_results_to(r.test_name)
109+
# Also save a test result record without suite name (e.g. just "test_foo"). This enables different suite runs to order tests
110+
# for quick --failfast termination, in case a test fails in multiple suites
111+
update_test_results_to(r.test_name.split(' ')[0])
112+
113+
json.dump(previous_test_run_results, open(common.PREVIOUS_TEST_RUN_RESULTS_FILE, 'w'), indent=2)
70114
pool.close()
71115
pool.join()
72116
return self.combine_results(result, results)
@@ -104,6 +148,8 @@ class BufferedParallelTestResult:
104148
def __init__(self):
105149
self.buffered_result = None
106150
self.test_duration = 0
151+
self.test_result = 'errored'
152+
self.test_name = ''
107153

108154
@property
109155
def test(self):
@@ -122,6 +168,7 @@ def updateResult(self, result):
122168
result.core_time += self.test_duration
123169

124170
def startTest(self, test):
171+
self.test_name = str(test)
125172
self.start_time = time.perf_counter()
126173

127174
def stopTest(self, test):
@@ -132,26 +179,32 @@ def stopTest(self, test):
132179
def addSuccess(self, test):
133180
print(test, '... ok (%.2fs)' % (self.calculateElapsed()), file=sys.stderr)
134181
self.buffered_result = BufferedTestSuccess(test)
182+
self.test_result = 'success'
135183

136184
def addExpectedFailure(self, test, err):
137185
print(test, '... expected failure (%.2fs)' % (self.calculateElapsed()), file=sys.stderr)
138186
self.buffered_result = BufferedTestExpectedFailure(test, err)
187+
self.test_result = 'expected failure'
139188

140189
def addUnexpectedSuccess(self, test):
141190
print(test, '... unexpected success (%.2fs)' % (self.calculateElapsed()), file=sys.stderr)
142191
self.buffered_result = BufferedTestUnexpectedSuccess(test)
192+
self.test_result = 'unexpected success'
143193

144194
def addSkip(self, test, reason):
145195
print(test, "... skipped '%s'" % reason, file=sys.stderr)
146196
self.buffered_result = BufferedTestSkip(test, reason)
197+
self.test_result = 'skipped'
147198

148199
def addFailure(self, test, err):
149200
print(test, '... FAIL', file=sys.stderr)
150201
self.buffered_result = BufferedTestFailure(test, err)
202+
self.test_result = 'failed'
151203

152204
def addError(self, test, err):
153205
print(test, '... ERROR', file=sys.stderr)
154206
self.buffered_result = BufferedTestError(test, err)
207+
self.test_result = 'errored'
155208

156209

157210
class BufferedTestBase:

test/runner.py

Lines changed: 89 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import sys
3131
import unittest
3232
import time
33+
from functools import cmp_to_key
3334

3435
# Setup
3536

@@ -270,8 +271,84 @@ def error_on_legacy_suite_names(args):
270271
utils.exit_with_error('`%s` test suite has been replaced with `%s`', a, new)
271272

272273

273-
def load_test_suites(args, modules, start_at, repeat):
274-
found_start = not start_at
274+
# Creates a sorter object that sorts the test run order to find the best possible
275+
# order to run the tests in. Generally this is slowest-first to maximize
276+
# parallelization, but if running with fail-fast, then the tests with recent
277+
# known failure frequency are run first, followed by slowest first.
278+
def create_test_run_sorter(failfast):
279+
previous_test_run_results = common.load_previous_test_run_results()
280+
281+
def read_approx_fail_freq(test_name):
282+
if test_name in previous_test_run_results and 'fail_frequency' in previous_test_run_results[test_name]:
283+
# Quantize the float value to relatively fine-grained buckets for sorting.
284+
# This bucketization is needed to merge two competing sorting goals: we may
285+
# want to fail early (so tests with previous history of failures should sort first)
286+
# but we also want to run the slowest tests first.
287+
# We cannot sort for both goals at the same time, so have failure frequency
288+
# take priority over test runtime, and quantize the failures to distinct
289+
# frequencies, to be able to then sort by test runtime inside the same failure
290+
# frequency bucket.
291+
NUM_BUCKETS = 20
292+
return round(previous_test_run_results[test_name]['fail_frequency'] * NUM_BUCKETS) / NUM_BUCKETS
293+
return 0
294+
295+
def sort_tests_failing_and_slowest_first_comparator(x, y):
296+
x = str(x)
297+
y = str(y)
298+
299+
# Look at the number of times this test has failed, and order by failures count first
300+
# Only do this in --failfast, if we are looking to fail early. (otherwise sorting by last test run duration is more productive)
301+
if failfast:
302+
x_fail_freq = read_approx_fail_freq(x)
303+
y_fail_freq = read_approx_fail_freq(y)
304+
if x_fail_freq != y_fail_freq:
305+
return y_fail_freq - x_fail_freq
306+
307+
# Look at the number of times this test has failed overall in any other suite, and order by failures count first
308+
x_fail_freq = read_approx_fail_freq(x.split(' ')[0])
309+
y_fail_freq = read_approx_fail_freq(y.split(' ')[0])
310+
if x_fail_freq != y_fail_freq:
311+
return y_fail_freq - x_fail_freq
312+
313+
if x in previous_test_run_results:
314+
X = previous_test_run_results[x]
315+
316+
# if test Y has not been run even once, run Y before X
317+
if y not in previous_test_run_results:
318+
return 1
319+
Y = previous_test_run_results[y]
320+
321+
# If both X and Y have been run before, order the tests based on what the previous result was (failures first, skips very last)
322+
# N.b. it is important to sandwich all skipped tests between fails and successes. This is to maximize the chances that when
323+
# a failing test is detected, then the other cores will fail-fast as well. (successful tests are run slowest-first to help
324+
# scheduling)
325+
order_by_result = {'errored': 0, 'failed': 1, 'expected failure': 2, 'unexpected success': 3, 'skipped': 4, 'success': 5}
326+
x_result = order_by_result[X['result']]
327+
y_result = order_by_result[Y['result']]
328+
if x_result != y_result:
329+
return x_result - y_result
330+
331+
# Finally, order by test duration from last run
332+
if X['duration'] != Y['duration']:
333+
if X['result'] == 'success':
334+
# If both tests were successful tests, run the slower test first to improve parallelism
335+
return Y['duration'] - X['duration']
336+
else:
337+
# If both tests were failing tests, run the quicker test first to improve --failfast detection time
338+
return X['duration'] - Y['duration']
339+
340+
# if test X has not been run even once, but Y has, run X before Y
341+
if y in previous_test_run_results:
342+
return -1
343+
344+
# Neither test have been run before, so run them in alphabetical order
345+
return (x > y) - (x < y)
346+
347+
return sort_tests_failing_and_slowest_first_comparator
348+
349+
350+
def load_test_suites(args, modules, options):
351+
found_start = not options.start_at
275352

276353
loader = unittest.TestLoader()
277354
error_on_legacy_suite_names(args)
@@ -291,20 +368,22 @@ def load_test_suites(args, modules, start_at, repeat):
291368
if names_in_module:
292369
loaded_tests = loader.loadTestsFromNames(sorted(names_in_module), m)
293370
tests = flattened_tests(loaded_tests)
294-
suite = suite_for_module(m, tests)
371+
suite = suite_for_module(m, tests, options)
372+
if options.failing_and_slow_first:
373+
tests = sorted(tests, key=cmp_to_key(create_test_run_sorter(options.failfast)))
295374
for test in tests:
296375
if not found_start:
297376
# Skip over tests until we find the start
298-
if test.id().endswith(start_at):
377+
if test.id().endswith(options.start_at):
299378
found_start = True
300379
else:
301380
continue
302-
for _x in range(repeat):
381+
for _x in range(options.repeat):
303382
total_tests += 1
304383
suite.addTest(test)
305384
suites.append((m.__name__, suite))
306385
if not found_start:
307-
utils.exit_with_error(f'unable to find --start-at test: {start_at}')
386+
utils.exit_with_error(f'unable to find --start-at test: {options.start_at}')
308387
if total_tests == 1 or parallel_testsuite.num_cores() == 1:
309388
# TODO: perhaps leave it at 2 if it was 2 before?
310389
common.EMTEST_SAVE_DIR = 1
@@ -318,13 +397,13 @@ def flattened_tests(loaded_tests):
318397
return tests
319398

320399

321-
def suite_for_module(module, tests):
400+
def suite_for_module(module, tests, options):
322401
suite_supported = module.__name__ in ('test_core', 'test_other', 'test_posixtest')
323402
if not common.EMTEST_SAVE_DIR and not shared.DEBUG:
324403
has_multiple_tests = len(tests) > 1
325404
has_multiple_cores = parallel_testsuite.num_cores() > 1
326405
if suite_supported and has_multiple_tests and has_multiple_cores:
327-
return parallel_testsuite.ParallelTestSuite(len(tests))
406+
return parallel_testsuite.ParallelTestSuite(len(tests), options)
328407
return unittest.TestSuite()
329408

330409

@@ -398,6 +477,7 @@ def parse_args():
398477
help='Use the default CI browser configuration.')
399478
parser.add_argument('tests', nargs='*')
400479
parser.add_argument('--failfast', action='store_true')
480+
parser.add_argument('--failing-and-slow-first', action='store_true', help='Run failing tests first, then sorted by slowest first. Combine with --failfast for fast fail-early CI runs.')
401481
parser.add_argument('--start-at', metavar='NAME', help='Skip all tests up until <NAME>')
402482
parser.add_argument('--continue', dest='_continue', action='store_true',
403483
help='Resume from the last run test.'
@@ -496,7 +576,7 @@ def prepend_default(arg):
496576
if os.path.exists(common.LAST_TEST):
497577
options.start_at = utils.read_file(common.LAST_TEST).strip()
498578

499-
suites, unmatched_tests = load_test_suites(tests, modules, options.start_at, options.repeat)
579+
suites, unmatched_tests = load_test_suites(tests, modules, options)
500580
if unmatched_tests:
501581
print('ERROR: could not find the following tests: ' + ' '.join(unmatched_tests))
502582
return 1

0 commit comments

Comments
 (0)