OLD | NEW |
1 # Copyright 2014 The Chromium Authors. All rights reserved. | 1 # Copyright 2014 The Chromium Authors. All rights reserved. |
2 # Use of this source code is governed by a BSD-style license that can be | 2 # Use of this source code is governed by a BSD-style license that can be |
3 # found in the LICENSE file. | 3 # found in the LICENSE file. |
4 | 4 |
5 import Queue | 5 import Queue |
6 import glob | 6 import glob |
7 import logging | 7 import logging |
8 import multiprocessing | 8 import multiprocessing |
9 import os | 9 import os |
10 import re | 10 import re |
11 import shutil | 11 import shutil |
12 import signal | 12 import signal |
13 import sys | 13 import sys |
14 import tempfile | 14 import tempfile |
15 import traceback | 15 import traceback |
16 | 16 |
17 from cStringIO import StringIO | 17 from cStringIO import StringIO |
18 | 18 |
| 19 from expect_tests.tempdir import TempDir |
19 from expect_tests.type_definitions import ( | 20 from expect_tests.type_definitions import ( |
20 Test, UnknownError, TestError, NoMatchingTestsError, MultiTest, | 21 Test, UnknownError, TestError, NoMatchingTestsError, MultiTest, |
21 Result, ResultStageAbort) | 22 Result, ResultStageAbort) |
22 | 23 |
23 from expect_tests import util | 24 from expect_tests import util |
24 | 25 |
25 | 26 |
26 OLD_TEMP_DIR = None | |
27 TEMP_PID_FILE = '.expect_tests_pidfile' | |
28 | |
29 | |
30 class ResetableStringIO(object): | 27 class ResetableStringIO(object): |
31 def __init__(self): | 28 def __init__(self): |
32 self._stream = StringIO() | 29 self._stream = StringIO() |
33 | 30 |
34 def reset(self): | 31 def reset(self): |
35 self._stream = StringIO() | 32 self._stream = StringIO() |
36 | 33 |
37 def __getattr__(self, key): | 34 def __getattr__(self, key): |
38 return getattr(self._stream, key) | 35 return getattr(self._stream, key) |
39 | 36 |
40 | 37 |
41 def set_temp_dir(): | |
42 """Provides an automatically-cleaned temporary directory base for tests.""" | |
43 global OLD_TEMP_DIR | |
44 | |
45 if OLD_TEMP_DIR: | |
46 return | |
47 | |
48 suffix = '.expect_tests_temp' | |
49 tempdir = tempfile.mkdtemp(suffix) | |
50 for p in glob.glob(os.path.join(os.path.dirname(tempdir), '*'+suffix)): | |
51 if p == tempdir: | |
52 continue | |
53 | |
54 pfile = os.path.join(p, TEMP_PID_FILE) | |
55 if os.path.exists(pfile): | |
56 with open(pfile, 'rb') as f: | |
57 try: | |
58 os.kill(int(f.read()), 0) | |
59 continue | |
60 except (OSError, TypeError): | |
61 pass | |
62 | |
63 shutil.rmtree(p, ignore_errors=True) | |
64 | |
65 with open(os.path.join(tempdir, TEMP_PID_FILE), 'wb') as f: | |
66 f.write(str(os.getpid())) | |
67 | |
68 OLD_TEMP_DIR = tempfile.tempdir | |
69 tempfile.tempdir = tempdir | |
70 | |
71 | |
72 def clear_temp_dir(): | |
73 global OLD_TEMP_DIR | |
74 if OLD_TEMP_DIR: | |
75 # Try to nuke the pidfile first | |
76 pfile = os.path.join(tempfile.tempdir, TEMP_PID_FILE) | |
77 try: | |
78 os.unlink(pfile) | |
79 except OSError as e: | |
80 print >> sys.stderr, "Error removing %r: %s" % (pfile, e) | |
81 shutil.rmtree(tempfile.tempdir, ignore_errors=True) | |
82 tempfile.temdir = OLD_TEMP_DIR | |
83 OLD_TEMP_DIR = None | |
84 | |
85 | |
86 def gen_loop_process(gens, test_queue, result_queue, opts, kill_switch, | 38 def gen_loop_process(gens, test_queue, result_queue, opts, kill_switch, |
87 cover_ctx, temp_dir): | 39 cover_ctx, temp_dir): |
88 """Generate `Test`s from |gens|, and feed them into |test_queue|. | 40 """Generate `Test`s from |gens|, and feed them into |test_queue|. |
89 | 41 |
90 Non-Test instances will be translated into `UnknownError` objects. | 42 Non-Test instances will be translated into `UnknownError` objects. |
91 | 43 |
92 @param gens: list of generators yielding Test() instances. | 44 @param gens: list of generators yielding Test() instances. |
93 @type test_queue: multiprocessing.Queue() | 45 @type test_queue: multiprocessing.Queue() |
94 @type result_queue: multiprocessing.Queue() | 46 @type result_queue: multiprocessing.Queue() |
95 @type opts: argparse.Namespace | 47 @type opts: argparse.Namespace |
(...skipping 158 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
254 signal.signal(signal.SIGTERM, signal.SIG_DFL) | 206 signal.signal(signal.SIGTERM, signal.SIG_DFL) |
255 signal.signal(signal.SIGINT, handle_killswitch) | 207 signal.signal(signal.SIGINT, handle_killswitch) |
256 signal.signal(signal.SIGTERM, handle_killswitch) | 208 signal.signal(signal.SIGTERM, handle_killswitch) |
257 | 209 |
258 # This flag is set when test generation has finished. | 210 # This flag is set when test generation has finished. |
259 test_gen_finished = multiprocessing.Event() | 211 test_gen_finished = multiprocessing.Event() |
260 | 212 |
261 test_queue = multiprocessing.Queue() | 213 test_queue = multiprocessing.Queue() |
262 result_queue = multiprocessing.Queue() | 214 result_queue = multiprocessing.Queue() |
263 | 215 |
264 set_temp_dir() | 216 with TempDir() as temp_dir: |
| 217 test_gen_args = ( |
| 218 test_gens, test_queue, result_queue, opts, kill_switch, cover_ctx, |
| 219 temp_dir |
| 220 ) |
265 | 221 |
266 test_gen_args = ( | 222 procs = [] |
267 test_gens, test_queue, result_queue, opts, kill_switch, | 223 if opts.handler.SKIP_RUNLOOP: |
268 cover_ctx, tempfile.tempdir | 224 gen_loop_process(*test_gen_args) |
269 ) | 225 else: |
| 226 procs = [multiprocessing.Process( |
| 227 target=gen_loop_process, args=test_gen_args, |
| 228 name='gen_loop_process') |
| 229 ] |
270 | 230 |
271 procs = [] | 231 procs += [ |
272 if opts.handler.SKIP_RUNLOOP: | 232 multiprocessing.Process( |
273 gen_loop_process(*test_gen_args) | 233 target=run_loop_process, |
274 else: | 234 args=(test_queue, result_queue, opts, kill_switch, |
275 procs = [multiprocessing.Process( | 235 test_gen_finished, cover_ctx, temp_dir), |
276 target=gen_loop_process, args=test_gen_args)] | 236 name='run_loop_process %d' % job_num) |
| 237 for job_num in xrange(opts.jobs) |
| 238 ] |
277 | 239 |
278 procs += [ | 240 for p in procs: |
279 multiprocessing.Process( | 241 p.daemon = True |
280 target=run_loop_process, args=( | 242 p.start() |
281 test_queue, result_queue, opts, kill_switch, test_gen_finished, | |
282 cover_ctx, tempfile.tempdir)) | |
283 for _ in xrange(opts.jobs) | |
284 ] | |
285 | 243 |
286 for p in procs: | 244 error = False |
287 p.daemon = True | |
288 p.start() | |
289 | 245 |
290 error = False | 246 try: |
| 247 def generate_objects(): |
| 248 while not kill_switch.is_set(): |
| 249 while not kill_switch.is_set(): |
| 250 try: |
| 251 yield result_queue.get(timeout=0.1) |
| 252 except Queue.Empty: |
| 253 break |
291 | 254 |
292 try: | 255 # Check if gen_loop_process has terminated. |
293 def generate_objects(): | 256 if procs and not procs[0].is_alive(): |
294 while not kill_switch.is_set(): | 257 # Signal all run_loop_process that they can exit. |
| 258 test_gen_finished.set() |
| 259 if not any(p.is_alive() for p in procs): |
| 260 break |
| 261 |
| 262 # Get everything still in the queue. Still need timeout, but since |
| 263 # nothing is going to be adding stuff to the queue, use a very short |
| 264 # timeout. |
295 while not kill_switch.is_set(): | 265 while not kill_switch.is_set(): |
296 try: | 266 try: |
297 yield result_queue.get(timeout=0.1) | 267 yield result_queue.get(timeout=0.00001) |
298 except Queue.Empty: | 268 except Queue.Empty: |
299 break | 269 break |
300 | 270 |
301 # Check if gen_loop_process has terminated. | 271 if kill_switch.is_set(): |
302 if procs and not procs[0].is_alive(): | 272 raise ResultStageAbort() |
303 # Signal all run_loop_process that they can exit. | 273 error = opts.handler.result_stage_loop(opts, generate_objects()) |
304 test_gen_finished.set() | 274 except ResultStageAbort: |
305 if not any(p.is_alive() for p in procs): | 275 pass |
306 break | |
307 | |
308 # Get everything still in the queue. Still need timeout, but since nothing | |
309 # is going to be adding stuff to the queue, use a very short timeout. | |
310 while not kill_switch.is_set(): | |
311 try: | |
312 yield result_queue.get(timeout=0.00001) | |
313 except Queue.Empty: | |
314 break | |
315 | |
316 if kill_switch.is_set(): | |
317 raise ResultStageAbort() | |
318 error = opts.handler.result_stage_loop(opts, generate_objects()) | |
319 except ResultStageAbort: | |
320 pass | |
321 finally: | |
322 clear_temp_dir() | |
323 | 276 |
324 for p in procs: | 277 for p in procs: |
325 p.join() | 278 p.join() |
326 | 279 |
327 if not kill_switch.is_set() and not result_queue.empty(): | 280 if not kill_switch.is_set() and not result_queue.empty(): |
328 error = True | 281 error = True |
329 | 282 |
330 return error, kill_switch.is_set() | 283 return error, kill_switch.is_set() |
OLD | NEW |