OLD | NEW |
1 # Copyright (c) 2010 The Chromium Authors. All rights reserved. | 1 # Copyright (c) 2010 The Chromium Authors. All rights reserved. |
2 # Use of this source code is governed by a BSD-style license that can be | 2 # Use of this source code is governed by a BSD-style license that can be |
3 # found in the LICENSE file. | 3 # found in the LICENSE file. |
4 | 4 |
5 """A database of OWNERS files.""" | 5 """A database of OWNERS files.""" |
6 | 6 |
7 class Assertion(AssertionError): | 7 import re |
8 pass | 8 |
| 9 |
| 10 # If this is present by itself on a line, this means that everyone can review. |
| 11 EVERYONE = '*' |
| 12 |
| 13 |
| 14 # Recognizes 'X@Y' email addresses. Very simplistic. |
| 15 BASIC_EMAIL_REGEXP = r'^[\w\-\+\%\.]+\@[\w\-\+\%\.]+$' |
9 | 16 |
10 | 17 |
11 class SyntaxErrorInOwnersFile(Exception): | 18 class SyntaxErrorInOwnersFile(Exception): |
12 def __init__(self, path, line, msg): | 19 def __init__(self, path, line, msg): |
13 super(SyntaxErrorInOwnersFile, self).__init__((path, line, msg)) | 20 super(SyntaxErrorInOwnersFile, self).__init__((path, line, msg)) |
14 self.path = path | 21 self.path = path |
15 self.line = line | 22 self.line = line |
16 self.msg = msg | 23 self.msg = msg |
17 | 24 |
18 def __str__(self): | 25 def __str__(self): |
19 if self.msg: | 26 if self.msg: |
20 return "%s:%d syntax error: %s" % (self.path, self.line, self.msg) | 27 return "%s:%d syntax error: %s" % (self.path, self.line, self.msg) |
21 else: | 28 else: |
22 return "%s:%d syntax error" % (self.path, self.line) | 29 return "%s:%d syntax error" % (self.path, self.line) |
23 | 30 |
24 | 31 |
25 # Wildcard email-address in the OWNERS file. | |
26 ANYONE = '*' | |
27 | |
28 | |
29 class Database(object): | 32 class Database(object): |
30 """A database of OWNERS files for a repository. | 33 """A database of OWNERS files for a repository. |
31 | 34 |
32 This class allows you to find a suggested set of reviewers for a list | 35 This class allows you to find a suggested set of reviewers for a list |
33 of changed files, and see if a list of changed files is covered by a | 36 of changed files, and see if a list of changed files is covered by a |
34 list of reviewers.""" | 37 list of reviewers.""" |
35 | 38 |
36 def __init__(self, root, fopen, os_path): | 39 def __init__(self, root, fopen, os_path): |
37 """Args: | 40 """Args: |
38 root: the path to the root of the Repository | 41 root: the path to the root of the Repository |
39 all_owners: the list of every owner in the system | |
40 open: function callback to open a text file for reading | 42 open: function callback to open a text file for reading |
41 os_path: module/object callback with fields for 'exists', | 43 os_path: module/object callback with fields for 'abspath', 'dirname', |
42 'dirname', and 'join' | 44 'exists', and 'join' |
43 """ | 45 """ |
44 self.root = root | 46 self.root = root |
45 self.fopen = fopen | 47 self.fopen = fopen |
46 self.os_path = os_path | 48 self.os_path = os_path |
47 | 49 |
48 # Mapping of files to authorized owners. | 50 # TODO: Figure out how to share the owners email addr format w/ |
49 self.files_owned_by = {} | 51 # tools/commit-queue/projects.py, especially for per-repo whitelists. |
| 52 self.email_regexp = re.compile(BASIC_EMAIL_REGEXP) |
50 | 53 |
51 # Mapping of owners to the files they own. | 54 # Mapping of owners to the paths they own. |
| 55 self.owned_by = {EVERYONE: set()} |
| 56 |
| 57 # Mapping of paths to authorized owners. |
52 self.owners_for = {} | 58 self.owners_for = {} |
53 | 59 |
54 # In-memory cached map of files to their OWNERS files. | 60 # Set of paths that stop us from looking above them for owners. |
55 self.owners_file_for = {} | 61 # (This is implicitly true for the root directory). |
56 | 62 self.stop_looking = set(['']) |
57 # In-memory cache of OWNERS files and their contents | |
58 self.owners_files = {} | |
59 | 63 |
60 def ReviewersFor(self, files): | 64 def ReviewersFor(self, files): |
61 """Returns a suggested set of reviewers that will cover the set of files. | 65 """Returns a suggested set of reviewers that will cover the set of files. |
62 | 66 |
63 The set of files are paths relative to (and under) self.root.""" | 67 files is a set of paths relative to (and under) self.root.""" |
| 68 self._CheckPaths(files) |
64 self._LoadDataNeededFor(files) | 69 self._LoadDataNeededFor(files) |
65 return self._CoveringSetOfOwnersFor(files) | 70 return self._CoveringSetOfOwnersFor(files) |
66 | 71 |
67 def FilesAreCoveredBy(self, files, reviewers): | 72 def FilesAreCoveredBy(self, files, reviewers): |
| 73 """Returns whether every file is owned by at least one reviewer.""" |
68 return not self.FilesNotCoveredBy(files, reviewers) | 74 return not self.FilesNotCoveredBy(files, reviewers) |
69 | 75 |
70 def FilesNotCoveredBy(self, files, reviewers): | 76 def FilesNotCoveredBy(self, files, reviewers): |
71 covered_files = set() | 77 """Returns the set of files that are not owned by at least one reviewer.""" |
72 for reviewer in reviewers: | 78 self._CheckPaths(files) |
73 covered_files = covered_files.union(self.files_owned_by[reviewer]) | 79 self._CheckReviewers(reviewers) |
74 return files.difference(covered_files) | 80 if not reviewers: |
| 81 return files |
| 82 |
| 83 self._LoadDataNeededFor(files) |
| 84 files_by_dir = self._FilesByDir(files) |
| 85 covered_dirs = self._DirsCoveredBy(reviewers) |
| 86 uncovered_files = [] |
| 87 for d, files_in_d in files_by_dir.iteritems(): |
| 88 if not self._IsDirCoveredBy(d, covered_dirs): |
| 89 uncovered_files.extend(files_in_d) |
| 90 return set(uncovered_files) |
| 91 |
| 92 def _CheckPaths(self, files): |
| 93 def _isunder(f, pfx): |
| 94 return self.os_path.abspath(self.os_path.join(pfx, f)).startswith(pfx) |
| 95 assert all(_isunder(f, self.os_path.abspath(self.root)) for f in files) |
| 96 |
| 97 def _CheckReviewers(self, reviewers): |
| 98 """Verifies each reviewer is a valid email address.""" |
| 99 assert all(self.email_regexp.match(r) for r in reviewers) |
| 100 |
| 101 def _FilesByDir(self, files): |
| 102 dirs = {} |
| 103 for f in files: |
| 104 dirs.setdefault(self.os_path.dirname(f), []).append(f) |
| 105 return dirs |
| 106 |
| 107 def _DirsCoveredBy(self, reviewers): |
| 108 dirs = self.owned_by[EVERYONE] |
| 109 for r in reviewers: |
| 110 dirs = dirs | self.owned_by.get(r, set()) |
| 111 return dirs |
| 112 |
| 113 def _StopLooking(self, dirname): |
| 114 return dirname in self.stop_looking |
| 115 |
| 116 def _IsDirCoveredBy(self, dirname, covered_dirs): |
| 117 while not dirname in covered_dirs and not self._StopLooking(dirname): |
| 118 dirname = self.os_path.dirname(dirname) |
| 119 return dirname in covered_dirs |
75 | 120 |
76 def _LoadDataNeededFor(self, files): | 121 def _LoadDataNeededFor(self, files): |
77 for f in files: | 122 for f in files: |
78 self._LoadOwnersFor(f) | 123 dirpath = self.os_path.dirname(f) |
| 124 while not dirpath in self.owners_for: |
| 125 self._ReadOwnersInDir(dirpath) |
| 126 if self._StopLooking(dirpath): |
| 127 break |
| 128 dirpath = self.os_path.dirname(dirpath) |
79 | 129 |
80 def _LoadOwnersFor(self, f): | 130 def _ReadOwnersInDir(self, dirpath): |
81 if f not in self.owners_for: | 131 owners_path = self.os_path.join(self.root, dirpath, 'OWNERS') |
82 owner_file = self._FindOwnersFileFor(f) | 132 if not self.os_path.exists(owners_path): |
83 self.owners_file_for[f] = owner_file | 133 return |
84 self._ReadOwnersFile(owner_file, f) | |
85 | 134 |
86 def _FindOwnersFileFor(self, f): | 135 lineno = 0 |
87 # This is really a "do ... until dirname = ''" | 136 for line in self.fopen(owners_path): |
88 dirname = self.os_path.dirname(f) | 137 lineno += 1 |
89 while dirname: | 138 line = line.strip() |
90 owner_path = self.os_path.join(dirname, 'OWNERS') | 139 if line.startswith('#'): |
91 if self.os_path.exists(owner_path): | 140 continue |
92 return owner_path | 141 if line == 'set noparent': |
93 dirname = self.os_path.dirname(dirname) | 142 self.stop_looking.add(dirpath) |
94 owner_path = self.os_path.join(dirname, 'OWNERS') | 143 continue |
95 if self.os_path.exists(owner_path): | 144 if self.email_regexp.match(line) or line == EVERYONE: |
96 return owner_path | 145 self.owned_by.setdefault(line, set()).add(dirpath) |
97 raise Assertion('No OWNERS file found for %s' % f) | 146 self.owners_for.setdefault(dirpath, set()).add(line) |
98 | 147 continue |
99 def _ReadOwnersFile(self, owner_file, affected_file): | 148 raise SyntaxErrorInOwnersFile(owners_path, lineno, line) |
100 owners_for = self.owners_for.setdefault(affected_file, set()) | |
101 for owner in self.fopen(owner_file): | |
102 owner = owner.strip() | |
103 self.files_owned_by.setdefault(owner, set()).add(affected_file) | |
104 owners_for.add(owner) | |
105 | 149 |
106 def _CoveringSetOfOwnersFor(self, files): | 150 def _CoveringSetOfOwnersFor(self, files): |
107 # TODO(dpranke): implement the greedy algorithm for covering sets, and | 151 # TODO(dpranke): implement the greedy algorithm for covering sets, and |
108 # consider returning multiple options in case there are several equally | 152 # consider returning multiple options in case there are several equally |
109 # short combinations of owners. | 153 # short combinations of owners. |
110 every_owner = set() | 154 every_owner = set() |
111 for f in files: | 155 for f in files: |
112 every_owner = every_owner.union(self.owners_for[f]) | 156 dirname = self.os_path.dirname(f) |
| 157 while dirname in self.owners_for: |
| 158 every_owner |= self.owners_for[dirname] |
| 159 if self._StopLooking(dirname): |
| 160 break |
| 161 dirname = self.os_path.dirname(dirname) |
113 return every_owner | 162 return every_owner |
OLD | NEW |