| Index: expect_tests/pipeline.py
|
| diff --git a/expect_tests/pipeline.py b/expect_tests/pipeline.py
|
| index 99e409c2f4168edf203ade2fb7eee5cb2a855ab2..ecb894ce50c8b12baa0b2ad366f33e0dadcb5a16 100644
|
| --- a/expect_tests/pipeline.py
|
| +++ b/expect_tests/pipeline.py
|
| @@ -16,6 +16,7 @@ import traceback
|
|
|
| from cStringIO import StringIO
|
|
|
| +from expect_tests.tempdir import TempDir
|
| from expect_tests.type_definitions import (
|
| Test, UnknownError, TestError, NoMatchingTestsError, MultiTest,
|
| Result, ResultStageAbort)
|
| @@ -23,10 +24,6 @@ from expect_tests.type_definitions import (
|
| from expect_tests import util
|
|
|
|
|
| -OLD_TEMP_DIR = None
|
| -TEMP_PID_FILE = '.expect_tests_pidfile'
|
| -
|
| -
|
| class ResetableStringIO(object):
|
| def __init__(self):
|
| self._stream = StringIO()
|
| @@ -38,51 +35,6 @@ class ResetableStringIO(object):
|
| return getattr(self._stream, key)
|
|
|
|
|
| -def set_temp_dir():
|
| - """Provides an automatically-cleaned temporary directory base for tests."""
|
| - global OLD_TEMP_DIR
|
| -
|
| - if OLD_TEMP_DIR:
|
| - return
|
| -
|
| - suffix = '.expect_tests_temp'
|
| - tempdir = tempfile.mkdtemp(suffix)
|
| - for p in glob.glob(os.path.join(os.path.dirname(tempdir), '*'+suffix)):
|
| - if p == tempdir:
|
| - continue
|
| -
|
| - pfile = os.path.join(p, TEMP_PID_FILE)
|
| - if os.path.exists(pfile):
|
| - with open(pfile, 'rb') as f:
|
| - try:
|
| - os.kill(int(f.read()), 0)
|
| - continue
|
| - except (OSError, TypeError):
|
| - pass
|
| -
|
| - shutil.rmtree(p, ignore_errors=True)
|
| -
|
| - with open(os.path.join(tempdir, TEMP_PID_FILE), 'wb') as f:
|
| - f.write(str(os.getpid()))
|
| -
|
| - OLD_TEMP_DIR = tempfile.tempdir
|
| - tempfile.tempdir = tempdir
|
| -
|
| -
|
| -def clear_temp_dir():
|
| - global OLD_TEMP_DIR
|
| - if OLD_TEMP_DIR:
|
| - # Try to nuke the pidfile first
|
| - pfile = os.path.join(tempfile.tempdir, TEMP_PID_FILE)
|
| - try:
|
| - os.unlink(pfile)
|
| - except OSError as e:
|
| - print >> sys.stderr, "Error removing %r: %s" % (pfile, e)
|
| - shutil.rmtree(tempfile.tempdir, ignore_errors=True)
|
| - tempfile.temdir = OLD_TEMP_DIR
|
| - OLD_TEMP_DIR = None
|
| -
|
| -
|
| def gen_loop_process(gens, test_queue, result_queue, opts, kill_switch,
|
| cover_ctx, temp_dir):
|
| """Generate `Test`s from |gens|, and feed them into |test_queue|.
|
| @@ -261,65 +213,66 @@ def result_loop(test_gens, cover_ctx, opts):
|
| test_queue = multiprocessing.Queue()
|
| result_queue = multiprocessing.Queue()
|
|
|
| - set_temp_dir()
|
| -
|
| - test_gen_args = (
|
| - test_gens, test_queue, result_queue, opts, kill_switch,
|
| - cover_ctx, tempfile.tempdir
|
| - )
|
| -
|
| - procs = []
|
| - if opts.handler.SKIP_RUNLOOP:
|
| - gen_loop_process(*test_gen_args)
|
| - else:
|
| - procs = [multiprocessing.Process(
|
| - target=gen_loop_process, args=test_gen_args)]
|
| -
|
| - procs += [
|
| - multiprocessing.Process(
|
| - target=run_loop_process, args=(
|
| - test_queue, result_queue, opts, kill_switch, test_gen_finished,
|
| - cover_ctx, tempfile.tempdir))
|
| - for _ in xrange(opts.jobs)
|
| - ]
|
| + with TempDir() as temp_dir:
|
| + test_gen_args = (
|
| + test_gens, test_queue, result_queue, opts, kill_switch, cover_ctx,
|
| + temp_dir
|
| + )
|
| +
|
| + procs = []
|
| + if opts.handler.SKIP_RUNLOOP:
|
| + gen_loop_process(*test_gen_args)
|
| + else:
|
| + procs = [multiprocessing.Process(
|
| + target=gen_loop_process, args=test_gen_args,
|
| + name='gen_loop_process')
|
| + ]
|
| +
|
| + procs += [
|
| + multiprocessing.Process(
|
| + target=run_loop_process,
|
| + args=(test_queue, result_queue, opts, kill_switch,
|
| + test_gen_finished, cover_ctx, temp_dir),
|
| + name='run_loop_process %d' % job_num)
|
| + for job_num in xrange(opts.jobs)
|
| + ]
|
| +
|
| + for p in procs:
|
| + p.daemon = True
|
| + p.start()
|
| +
|
| + error = False
|
|
|
| - for p in procs:
|
| - p.daemon = True
|
| - p.start()
|
| -
|
| - error = False
|
| + try:
|
| + def generate_objects():
|
| + while not kill_switch.is_set():
|
| + while not kill_switch.is_set():
|
| + try:
|
| + yield result_queue.get(timeout=0.1)
|
| + except Queue.Empty:
|
| + break
|
| +
|
| + # Check if gen_loop_process has terminated.
|
| + if procs and not procs[0].is_alive():
|
| + # Signal all run_loop_process that they can exit.
|
| + test_gen_finished.set()
|
| + if not any(p.is_alive() for p in procs):
|
| + break
|
|
|
| - try:
|
| - def generate_objects():
|
| - while not kill_switch.is_set():
|
| + # Get everything still in the queue. Still need timeout, but since
|
| + # nothing is going to be adding stuff to the queue, use a very short
|
| + # timeout.
|
| while not kill_switch.is_set():
|
| try:
|
| - yield result_queue.get(timeout=0.1)
|
| + yield result_queue.get(timeout=0.00001)
|
| except Queue.Empty:
|
| break
|
|
|
| - # Check if gen_loop_process has terminated.
|
| - if procs and not procs[0].is_alive():
|
| - # Signal all run_loop_process that they can exit.
|
| - test_gen_finished.set()
|
| - if not any(p.is_alive() for p in procs):
|
| - break
|
| -
|
| - # Get everything still in the queue. Still need timeout, but since nothing
|
| - # is going to be adding stuff to the queue, use a very short timeout.
|
| - while not kill_switch.is_set():
|
| - try:
|
| - yield result_queue.get(timeout=0.00001)
|
| - except Queue.Empty:
|
| - break
|
| -
|
| - if kill_switch.is_set():
|
| - raise ResultStageAbort()
|
| - error = opts.handler.result_stage_loop(opts, generate_objects())
|
| - except ResultStageAbort:
|
| - pass
|
| - finally:
|
| - clear_temp_dir()
|
| + if kill_switch.is_set():
|
| + raise ResultStageAbort()
|
| + error = opts.handler.result_stage_loop(opts, generate_objects())
|
| + except ResultStageAbort:
|
| + pass
|
|
|
| for p in procs:
|
| p.join()
|
|
|