Index: expect_tests/listing.py |
diff --git a/expect_tests/listing.py b/expect_tests/listing.py |
new file mode 100644 |
index 0000000000000000000000000000000000000000..d5ff4c30424ba35ddd127528af9f448a5120ef28 |
--- /dev/null |
+++ b/expect_tests/listing.py |
@@ -0,0 +1,240 @@ |
+# Copyright 2014 The Chromium Authors. All rights reserved. |
+# Use of this source code is governed by a BSD-style license that can be |
+# found in the LICENSE file. |
+ |
+ |
+import ConfigParser |
+import glob |
+import os |
+import re |
+ |
+ |
+CONFIG_FILE_NAME = '.expect_tests.cfg' |
+ |
+ |
+def get_python_root(path): |
+ """Get the lowest directory with no __init__.py file. |
+ |
+ When ``path`` is pointing inside a Python package, this function returns the |
+ directory directly containing this package. If ``path`` points outside of |
+ a Python package, the it returns ``path``. |
+ |
+ Args: |
+ path (str): arbitrary path |
+ Returns: |
+ root (str): ancestor directory, with no __init__.py file in it. |
+ """ |
+ if not os.path.exists(path): |
+ raise ValueError('path must exist: %s') |
+ |
+ while path != os.path.dirname(path): |
+ if not os.path.exists(os.path.join(path, '__init__.py')): |
+ return path |
+ path = os.path.dirname(path) |
+ |
+ # This is not supposed to happen, but in case somebody adds a __init__.py |
+ # at the filesystem root ... |
+ raise IOError("Unable to find a python root for %s" % path) |
+ |
+ |
+def parse_test_glob(test_glob): |
+ """A test glob is composed of a path and a glob expression like: |
+ '<path>:<glob>'. The path should point to a directory or a file inside |
+ a Python package (it can be the root directory of that package). |
+ The glob is a Python name used to filter tests. |
+ |
+ A test name is the name of the method prepended with the class name. |
+ |
+ Example: |
+ 'my/nice/package/test1/:TestA*', the package root being 'my/nice/package': |
+ this matches all tests whose class name starts with 'TestA' inside all |
+ files matching test1/*_test.py, like |
+ ``TestABunchOfStuff.testCatFoodDeployment``. |
+ |
+ Args: |
+ test_glob (str): a test glob |
+ Returns: |
+ (path, test_filter): absolute path and test filter glob. |
+ """ |
+ parts = test_glob.split(':') |
+ if len(parts) > 2: |
+ raise ValueError('A test_glob should contain at most one colon (got %s)' |
+ % test_glob) |
+ if len(parts) == 2: |
+ path, test_filter = parts |
+ if '/' in test_filter: |
+ raise ValueError('A test filter cannot contain a slash (got %s)', |
+ test_filter) |
+ |
+ if not test_filter: # empty string case |
+ test_filter = ('*',) |
+ else: |
+ path, test_filter = parts[0], ('*',) |
+ |
+ path = os.path.abspath(path) |
+ return path, test_filter |
+ |
+ |
+class PackageTestingContext(object): |
+ def __init__(self, cwd, package_name, filters): |
+ """Information to run a set of tests in a single package. |
+ |
+ See also parse_test_glob. |
+ """ |
+ # TODO(iannucci): let's scan packages too so that <path> can also be a |
+ # glob. Then expect_tests can use a default of '*:*' when no tests are |
+ # specified. |
+ |
+ self.cwd = cwd |
+ self.package_name = package_name |
+ # list of (path, filter) pairs. |
+ # The path is a relative path to a subdirectory of |
+ # os.path.join(self.cwd, self.package_name) in which to look for tests. |
+ # Only tests whose name matches the glob are kept. |
+ self.filters = filters |
+ |
+ def itermatchers(self): |
+ """Iterate over all filters, and yield matchers for each of them. |
+ |
+ Yields: |
+ path (str): restrict test listing to this subpackage. |
+ matcher (SRE_Pattern): whitelist matcher |
+ """ |
+ for filt in self.filters: |
+ # Implicitely append * to globs |
+ one_glob = '%s%s' % (filt[1], '*' if '*' not in filt[1] else '') |
+ matcher = re.compile('^(?:%s)$' % glob.fnmatch.translate(one_glob)) |
+ |
+ if matcher.pattern == '^$': |
+ matcher = re.compile('^.*$') |
+ |
+ yield filt[0], matcher |
+ |
+ @classmethod |
+ def from_path(cls, path, filters=('*',)): |
+ path = os.path.abspath(path) |
+ if not os.path.exists(path): |
+ raise ValueError('Path does not exist: %s' % path) |
+ cwd = get_python_root(path) |
+ package_name = os.path.relpath(path, cwd).split(os.path.sep)[0] |
+ # The path in which to look for tests. Only tests whose name matches the |
+ # glob are kept. |
+ relpath = os.path.relpath(path, os.path.join(cwd, package_name)) |
+ |
+ if not isinstance(filters, (list, tuple)): |
+ raise ValueError('the "filter" parameter must be a tuple or a list, ' |
+ 'got %s' % type(filters).__name__) |
+ if len(filters) == 0: |
+ filters = [(relpath, '*')] |
+ else: |
+ filters = [(relpath, filt) for filt in filters] |
+ |
+ return cls(cwd, package_name, filters) |
+ |
+ @classmethod |
+ def from_context_list(cls, contexts): |
+ """Merge several PackageTestingContext pointing to the same package.""" |
+ cwd = set(context.cwd for context in contexts) |
+ if len(cwd) > 1: |
+ raise ValueError( |
+ 'from_context_list() was given' |
+ 'process contexts containing the following cwds, ' |
+ 'but can only process contexts which all share a single cwd: ' |
+ '%s' % str(cwd)) |
+ |
+ package_name = set(context.package_name for context in contexts) |
+ if len(package_name) > 1: |
+ raise ValueError( |
+ 'from_context_list() was given' |
+ 'process contexts containing the following package_name, ' |
+ 'but can only process contexts which all share a single package_name: ' |
+ '%s' % str(package_name)) |
+ |
+ filters = [] |
+ for context in contexts: |
+ filters.extend(context.filters) |
+ |
+ return cls(cwd.pop(), package_name.pop(), filters) |
+ |
+ |
+class ProcessingContext(object): |
+ def __init__(self, testing_contexts): |
+ """Information to run a set of tasks in a given working directory. |
+ |
+ Args: |
+ testing_contexts (list): list of PackageTestingContext instances. |
+ """ |
+ self.cwd = testing_contexts[0].cwd |
+ |
+ # Merge testing_contexts by package |
+ groups = {} |
+ for context in testing_contexts: |
+ if context.cwd != self.cwd: |
+ raise ValueError('All package must have the same value for "cwd"') |
+ groups.setdefault(context.package_name, []).append(context) |
+ |
+ self.testing_contexts = [PackageTestingContext.from_context_list(contexts) |
+ for contexts in groups.itervalues()] |
+ |
+ |
+def get_config(path): |
+ """Get configuration values |
+ |
+ Reads the config file in provided path, and returns content. |
+ See Python ConfigParser for general formatting syntax. |
+ |
+ Example: |
+ [expect_tests] |
+ skip=directory1 |
+ directory2 |
+ directory3 |
+ |
+ Args: |
+ path (str): path to a directory. |
+ |
+ Returns: |
+ black_list (set): blacklisted subdirectories. |
+ """ |
+ black_list = set() |
+ |
+ config_file_name = os.path.join(path, CONFIG_FILE_NAME) |
+ parser = ConfigParser.ConfigParser() |
+ parser.read([config_file_name]) |
+ |
+ if not parser.has_section('expect_tests'): |
+ return black_list |
+ |
+ if parser.has_option('expect_tests', 'skip'): |
+ black_list.update(parser.get('expect_tests', 'skip').splitlines()) |
+ return black_list |
+ |
+ |
+def get_runtime_contexts(test_globs): |
+ """Compute the list of packages/filters to get tests from.""" |
+ # Step 1: compute list of packages + subtree |
+ testing_contexts = [] |
+ for test_glob in test_globs: |
+ path, test_filter = parse_test_glob(test_glob) |
+ if os.path.exists(os.path.join(path, '__init__.py')): |
+ testing_contexts.append( |
+ PackageTestingContext.from_path(path, test_filter)) |
+ else: |
+ # Look for all packages in path. |
+ subpaths = [] |
+ black_list = get_config(path) |
+ |
+ for filename in filter(lambda x: x not in black_list, os.listdir(path)): |
+ abs_filename = os.path.join(path, filename) |
+ if (os.path.isdir(abs_filename) |
+ and os.path.isfile(os.path.join(abs_filename, '__init__.py'))): |
+ subpaths.append(abs_filename) |
+ |
+ testing_contexts.extend( |
+ [PackageTestingContext.from_path(subpath, test_filter) |
+ for subpath in subpaths]) |
+ |
+ # Step 2: group by working directory - one process per wd. |
+ groups = {} |
+ for context in testing_contexts: |
+ groups.setdefault(context.cwd, []).append(context) |
+ return [ProcessingContext(contexts) for contexts in groups.itervalues()] |