Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(79)

Unified Diff: tools/telemetry/third_party/typ/typ/runner.py

Issue 1647513002: Delete tools/telemetry. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Created 4 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « tools/telemetry/third_party/typ/typ/printer.py ('k') | tools/telemetry/third_party/typ/typ/stats.py » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: tools/telemetry/third_party/typ/typ/runner.py
diff --git a/tools/telemetry/third_party/typ/typ/runner.py b/tools/telemetry/third_party/typ/typ/runner.py
deleted file mode 100644
index 38c0e1c554f3d8d3032a281e564da8194feb63e3..0000000000000000000000000000000000000000
--- a/tools/telemetry/third_party/typ/typ/runner.py
+++ /dev/null
@@ -1,938 +0,0 @@
-# Copyright 2014 Google Inc. All rights reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import fnmatch
-import importlib
-import inspect
-import json
-import os
-import pdb
-import sys
-import unittest
-import traceback
-
-from collections import OrderedDict
-
-# This ensures that absolute imports of typ modules will work when
-# running typ/runner.py as a script even if typ is not installed.
-# We need this entry in addition to the one in __main__.py to ensure
-# that typ/runner.py works when invoked via subprocess on windows in
-# _spawn_main().
-path_to_file = os.path.realpath(__file__)
-if path_to_file.endswith('.pyc'): # pragma: no cover
- path_to_file = path_to_file[:-1]
-dir_above_typ = os.path.dirname(os.path.dirname(path_to_file))
-if dir_above_typ not in sys.path: # pragma: no cover
- sys.path.append(dir_above_typ)
-
-
-from typ import json_results
-from typ.arg_parser import ArgumentParser
-from typ.host import Host
-from typ.pool import make_pool
-from typ.stats import Stats
-from typ.printer import Printer
-from typ.test_case import TestCase as TypTestCase
-from typ.version import VERSION
-
-
-Result = json_results.Result
-ResultSet = json_results.ResultSet
-ResultType = json_results.ResultType
-
-
-def main(argv=None, host=None, win_multiprocessing=None, **defaults):
- host = host or Host()
- runner = Runner(host=host)
- if win_multiprocessing is not None:
- runner.win_multiprocessing = win_multiprocessing
- return runner.main(argv, **defaults)
-
-
-class TestInput(object):
-
- def __init__(self, name, msg='', timeout=None, expected=None):
- self.name = name
- self.msg = msg
- self.timeout = timeout
- self.expected = expected
-
-
-class TestSet(object):
-
- def __init__(self, parallel_tests=None, isolated_tests=None,
- tests_to_skip=None):
-
- def promote(tests):
- tests = tests or []
- return [test if isinstance(test, TestInput) else TestInput(test)
- for test in tests]
-
- self.parallel_tests = promote(parallel_tests)
- self.isolated_tests = promote(isolated_tests)
- self.tests_to_skip = promote(tests_to_skip)
-
-
-class WinMultiprocessing(object):
- ignore = 'ignore'
- importable = 'importable'
- spawn = 'spawn'
-
- values = [ignore, importable, spawn]
-
-
-class _AddTestsError(Exception):
- pass
-
-
-class Runner(object):
-
- def __init__(self, host=None):
- self.args = None
- self.classifier = None
- self.cov = None
- self.context = None
- self.coverage_source = None
- self.host = host or Host()
- self.loader = unittest.loader.TestLoader()
- self.printer = None
- self.setup_fn = None
- self.stats = None
- self.teardown_fn = None
- self.top_level_dir = None
- self.win_multiprocessing = WinMultiprocessing.spawn
- self.final_responses = []
-
- # initialize self.args to the defaults.
- parser = ArgumentParser(self.host)
- self.parse_args(parser, [])
-
- def main(self, argv=None, **defaults):
- parser = ArgumentParser(self.host)
- self.parse_args(parser, argv, **defaults)
- if parser.exit_status is not None:
- return parser.exit_status
-
- try:
- ret, _, _ = self.run()
- return ret
- except KeyboardInterrupt:
- self.print_("interrupted, exiting", stream=self.host.stderr)
- return 130
-
- def parse_args(self, parser, argv, **defaults):
- for attrname in defaults:
- if not hasattr(self.args, attrname):
- parser.error("Unknown default argument name '%s'" % attrname,
- bailout=False)
- return
- parser.set_defaults(**defaults)
- self.args = parser.parse_args(args=argv)
- if parser.exit_status is not None:
- return
-
- def print_(self, msg='', end='\n', stream=None):
- self.host.print_(msg, end, stream=stream)
-
- def run(self, test_set=None):
-
- ret = 0
- h = self.host
-
- if self.args.version:
- self.print_(VERSION)
- return ret, None, None
-
- should_spawn = self._check_win_multiprocessing()
- if should_spawn:
- return self._spawn(test_set)
-
- ret = self._set_up_runner()
- if ret: # pragma: no cover
- return ret, None, None
-
- find_start = h.time()
- if self.cov: # pragma: no cover
- self.cov.erase()
- self.cov.start()
-
- full_results = None
- result_set = ResultSet()
-
- if not test_set:
- ret, test_set = self.find_tests(self.args)
- find_end = h.time()
-
- if not ret:
- ret, full_results = self._run_tests(result_set, test_set)
-
- if self.cov: # pragma: no cover
- self.cov.stop()
- self.cov.save()
- test_end = h.time()
-
- trace = self._trace_from_results(result_set)
- if full_results:
- self._summarize(full_results)
- self._write(self.args.write_full_results_to, full_results)
- upload_ret = self._upload(full_results)
- if not ret:
- ret = upload_ret
- reporting_end = h.time()
- self._add_trace_event(trace, 'run', find_start, reporting_end)
- self._add_trace_event(trace, 'discovery', find_start, find_end)
- self._add_trace_event(trace, 'testing', find_end, test_end)
- self._add_trace_event(trace, 'reporting', test_end, reporting_end)
- self._write(self.args.write_trace_to, trace)
- self.report_coverage()
- else:
- upload_ret = 0
-
- return ret, full_results, trace
-
- def _check_win_multiprocessing(self):
- wmp = self.win_multiprocessing
-
- ignore, importable, spawn = WinMultiprocessing.values
-
- if wmp not in WinMultiprocessing.values:
- raise ValueError('illegal value %s for win_multiprocessing' %
- wmp)
-
- h = self.host
- if wmp == ignore and h.platform == 'win32': # pragma: win32
- raise ValueError('Cannot use WinMultiprocessing.ignore for '
- 'win_multiprocessing when actually running '
- 'on Windows.')
-
- if wmp == ignore or self.args.jobs == 1:
- return False
-
- if wmp == importable:
- if self._main_is_importable():
- return False
- raise ValueError('The __main__ module (%s) ' # pragma: no cover
- 'may not be importable' %
- sys.modules['__main__'].__file__)
-
- assert wmp == spawn
- return True
-
- def _main_is_importable(self): # pragma: untested
- path = sys.modules['__main__'].__file__
- if not path:
- return False
- if path.endswith('.pyc'):
- path = path[:-1]
- if not path.endswith('.py'):
- return False
- if path.endswith('__main__.py'):
- # main modules are not directly importable.
- return False
-
- path = self.host.realpath(path)
- for d in sys.path:
- if path.startswith(self.host.realpath(d)):
- return True
- return False # pragma: no cover
-
- def _spawn(self, test_set):
- # TODO: Handle picklable hooks, rather than requiring them to be None.
- assert self.classifier is None
- assert self.context is None
- assert self.setup_fn is None
- assert self.teardown_fn is None
- assert test_set is None
- h = self.host
-
- if self.args.write_trace_to: # pragma: untested
- should_delete_trace = False
- else:
- should_delete_trace = True
- fp = h.mktempfile(delete=False)
- fp.close()
- self.args.write_trace_to = fp.name
-
- if self.args.write_full_results_to: # pragma: untested
- should_delete_results = False
- else:
- should_delete_results = True
- fp = h.mktempfile(delete=False)
- fp.close()
- self.args.write_full_results_to = fp.name
-
- argv = ArgumentParser(h).argv_from_args(self.args)
- ret = h.call_inline([h.python_interpreter, path_to_file] + argv)
-
- trace = self._read_and_delete(self.args.write_trace_to,
- should_delete_trace)
- full_results = self._read_and_delete(self.args.write_full_results_to,
- should_delete_results)
- return ret, full_results, trace
-
- def _set_up_runner(self):
- h = self.host
- args = self.args
-
- self.stats = Stats(args.status_format, h.time, args.jobs)
- self.printer = Printer(
- self.print_, args.overwrite, args.terminal_width)
-
- self.top_level_dir = args.top_level_dir
- if not self.top_level_dir:
- if args.tests and h.isdir(args.tests[0]):
- # TODO: figure out what to do if multiple files are
- # specified and they don't all have the same correct
- # top level dir.
- d = h.realpath(h.dirname(args.tests[0]))
- if h.exists(d, '__init__.py'):
- top_dir = d
- else:
- top_dir = args.tests[0]
- else:
- top_dir = h.getcwd()
- while h.exists(top_dir, '__init__.py'):
- top_dir = h.dirname(top_dir)
- self.top_level_dir = h.realpath(top_dir)
-
- h.add_to_path(self.top_level_dir)
-
- for path in args.path:
- h.add_to_path(path)
-
- if args.coverage: # pragma: no cover
- try:
- import coverage
- except ImportError:
- h.print_("Error: coverage is not installed")
- return 1
- source = self.args.coverage_source
- if not source:
- source = [self.top_level_dir] + self.args.path
- self.coverage_source = source
- self.cov = coverage.coverage(source=self.coverage_source,
- data_suffix=True)
- self.cov.erase()
- return 0
-
- def find_tests(self, args):
- test_set = TestSet()
-
- orig_skip = unittest.skip
- orig_skip_if = unittest.skipIf
- if args.all:
- unittest.skip = lambda reason: lambda x: x
- unittest.skipIf = lambda condition, reason: lambda x: x
-
- try:
- names = self._name_list_from_args(args)
- classifier = self.classifier or _default_classifier(args)
-
- for name in names:
- try:
- self._add_tests_to_set(test_set, args.suffixes,
- self.top_level_dir, classifier,
- name)
- except (AttributeError, ImportError, SyntaxError) as e:
- self.print_('Failed to load "%s": %s' % (name, e))
- return 1, None
- except _AddTestsError as e:
- self.print_(str(e))
- return 1, None
-
- # TODO: Add support for discovering setupProcess/teardownProcess?
-
- test_set.parallel_tests = _sort_inputs(test_set.parallel_tests)
- test_set.isolated_tests = _sort_inputs(test_set.isolated_tests)
- test_set.tests_to_skip = _sort_inputs(test_set.tests_to_skip)
- return 0, test_set
- finally:
- unittest.skip = orig_skip
- unittest.skipIf = orig_skip_if
-
- def _name_list_from_args(self, args):
- if args.tests:
- names = args.tests
- elif args.file_list:
- if args.file_list == '-':
- s = self.host.stdin.read()
- else:
- s = self.host.read_text_file(args.file_list)
- names = [line.strip() for line in s.splitlines()]
- else:
- names = [self.top_level_dir]
- return names
-
- def _add_tests_to_set(self, test_set, suffixes, top_level_dir, classifier,
- name):
- h = self.host
- loader = self.loader
- add_tests = _test_adder(test_set, classifier)
-
- if h.isfile(name):
- rpath = h.relpath(name, top_level_dir)
- if rpath.endswith('.py'):
- rpath = rpath[:-3]
- module = rpath.replace(h.sep, '.')
- add_tests(loader.loadTestsFromName(module))
- elif h.isdir(name):
- for suffix in suffixes:
- add_tests(loader.discover(name, suffix, top_level_dir))
- else:
- possible_dir = name.replace('.', h.sep)
- if h.isdir(top_level_dir, possible_dir):
- for suffix in suffixes:
- path = h.join(top_level_dir, possible_dir)
- suite = loader.discover(path, suffix, top_level_dir)
- add_tests(suite)
- else:
- add_tests(loader.loadTestsFromName(name))
-
- def _run_tests(self, result_set, test_set):
- h = self.host
- if not test_set.parallel_tests and not test_set.isolated_tests:
- self.print_('No tests to run.')
- return 1, None
-
- all_tests = [ti.name for ti in
- _sort_inputs(test_set.parallel_tests +
- test_set.isolated_tests +
- test_set.tests_to_skip)]
-
- if self.args.list_only:
- self.print_('\n'.join(all_tests))
- return 0, None
-
- self._run_one_set(self.stats, result_set, test_set)
-
- failed_tests = sorted(json_results.failed_test_names(result_set))
- retry_limit = self.args.retry_limit
-
- while retry_limit and failed_tests:
- if retry_limit == self.args.retry_limit:
- self.flush()
- self.args.overwrite = False
- self.printer.should_overwrite = False
- self.args.verbose = min(self.args.verbose, 1)
-
- self.print_('')
- self.print_('Retrying failed tests (attempt #%d of %d)...' %
- (self.args.retry_limit - retry_limit + 1,
- self.args.retry_limit))
- self.print_('')
-
- stats = Stats(self.args.status_format, h.time, 1)
- stats.total = len(failed_tests)
- tests_to_retry = TestSet(isolated_tests=list(failed_tests))
- retry_set = ResultSet()
- self._run_one_set(stats, retry_set, tests_to_retry)
- result_set.results.extend(retry_set.results)
- failed_tests = json_results.failed_test_names(retry_set)
- retry_limit -= 1
-
- if retry_limit != self.args.retry_limit:
- self.print_('')
-
- full_results = json_results.make_full_results(self.args.metadata,
- int(h.time()),
- all_tests, result_set)
-
- return (json_results.exit_code_from_full_results(full_results),
- full_results)
-
- def _run_one_set(self, stats, result_set, test_set):
- stats.total = (len(test_set.parallel_tests) +
- len(test_set.isolated_tests) +
- len(test_set.tests_to_skip))
- self._skip_tests(stats, result_set, test_set.tests_to_skip)
- self._run_list(stats, result_set,
- test_set.parallel_tests, self.args.jobs)
- self._run_list(stats, result_set,
- test_set.isolated_tests, 1)
-
- def _skip_tests(self, stats, result_set, tests_to_skip):
- for test_input in tests_to_skip:
- last = self.host.time()
- stats.started += 1
- self._print_test_started(stats, test_input)
- now = self.host.time()
- result = Result(test_input.name, actual=ResultType.Skip,
- started=last, took=(now - last), worker=0,
- expected=[ResultType.Skip],
- out=test_input.msg)
- result_set.add(result)
- stats.finished += 1
- self._print_test_finished(stats, result)
-
- def _run_list(self, stats, result_set, test_inputs, jobs):
- h = self.host
- running_jobs = set()
-
- jobs = min(len(test_inputs), jobs)
- if not jobs:
- return
-
- child = _Child(self)
- pool = make_pool(h, jobs, _run_one_test, child,
- _setup_process, _teardown_process)
- try:
- while test_inputs or running_jobs:
- while test_inputs and (len(running_jobs) < self.args.jobs):
- test_input = test_inputs.pop(0)
- stats.started += 1
- pool.send(test_input)
- running_jobs.add(test_input.name)
- self._print_test_started(stats, test_input)
-
- result = pool.get()
- running_jobs.remove(result.name)
- result_set.add(result)
- stats.finished += 1
- self._print_test_finished(stats, result)
- pool.close()
- finally:
- self.final_responses.extend(pool.join())
-
- def _print_test_started(self, stats, test_input):
- if self.args.quiet:
- # Print nothing when --quiet was passed.
- return
-
- # If -vvv was passed, print when the test is queued to be run.
- # We don't actually know when the test picked up to run, because
- # that is handled by the child process (where we can't easily
- # print things). Otherwise, only print when the test is started
- # if we know we can overwrite the line, so that we do not
- # get multiple lines of output as noise (in -vvv, we actually want
- # the noise).
- test_start_msg = stats.format() + test_input.name
- if self.args.verbose > 2:
- self.update(test_start_msg + ' queued', elide=False)
- if self.args.overwrite:
- self.update(test_start_msg, elide=(not self.args.verbose))
-
- def _print_test_finished(self, stats, result):
- stats.add_time()
-
- assert result.actual in [ResultType.Failure, ResultType.Skip,
- ResultType.Pass]
- if result.actual == ResultType.Failure:
- result_str = ' failed'
- elif result.actual == ResultType.Skip:
- result_str = ' was skipped'
- elif result.actual == ResultType.Pass:
- result_str = ' passed'
-
- if result.unexpected:
- result_str += ' unexpectedly'
- if self.args.timing:
- timing_str = ' %.4fs' % result.took
- else:
- timing_str = ''
- suffix = '%s%s' % (result_str, timing_str)
- out = result.out
- err = result.err
- if result.code:
- if out or err:
- suffix += ':\n'
- self.update(stats.format() + result.name + suffix, elide=False)
- for l in out.splitlines():
- self.print_(' %s' % l)
- for l in err.splitlines():
- self.print_(' %s' % l)
- elif not self.args.quiet:
- if self.args.verbose > 1 and (out or err):
- suffix += ':\n'
- self.update(stats.format() + result.name + suffix,
- elide=(not self.args.verbose))
- if self.args.verbose > 1:
- for l in out.splitlines():
- self.print_(' %s' % l)
- for l in err.splitlines():
- self.print_(' %s' % l)
- if self.args.verbose:
- self.flush()
-
- def update(self, msg, elide):
- self.printer.update(msg, elide)
-
- def flush(self):
- self.printer.flush()
-
- def _summarize(self, full_results):
- num_tests = self.stats.finished
- num_failures = json_results.num_failures(full_results)
-
- if self.args.quiet and num_failures == 0:
- return
-
- if self.args.timing:
- timing_clause = ' in %.1fs' % (self.host.time() -
- self.stats.started_time)
- else:
- timing_clause = ''
- self.update('%d test%s run%s, %d failure%s.' %
- (num_tests,
- '' if num_tests == 1 else 's',
- timing_clause,
- num_failures,
- '' if num_failures == 1 else 's'), elide=False)
- self.print_()
-
- def _read_and_delete(self, path, delete):
- h = self.host
- obj = None
- if h.exists(path):
- contents = h.read_text_file(path)
- if contents:
- obj = json.loads(contents)
- if delete:
- h.remove(path)
- return obj
-
- def _write(self, path, obj):
- if path:
- self.host.write_text_file(path, json.dumps(obj, indent=2) + '\n')
-
- def _upload(self, full_results):
- h = self.host
- if not self.args.test_results_server:
- return 0
-
- url, content_type, data = json_results.make_upload_request(
- self.args.test_results_server, self.args.builder_name,
- self.args.master_name, self.args.test_type,
- full_results)
-
- try:
- h.fetch(url, data, {'Content-Type': content_type})
- return 0
- except Exception as e:
- h.print_('Uploading the JSON results raised "%s"' % str(e))
- return 1
-
- def report_coverage(self):
- if self.args.coverage: # pragma: no cover
- self.host.print_()
- import coverage
- cov = coverage.coverage(data_suffix=True)
- cov.combine()
- cov.report(show_missing=self.args.coverage_show_missing,
- omit=self.args.coverage_omit)
- if self.args.coverage_annotate:
- cov.annotate(omit=self.args.coverage_omit)
-
- def _add_trace_event(self, trace, name, start, end):
- event = {
- 'name': name,
- 'ts': int((start - self.stats.started_time) * 1000000),
- 'dur': int((end - start) * 1000000),
- 'ph': 'X',
- 'pid': self.host.getpid(),
- 'tid': 0,
- }
- trace['traceEvents'].append(event)
-
- def _trace_from_results(self, result_set):
- trace = OrderedDict()
- trace['traceEvents'] = []
- trace['otherData'] = {}
- for m in self.args.metadata:
- k, v = m.split('=')
- trace['otherData'][k] = v
-
- for result in result_set.results:
- started = int((result.started - self.stats.started_time) * 1000000)
- took = int(result.took * 1000000)
- event = OrderedDict()
- event['name'] = result.name
- event['dur'] = took
- event['ts'] = started
- event['ph'] = 'X' # "Complete" events
- event['pid'] = result.pid
- event['tid'] = result.worker
-
- args = OrderedDict()
- args['expected'] = sorted(str(r) for r in result.expected)
- args['actual'] = str(result.actual)
- args['out'] = result.out
- args['err'] = result.err
- args['code'] = result.code
- args['unexpected'] = result.unexpected
- args['flaky'] = result.flaky
- event['args'] = args
-
- trace['traceEvents'].append(event)
- return trace
-
-
-def _matches(name, globs):
- return any(fnmatch.fnmatch(name, glob) for glob in globs)
-
-
-def _default_classifier(args):
- def default_classifier(test_set, test):
- name = test.id()
- if not args.all and _matches(name, args.skip):
- test_set.tests_to_skip.append(TestInput(name,
- 'skipped by request'))
- elif _matches(name, args.isolate):
- test_set.isolated_tests.append(TestInput(name))
- else:
- test_set.parallel_tests.append(TestInput(name))
- return default_classifier
-
-
-def _test_adder(test_set, classifier):
- def add_tests(obj):
- if isinstance(obj, unittest.suite.TestSuite):
- for el in obj:
- add_tests(el)
- elif (obj.id().startswith('unittest.loader.LoadTestsFailure') or
- obj.id().startswith('unittest.loader.ModuleImportFailure')):
- # Access to protected member pylint: disable=W0212
- module_name = obj._testMethodName
- try:
- method = getattr(obj, obj._testMethodName)
- method()
- except Exception as e:
- if 'LoadTests' in obj.id():
- raise _AddTestsError('%s.load_tests() failed: %s'
- % (module_name, str(e)))
- else:
- raise _AddTestsError(str(e))
- else:
- assert isinstance(obj, unittest.TestCase)
- classifier(test_set, obj)
- return add_tests
-
-
-class _Child(object):
-
- def __init__(self, parent):
- self.host = None
- self.worker_num = None
- self.all = parent.args.all
- self.debugger = parent.args.debugger
- self.coverage = parent.args.coverage and parent.args.jobs > 1
- self.coverage_source = parent.coverage_source
- self.dry_run = parent.args.dry_run
- self.loader = parent.loader
- self.passthrough = parent.args.passthrough
- self.context = parent.context
- self.setup_fn = parent.setup_fn
- self.teardown_fn = parent.teardown_fn
- self.context_after_setup = None
- self.top_level_dir = parent.top_level_dir
- self.loaded_suites = {}
- self.cov = None
-
-
-def _setup_process(host, worker_num, child):
- child.host = host
- child.worker_num = worker_num
- # pylint: disable=protected-access
-
- if child.coverage: # pragma: no cover
- import coverage
- child.cov = coverage.coverage(source=child.coverage_source,
- data_suffix=True)
- child.cov._warn_no_data = False
- child.cov.start()
-
- if child.setup_fn:
- child.context_after_setup = child.setup_fn(child, child.context)
- else:
- child.context_after_setup = child.context
- return child
-
-
-def _teardown_process(child):
- res = None
- e = None
- if child.teardown_fn:
- try:
- res = child.teardown_fn(child, child.context_after_setup)
- except Exception as e:
- pass
-
- if child.cov: # pragma: no cover
- child.cov.stop()
- child.cov.save()
-
- return (child.worker_num, res, e)
-
-
-def _run_one_test(child, test_input):
- h = child.host
- pid = h.getpid()
- test_name = test_input.name
-
- start = h.time()
-
- # It is important to capture the output before loading the test
- # to ensure that
- # 1) the loader doesn't logs something we don't captured
- # 2) neither the loader nor the test case grab a reference to the
- # uncaptured stdout or stderr that later is used when the test is run.
- # This comes up when using the FakeTestLoader and testing typ itself,
- # but could come up when testing non-typ code as well.
- h.capture_output(divert=not child.passthrough)
-
- tb_str = ''
- try:
- orig_skip = unittest.skip
- orig_skip_if = unittest.skipIf
- if child.all:
- unittest.skip = lambda reason: lambda x: x
- unittest.skipIf = lambda condition, reason: lambda x: x
-
- try:
- suite = child.loader.loadTestsFromName(test_name)
- except Exception as e:
- try:
- suite = _load_via_load_tests(child, test_name)
- except Exception as e: # pragma: untested
- suite = []
- tb_str = traceback.format_exc(e)
- finally:
- unittest.skip = orig_skip
- unittest.skipIf = orig_skip_if
-
- tests = list(suite)
- if len(tests) != 1:
- err = 'Failed to load %s'
- if tb_str: # pragma: untested
- err += (' (traceback follows):\n %s' %
- ' \n'.join(tb_str.splitlines()))
-
- h.restore_output()
- return Result(test_name, ResultType.Failure, start, 0,
- child.worker_num, unexpected=True, code=1,
- err=err, pid=pid)
-
- test_case = tests[0]
- if isinstance(test_case, TypTestCase):
- test_case.child = child
- test_case.context = child.context_after_setup
-
- test_result = unittest.TestResult()
- out = ''
- err = ''
- try:
- if child.dry_run:
- pass
- elif child.debugger: # pragma: no cover
- _run_under_debugger(h, test_case, suite, test_result)
- else:
- suite.run(test_result)
- finally:
- out, err = h.restore_output()
-
- took = h.time() - start
- return _result_from_test_result(test_result, test_name, start, took, out,
- err, child.worker_num, pid)
-
-
-def _run_under_debugger(host, test_case, suite,
- test_result): # pragma: no cover
- # Access to protected member pylint: disable=W0212
- test_func = getattr(test_case, test_case._testMethodName)
- fname = inspect.getsourcefile(test_func)
- lineno = inspect.getsourcelines(test_func)[1] + 1
- dbg = pdb.Pdb(stdout=host.stdout.stream)
- dbg.set_break(fname, lineno)
- dbg.runcall(suite.run, test_result)
-
-
-def _result_from_test_result(test_result, test_name, start, took, out, err,
- worker_num, pid):
- flaky = False
- if test_result.failures:
- expected = [ResultType.Pass]
- actual = ResultType.Failure
- code = 1
- unexpected = True
- err = err + test_result.failures[0][1]
- elif test_result.errors:
- expected = [ResultType.Pass]
- actual = ResultType.Failure
- code = 1
- unexpected = True
- err = err + test_result.errors[0][1]
- elif test_result.skipped:
- expected = [ResultType.Skip]
- actual = ResultType.Skip
- err = err + test_result.skipped[0][1]
- code = 0
- unexpected = False
- elif test_result.expectedFailures:
- expected = [ResultType.Failure]
- actual = ResultType.Failure
- code = 1
- err = err + test_result.expectedFailures[0][1]
- unexpected = False
- elif test_result.unexpectedSuccesses:
- expected = [ResultType.Failure]
- actual = ResultType.Pass
- code = 0
- unexpected = True
- else:
- expected = [ResultType.Pass]
- actual = ResultType.Pass
- code = 0
- unexpected = False
-
- return Result(test_name, actual, start, took, worker_num,
- expected, unexpected, flaky, code, out, err, pid)
-
-
-def _load_via_load_tests(child, test_name):
- # If we couldn't import a test directly, the test may be only loadable
- # via unittest's load_tests protocol. See if we can find a load_tests
- # entry point that will work for this test.
- loader = child.loader
- comps = test_name.split('.')
- new_suite = unittest.TestSuite()
-
- while comps:
- name = '.'.join(comps)
- module = None
- suite = None
- if name not in child.loaded_suites:
- try:
- module = importlib.import_module(name)
- except ImportError:
- pass
- if module:
- suite = loader.loadTestsFromModule(module)
- child.loaded_suites[name] = suite
- suite = child.loaded_suites[name]
- if suite:
- for test_case in suite:
- assert isinstance(test_case, unittest.TestCase)
- if test_case.id() == test_name:
- new_suite.addTest(test_case)
- break
- comps.pop()
- return new_suite
-
-
-def _sort_inputs(inps):
- return sorted(inps, key=lambda inp: inp.name)
-
-
-if __name__ == '__main__': # pragma: no cover
- sys.modules['__main__'].__file__ = path_to_file
- sys.exit(main(win_multiprocessing=WinMultiprocessing.importable))
« no previous file with comments | « tools/telemetry/third_party/typ/typ/printer.py ('k') | tools/telemetry/third_party/typ/typ/stats.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698