Index: owners.py |
diff --git a/owners.py b/owners.py |
index 56311e40e699c3d8dccb03c9723dae3aeb265ce1..78c7cca9cb7e6d9385fd954ce3512ce13ffd2da6 100644 |
--- a/owners.py |
+++ b/owners.py |
@@ -55,6 +55,7 @@ Examples for all of these combinations can be found in tests/owners_unittest.py. |
""" |
import collections |
+import fnmatch |
import random |
import re |
@@ -94,35 +95,32 @@ class Database(object): |
of changed files, and see if a list of changed files is covered by a |
list of reviewers.""" |
- def __init__(self, root, fopen, os_path, glob): |
+ def __init__(self, root, fopen, os_path): |
"""Args: |
root: the path to the root of the Repository |
open: function callback to open a text file for reading |
os_path: module/object callback with fields for 'abspath', 'dirname', |
'exists', 'join', and 'relpath' |
- glob: function callback to list entries in a directory match a glob |
- (i.e., glob.glob) |
""" |
self.root = root |
self.fopen = fopen |
self.os_path = os_path |
- self.glob = glob |
# Pick a default email regexp to use; callers can override as desired. |
self.email_regexp = re.compile(BASIC_EMAIL_REGEXP) |
- # Mapping of owners to the paths they own. |
- self.owned_by = {EVERYONE: set()} |
+ # Mapping of owners to the paths or globs they own. |
+ self._owners_to_paths = {EVERYONE: set()} |
# Mapping of paths to authorized owners. |
- self.owners_for = {} |
+ self._paths_to_owners = {} |
# Mapping reviewers to the preceding comment per file in the OWNERS files. |
self.comments = {} |
# Set of paths that stop us from looking above them for owners. |
# (This is implicitly true for the root directory). |
- self.stop_looking = set(['']) |
+ self._stop_looking = set(['']) |
# Set of files which have already been read. |
self.read_files = set() |
@@ -135,6 +133,7 @@ class Database(object): |
in order avoid suggesting the author as a reviewer for their own changes.""" |
self._check_paths(files) |
self.load_data_needed_for(files) |
+ |
suggested_owners = self._covering_set_of_owners_for(files, author) |
if EVERYONE in suggested_owners: |
if len(suggested_owners) > 1: |
@@ -154,11 +153,7 @@ class Database(object): |
self._check_reviewers(reviewers) |
self.load_data_needed_for(files) |
- covered_objs = self._objs_covered_by(reviewers) |
- uncovered_files = [f for f in files |
- if not self._is_obj_covered_by(f, covered_objs)] |
- |
- return set(uncovered_files) |
+ return set(f for f in files if not self._is_obj_covered_by(f, reviewers)) |
def _check_paths(self, files): |
def _is_under(f, pfx): |
@@ -171,25 +166,23 @@ class Database(object): |
_assert_is_collection(reviewers) |
assert all(self.email_regexp.match(r) for r in reviewers) |
- def _objs_covered_by(self, reviewers): |
- objs = self.owned_by[EVERYONE] |
- for r in reviewers: |
- objs = objs | self.owned_by.get(r, set()) |
- return objs |
- |
- def _stop_looking(self, objname): |
- return objname in self.stop_looking |
- |
- def _is_obj_covered_by(self, objname, covered_objs): |
- while not objname in covered_objs and not self._stop_looking(objname): |
+ def _is_obj_covered_by(self, objname, reviewers): |
+ reviewers = list(reviewers) + [EVERYONE] |
+ while True: |
+ for reviewer in reviewers: |
+ for owned_pattern in self._owners_to_paths.get(reviewer, set()): |
+ if fnmatch.fnmatch(objname, owned_pattern): |
+ return True |
+ if self._should_stop_looking(objname): |
+ break |
objname = self.os_path.dirname(objname) |
- return objname in covered_objs |
+ return False |
def _enclosing_dir_with_owners(self, objname): |
"""Returns the innermost enclosing directory that has an OWNERS file.""" |
dirpath = objname |
- while not dirpath in self.owners_for: |
- if self._stop_looking(dirpath): |
+ while not self._owners_for(dirpath): |
+ if self._should_stop_looking(dirpath): |
break |
dirpath = self.os_path.dirname(dirpath) |
return dirpath |
@@ -197,12 +190,23 @@ class Database(object): |
def load_data_needed_for(self, files): |
for f in files: |
dirpath = self.os_path.dirname(f) |
- while not dirpath in self.owners_for: |
+ while not self._owners_for(dirpath): |
self._read_owners(self.os_path.join(dirpath, 'OWNERS')) |
- if self._stop_looking(dirpath): |
+ if self._should_stop_looking(dirpath): |
break |
dirpath = self.os_path.dirname(dirpath) |
+ def _should_stop_looking(self, objname): |
+ return any(fnmatch.fnmatch(objname, stop_looking) |
+ for stop_looking in self._stop_looking) |
+ |
+ def _owners_for(self, objname): |
+ obj_owners = set() |
+ for owned_path, path_owners in self._paths_to_owners.iteritems(): |
+ if fnmatch.fnmatch(objname, owned_path): |
+ obj_owners |= path_owners |
+ return obj_owners |
+ |
def _read_owners(self, path): |
owners_path = self.os_path.join(self.root, path) |
if not self.os_path.exists(owners_path): |
@@ -231,7 +235,7 @@ class Database(object): |
in_comment = False |
if line == 'set noparent': |
- self.stop_looking.add(dirpath) |
+ self._stop_looking.add(dirpath) |
continue |
m = re.match('per-file (.+)=(.+)', line) |
@@ -243,10 +247,9 @@ class Database(object): |
raise SyntaxErrorInOwnersFile(owners_path, lineno, |
'per-file globs cannot span directories or use escapes: "%s"' % |
line) |
- baselines = self.glob(full_glob_string) |
- for baseline in (self.os_path.relpath(b, self.root) for b in baselines): |
- self._add_entry(baseline, directive, 'per-file line', |
- owners_path, lineno, '\n'.join(comment)) |
+ relative_glob_string = self.os_path.relpath(full_glob_string, self.root) |
+ self._add_entry(relative_glob_string, directive, 'per-file line', |
+ owners_path, lineno, '\n'.join(comment)) |
continue |
if line.startswith('set '): |
@@ -259,7 +262,7 @@ class Database(object): |
def _add_entry(self, path, directive, |
line_type, owners_path, lineno, comment): |
if directive == 'set noparent': |
- self.stop_looking.add(path) |
+ self._stop_looking.add(path) |
elif directive.startswith('file:'): |
owners_file = self._resolve_include(directive[5:], owners_path) |
if not owners_file: |
@@ -269,19 +272,20 @@ class Database(object): |
self._read_owners(owners_file) |
dirpath = self.os_path.dirname(owners_file) |
- for key in self.owned_by: |
- if not dirpath in self.owned_by[key]: |
+ for key in self._owners_to_paths: |
+ if not dirpath in self._owners_to_paths[key]: |
continue |
- self.owned_by[key].add(path) |
+ self._owners_to_paths[key].add(path) |
- if dirpath in self.owners_for: |
- self.owners_for.setdefault(path, set()).update(self.owners_for[dirpath]) |
+ if dirpath in self._paths_to_owners: |
+ self._paths_to_owners.setdefault(path, set()).update( |
+ self._paths_to_owners[dirpath]) |
elif self.email_regexp.match(directive) or directive == EVERYONE: |
self.comments.setdefault(directive, {}) |
self.comments[directive][path] = comment |
- self.owned_by.setdefault(directive, set()).add(path) |
- self.owners_for.setdefault(path, set()).add(directive) |
+ self._owners_to_paths.setdefault(directive, set()).add(path) |
+ self._paths_to_owners.setdefault(path, set()).add(directive) |
else: |
raise SyntaxErrorInOwnersFile(owners_path, lineno, |
('%s is not a "set" directive, file include, "*", ' |
@@ -321,7 +325,7 @@ class Database(object): |
dirname = current_dir |
distance = 1 |
while True: |
- for owner in self.owners_for.get(dirname, []): |
+ for owner in self._owners_for(dirname): |
if author and owner == author: |
continue |
all_possible_owners.setdefault(owner, []) |
@@ -329,7 +333,7 @@ class Database(object): |
# directory, only count the closest one. |
if not any(current_dir == el[0] for el in all_possible_owners[owner]): |
all_possible_owners[owner].append((current_dir, distance)) |
- if self._stop_looking(dirname): |
+ if self._should_stop_looking(dirname): |
break |
dirname = self.os_path.dirname(dirname) |
distance += 1 |