1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
tools / perry.py [blame]
#!/usr/bin/env python
# Copyright 2017 The Chromium Authors
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Runs all permutations of pairs of tests in a gtest binary to attempt to
detect state leakage between tests.
Example invocation:
gn gen out/asan --args='is_asan=true enable_nacl=false is_debug=false'
ninja -C out/asan base_unittests
tools/perry.py out/asan/base_unittests > perry.log &
tail -f perry.log
You might want to run it in `screen` as it'll take a while.
"""
from __future__ import print_function
import argparse
import os
import multiprocessing
import subprocess
import sys
def _GetTestList(path_to_binary):
"""Returns a set of full test names.
Each test will be of the form "Case.Test". There will be a separate line
for each combination of Case/Test (there are often multiple tests in each
case).
"""
raw_output = subprocess.check_output([path_to_binary, "--gtest_list_tests"])
input_lines = raw_output.splitlines()
# The format of the gtest_list_tests output is:
# "Case1."
# " Test1 # <Optional extra stuff>"
# " Test2"
# "Case2."
# " Test1"
case_name = '' # Includes trailing dot.
test_set = set()
for line in input_lines:
if len(line) > 1:
if '#' in line:
line = line[:line.find('#')]
if line[0] == ' ':
# Indented means a test in previous case.
test_set.add(case_name + line.strip())
else:
# New test case.
case_name = line.strip()
return test_set
def _CheckForFailure(data):
test_binary, pair0, pair1 = data
p = subprocess.Popen(
[test_binary, '--gtest_repeat=5', '--gtest_shuffle',
'--gtest_filter=' + pair0 + ':' + pair1],
stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
out, _ = p.communicate()
if p.returncode != 0:
return (pair0, pair1, out)
return None
def _PrintStatus(i, total, failed):
status = '%d of %d tested (%d failures)' % (i+1, total, failed)
print('\r%s%s' % (status, '\x1B[K'), end=' ')
sys.stdout.flush()
def main():
parser = argparse.ArgumentParser(description="Find failing pairs of tests.")
parser.add_argument('binary', help='Path to gtest binary or wrapper script.')
args = parser.parse_args()
print('Getting test list...')
all_tests = _GetTestList(args.binary)
permuted = [(args.binary, x, y) for x in all_tests for y in all_tests]
failed = []
pool = multiprocessing.Pool()
total_count = len(permuted)
for i, result in enumerate(pool.imap_unordered(
_CheckForFailure, permuted, 1)):
if result:
print('\n--gtest_filter=%s:%s failed\n\n%s\n\n' % (result[0], result[1],
result[2]))
failed.append(result)
_PrintStatus(i, total_count, len(failed))
pool.terminate()
pool.join()
if failed:
print('Failed pairs:')
for f in failed:
print(f[0], f[1])
return 0
if __name__ == '__main__':
sys.exit(main())