Index: expect_tests/pipeline.py |
diff --git a/expect_tests/pipeline.py b/expect_tests/pipeline.py |
index a717289e5dd1fe333393b6bb9c5480822995b86a..3cdc3e6b7432bba0300c191ab3f082d547ce83b2 100644 |
--- a/expect_tests/pipeline.py |
+++ b/expect_tests/pipeline.py |
@@ -16,6 +16,7 @@ import traceback |
from cStringIO import StringIO |
+from expect_tests.tempdir import TempDir |
from expect_tests.type_definitions import ( |
Test, UnknownError, TestError, NoMatchingTestsError, MultiTest, |
Result, ResultStageAbort) |
@@ -23,10 +24,6 @@ from expect_tests.type_definitions import ( |
from expect_tests import util |
-OLD_TEMP_DIR = None |
-TEMP_PID_FILE = '.expect_tests_pidfile' |
- |
- |
class ResetableStringIO(object): |
def __init__(self): |
self._stream = StringIO() |
@@ -38,51 +35,6 @@ class ResetableStringIO(object): |
return getattr(self._stream, key) |
-def set_temp_dir(): |
- """Provides an automatically-cleaned temporary directory base for tests.""" |
- global OLD_TEMP_DIR |
- |
- if OLD_TEMP_DIR: |
- return |
- |
- suffix = '.expect_tests_temp' |
- tempdir = tempfile.mkdtemp(suffix) |
- for p in glob.glob(os.path.join(os.path.dirname(tempdir), '*'+suffix)): |
- if p == tempdir: |
- continue |
- |
- pfile = os.path.join(p, TEMP_PID_FILE) |
- if os.path.exists(pfile): |
- with open(pfile, 'rb') as f: |
- try: |
- os.kill(int(f.read()), 0) |
- continue |
- except (OSError, TypeError): |
- pass |
- |
- shutil.rmtree(p, ignore_errors=True) |
- |
- with open(os.path.join(tempdir, TEMP_PID_FILE), 'wb') as f: |
- f.write(str(os.getpid())) |
- |
- OLD_TEMP_DIR = tempfile.tempdir |
- tempfile.tempdir = tempdir |
- |
- |
-def clear_temp_dir(): |
- global OLD_TEMP_DIR |
- if OLD_TEMP_DIR: |
- # Try to nuke the pidfile first |
- pfile = os.path.join(tempfile.tempdir, TEMP_PID_FILE) |
- try: |
- os.unlink(pfile) |
- except OSError as e: |
- print >> sys.stderr, "Error removing %r: %s" % (pfile, e) |
- shutil.rmtree(tempfile.tempdir, ignore_errors=True) |
- tempfile.temdir = OLD_TEMP_DIR |
- OLD_TEMP_DIR = None |
- |
- |
def gen_loop_process(gens, test_queue, result_queue, opts, kill_switch, |
cover_ctx, temp_dir): |
"""Generate `Test`s from |gens|, and feed them into |test_queue|. |
@@ -262,61 +214,59 @@ def result_loop(test_gens, cover_ctx, opts): |
test_queue = multiprocessing.Queue() |
result_queue = multiprocessing.Queue() |
- set_temp_dir() |
- |
- test_gen_args = ( |
- test_gens, test_queue, result_queue, opts, kill_switch, cover_ctx, |
- tempfile.tempdir |
- ) |
- |
- procs = [] |
- if opts.handler.SKIP_RUNLOOP: |
- gen_loop_process(*test_gen_args) |
- else: |
- procs = [multiprocessing.Process( |
- target=gen_loop_process, args=test_gen_args)] |
- |
- procs += [ |
- multiprocessing.Process( |
- target=run_loop_process, args=( |
- test_queue, result_queue, opts, kill_switch, cover_ctx, |
- tempfile.tempdir)) |
- for _ in xrange(opts.jobs) |
- ] |
+ with TempDir() as temp_dir: |
+ test_gen_args = ( |
+ test_gens, test_queue, result_queue, opts, kill_switch, cover_ctx, |
+ temp_dir |
+ ) |
+ |
+ procs = [] |
+ if opts.handler.SKIP_RUNLOOP: |
+ gen_loop_process(*test_gen_args) |
+ else: |
+ procs = [multiprocessing.Process( |
+ target=gen_loop_process, args=test_gen_args)] |
+ |
+ procs += [ |
+ multiprocessing.Process( |
+ target=run_loop_process, args=( |
+ test_queue, result_queue, opts, kill_switch, cover_ctx, |
+ temp_dir)) |
+ for _ in xrange(opts.jobs) |
+ ] |
+ |
+ for p in procs: |
+ p.daemon = True |
+ p.start() |
+ |
+ error = False |
- for p in procs: |
- p.daemon = True |
- p.start() |
+ try: |
+ def generate_objects(): |
+ while not kill_switch.is_set(): |
+ while not kill_switch.is_set(): |
+ try: |
+ yield result_queue.get(timeout=0.1) |
+ except Queue.Empty: |
+ break |
- error = False |
+ if not any(p.is_alive() for p in procs): |
+ break |
- try: |
- def generate_objects(): |
- while not kill_switch.is_set(): |
+ # Get everything still in the queue. Still need timeout, but since |
+ # nothing is going to be adding stuff to the queue, use a very short |
+ # timeout. |
while not kill_switch.is_set(): |
try: |
- yield result_queue.get(timeout=0.1) |
+ yield result_queue.get(timeout=0.00001) |
except Queue.Empty: |
break |
- if not any(p.is_alive() for p in procs): |
- break |
- |
- # Get everything still in the queue. Still need timeout, but since nothing |
- # is going to be adding stuff to the queue, use a very short timeout. |
- while not kill_switch.is_set(): |
- try: |
- yield result_queue.get(timeout=0.00001) |
- except Queue.Empty: |
- break |
- |
- if kill_switch.is_set(): |
- raise ResultStageAbort() |
- error = opts.handler.result_stage_loop(opts, generate_objects()) |
- except ResultStageAbort: |
- pass |
- finally: |
- clear_temp_dir() |
+ if kill_switch.is_set(): |
+ raise ResultStageAbort() |
+ error = opts.handler.result_stage_loop(opts, generate_objects()) |
+ except ResultStageAbort: |
+ pass |
for p in procs: |
p.join() |