| Index: owners.py
|
| diff --git a/owners.py b/owners.py
|
| index 563675a15bce4743b2d0d65df6da58289120789b..46cdf2216239a4e9a6d28d5a55b1e40bd0ad140d 100644
|
| --- a/owners.py
|
| +++ b/owners.py
|
| @@ -55,6 +55,7 @@ Examples for all of these combinations can be found in tests/owners_unittest.py.
|
| """
|
|
|
| import collections
|
| +import fnmatch
|
| import random
|
| import re
|
|
|
| @@ -94,35 +95,32 @@ class Database(object):
|
| of changed files, and see if a list of changed files is covered by a
|
| list of reviewers."""
|
|
|
| - def __init__(self, root, fopen, os_path, glob):
|
| + def __init__(self, root, fopen, os_path):
|
| """Args:
|
| root: the path to the root of the Repository
|
| open: function callback to open a text file for reading
|
| os_path: module/object callback with fields for 'abspath', 'dirname',
|
| 'exists', and 'join'
|
| - glob: function callback to list entries in a directory match a glob
|
| - (i.e., glob.glob)
|
| """
|
| self.root = root
|
| self.fopen = fopen
|
| self.os_path = os_path
|
| - self.glob = glob
|
|
|
| # Pick a default email regexp to use; callers can override as desired.
|
| self.email_regexp = re.compile(BASIC_EMAIL_REGEXP)
|
|
|
| - # Mapping of owners to the paths they own.
|
| - self.owned_by = {EVERYONE: set()}
|
| + # Mapping of owners to the paths or globs they own.
|
| + self._owners_to_paths = {EVERYONE: set()}
|
|
|
| # Mapping of paths to authorized owners.
|
| - self.owners_for = {}
|
| + self._paths_to_owners = {}
|
|
|
| # Mapping reviewers to the preceding comment per file in the OWNERS files.
|
| self.comments = {}
|
|
|
| # Set of paths that stop us from looking above them for owners.
|
| # (This is implicitly true for the root directory).
|
| - self.stop_looking = set([''])
|
| + self._stop_looking = set([''])
|
|
|
| # Set of files which have already been read.
|
| self.read_files = set()
|
| @@ -135,6 +133,7 @@ class Database(object):
|
| in order avoid suggesting the author as a reviewer for their own changes."""
|
| self._check_paths(files)
|
| self.load_data_needed_for(files)
|
| +
|
| suggested_owners = self._covering_set_of_owners_for(files, author)
|
| if EVERYONE in suggested_owners:
|
| if len(suggested_owners) > 1:
|
| @@ -154,11 +153,7 @@ class Database(object):
|
| self._check_reviewers(reviewers)
|
| self.load_data_needed_for(files)
|
|
|
| - covered_objs = self._objs_covered_by(reviewers)
|
| - uncovered_files = [f for f in files
|
| - if not self._is_obj_covered_by(f, covered_objs)]
|
| -
|
| - return set(uncovered_files)
|
| + return set(f for f in files if not self._is_obj_covered_by(f, reviewers))
|
|
|
| def _check_paths(self, files):
|
| def _is_under(f, pfx):
|
| @@ -171,25 +166,23 @@ class Database(object):
|
| _assert_is_collection(reviewers)
|
| assert all(self.email_regexp.match(r) for r in reviewers)
|
|
|
| - def _objs_covered_by(self, reviewers):
|
| - objs = self.owned_by[EVERYONE]
|
| - for r in reviewers:
|
| - objs = objs | self.owned_by.get(r, set())
|
| - return objs
|
| -
|
| - def _stop_looking(self, objname):
|
| - return objname in self.stop_looking
|
| -
|
| - def _is_obj_covered_by(self, objname, covered_objs):
|
| - while not objname in covered_objs and not self._stop_looking(objname):
|
| + def _is_obj_covered_by(self, objname, reviewers):
|
| + reviewers = list(reviewers) + [EVERYONE]
|
| + while True:
|
| + for reviewer in reviewers:
|
| + for owned_pattern in self._owners_to_paths.get(reviewer, set()):
|
| + if fnmatch.fnmatch(objname, owned_pattern):
|
| + return True
|
| + if self._should_stop_looking(objname):
|
| + break
|
| objname = self.os_path.dirname(objname)
|
| - return objname in covered_objs
|
| + return False
|
|
|
| def _enclosing_dir_with_owners(self, objname):
|
| """Returns the innermost enclosing directory that has an OWNERS file."""
|
| dirpath = objname
|
| - while not dirpath in self.owners_for:
|
| - if self._stop_looking(dirpath):
|
| + while not self._owners_for(dirpath):
|
| + if self._should_stop_looking(dirpath):
|
| break
|
| dirpath = self.os_path.dirname(dirpath)
|
| return dirpath
|
| @@ -197,12 +190,23 @@ class Database(object):
|
| def load_data_needed_for(self, files):
|
| for f in files:
|
| dirpath = self.os_path.dirname(f)
|
| - while not dirpath in self.owners_for:
|
| + while not self._owners_for(dirpath):
|
| self._read_owners(self.os_path.join(dirpath, 'OWNERS'))
|
| - if self._stop_looking(dirpath):
|
| + if self._should_stop_looking(dirpath):
|
| break
|
| dirpath = self.os_path.dirname(dirpath)
|
|
|
| + def _should_stop_looking(self, objname):
|
| + return any(fnmatch.fnmatch(objname, stop_looking)
|
| + for stop_looking in self._stop_looking)
|
| +
|
| + def _owners_for(self, objname):
|
| + obj_owners = set()
|
| + for owned_path, path_owners in self._paths_to_owners.iteritems():
|
| + if fnmatch.fnmatch(objname, owned_path):
|
| + obj_owners |= path_owners
|
| + return obj_owners
|
| +
|
| def _read_owners(self, path):
|
| owners_path = self.os_path.join(self.root, path)
|
| if not self.os_path.exists(owners_path):
|
| @@ -231,7 +235,7 @@ class Database(object):
|
| in_comment = False
|
|
|
| if line == 'set noparent':
|
| - self.stop_looking.add(dirpath)
|
| + self._stop_looking.add(dirpath)
|
| continue
|
|
|
| m = re.match('per-file (.+)=(.+)', line)
|
| @@ -243,10 +247,9 @@ class Database(object):
|
| raise SyntaxErrorInOwnersFile(owners_path, lineno,
|
| 'per-file globs cannot span directories or use escapes: "%s"' %
|
| line)
|
| - baselines = self.glob(full_glob_string)
|
| - for baseline in (self.os_path.relpath(b, self.root) for b in baselines):
|
| - self._add_entry(baseline, directive, 'per-file line',
|
| - owners_path, lineno, '\n'.join(comment))
|
| + relative_glob_string = self.os_path.relpath(full_glob_string, self.root)
|
| + self._add_entry(relative_glob_string, directive, 'per-file line',
|
| + owners_path, lineno, '\n'.join(comment))
|
| continue
|
|
|
| if line.startswith('set '):
|
| @@ -259,7 +262,7 @@ class Database(object):
|
| def _add_entry(self, path, directive,
|
| line_type, owners_path, lineno, comment):
|
| if directive == 'set noparent':
|
| - self.stop_looking.add(path)
|
| + self._stop_looking.add(path)
|
| elif directive.startswith('file:'):
|
| owners_file = self._resolve_include(directive[5:], owners_path)
|
| if not owners_file:
|
| @@ -269,19 +272,20 @@ class Database(object):
|
| self._read_owners(owners_file)
|
|
|
| dirpath = self.os_path.dirname(owners_file)
|
| - for key in self.owned_by:
|
| - if not dirpath in self.owned_by[key]:
|
| + for key in self._owners_to_paths:
|
| + if not dirpath in self._owners_to_paths[key]:
|
| continue
|
| - self.owned_by[key].add(path)
|
| + self._owners_to_paths[key].add(path)
|
|
|
| - if dirpath in self.owners_for:
|
| - self.owners_for.setdefault(path, set()).update(self.owners_for[dirpath])
|
| + if dirpath in self._paths_to_owners:
|
| + self._paths_to_owners.setdefault(path, set()).update(
|
| + self._paths_to_owners[dirpath])
|
|
|
| elif self.email_regexp.match(directive) or directive == EVERYONE:
|
| self.comments.setdefault(directive, {})
|
| self.comments[directive][path] = comment
|
| - self.owned_by.setdefault(directive, set()).add(path)
|
| - self.owners_for.setdefault(path, set()).add(directive)
|
| + self._owners_to_paths.setdefault(directive, set()).add(path)
|
| + self._paths_to_owners.setdefault(path, set()).add(directive)
|
| else:
|
| raise SyntaxErrorInOwnersFile(owners_path, lineno,
|
| ('%s is not a "set" directive, file include, "*", '
|
| @@ -321,7 +325,7 @@ class Database(object):
|
| dirname = current_dir
|
| distance = 1
|
| while True:
|
| - for owner in self.owners_for.get(dirname, []):
|
| + for owner in self._owners_for(dirname):
|
| if author and owner == author:
|
| continue
|
| all_possible_owners.setdefault(owner, [])
|
| @@ -329,7 +333,7 @@ class Database(object):
|
| # directory, only count the closest one.
|
| if not any(current_dir == el[0] for el in all_possible_owners[owner]):
|
| all_possible_owners[owner].append((current_dir, distance))
|
| - if self._stop_looking(dirname):
|
| + if self._should_stop_looking(dirname):
|
| break
|
| dirname = self.os_path.dirname(dirname)
|
| distance += 1
|
|
|