OLD | NEW |
| (Empty) |
1 # Copyright 2014 The Chromium Authors. All rights reserved. | |
2 # Use of this source code is governed by a BSD-style license that can be | |
3 # found in the LICENSE file. | |
4 | |
5 import Queue | |
6 import glob | |
7 import logging | |
8 import multiprocessing | |
9 import re | |
10 import signal | |
11 import traceback | |
12 | |
13 from cStringIO import StringIO | |
14 | |
15 from .type_definitions import ( | |
16 Test, UnknownError, TestError, NoMatchingTestsError, | |
17 Result, ResultStageAbort) | |
18 | |
19 from . import util | |
20 | |
21 | |
22 class ResetableStringIO(object): | |
23 def __init__(self): | |
24 self._stream = StringIO() | |
25 | |
26 def reset(self): | |
27 self._stream = StringIO() | |
28 | |
29 def __getattr__(self, key): | |
30 return getattr(self._stream, key) | |
31 | |
32 | |
33 def gen_loop_process(gen, test_queue, result_queue, opts, kill_switch, | |
34 cover_ctx): | |
35 """Generate `Test`'s from |gen|, and feed them into |test_queue|. | |
36 | |
37 Non-Test instances will be translated into `UnknownError` objects. | |
38 | |
39 On completion, feed |opts.jobs| None objects into |test_queue|. | |
40 | |
41 @param gen: generator yielding Test() instances. | |
42 @type test_queue: multiprocessing.Queue() | |
43 @type result_queue: multiprocessing.Queue() | |
44 @type opts: argparse.Namespace | |
45 @type kill_switch: multiprocessing.Event() | |
46 @type cover_ctx: cover.CoverageContext().create_subprocess_context() | |
47 """ | |
48 # Implicitly append '*'' to globs that don't specify it. | |
49 globs = ['%s%s' % (g, '*' if '*' not in g else '') for g in opts.test_glob] | |
50 | |
51 matcher = re.compile( | |
52 '^%s$' % '|'.join('(?:%s)' % glob.fnmatch.translate(g) | |
53 for g in globs if g[0] != '-')) | |
54 if matcher.pattern == '^$': | |
55 matcher = re.compile('^.*$') | |
56 | |
57 neg_matcher = re.compile( | |
58 '^%s$' % '|'.join('(?:%s)' % glob.fnmatch.translate(g[1:]) | |
59 for g in globs if g[0] == '-')) | |
60 | |
61 def generate_tests(): | |
62 paths_seen = set() | |
63 seen_tests = False | |
64 try: | |
65 with cover_ctx: | |
66 gen_inst = gen() | |
67 | |
68 while not kill_switch.is_set(): | |
69 with cover_ctx: | |
70 root_test = next(gen_inst) | |
71 | |
72 if kill_switch.is_set(): | |
73 break | |
74 | |
75 ok_tests = [] | |
76 subtests = root_test.tests | |
77 | |
78 for subtest in subtests: | |
79 if not isinstance(subtest, Test): | |
80 result_queue.put_nowait( | |
81 UnknownError('Got non-[Multi]Test isinstance from generator: %r' | |
82 % subtest)) | |
83 continue | |
84 | |
85 test_path = subtest.expect_path() | |
86 if test_path is not None and test_path in paths_seen: | |
87 result_queue.put_nowait( | |
88 TestError(subtest, 'Duplicate expectation path!')) | |
89 else: | |
90 if test_path is not None: | |
91 paths_seen.add(test_path) | |
92 name = subtest.name | |
93 if not neg_matcher.match(name) and matcher.match(name): | |
94 ok_tests.append(subtest) | |
95 | |
96 if ok_tests: | |
97 seen_tests = True | |
98 yield root_test.restrict(ok_tests) | |
99 | |
100 if not seen_tests: | |
101 result_queue.put_nowait(NoMatchingTestsError()) | |
102 except StopIteration: | |
103 pass | |
104 except KeyboardInterrupt: | |
105 pass | |
106 finally: | |
107 for _ in xrange(opts.jobs): | |
108 test_queue.put_nowait(None) | |
109 | |
110 | |
111 next_stage = (result_queue if opts.handler.SKIP_RUNLOOP else test_queue) | |
112 opts.handler.gen_stage_loop(opts, generate_tests(), next_stage.put_nowait, | |
113 result_queue.put_nowait) | |
114 | |
115 | |
116 def run_loop_process(test_queue, result_queue, opts, kill_switch, cover_ctx): | |
117 """Consume `Test` instances from |test_queue|, run them, and yield the results | |
118 into opts.run_stage_loop(). | |
119 | |
120 Generates coverage data as a side-effect. | |
121 | |
122 @type test_queue: multiprocessing.Queue() | |
123 @type result_queue: multiprocessing.Queue() | |
124 @type opts: argparse.Namespace | |
125 @type kill_switch: multiprocessing.Event() | |
126 @type cover_ctx: cover.CoverageContext().create_subprocess_context() | |
127 """ | |
128 logstream = ResetableStringIO() | |
129 logger = logging.getLogger() | |
130 logger.setLevel(logging.DEBUG) | |
131 shandler = logging.StreamHandler(logstream) | |
132 shandler.setFormatter( | |
133 logging.Formatter('%(levelname)s: %(message)s')) | |
134 logger.addHandler(shandler) | |
135 | |
136 SKIP = object() | |
137 def process_test(subtest): | |
138 logstream.reset() | |
139 with cover_ctx(include=subtest.coverage_includes()): | |
140 subresult = subtest.run() | |
141 if isinstance(subresult, TestError): | |
142 result_queue.put_nowait(subresult) | |
143 return SKIP | |
144 elif not isinstance(subresult, Result): | |
145 result_queue.put_nowait( | |
146 TestError( | |
147 subtest, | |
148 'Got non-Result instance from test: %r' % subresult)) | |
149 return SKIP | |
150 return subresult | |
151 | |
152 def generate_tests_results(): | |
153 try: | |
154 while not kill_switch.is_set(): | |
155 try: | |
156 test = test_queue.get(timeout=0.1) | |
157 if test is None: | |
158 break | |
159 except Queue.Empty: | |
160 continue | |
161 | |
162 try: | |
163 for subtest, subresult in test.process(process_test): | |
164 if subresult is not SKIP: | |
165 yield subtest, subresult, logstream.getvalue().splitlines() | |
166 except Exception: | |
167 result_queue.put_nowait( | |
168 TestError(test, traceback.format_exc(), | |
169 logstream.getvalue().splitlines())) | |
170 except KeyboardInterrupt: | |
171 pass | |
172 | |
173 opts.handler.run_stage_loop(opts, generate_tests_results(), | |
174 result_queue.put_nowait) | |
175 | |
176 | |
177 def result_loop(test_gen, cover_ctx, opts): | |
178 kill_switch = multiprocessing.Event() | |
179 def handle_killswitch(*_): | |
180 kill_switch.set() | |
181 # Reset the signal to DFL so that double ctrl-C kills us for sure. | |
182 signal.signal(signal.SIGINT, signal.SIG_DFL) | |
183 signal.signal(signal.SIGTERM, signal.SIG_DFL) | |
184 signal.signal(signal.SIGINT, handle_killswitch) | |
185 signal.signal(signal.SIGTERM, handle_killswitch) | |
186 | |
187 test_queue = multiprocessing.Queue() | |
188 result_queue = multiprocessing.Queue() | |
189 | |
190 gen_cover_ctx = cover_ctx | |
191 if cover_ctx.enabled: | |
192 gen_cover_ctx = cover_ctx(include=util.get_cover_list(test_gen)) | |
193 | |
194 test_gen_args = ( | |
195 test_gen, test_queue, result_queue, opts, kill_switch, gen_cover_ctx) | |
196 | |
197 procs = [] | |
198 if opts.handler.SKIP_RUNLOOP: | |
199 gen_loop_process(*test_gen_args) | |
200 else: | |
201 procs = [multiprocessing.Process( | |
202 target=gen_loop_process, args=test_gen_args)] | |
203 | |
204 procs += [ | |
205 multiprocessing.Process( | |
206 target=run_loop_process, args=( | |
207 test_queue, result_queue, opts, kill_switch, cover_ctx)) | |
208 for _ in xrange(opts.jobs) | |
209 ] | |
210 | |
211 for p in procs: | |
212 p.daemon = True | |
213 p.start() | |
214 | |
215 error = False | |
216 try: | |
217 def generate_objects(): | |
218 while not kill_switch.is_set(): | |
219 while not kill_switch.is_set(): | |
220 try: | |
221 yield result_queue.get(timeout=0.1) | |
222 except Queue.Empty: | |
223 break | |
224 | |
225 if not any(p.is_alive() for p in procs): | |
226 break | |
227 | |
228 # Get everything still in the queue. Still need timeout, but since nothing | |
229 # is going to be adding stuff to the queue, use a very short timeout. | |
230 while not kill_switch.is_set(): | |
231 try: | |
232 yield result_queue.get(timeout=0.00001) | |
233 except Queue.Empty: | |
234 break | |
235 | |
236 if kill_switch.is_set(): | |
237 raise ResultStageAbort() | |
238 error = opts.handler.result_stage_loop(opts, generate_objects()) | |
239 except ResultStageAbort: | |
240 pass | |
241 | |
242 for p in procs: | |
243 p.join() | |
244 | |
245 if not kill_switch.is_set() and not result_queue.empty(): | |
246 error = True | |
247 | |
248 return error, kill_switch.is_set() | |
OLD | NEW |