| Index: third_party/typ/typ/runner.py
|
| diff --git a/third_party/typ/typ/runner.py b/third_party/typ/typ/runner.py
|
| new file mode 100644
|
| index 0000000000000000000000000000000000000000..638c94db279b6ff7f428b8b675b804b079792405
|
| --- /dev/null
|
| +++ b/third_party/typ/typ/runner.py
|
| @@ -0,0 +1,790 @@
|
| +# Copyright 2014 Google Inc. All rights reserved.
|
| +#
|
| +# Licensed under the Apache License, Version 2.0 (the "License");
|
| +# you may not use this file except in compliance with the License.
|
| +# You may obtain a copy of the License at
|
| +#
|
| +# http://www.apache.org/licenses/LICENSE-2.0
|
| +#
|
| +# Unless required by applicable law or agreed to in writing, software
|
| +# distributed under the License is distributed on an "AS IS" BASIS,
|
| +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
| +# See the License for the specific language governing permissions and
|
| +# limitations under the License.
|
| +
|
| +import fnmatch
|
| +import importlib
|
| +import inspect
|
| +import json
|
| +import pdb
|
| +import unittest
|
| +
|
| +from collections import OrderedDict
|
| +
|
| +from typ import json_results
|
| +from typ.arg_parser import ArgumentParser
|
| +from typ.host import Host
|
| +from typ.pool import make_pool
|
| +from typ.stats import Stats
|
| +from typ.printer import Printer
|
| +from typ.test_case import TestCase as TypTestCase
|
| +from typ.version import VERSION
|
| +
|
| +
|
| +Result = json_results.Result
|
| +ResultSet = json_results.ResultSet
|
| +ResultType = json_results.ResultType
|
| +
|
| +
|
| +class TestInput(object):
|
| +
|
| + def __init__(self, name, msg='', timeout=None, expected=None):
|
| + self.name = name
|
| + self.msg = msg
|
| + self.timeout = timeout
|
| + self.expected = expected
|
| +
|
| +
|
| +class TestSet(object):
|
| +
|
| + def __init__(self, parallel_tests=None, isolated_tests=None,
|
| + tests_to_skip=None, context=None, setup_fn=None,
|
| + teardown_fn=None):
|
| +
|
| + def promote(tests):
|
| + tests = tests or []
|
| + return [test if isinstance(test, TestInput) else TestInput(test)
|
| + for test in tests]
|
| +
|
| + self.parallel_tests = promote(parallel_tests)
|
| + self.isolated_tests = promote(isolated_tests)
|
| + self.tests_to_skip = promote(tests_to_skip)
|
| + self.context = context
|
| + self.setup_fn = setup_fn
|
| + self.teardown_fn = teardown_fn
|
| +
|
| +
|
| +class _AddTestsError(Exception):
|
| + pass
|
| +
|
| +
|
| +class Runner(object):
|
| +
|
| + def __init__(self, host=None, loader=None):
|
| + self.host = host or Host()
|
| + self.loader = loader or unittest.loader.TestLoader()
|
| + self.printer = None
|
| + self.stats = None
|
| + self.cov = None
|
| + self.coverage_source = None
|
| + self.top_level_dir = None
|
| + self.args = None
|
| +
|
| + # initialize self.args to the defaults.
|
| + parser = ArgumentParser(self.host)
|
| + self.parse_args(parser, [])
|
| +
|
| + def main(self, argv=None):
|
| + parser = ArgumentParser(self.host)
|
| + self.parse_args(parser, argv)
|
| + if parser.exit_status is not None:
|
| + return parser.exit_status
|
| +
|
| + try:
|
| + ret, _, _ = self.run()
|
| + return ret
|
| + except KeyboardInterrupt:
|
| + self.print_("interrupted, exiting", stream=self.host.stderr)
|
| + return 130
|
| +
|
| + def parse_args(self, parser, argv):
|
| + self.args = parser.parse_args(args=argv)
|
| + if parser.exit_status is not None:
|
| + return
|
| +
|
| + def print_(self, msg='', end='\n', stream=None):
|
| + self.host.print_(msg, end, stream=stream)
|
| +
|
| + def run(self, test_set=None, classifier=None, context=None,
|
| + setup_fn=None, teardown_fn=None):
|
| + ret = 0
|
| + h = self.host
|
| +
|
| + if self.args.version:
|
| + self.print_(VERSION)
|
| + return ret, None, None
|
| +
|
| + ret = self._set_up_runner()
|
| + if ret: # pragma: untested
|
| + return ret, None, None
|
| +
|
| + find_start = h.time()
|
| + if self.cov: # pragma: no cover
|
| + self.cov.erase()
|
| + self.cov.start()
|
| +
|
| + full_results = None
|
| + result_set = ResultSet()
|
| +
|
| + if not test_set:
|
| + ret, test_set = self.find_tests(self.args, classifier, context,
|
| + setup_fn, teardown_fn)
|
| + find_end = h.time()
|
| +
|
| + if not ret:
|
| + ret, full_results = self._run_tests(result_set, test_set)
|
| +
|
| + if self.cov: # pragma: no cover
|
| + self.cov.stop()
|
| + self.cov.save()
|
| + test_end = h.time()
|
| +
|
| + trace = self._trace_from_results(result_set)
|
| + if full_results:
|
| + self._summarize(full_results)
|
| + self.write_results(full_results)
|
| + upload_ret = self.upload_results(full_results)
|
| + if not ret:
|
| + ret = upload_ret
|
| + reporting_end = h.time()
|
| + self._add_trace_event(trace, 'run', find_start, reporting_end)
|
| + self._add_trace_event(trace, 'discovery', find_start, find_end)
|
| + self._add_trace_event(trace, 'testing', find_end, test_end)
|
| + self._add_trace_event(trace, 'reporting', test_end, reporting_end)
|
| + self.write_trace(trace)
|
| + self.report_coverage()
|
| + else:
|
| + upload_ret = 0
|
| +
|
| + return ret, full_results, trace
|
| +
|
| + def _set_up_runner(self):
|
| + h = self.host
|
| + args = self.args
|
| +
|
| + self.stats = Stats(args.status_format, h.time, args.jobs)
|
| + self.printer = Printer(
|
| + self.print_, args.overwrite, args.terminal_width)
|
| +
|
| + self.top_level_dir = args.top_level_dir
|
| + if not self.top_level_dir:
|
| + if args.tests and h.exists(args.tests[0]):
|
| + # TODO: figure out what to do if multiple files are
|
| + # specified and they don't all have the same correct
|
| + # top level dir.
|
| + top_dir = h.dirname(args.tests[0])
|
| + else:
|
| + top_dir = h.getcwd()
|
| + while h.exists(top_dir, '__init__.py'):
|
| + top_dir = h.dirname(top_dir)
|
| + self.top_level_dir = h.abspath(top_dir)
|
| +
|
| + h.add_to_path(self.top_level_dir)
|
| +
|
| + for path in args.path:
|
| + h.add_to_path(path)
|
| +
|
| + if args.coverage: # pragma: no cover
|
| + try:
|
| + import coverage
|
| + except ImportError:
|
| + h.print_("Error: coverage is not installed")
|
| + return 1
|
| + source = self.args.coverage_source
|
| + if not source:
|
| + source = [self.top_level_dir] + self.args.path
|
| + self.coverage_source = source
|
| + self.cov = coverage.coverage(source=self.coverage_source,
|
| + data_suffix=True)
|
| + self.cov.erase()
|
| + return 0
|
| +
|
| + def find_tests(self, args, classifier=None,
|
| + context=None, setup_fn=None, teardown_fn=None):
|
| + if not context and self.args.context: # pragma: untested
|
| + context = json.loads(self.args.context)
|
| + if not setup_fn and self.args.setup: # pragma: untested
|
| + setup_fn = _import_name(self.args.setup)
|
| + if not teardown_fn and self.args.teardown: # pragma: untested
|
| + teardown_fn = _import_name(self.args.teardown)
|
| +
|
| + test_set = self._make_test_set(context=context,
|
| + setup_fn=setup_fn,
|
| + teardown_fn=teardown_fn)
|
| +
|
| + names = self._name_list_from_args(args)
|
| + classifier = classifier or _default_classifier(args)
|
| +
|
| + for name in names:
|
| + try:
|
| + self._add_tests_to_set(test_set, args.suffixes,
|
| + self.top_level_dir, classifier, name)
|
| + except (AttributeError, ImportError, SyntaxError
|
| + ) as e: # pragma: untested
|
| + self.print_('Failed to load "%s": %s' % (name, e))
|
| + return 1, None
|
| + except _AddTestsError as e: # pragma: untested
|
| + self.print_(str(e))
|
| + return 1, None
|
| +
|
| + # TODO: Add support for discovering setupProcess/teardownProcess?
|
| +
|
| + test_set.parallel_tests = _sort_inputs(test_set.parallel_tests)
|
| + test_set.isolated_tests = _sort_inputs(test_set.isolated_tests)
|
| + test_set.tests_to_skip = _sort_inputs(test_set.tests_to_skip)
|
| + return 0, test_set
|
| +
|
| + def _name_list_from_args(self, args):
|
| + if args.tests:
|
| + names = args.tests
|
| + elif args.file_list:
|
| + if args.file_list == '-':
|
| + s = self.host.stdin.read()
|
| + else:
|
| + s = self.host.read_text_file(args.file_list)
|
| + names = [line.strip() for line in s.splitlines()]
|
| + else:
|
| + names = ['.']
|
| + return names
|
| +
|
| + def _add_tests_to_set(self, test_set, suffixes, top_level_dir, classifier,
|
| + name):
|
| + h = self.host
|
| + loader = self.loader
|
| + add_tests = _test_adder(test_set, classifier)
|
| +
|
| + if h.isfile(name):
|
| + rpath = h.relpath(name, top_level_dir)
|
| + if rpath.endswith('.py'):
|
| + rpath = rpath[:-3]
|
| + module = rpath.replace(h.sep, '.')
|
| + add_tests(loader.loadTestsFromName(module))
|
| + elif h.isdir(name):
|
| + for suffix in suffixes:
|
| + add_tests(loader.discover(name, suffix, top_level_dir))
|
| + else:
|
| + possible_dir = name.replace('.', h.sep)
|
| + if h.isdir(top_level_dir, possible_dir):
|
| + for suffix in suffixes:
|
| + path = h.join(top_level_dir, possible_dir)
|
| + suite = loader.discover(path, suffix, top_level_dir)
|
| + add_tests(suite)
|
| + else:
|
| + add_tests(loader.loadTestsFromName(name))
|
| +
|
| + def _run_tests(self, result_set, test_set):
|
| + h = self.host
|
| + if not test_set.parallel_tests and not test_set.isolated_tests:
|
| + self.print_('No tests to run.')
|
| + return 1, None
|
| +
|
| + all_tests = [ti.name for ti in
|
| + _sort_inputs(test_set.parallel_tests +
|
| + test_set.isolated_tests +
|
| + test_set.tests_to_skip)]
|
| +
|
| + if self.args.list_only:
|
| + self.print_('\n'.join(all_tests))
|
| + return 0, None
|
| +
|
| + self._run_one_set(self.stats, result_set, test_set)
|
| +
|
| + failed_tests = json_results.failed_test_names(result_set)
|
| + retry_limit = self.args.retry_limit
|
| +
|
| + while retry_limit and failed_tests:
|
| + if retry_limit == self.args.retry_limit:
|
| + self.flush()
|
| + self.args.overwrite = False
|
| + self.printer.should_overwrite = False
|
| + self.args.verbose = min(self.args.verbose, 1)
|
| +
|
| + self.print_('')
|
| + self.print_('Retrying failed tests (attempt #%d of %d)...' %
|
| + (self.args.retry_limit - retry_limit + 1,
|
| + self.args.retry_limit))
|
| + self.print_('')
|
| +
|
| + stats = Stats(self.args.status_format, h.time, 1)
|
| + stats.total = len(failed_tests)
|
| + tests_to_retry = self._make_test_set(
|
| + isolated_tests=[TestInput(name) for name in failed_tests],
|
| + context=test_set.context,
|
| + setup_fn=test_set.setup_fn,
|
| + teardown_fn=test_set.teardown_fn)
|
| + retry_set = ResultSet()
|
| + self._run_one_set(stats, retry_set, tests_to_retry)
|
| + result_set.results.extend(retry_set.results)
|
| + failed_tests = json_results.failed_test_names(retry_set)
|
| + retry_limit -= 1
|
| +
|
| + if retry_limit != self.args.retry_limit:
|
| + self.print_('')
|
| +
|
| + full_results = json_results.make_full_results(self.args.metadata,
|
| + int(h.time()),
|
| + all_tests, result_set)
|
| +
|
| + return (json_results.exit_code_from_full_results(full_results),
|
| + full_results)
|
| +
|
| + def _make_test_set(self, parallel_tests=None, isolated_tests=None,
|
| + tests_to_skip=None, context=None, setup_fn=None,
|
| + teardown_fn=None):
|
| + parallel_tests = parallel_tests or []
|
| + isolated_tests = isolated_tests or []
|
| + tests_to_skip = tests_to_skip or []
|
| + return TestSet(_sort_inputs(parallel_tests),
|
| + _sort_inputs(isolated_tests),
|
| + _sort_inputs(tests_to_skip),
|
| + context, setup_fn, teardown_fn)
|
| +
|
| + def _run_one_set(self, stats, result_set, test_set):
|
| + stats.total = (len(test_set.parallel_tests) +
|
| + len(test_set.isolated_tests) +
|
| + len(test_set.tests_to_skip))
|
| + self._skip_tests(stats, result_set, test_set.tests_to_skip)
|
| + self._run_list(stats, result_set, test_set,
|
| + test_set.parallel_tests, self.args.jobs)
|
| + self._run_list(stats, result_set, test_set,
|
| + test_set.isolated_tests, 1)
|
| +
|
| + def _skip_tests(self, stats, result_set, tests_to_skip):
|
| + for test_input in tests_to_skip:
|
| + last = self.host.time()
|
| + stats.started += 1
|
| + self._print_test_started(stats, test_input)
|
| + now = self.host.time()
|
| + result = Result(test_input.name, actual=ResultType.Skip,
|
| + started=last, took=(now - last), worker=0,
|
| + expected=[ResultType.Skip],
|
| + out=test_input.msg)
|
| + result_set.add(result)
|
| + stats.finished += 1
|
| + self._print_test_finished(stats, result)
|
| +
|
| + def _run_list(self, stats, result_set, test_set, test_inputs, jobs):
|
| + h = self.host
|
| + running_jobs = set()
|
| +
|
| + jobs = min(len(test_inputs), jobs)
|
| + if not jobs:
|
| + return
|
| +
|
| + child = _Child(self, self.loader, test_set)
|
| + pool = make_pool(h, jobs, _run_one_test, child,
|
| + _setup_process, _teardown_process)
|
| + try:
|
| + while test_inputs or running_jobs:
|
| + while test_inputs and (len(running_jobs) < self.args.jobs):
|
| + test_input = test_inputs.pop(0)
|
| + stats.started += 1
|
| + pool.send(test_input)
|
| + running_jobs.add(test_input.name)
|
| + self._print_test_started(stats, test_input)
|
| +
|
| + result = pool.get()
|
| + running_jobs.remove(result.name)
|
| + result_set.add(result)
|
| + stats.finished += 1
|
| + self._print_test_finished(stats, result)
|
| + pool.close()
|
| + finally:
|
| + pool.join()
|
| +
|
| + def _print_test_started(self, stats, test_input):
|
| + if not self.args.quiet and self.args.overwrite:
|
| + self.update(stats.format() + test_input.name,
|
| + elide=(not self.args.verbose))
|
| +
|
| + def _print_test_finished(self, stats, result):
|
| + stats.add_time()
|
| + if result.actual == ResultType.Failure:
|
| + result_str = ' failed'
|
| + elif result.actual == ResultType.Skip:
|
| + result_str = ' was skipped'
|
| + elif result.actual == ResultType.Pass:
|
| + result_str = ' passed'
|
| + else: # pragma: untested
|
| + raise ValueError('Unimplemented result type %s' % result)
|
| + if result.unexpected:
|
| + result_str += ' unexpectedly'
|
| + if self.args.timing:
|
| + timing_str = ' %.4fs' % result.took
|
| + else:
|
| + timing_str = ''
|
| + suffix = '%s%s' % (result_str, timing_str)
|
| + out = result.out
|
| + err = result.err
|
| + if result.code:
|
| + if out or err:
|
| + suffix += ':\n'
|
| + self.update(stats.format() + result.name + suffix, elide=False)
|
| + for l in out.splitlines(): # pragma: untested
|
| + self.print_(' %s' % l)
|
| + for l in err.splitlines(): # pragma: untested
|
| + self.print_(' %s' % l)
|
| + elif not self.args.quiet:
|
| + if self.args.verbose > 1 and (out or err): # pragma: untested
|
| + suffix += ':\n'
|
| + self.update(stats.format() + result.name + suffix,
|
| + elide=(not self.args.verbose))
|
| + if self.args.verbose > 1: # pragma: untested
|
| + for l in out.splitlines():
|
| + self.print_(' %s' % l)
|
| + for l in err.splitlines():
|
| + self.print_(' %s' % l)
|
| + if self.args.verbose:
|
| + self.flush()
|
| +
|
| + def update(self, msg, elide):
|
| + self.printer.update(msg, elide)
|
| +
|
| + def flush(self):
|
| + self.printer.flush()
|
| +
|
| + def _summarize(self, full_results):
|
| + num_tests = self.stats.finished
|
| + num_failures = json_results.num_failures(full_results)
|
| +
|
| + if self.args.quiet and num_failures == 0:
|
| + return
|
| +
|
| + if self.args.timing:
|
| + timing_clause = ' in %.1fs' % (self.host.time() -
|
| + self.stats.started_time)
|
| + else:
|
| + timing_clause = ''
|
| + self.update('%d test%s run%s, %d failure%s.' %
|
| + (num_tests,
|
| + '' if num_tests == 1 else 's',
|
| + timing_clause,
|
| + num_failures,
|
| + '' if num_failures == 1 else 's'), elide=False)
|
| + self.print_()
|
| +
|
| + def write_trace(self, trace):
|
| + if self.args.write_trace_to:
|
| + self.host.write_text_file(
|
| + self.args.write_trace_to,
|
| + json.dumps(trace, indent=2) + '\n')
|
| +
|
| + def write_results(self, full_results):
|
| + if self.args.write_full_results_to:
|
| + self.host.write_text_file(
|
| + self.args.write_full_results_to,
|
| + json.dumps(full_results, indent=2) + '\n')
|
| +
|
| + def upload_results(self, full_results):
|
| + h = self.host
|
| + if not self.args.test_results_server:
|
| + return 0
|
| +
|
| + url, content_type, data = json_results.make_upload_request(
|
| + self.args.test_results_server, self.args.builder_name,
|
| + self.args.master_name, self.args.test_type,
|
| + full_results)
|
| +
|
| + try:
|
| + h.fetch(url, data, {'Content-Type': content_type})
|
| + return 0
|
| + except Exception as e:
|
| + h.print_('Uploading the JSON results raised "%s"' % str(e))
|
| + return 1
|
| +
|
| + def report_coverage(self):
|
| + if self.args.coverage: # pragma: no cover
|
| + self.host.print_()
|
| + import coverage
|
| + cov = coverage.coverage(data_suffix=True)
|
| + cov.combine()
|
| + cov.report(show_missing=self.args.coverage_show_missing,
|
| + omit=self.args.coverage_omit)
|
| +
|
| + def _add_trace_event(self, trace, name, start, end):
|
| + event = {
|
| + 'name': name,
|
| + 'ts': int((start - self.stats.started_time) * 1000000),
|
| + 'dur': int((end - start) * 1000000),
|
| + 'ph': 'X',
|
| + 'pid': self.host.getpid(),
|
| + 'tid': 0,
|
| + }
|
| + trace['traceEvents'].append(event)
|
| +
|
| + def _trace_from_results(self, result_set):
|
| + trace = OrderedDict()
|
| + trace['traceEvents'] = []
|
| + trace['otherData'] = {}
|
| + for m in self.args.metadata:
|
| + k, v = m.split('=')
|
| + trace['otherData'][k] = v
|
| +
|
| + for result in result_set.results:
|
| + started = int((result.started - self.stats.started_time) * 1000000)
|
| + took = int(result.took * 1000000)
|
| + event = OrderedDict()
|
| + event['name'] = result.name
|
| + event['dur'] = took
|
| + event['ts'] = started
|
| + event['ph'] = 'X' # "Complete" events
|
| + event['pid'] = result.pid
|
| + event['tid'] = result.worker
|
| +
|
| + args = OrderedDict()
|
| + args['expected'] = sorted(str(r) for r in result.expected)
|
| + args['actual'] = str(result.actual)
|
| + args['out'] = result.out
|
| + args['err'] = result.err
|
| + args['code'] = result.code
|
| + args['unexpected'] = result.unexpected
|
| + args['flaky'] = result.flaky
|
| + event['args'] = args
|
| +
|
| + trace['traceEvents'].append(event)
|
| + return trace
|
| +
|
| +
|
| +def _matches(name, globs):
|
| + return any(fnmatch.fnmatch(name, glob) for glob in globs)
|
| +
|
| +
|
| +def _default_classifier(args):
|
| + def default_classifier(test_set, test):
|
| + name = test.id()
|
| + if _matches(name, args.skip):
|
| + test_set.tests_to_skip.append(TestInput(name,
|
| + 'skipped by request'))
|
| + elif _matches(name, args.isolate):
|
| + test_set.isolated_tests.append(TestInput(name))
|
| + else:
|
| + test_set.parallel_tests.append(TestInput(name))
|
| + return default_classifier
|
| +
|
| +
|
| +def _test_adder(test_set, classifier):
|
| + def add_tests(obj):
|
| + if isinstance(obj, unittest.suite.TestSuite):
|
| + for el in obj:
|
| + add_tests(el)
|
| + elif (obj.id().startswith('unittest.loader.LoadTestsFailure') or
|
| + obj.id().startswith('unittest.loader.ModuleImportFailure')
|
| + ): # pragma: untested
|
| + # Access to protected member pylint: disable=W0212
|
| + module_name = obj._testMethodName
|
| + try:
|
| + method = getattr(obj, obj._testMethodName)
|
| + method()
|
| + except Exception as e:
|
| + if 'LoadTests' in obj.id():
|
| + raise _AddTestsError('%s.load_tests() failed: %s'
|
| + % (module_name, str(e)))
|
| + else:
|
| + raise _AddTestsError(str(e))
|
| + else:
|
| + assert isinstance(obj, unittest.TestCase)
|
| + classifier(test_set, obj)
|
| + return add_tests
|
| +
|
| +
|
| +class _Child(object):
|
| +
|
| + def __init__(self, parent, loader, test_set):
|
| + self.host = None
|
| + self.worker_num = None
|
| + self.debugger = parent.args.debugger
|
| + self.coverage = parent.args.coverage and parent.args.jobs > 1
|
| + self.coverage_source = parent.coverage_source
|
| + self.dry_run = parent.args.dry_run
|
| + self.loader = loader
|
| + self.passthrough = parent.args.passthrough
|
| + self.context = test_set.context
|
| + self.setup_fn = test_set.setup_fn
|
| + self.teardown_fn = test_set.teardown_fn
|
| + self.context_after_setup = None
|
| + self.top_level_dir = parent.top_level_dir
|
| + self.loaded_suites = {}
|
| + self.cov = None
|
| +
|
| +
|
| +def _setup_process(host, worker_num, child):
|
| + child.host = host
|
| + child.worker_num = worker_num
|
| +
|
| + if child.coverage: # pragma: untested
|
| + import coverage
|
| + child.cov = coverage.coverage(source=child.coverage_source,
|
| + data_suffix=True)
|
| + child.cov._warn_no_data = False
|
| + child.cov.start()
|
| +
|
| + if child.setup_fn: # pragma: untested
|
| + child.context_after_setup = child.setup_fn(child, child.context)
|
| + else:
|
| + child.context_after_setup = child.context
|
| + return child
|
| +
|
| +
|
| +def _teardown_process(child):
|
| + if child.teardown_fn: # pragma: untested
|
| + child.teardown_fn(child, child.context_after_setup)
|
| + # TODO: Return a more structured result, including something from
|
| + # the teardown function?
|
| +
|
| + if child.cov: # pragma: untested
|
| + child.cov.stop()
|
| + child.cov.save()
|
| +
|
| + return child.worker_num
|
| +
|
| +
|
| +def _import_name(name): # pragma: untested
|
| + module_name, function_name = name.rsplit('.', 1)
|
| + module = importlib.import_module(module_name)
|
| + return getattr(module, function_name)
|
| +
|
| +
|
| +def _run_one_test(child, test_input):
|
| + h = child.host
|
| + pid = h.getpid()
|
| + test_name = test_input.name
|
| +
|
| + start = h.time()
|
| + if child.dry_run:
|
| + return Result(test_name, ResultType.Pass, start, 0, child.worker_num,
|
| + pid=pid)
|
| +
|
| + if h.is_python3 and child.debugger: # pragma: untested
|
| + h.set_debugging(True)
|
| +
|
| + # It is important to capture the output before loading the test
|
| + # to ensure that
|
| + # 1) the loader doesn't logs something we don't captured
|
| + # 2) neither the loader nor the test case grab a reference to the
|
| + # uncaptured stdout or stderr that later is used when the test is run.
|
| + # This comes up when using the FakeTestLoader and testing typ itself,
|
| + # but could come up when testing non-typ code as well.
|
| + h.capture_output(divert=not child.passthrough)
|
| +
|
| + try:
|
| + suite = child.loader.loadTestsFromName(test_name)
|
| + except Exception as e: # pragma: untested
|
| + suite = _load_via_load_tests(child, test_name)
|
| + if not suite:
|
| + # TODO: Figure out how to handle failures here.
|
| + err = 'failed to load %s: %s' % (test_name, str(e))
|
| + h.restore_output()
|
| + return Result(test_name, ResultType.Failure, start, 0,
|
| + child.worker_num, unexpected=True, code=1,
|
| + err=err, pid=pid)
|
| +
|
| + tests = list(suite)
|
| + assert len(tests) == 1
|
| + test_case = tests[0]
|
| + if isinstance(test_case, TypTestCase):
|
| + test_case.child = child
|
| + test_case.context = child.context_after_setup
|
| +
|
| + test_result = unittest.TestResult()
|
| + out = ''
|
| + err = ''
|
| + try:
|
| + if child.debugger: # pragma: untested
|
| + # Access to protected member pylint: disable=W0212
|
| + test_func = getattr(test_case, test_case._testMethodName)
|
| + fname = inspect.getsourcefile(test_func)
|
| + lineno = inspect.getsourcelines(test_func)[1] + 1
|
| + dbg = pdb.Pdb(stdout=h.stdout)
|
| + dbg.set_break(fname, lineno)
|
| + dbg.runcall(suite.run, test_result)
|
| + else:
|
| + suite.run(test_result)
|
| + finally:
|
| + out, err = h.restore_output()
|
| +
|
| + took = h.time() - start
|
| + return _result_from_test_result(test_result, test_name, start, took, out,
|
| + err, child.worker_num, pid)
|
| +
|
| +
|
| +def _result_from_test_result(test_result, test_name, start, took, out, err,
|
| + worker_num, pid):
|
| + flaky = False
|
| + if test_result.failures:
|
| + expected = [ResultType.Pass]
|
| + actual = ResultType.Failure
|
| + code = 1
|
| + unexpected = True
|
| + err = err + test_result.failures[0][1]
|
| + elif test_result.errors: # pragma: untested
|
| + expected = [ResultType.Pass]
|
| + actual = ResultType.Failure
|
| + code = 1
|
| + unexpected = True
|
| + err = err + test_result.errors[0][1]
|
| + elif test_result.skipped: # pragma: untested
|
| + expected = [ResultType.Skip]
|
| + actual = ResultType.Skip
|
| + err = err + test_result.skipped[0][1]
|
| + code = 0
|
| + unexpected = False
|
| + elif test_result.expectedFailures: # pragma: untested
|
| + expected = [ResultType.Failure]
|
| + actual = ResultType.Failure
|
| + code = 1
|
| + err = err + test_result.expectedFailures[0][1]
|
| + unexpected = False
|
| + elif test_result.unexpectedSuccesses: # pragma: untested
|
| + expected = [ResultType.Failure]
|
| + actual = ResultType.Pass
|
| + code = 0
|
| + unexpected = True
|
| + else:
|
| + expected = [ResultType.Pass]
|
| + actual = ResultType.Pass
|
| + code = 0
|
| + unexpected = False
|
| +
|
| + return Result(test_name, actual, start, took, worker_num,
|
| + expected, unexpected, flaky, code, out, err, pid)
|
| +
|
| +
|
| +def _load_via_load_tests(child, test_name): # pragma: untested
|
| + # If we couldn't import a test directly, the test may be only loadable
|
| + # via unittest's load_tests protocol. See if we can find a load_tests
|
| + # entry point that will work for this test.
|
| + loader = child.loader
|
| + comps = test_name.split('.')
|
| +
|
| + while comps:
|
| + name = '.'.join(comps)
|
| + module = None
|
| + suite = None
|
| + if name not in child.loaded_suites:
|
| + try:
|
| + module = importlib.import_module(name)
|
| + except ImportError:
|
| + pass
|
| + if module:
|
| + try:
|
| + suite = loader.loadTestsFromModule(module)
|
| + except Exception:
|
| + # TODO: Figure out how to handle errors here
|
| + pass
|
| + child.loaded_suites[name] = suite
|
| + suite = child.loaded_suites[name]
|
| + if suite:
|
| + for test_case in suite:
|
| + if not isinstance(test_case, unittest.TestCase):
|
| + pass # pdb.set_trace()
|
| + if test_case.id() == test_name:
|
| + new_suite = unittest.TestSuite()
|
| + new_suite.addTest(test_case)
|
| + return new_suite
|
| + comps.pop()
|
| + if not comps:
|
| + return None
|
| +
|
| +
|
| +def _sort_inputs(inps):
|
| + return sorted(inps, key=lambda inp: inp.name)
|
|
|