| Index: owners.py
|
| diff --git a/owners.py b/owners.py
|
| index 9f840dbc65c9e59f67d018dff1eb34e9edb63b3b..1e0deb9b26d11f698ac177fb041fa92803373f0d 100644
|
| --- a/owners.py
|
| +++ b/owners.py
|
| @@ -4,8 +4,15 @@
|
|
|
| """A database of OWNERS files."""
|
|
|
| -class Assertion(AssertionError):
|
| - pass
|
| +import re
|
| +
|
| +
|
| +# If this is present by itself on a line, this means that everyone can review.
|
| +EVERYONE = '*'
|
| +
|
| +
|
| +# Recognizes 'X@Y' email addresses. Very simplistic.
|
| +BASIC_EMAIL_REGEXP = r'^[\w\-\+\%\.]+\@[\w\-\+\%\.]+$'
|
|
|
|
|
| class SyntaxErrorInOwnersFile(Exception):
|
| @@ -22,10 +29,6 @@ class SyntaxErrorInOwnersFile(Exception):
|
| return "%s:%d syntax error" % (self.path, self.line)
|
|
|
|
|
| -# Wildcard email-address in the OWNERS file.
|
| -ANYONE = '*'
|
| -
|
| -
|
| class Database(object):
|
| """A database of OWNERS files for a repository.
|
|
|
| @@ -36,72 +39,113 @@ class Database(object):
|
| def __init__(self, root, fopen, os_path):
|
| """Args:
|
| root: the path to the root of the Repository
|
| - all_owners: the list of every owner in the system
|
| open: function callback to open a text file for reading
|
| - os_path: module/object callback with fields for 'exists',
|
| - 'dirname', and 'join'
|
| + os_path: module/object callback with fields for 'abspath', 'dirname',
|
| + 'exists', and 'join'
|
| """
|
| self.root = root
|
| self.fopen = fopen
|
| self.os_path = os_path
|
|
|
| - # Mapping of files to authorized owners.
|
| - self.files_owned_by = {}
|
| + # TODO: Figure out how to share the owners email addr format w/
|
| + # tools/commit-queue/projects.py, especially for per-repo whitelists.
|
| + self.email_regexp = re.compile(BASIC_EMAIL_REGEXP)
|
|
|
| - # Mapping of owners to the files they own.
|
| - self.owners_for = {}
|
| + # Mapping of owners to the paths they own.
|
| + self.owned_by = {EVERYONE: set()}
|
|
|
| - # In-memory cached map of files to their OWNERS files.
|
| - self.owners_file_for = {}
|
| + # Mapping of paths to authorized owners.
|
| + self.owners_for = {}
|
|
|
| - # In-memory cache of OWNERS files and their contents
|
| - self.owners_files = {}
|
| + # Set of paths that stop us from looking above them for owners.
|
| + # (This is implicitly true for the root directory).
|
| + self.stop_looking = set([''])
|
|
|
| def ReviewersFor(self, files):
|
| """Returns a suggested set of reviewers that will cover the set of files.
|
|
|
| - The set of files are paths relative to (and under) self.root."""
|
| + files is a set of paths relative to (and under) self.root."""
|
| + self._CheckPaths(files)
|
| self._LoadDataNeededFor(files)
|
| return self._CoveringSetOfOwnersFor(files)
|
|
|
| def FilesAreCoveredBy(self, files, reviewers):
|
| + """Returns whether every file is owned by at least one reviewer."""
|
| return not self.FilesNotCoveredBy(files, reviewers)
|
|
|
| def FilesNotCoveredBy(self, files, reviewers):
|
| - covered_files = set()
|
| - for reviewer in reviewers:
|
| - covered_files = covered_files.union(self.files_owned_by[reviewer])
|
| - return files.difference(covered_files)
|
| + """Returns the set of files that are not owned by at least one reviewer."""
|
| + self._CheckPaths(files)
|
| + self._CheckReviewers(reviewers)
|
| + if not reviewers:
|
| + return files
|
|
|
| - def _LoadDataNeededFor(self, files):
|
| + self._LoadDataNeededFor(files)
|
| + files_by_dir = self._FilesByDir(files)
|
| + covered_dirs = self._DirsCoveredBy(reviewers)
|
| + uncovered_files = []
|
| + for d, files_in_d in files_by_dir.iteritems():
|
| + if not self._IsDirCoveredBy(d, covered_dirs):
|
| + uncovered_files.extend(files_in_d)
|
| + return set(uncovered_files)
|
| +
|
| + def _CheckPaths(self, files):
|
| + def _isunder(f, pfx):
|
| + return self.os_path.abspath(self.os_path.join(pfx, f)).startswith(pfx)
|
| + assert all(_isunder(f, self.os_path.abspath(self.root)) for f in files)
|
| +
|
| + def _CheckReviewers(self, reviewers):
|
| + """Verifies each reviewer is a valid email address."""
|
| + assert all(self.email_regexp.match(r) for r in reviewers)
|
| +
|
| + def _FilesByDir(self, files):
|
| + dirs = {}
|
| for f in files:
|
| - self._LoadOwnersFor(f)
|
| -
|
| - def _LoadOwnersFor(self, f):
|
| - if f not in self.owners_for:
|
| - owner_file = self._FindOwnersFileFor(f)
|
| - self.owners_file_for[f] = owner_file
|
| - self._ReadOwnersFile(owner_file, f)
|
| -
|
| - def _FindOwnersFileFor(self, f):
|
| - # This is really a "do ... until dirname = ''"
|
| - dirname = self.os_path.dirname(f)
|
| - while dirname:
|
| - owner_path = self.os_path.join(dirname, 'OWNERS')
|
| - if self.os_path.exists(owner_path):
|
| - return owner_path
|
| + dirs.setdefault(self.os_path.dirname(f), []).append(f)
|
| + return dirs
|
| +
|
| + def _DirsCoveredBy(self, reviewers):
|
| + dirs = self.owned_by[EVERYONE]
|
| + for r in reviewers:
|
| + dirs = dirs | self.owned_by.get(r, set())
|
| + return dirs
|
| +
|
| + def _StopLooking(self, dirname):
|
| + return dirname in self.stop_looking
|
| +
|
| + def _IsDirCoveredBy(self, dirname, covered_dirs):
|
| + while not dirname in covered_dirs and not self._StopLooking(dirname):
|
| dirname = self.os_path.dirname(dirname)
|
| - owner_path = self.os_path.join(dirname, 'OWNERS')
|
| - if self.os_path.exists(owner_path):
|
| - return owner_path
|
| - raise Assertion('No OWNERS file found for %s' % f)
|
| -
|
| - def _ReadOwnersFile(self, owner_file, affected_file):
|
| - owners_for = self.owners_for.setdefault(affected_file, set())
|
| - for owner in self.fopen(owner_file):
|
| - owner = owner.strip()
|
| - self.files_owned_by.setdefault(owner, set()).add(affected_file)
|
| - owners_for.add(owner)
|
| + return dirname in covered_dirs
|
| +
|
| + def _LoadDataNeededFor(self, files):
|
| + for f in files:
|
| + dirpath = self.os_path.dirname(f)
|
| + while not dirpath in self.owners_for:
|
| + self._ReadOwnersInDir(dirpath)
|
| + if self._StopLooking(dirpath):
|
| + break
|
| + dirpath = self.os_path.dirname(dirpath)
|
| +
|
| + def _ReadOwnersInDir(self, dirpath):
|
| + owners_path = self.os_path.join(self.root, dirpath, 'OWNERS')
|
| + if not self.os_path.exists(owners_path):
|
| + return
|
| +
|
| + lineno = 0
|
| + for line in self.fopen(owners_path):
|
| + lineno += 1
|
| + line = line.strip()
|
| + if line.startswith('#'):
|
| + continue
|
| + if line == 'set noparent':
|
| + self.stop_looking.add(dirpath)
|
| + continue
|
| + if self.email_regexp.match(line) or line == EVERYONE:
|
| + self.owned_by.setdefault(line, set()).add(dirpath)
|
| + self.owners_for.setdefault(dirpath, set()).add(line)
|
| + continue
|
| + raise SyntaxErrorInOwnersFile(owners_path, lineno, line)
|
|
|
| def _CoveringSetOfOwnersFor(self, files):
|
| # TODO(dpranke): implement the greedy algorithm for covering sets, and
|
| @@ -109,5 +153,10 @@ class Database(object):
|
| # short combinations of owners.
|
| every_owner = set()
|
| for f in files:
|
| - every_owner = every_owner.union(self.owners_for[f])
|
| + dirname = self.os_path.dirname(f)
|
| + while dirname in self.owners_for:
|
| + every_owner |= self.owners_for[dirname]
|
| + if self._StopLooking(dirname):
|
| + break
|
| + dirname = self.os_path.dirname(dirname)
|
| return every_owner
|
|
|