Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(3523)

Unified Diff: expect_tests/pipeline.py

Issue 556313004: Added TempDir (Closed) Base URL: https://chromium.googlesource.com/infra/testing/expect_tests@shebang
Patch Set: Added process names Created 6 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « no previous file | expect_tests/tempdir.py » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: expect_tests/pipeline.py
diff --git a/expect_tests/pipeline.py b/expect_tests/pipeline.py
index 99e409c2f4168edf203ade2fb7eee5cb2a855ab2..ecb894ce50c8b12baa0b2ad366f33e0dadcb5a16 100644
--- a/expect_tests/pipeline.py
+++ b/expect_tests/pipeline.py
@@ -16,6 +16,7 @@ import traceback
from cStringIO import StringIO
+from expect_tests.tempdir import TempDir
from expect_tests.type_definitions import (
Test, UnknownError, TestError, NoMatchingTestsError, MultiTest,
Result, ResultStageAbort)
@@ -23,10 +24,6 @@ from expect_tests.type_definitions import (
from expect_tests import util
-OLD_TEMP_DIR = None
-TEMP_PID_FILE = '.expect_tests_pidfile'
-
-
class ResetableStringIO(object):
def __init__(self):
self._stream = StringIO()
@@ -38,51 +35,6 @@ class ResetableStringIO(object):
return getattr(self._stream, key)
-def set_temp_dir():
- """Provides an automatically-cleaned temporary directory base for tests."""
- global OLD_TEMP_DIR
-
- if OLD_TEMP_DIR:
- return
-
- suffix = '.expect_tests_temp'
- tempdir = tempfile.mkdtemp(suffix)
- for p in glob.glob(os.path.join(os.path.dirname(tempdir), '*'+suffix)):
- if p == tempdir:
- continue
-
- pfile = os.path.join(p, TEMP_PID_FILE)
- if os.path.exists(pfile):
- with open(pfile, 'rb') as f:
- try:
- os.kill(int(f.read()), 0)
- continue
- except (OSError, TypeError):
- pass
-
- shutil.rmtree(p, ignore_errors=True)
-
- with open(os.path.join(tempdir, TEMP_PID_FILE), 'wb') as f:
- f.write(str(os.getpid()))
-
- OLD_TEMP_DIR = tempfile.tempdir
- tempfile.tempdir = tempdir
-
-
-def clear_temp_dir():
- global OLD_TEMP_DIR
- if OLD_TEMP_DIR:
- # Try to nuke the pidfile first
- pfile = os.path.join(tempfile.tempdir, TEMP_PID_FILE)
- try:
- os.unlink(pfile)
- except OSError as e:
- print >> sys.stderr, "Error removing %r: %s" % (pfile, e)
- shutil.rmtree(tempfile.tempdir, ignore_errors=True)
- tempfile.temdir = OLD_TEMP_DIR
- OLD_TEMP_DIR = None
-
-
def gen_loop_process(gens, test_queue, result_queue, opts, kill_switch,
cover_ctx, temp_dir):
"""Generate `Test`s from |gens|, and feed them into |test_queue|.
@@ -261,65 +213,66 @@ def result_loop(test_gens, cover_ctx, opts):
test_queue = multiprocessing.Queue()
result_queue = multiprocessing.Queue()
- set_temp_dir()
-
- test_gen_args = (
- test_gens, test_queue, result_queue, opts, kill_switch,
- cover_ctx, tempfile.tempdir
- )
-
- procs = []
- if opts.handler.SKIP_RUNLOOP:
- gen_loop_process(*test_gen_args)
- else:
- procs = [multiprocessing.Process(
- target=gen_loop_process, args=test_gen_args)]
-
- procs += [
- multiprocessing.Process(
- target=run_loop_process, args=(
- test_queue, result_queue, opts, kill_switch, test_gen_finished,
- cover_ctx, tempfile.tempdir))
- for _ in xrange(opts.jobs)
- ]
+ with TempDir() as temp_dir:
+ test_gen_args = (
+ test_gens, test_queue, result_queue, opts, kill_switch, cover_ctx,
+ temp_dir
+ )
+
+ procs = []
+ if opts.handler.SKIP_RUNLOOP:
+ gen_loop_process(*test_gen_args)
+ else:
+ procs = [multiprocessing.Process(
+ target=gen_loop_process, args=test_gen_args,
+ name='gen_loop_process')
+ ]
+
+ procs += [
+ multiprocessing.Process(
+ target=run_loop_process,
+ args=(test_queue, result_queue, opts, kill_switch,
+ test_gen_finished, cover_ctx, temp_dir),
+ name='run_loop_process %d' % job_num)
+ for job_num in xrange(opts.jobs)
+ ]
+
+ for p in procs:
+ p.daemon = True
+ p.start()
+
+ error = False
- for p in procs:
- p.daemon = True
- p.start()
-
- error = False
+ try:
+ def generate_objects():
+ while not kill_switch.is_set():
+ while not kill_switch.is_set():
+ try:
+ yield result_queue.get(timeout=0.1)
+ except Queue.Empty:
+ break
+
+ # Check if gen_loop_process has terminated.
+ if procs and not procs[0].is_alive():
+ # Signal all run_loop_process that they can exit.
+ test_gen_finished.set()
+ if not any(p.is_alive() for p in procs):
+ break
- try:
- def generate_objects():
- while not kill_switch.is_set():
+ # Get everything still in the queue. Still need timeout, but since
+ # nothing is going to be adding stuff to the queue, use a very short
+ # timeout.
while not kill_switch.is_set():
try:
- yield result_queue.get(timeout=0.1)
+ yield result_queue.get(timeout=0.00001)
except Queue.Empty:
break
- # Check if gen_loop_process has terminated.
- if procs and not procs[0].is_alive():
- # Signal all run_loop_process that they can exit.
- test_gen_finished.set()
- if not any(p.is_alive() for p in procs):
- break
-
- # Get everything still in the queue. Still need timeout, but since nothing
- # is going to be adding stuff to the queue, use a very short timeout.
- while not kill_switch.is_set():
- try:
- yield result_queue.get(timeout=0.00001)
- except Queue.Empty:
- break
-
- if kill_switch.is_set():
- raise ResultStageAbort()
- error = opts.handler.result_stage_loop(opts, generate_objects())
- except ResultStageAbort:
- pass
- finally:
- clear_temp_dir()
+ if kill_switch.is_set():
+ raise ResultStageAbort()
+ error = opts.handler.result_stage_loop(opts, generate_objects())
+ except ResultStageAbort:
+ pass
for p in procs:
p.join()
« no previous file with comments | « no previous file | expect_tests/tempdir.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698