OLD | NEW |
---|---|
1 # Copyright (c) 2010 The Chromium Authors. All rights reserved. | 1 # Copyright (c) 2010 The Chromium Authors. All rights reserved. |
2 # Use of this source code is governed by a BSD-style license that can be | 2 # Use of this source code is governed by a BSD-style license that can be |
3 # found in the LICENSE file. | 3 # found in the LICENSE file. |
4 | 4 |
5 """A database of OWNERS files.""" | 5 """A database of OWNERS files.""" |
6 | 6 |
7 class Assertion(AssertionError): | 7 import re |
8 pass | |
9 | 8 |
10 | 9 |
11 class SyntaxErrorInOwnersFile(Exception): | 10 class SyntaxErrorInOwnersFile(Exception): |
12 def __init__(self, path, line, msg): | 11 def __init__(self, path, line, msg): |
13 super(SyntaxErrorInOwnersFile, self).__init__((path, line, msg)) | 12 super(SyntaxErrorInOwnersFile, self).__init__((path, line, msg)) |
14 self.path = path | 13 self.path = path |
15 self.line = line | 14 self.line = line |
16 self.msg = msg | 15 self.msg = msg |
17 | 16 |
18 def __str__(self): | 17 def __str__(self): |
19 if self.msg: | 18 if self.msg: |
20 return "%s:%d syntax error: %s" % (self.path, self.line, self.msg) | 19 return "%s:%d syntax error: %s" % (self.path, self.line, self.msg) |
21 else: | 20 else: |
22 return "%s:%d syntax error" % (self.path, self.line) | 21 return "%s:%d syntax error" % (self.path, self.line) |
23 | 22 |
24 | 23 |
25 # Wildcard email-address in the OWNERS file. | 24 # If this is present by itself on a line, this means that everyone can review. |
26 ANYONE = '*' | 25 EVERYONE = '*' |
27 | 26 |
27 # Recognizes 'X@Y' email addresses. Very simplistic. | |
28 BASIC_EMAIL_REGEXP = r'^[\w\-\+\%\.]+\@[\w\-\+\%\.]+$' | |
M-A Ruel
2011/03/08 20:24:03
style nit:
I prefer const at the top of the file,
Dirk Pranke
2011/03/08 22:05:27
OK.
| |
28 | 29 |
29 class Database(object): | 30 class Database(object): |
30 """A database of OWNERS files for a repository. | 31 """A database of OWNERS files for a repository. |
31 | 32 |
32 This class allows you to find a suggested set of reviewers for a list | 33 This class allows you to find a suggested set of reviewers for a list |
33 of changed files, and see if a list of changed files is covered by a | 34 of changed files, and see if a list of changed files is covered by a |
34 list of reviewers.""" | 35 list of reviewers.""" |
35 | 36 |
36 def __init__(self, root, fopen, os_path): | 37 def __init__(self, root, fopen, os_path): |
37 """Args: | 38 """Args: |
38 root: the path to the root of the Repository | 39 root: the path to the root of the Repository |
39 all_owners: the list of every owner in the system | |
40 open: function callback to open a text file for reading | 40 open: function callback to open a text file for reading |
41 os_path: module/object callback with fields for 'exists', | 41 os_path: module/object callback with fields for 'abspath', 'dirname', |
42 'dirname', and 'join' | 42 'exists', and 'join' |
43 """ | 43 """ |
44 self.root = root | 44 self.root = root |
45 self.fopen = fopen | 45 self.fopen = fopen |
46 self.os_path = os_path | 46 self.os_path = os_path |
47 | 47 |
48 # Mapping of files to authorized owners. | 48 # TODO: Figure out how to share the owners email addr format w/ |
49 self.files_owned_by = {} | 49 # tools/commit-queue/projects.py, especially for per-repo whitelists. |
50 self.email_regexp = re.compile(BASIC_EMAIL_REGEXP) | |
M-A Ruel
2011/03/08 20:24:03
FYI, if you want it to be a whitelist of valid cre
Dirk Pranke
2011/03/08 22:05:27
Yeah, this was intended to be the simplest possibl
| |
50 | 51 |
51 # Mapping of owners to the files they own. | 52 # Mapping of owners to the paths they own. |
53 self.owned_by = {EVERYONE: set()} | |
54 | |
55 # Mapping of paths to authorized owners. | |
52 self.owners_for = {} | 56 self.owners_for = {} |
53 | 57 |
54 # In-memory cached map of files to their OWNERS files. | 58 # Set of paths that stop us from looking above them for owners. |
55 self.owners_file_for = {} | 59 # (This is implicitly true for the root directory). |
56 | 60 self.stop_looking = set(['']) |
57 # In-memory cache of OWNERS files and their contents | |
58 self.owners_files = {} | |
59 | 61 |
60 def ReviewersFor(self, files): | 62 def ReviewersFor(self, files): |
61 """Returns a suggested set of reviewers that will cover the set of files. | 63 """Returns a suggested set of reviewers that will cover the set of files. |
62 | 64 |
63 The set of files are paths relative to (and under) self.root.""" | 65 files is a set of paths relative to (and under) self.root.""" |
66 self._CheckPaths(files) | |
64 self._LoadDataNeededFor(files) | 67 self._LoadDataNeededFor(files) |
65 return self._CoveringSetOfOwnersFor(files) | 68 return self._CoveringSetOfOwnersFor(files) |
66 | 69 |
67 def FilesAreCoveredBy(self, files, reviewers): | 70 def FilesAreCoveredBy(self, files, reviewers): |
71 """Returns whether every file is owned by at least one reviewer.""" | |
68 return not self.FilesNotCoveredBy(files, reviewers) | 72 return not self.FilesNotCoveredBy(files, reviewers) |
69 | 73 |
70 def FilesNotCoveredBy(self, files, reviewers): | 74 def FilesNotCoveredBy(self, files, reviewers): |
71 covered_files = set() | 75 """Returns the set of files that are not owned by at least one reviewer.""" |
72 for reviewer in reviewers: | 76 self._CheckPaths(files) |
73 covered_files = covered_files.union(self.files_owned_by[reviewer]) | 77 self._CheckReviewers(reviewers) |
74 return files.difference(covered_files) | 78 if not reviewers: |
79 return files | |
80 | |
81 self._LoadDataNeededFor(files) | |
82 files_by_dir = self._FilesByDir(files) | |
83 covered_dirs = self._DirsCoveredBy(reviewers) | |
84 uncovered_files = [] | |
85 for d, files_in_d in files_by_dir.iteritems(): | |
86 if not self._IsDirCoveredBy(d, covered_dirs): | |
87 uncovered_files.extend(files_in_d) | |
88 return set(uncovered_files) | |
89 | |
90 def _CheckPaths(self, files): | |
M-A Ruel
2011/03/08 20:24:03
BTW I'm trying to get towards PEP-8 style naming f
Dirk Pranke
2011/03/08 22:05:27
You tell me this NOW?? :) Is there anything else i
| |
91 def _isunder(f, pfx): | |
92 return self.os_path.abspath(self.os_path.join(pfx, f)).startswith(pfx) | |
93 assert all(_isunder(f, self.root) for f in files) | |
M-A Ruel
2011/03/08 20:24:03
What if self.root is not an abspath already? Like
Dirk Pranke
2011/03/08 22:05:27
Good question. I'll update it.
| |
94 | |
95 def _CheckReviewers(self, reviewers): | |
M-A Ruel
2011/03/08 20:24:03
Can you add a one line docstring saying that it "V
Dirk Pranke
2011/03/08 22:05:27
Will do.
| |
96 assert all(self.email_regexp.match(r) for r in reviewers) | |
97 | |
98 def _FilesByDir(self, files): | |
99 dirs = {} | |
100 for f in files: | |
101 dirs.setdefault(self.os_path.dirname(f), []).append(f) | |
102 return dirs | |
103 | |
104 def _DirsCoveredBy(self, reviewers): | |
105 dirs = self.owned_by[EVERYONE] | |
106 for r in reviewers: | |
107 dirs = dirs | self.owned_by.get(r, set()) | |
108 return dirs | |
109 | |
110 def _StopLooking(self, dirname): | |
111 return dirname in self.stop_looking | |
112 | |
113 def _IsDirCoveredBy(self, dirname, covered_dirs): | |
114 while not dirname in covered_dirs and not self._StopLooking(dirname): | |
115 dirname = self.os_path.dirname(dirname) | |
116 return dirname in covered_dirs | |
75 | 117 |
76 def _LoadDataNeededFor(self, files): | 118 def _LoadDataNeededFor(self, files): |
77 for f in files: | 119 for f in files: |
78 self._LoadOwnersFor(f) | 120 dirpath = self.os_path.dirname(f) |
121 while not dirpath in self.owners_for: | |
122 self._ReadOwnersInDir(dirpath) | |
123 if self._StopLooking(dirpath): | |
124 break | |
125 dirpath = self.os_path.dirname(dirpath) | |
79 | 126 |
80 def _LoadOwnersFor(self, f): | 127 def _ReadOwnersInDir(self, dirpath): |
81 if f not in self.owners_for: | 128 owners_path = self.os_path.join(self.root, dirpath, 'OWNERS') |
82 owner_file = self._FindOwnersFileFor(f) | 129 if not self.os_path.exists(owners_path): |
83 self.owners_file_for[f] = owner_file | 130 return |
84 self._ReadOwnersFile(owner_file, f) | |
85 | 131 |
86 def _FindOwnersFileFor(self, f): | 132 lineno = 0 |
87 # This is really a "do ... until dirname = ''" | 133 for line in self.fopen(owners_path): |
88 dirname = self.os_path.dirname(f) | 134 lineno += 1 |
89 while dirname: | 135 self._ReadOneLine(line, lineno, dirpath, owners_path) |
M-A Ruel
2011/03/08 20:24:03
I don't think having this code in a function makes
Dirk Pranke
2011/03/08 22:05:27
I slightly prefer it, I think, but I agree it's no
| |
90 owner_path = self.os_path.join(dirname, 'OWNERS') | |
91 if self.os_path.exists(owner_path): | |
92 return owner_path | |
93 dirname = self.os_path.dirname(dirname) | |
94 owner_path = self.os_path.join(dirname, 'OWNERS') | |
95 if self.os_path.exists(owner_path): | |
96 return owner_path | |
97 raise Assertion('No OWNERS file found for %s' % f) | |
98 | 136 |
99 def _ReadOwnersFile(self, owner_file, affected_file): | 137 def _ReadOneLine(self, line, lineno, dirpath, owners_path): |
100 owners_for = self.owners_for.setdefault(affected_file, set()) | 138 line = line.strip() |
101 for owner in self.fopen(owner_file): | 139 if line.startswith('#'): |
102 owner = owner.strip() | 140 return |
103 self.files_owned_by.setdefault(owner, set()).add(affected_file) | 141 if line == 'set noparent': |
104 owners_for.add(owner) | 142 self.stop_looking.add(dirpath) |
143 return | |
144 if self.email_regexp.match(line) or line == EVERYONE: | |
145 self.owned_by.setdefault(line, set()).add(dirpath) | |
146 self.owners_for.setdefault(dirpath, set()).add(line) | |
147 return | |
148 raise SyntaxErrorInOwnersFile(owners_path, lineno, line) | |
105 | 149 |
106 def _CoveringSetOfOwnersFor(self, files): | 150 def _CoveringSetOfOwnersFor(self, files): |
107 # TODO(dpranke): implement the greedy algorithm for covering sets, and | 151 # TODO(dpranke): implement the greedy algorithm for covering sets, and |
108 # consider returning multiple options in case there are several equally | 152 # consider returning multiple options in case there are several equally |
109 # short combinations of owners. | 153 # short combinations of owners. |
110 every_owner = set() | 154 every_owner = set() |
111 for f in files: | 155 for f in files: |
112 every_owner = every_owner.union(self.owners_for[f]) | 156 dirname = self.os_path.dirname(f) |
157 while dirname in self.owners_for: | |
158 every_owner |= self.owners_for[dirname] | |
159 if self._StopLooking(dirname): | |
160 break | |
161 dirname = self.os_path.dirname(dirname) | |
113 return every_owner | 162 return every_owner |
OLD | NEW |