Index: owners.py |
diff --git a/owners.py b/owners.py |
index 9f840dbc65c9e59f67d018dff1eb34e9edb63b3b..1e0deb9b26d11f698ac177fb041fa92803373f0d 100644 |
--- a/owners.py |
+++ b/owners.py |
@@ -4,8 +4,15 @@ |
"""A database of OWNERS files.""" |
-class Assertion(AssertionError): |
- pass |
+import re |
+ |
+ |
+# If this is present by itself on a line, this means that everyone can review. |
+EVERYONE = '*' |
+ |
+ |
+# Recognizes 'X@Y' email addresses. Very simplistic. |
+BASIC_EMAIL_REGEXP = r'^[\w\-\+\%\.]+\@[\w\-\+\%\.]+$' |
class SyntaxErrorInOwnersFile(Exception): |
@@ -22,10 +29,6 @@ class SyntaxErrorInOwnersFile(Exception): |
return "%s:%d syntax error" % (self.path, self.line) |
-# Wildcard email-address in the OWNERS file. |
-ANYONE = '*' |
- |
- |
class Database(object): |
"""A database of OWNERS files for a repository. |
@@ -36,72 +39,113 @@ class Database(object): |
def __init__(self, root, fopen, os_path): |
"""Args: |
root: the path to the root of the Repository |
- all_owners: the list of every owner in the system |
open: function callback to open a text file for reading |
- os_path: module/object callback with fields for 'exists', |
- 'dirname', and 'join' |
+ os_path: module/object callback with fields for 'abspath', 'dirname', |
+ 'exists', and 'join' |
""" |
self.root = root |
self.fopen = fopen |
self.os_path = os_path |
- # Mapping of files to authorized owners. |
- self.files_owned_by = {} |
+ # TODO: Figure out how to share the owners email addr format w/ |
+ # tools/commit-queue/projects.py, especially for per-repo whitelists. |
+ self.email_regexp = re.compile(BASIC_EMAIL_REGEXP) |
- # Mapping of owners to the files they own. |
- self.owners_for = {} |
+ # Mapping of owners to the paths they own. |
+ self.owned_by = {EVERYONE: set()} |
- # In-memory cached map of files to their OWNERS files. |
- self.owners_file_for = {} |
+ # Mapping of paths to authorized owners. |
+ self.owners_for = {} |
- # In-memory cache of OWNERS files and their contents |
- self.owners_files = {} |
+ # Set of paths that stop us from looking above them for owners. |
+ # (This is implicitly true for the root directory). |
+ self.stop_looking = set(['']) |
def ReviewersFor(self, files): |
"""Returns a suggested set of reviewers that will cover the set of files. |
- The set of files are paths relative to (and under) self.root.""" |
+ files is a set of paths relative to (and under) self.root.""" |
+ self._CheckPaths(files) |
self._LoadDataNeededFor(files) |
return self._CoveringSetOfOwnersFor(files) |
def FilesAreCoveredBy(self, files, reviewers): |
+ """Returns whether every file is owned by at least one reviewer.""" |
return not self.FilesNotCoveredBy(files, reviewers) |
def FilesNotCoveredBy(self, files, reviewers): |
- covered_files = set() |
- for reviewer in reviewers: |
- covered_files = covered_files.union(self.files_owned_by[reviewer]) |
- return files.difference(covered_files) |
+ """Returns the set of files that are not owned by at least one reviewer.""" |
+ self._CheckPaths(files) |
+ self._CheckReviewers(reviewers) |
+ if not reviewers: |
+ return files |
- def _LoadDataNeededFor(self, files): |
+ self._LoadDataNeededFor(files) |
+ files_by_dir = self._FilesByDir(files) |
+ covered_dirs = self._DirsCoveredBy(reviewers) |
+ uncovered_files = [] |
+ for d, files_in_d in files_by_dir.iteritems(): |
+ if not self._IsDirCoveredBy(d, covered_dirs): |
+ uncovered_files.extend(files_in_d) |
+ return set(uncovered_files) |
+ |
+ def _CheckPaths(self, files): |
+ def _isunder(f, pfx): |
+ return self.os_path.abspath(self.os_path.join(pfx, f)).startswith(pfx) |
+ assert all(_isunder(f, self.os_path.abspath(self.root)) for f in files) |
+ |
+ def _CheckReviewers(self, reviewers): |
+ """Verifies each reviewer is a valid email address.""" |
+ assert all(self.email_regexp.match(r) for r in reviewers) |
+ |
+ def _FilesByDir(self, files): |
+ dirs = {} |
for f in files: |
- self._LoadOwnersFor(f) |
- |
- def _LoadOwnersFor(self, f): |
- if f not in self.owners_for: |
- owner_file = self._FindOwnersFileFor(f) |
- self.owners_file_for[f] = owner_file |
- self._ReadOwnersFile(owner_file, f) |
- |
- def _FindOwnersFileFor(self, f): |
- # This is really a "do ... until dirname = ''" |
- dirname = self.os_path.dirname(f) |
- while dirname: |
- owner_path = self.os_path.join(dirname, 'OWNERS') |
- if self.os_path.exists(owner_path): |
- return owner_path |
+ dirs.setdefault(self.os_path.dirname(f), []).append(f) |
+ return dirs |
+ |
+ def _DirsCoveredBy(self, reviewers): |
+ dirs = self.owned_by[EVERYONE] |
+ for r in reviewers: |
+ dirs = dirs | self.owned_by.get(r, set()) |
+ return dirs |
+ |
+ def _StopLooking(self, dirname): |
+ return dirname in self.stop_looking |
+ |
+ def _IsDirCoveredBy(self, dirname, covered_dirs): |
+ while not dirname in covered_dirs and not self._StopLooking(dirname): |
dirname = self.os_path.dirname(dirname) |
- owner_path = self.os_path.join(dirname, 'OWNERS') |
- if self.os_path.exists(owner_path): |
- return owner_path |
- raise Assertion('No OWNERS file found for %s' % f) |
- |
- def _ReadOwnersFile(self, owner_file, affected_file): |
- owners_for = self.owners_for.setdefault(affected_file, set()) |
- for owner in self.fopen(owner_file): |
- owner = owner.strip() |
- self.files_owned_by.setdefault(owner, set()).add(affected_file) |
- owners_for.add(owner) |
+ return dirname in covered_dirs |
+ |
+ def _LoadDataNeededFor(self, files): |
+ for f in files: |
+ dirpath = self.os_path.dirname(f) |
+ while not dirpath in self.owners_for: |
+ self._ReadOwnersInDir(dirpath) |
+ if self._StopLooking(dirpath): |
+ break |
+ dirpath = self.os_path.dirname(dirpath) |
+ |
+ def _ReadOwnersInDir(self, dirpath): |
+ owners_path = self.os_path.join(self.root, dirpath, 'OWNERS') |
+ if not self.os_path.exists(owners_path): |
+ return |
+ |
+ lineno = 0 |
+ for line in self.fopen(owners_path): |
+ lineno += 1 |
+ line = line.strip() |
+ if line.startswith('#'): |
+ continue |
+ if line == 'set noparent': |
+ self.stop_looking.add(dirpath) |
+ continue |
+ if self.email_regexp.match(line) or line == EVERYONE: |
+ self.owned_by.setdefault(line, set()).add(dirpath) |
+ self.owners_for.setdefault(dirpath, set()).add(line) |
+ continue |
+ raise SyntaxErrorInOwnersFile(owners_path, lineno, line) |
def _CoveringSetOfOwnersFor(self, files): |
# TODO(dpranke): implement the greedy algorithm for covering sets, and |
@@ -109,5 +153,10 @@ class Database(object): |
# short combinations of owners. |
every_owner = set() |
for f in files: |
- every_owner = every_owner.union(self.owners_for[f]) |
+ dirname = self.os_path.dirname(f) |
+ while dirname in self.owners_for: |
+ every_owner |= self.owners_for[dirname] |
+ if self._StopLooking(dirname): |
+ break |
+ dirname = self.os_path.dirname(dirname) |
return every_owner |