Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(329)

Side by Side Diff: expect_tests/pipeline.py

Issue 556313004: Added TempDir (Closed) Base URL: https://chromium.googlesource.com/infra/testing/expect_tests@shebang
Patch Set: Added process names Created 6 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « no previous file | expect_tests/tempdir.py » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 # Copyright 2014 The Chromium Authors. All rights reserved. 1 # Copyright 2014 The Chromium Authors. All rights reserved.
2 # Use of this source code is governed by a BSD-style license that can be 2 # Use of this source code is governed by a BSD-style license that can be
3 # found in the LICENSE file. 3 # found in the LICENSE file.
4 4
5 import Queue 5 import Queue
6 import glob 6 import glob
7 import logging 7 import logging
8 import multiprocessing 8 import multiprocessing
9 import os 9 import os
10 import re 10 import re
11 import shutil 11 import shutil
12 import signal 12 import signal
13 import sys 13 import sys
14 import tempfile 14 import tempfile
15 import traceback 15 import traceback
16 16
17 from cStringIO import StringIO 17 from cStringIO import StringIO
18 18
19 from expect_tests.tempdir import TempDir
19 from expect_tests.type_definitions import ( 20 from expect_tests.type_definitions import (
20 Test, UnknownError, TestError, NoMatchingTestsError, MultiTest, 21 Test, UnknownError, TestError, NoMatchingTestsError, MultiTest,
21 Result, ResultStageAbort) 22 Result, ResultStageAbort)
22 23
23 from expect_tests import util 24 from expect_tests import util
24 25
25 26
26 OLD_TEMP_DIR = None
27 TEMP_PID_FILE = '.expect_tests_pidfile'
28
29
30 class ResetableStringIO(object): 27 class ResetableStringIO(object):
31 def __init__(self): 28 def __init__(self):
32 self._stream = StringIO() 29 self._stream = StringIO()
33 30
34 def reset(self): 31 def reset(self):
35 self._stream = StringIO() 32 self._stream = StringIO()
36 33
37 def __getattr__(self, key): 34 def __getattr__(self, key):
38 return getattr(self._stream, key) 35 return getattr(self._stream, key)
39 36
40 37
41 def set_temp_dir():
42 """Provides an automatically-cleaned temporary directory base for tests."""
43 global OLD_TEMP_DIR
44
45 if OLD_TEMP_DIR:
46 return
47
48 suffix = '.expect_tests_temp'
49 tempdir = tempfile.mkdtemp(suffix)
50 for p in glob.glob(os.path.join(os.path.dirname(tempdir), '*'+suffix)):
51 if p == tempdir:
52 continue
53
54 pfile = os.path.join(p, TEMP_PID_FILE)
55 if os.path.exists(pfile):
56 with open(pfile, 'rb') as f:
57 try:
58 os.kill(int(f.read()), 0)
59 continue
60 except (OSError, TypeError):
61 pass
62
63 shutil.rmtree(p, ignore_errors=True)
64
65 with open(os.path.join(tempdir, TEMP_PID_FILE), 'wb') as f:
66 f.write(str(os.getpid()))
67
68 OLD_TEMP_DIR = tempfile.tempdir
69 tempfile.tempdir = tempdir
70
71
72 def clear_temp_dir():
73 global OLD_TEMP_DIR
74 if OLD_TEMP_DIR:
75 # Try to nuke the pidfile first
76 pfile = os.path.join(tempfile.tempdir, TEMP_PID_FILE)
77 try:
78 os.unlink(pfile)
79 except OSError as e:
80 print >> sys.stderr, "Error removing %r: %s" % (pfile, e)
81 shutil.rmtree(tempfile.tempdir, ignore_errors=True)
82 tempfile.temdir = OLD_TEMP_DIR
83 OLD_TEMP_DIR = None
84
85
86 def gen_loop_process(gens, test_queue, result_queue, opts, kill_switch, 38 def gen_loop_process(gens, test_queue, result_queue, opts, kill_switch,
87 cover_ctx, temp_dir): 39 cover_ctx, temp_dir):
88 """Generate `Test`s from |gens|, and feed them into |test_queue|. 40 """Generate `Test`s from |gens|, and feed them into |test_queue|.
89 41
90 Non-Test instances will be translated into `UnknownError` objects. 42 Non-Test instances will be translated into `UnknownError` objects.
91 43
92 @param gens: list of generators yielding Test() instances. 44 @param gens: list of generators yielding Test() instances.
93 @type test_queue: multiprocessing.Queue() 45 @type test_queue: multiprocessing.Queue()
94 @type result_queue: multiprocessing.Queue() 46 @type result_queue: multiprocessing.Queue()
95 @type opts: argparse.Namespace 47 @type opts: argparse.Namespace
(...skipping 158 matching lines...) Expand 10 before | Expand all | Expand 10 after
254 signal.signal(signal.SIGTERM, signal.SIG_DFL) 206 signal.signal(signal.SIGTERM, signal.SIG_DFL)
255 signal.signal(signal.SIGINT, handle_killswitch) 207 signal.signal(signal.SIGINT, handle_killswitch)
256 signal.signal(signal.SIGTERM, handle_killswitch) 208 signal.signal(signal.SIGTERM, handle_killswitch)
257 209
258 # This flag is set when test generation has finished. 210 # This flag is set when test generation has finished.
259 test_gen_finished = multiprocessing.Event() 211 test_gen_finished = multiprocessing.Event()
260 212
261 test_queue = multiprocessing.Queue() 213 test_queue = multiprocessing.Queue()
262 result_queue = multiprocessing.Queue() 214 result_queue = multiprocessing.Queue()
263 215
264 set_temp_dir() 216 with TempDir() as temp_dir:
217 test_gen_args = (
218 test_gens, test_queue, result_queue, opts, kill_switch, cover_ctx,
219 temp_dir
220 )
265 221
266 test_gen_args = ( 222 procs = []
267 test_gens, test_queue, result_queue, opts, kill_switch, 223 if opts.handler.SKIP_RUNLOOP:
268 cover_ctx, tempfile.tempdir 224 gen_loop_process(*test_gen_args)
269 ) 225 else:
226 procs = [multiprocessing.Process(
227 target=gen_loop_process, args=test_gen_args,
228 name='gen_loop_process')
229 ]
270 230
271 procs = [] 231 procs += [
272 if opts.handler.SKIP_RUNLOOP: 232 multiprocessing.Process(
273 gen_loop_process(*test_gen_args) 233 target=run_loop_process,
274 else: 234 args=(test_queue, result_queue, opts, kill_switch,
275 procs = [multiprocessing.Process( 235 test_gen_finished, cover_ctx, temp_dir),
276 target=gen_loop_process, args=test_gen_args)] 236 name='run_loop_process %d' % job_num)
237 for job_num in xrange(opts.jobs)
238 ]
277 239
278 procs += [ 240 for p in procs:
279 multiprocessing.Process( 241 p.daemon = True
280 target=run_loop_process, args=( 242 p.start()
281 test_queue, result_queue, opts, kill_switch, test_gen_finished,
282 cover_ctx, tempfile.tempdir))
283 for _ in xrange(opts.jobs)
284 ]
285 243
286 for p in procs: 244 error = False
287 p.daemon = True
288 p.start()
289 245
290 error = False 246 try:
247 def generate_objects():
248 while not kill_switch.is_set():
249 while not kill_switch.is_set():
250 try:
251 yield result_queue.get(timeout=0.1)
252 except Queue.Empty:
253 break
291 254
292 try: 255 # Check if gen_loop_process has terminated.
293 def generate_objects(): 256 if procs and not procs[0].is_alive():
294 while not kill_switch.is_set(): 257 # Signal all run_loop_process that they can exit.
258 test_gen_finished.set()
259 if not any(p.is_alive() for p in procs):
260 break
261
262 # Get everything still in the queue. Still need timeout, but since
263 # nothing is going to be adding stuff to the queue, use a very short
264 # timeout.
295 while not kill_switch.is_set(): 265 while not kill_switch.is_set():
296 try: 266 try:
297 yield result_queue.get(timeout=0.1) 267 yield result_queue.get(timeout=0.00001)
298 except Queue.Empty: 268 except Queue.Empty:
299 break 269 break
300 270
301 # Check if gen_loop_process has terminated. 271 if kill_switch.is_set():
302 if procs and not procs[0].is_alive(): 272 raise ResultStageAbort()
303 # Signal all run_loop_process that they can exit. 273 error = opts.handler.result_stage_loop(opts, generate_objects())
304 test_gen_finished.set() 274 except ResultStageAbort:
305 if not any(p.is_alive() for p in procs): 275 pass
306 break
307
308 # Get everything still in the queue. Still need timeout, but since nothing
309 # is going to be adding stuff to the queue, use a very short timeout.
310 while not kill_switch.is_set():
311 try:
312 yield result_queue.get(timeout=0.00001)
313 except Queue.Empty:
314 break
315
316 if kill_switch.is_set():
317 raise ResultStageAbort()
318 error = opts.handler.result_stage_loop(opts, generate_objects())
319 except ResultStageAbort:
320 pass
321 finally:
322 clear_temp_dir()
323 276
324 for p in procs: 277 for p in procs:
325 p.join() 278 p.join()
326 279
327 if not kill_switch.is_set() and not result_queue.empty(): 280 if not kill_switch.is_set() and not result_queue.empty():
328 error = True 281 error = True
329 282
330 return error, kill_switch.is_set() 283 return error, kill_switch.is_set()
OLDNEW
« no previous file with comments | « no previous file | expect_tests/tempdir.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698