Index: owners_finder.py |
diff --git a/owners_finder.py b/owners_finder.py |
new file mode 100644 |
index 0000000000000000000000000000000000000000..4af40f1455e52da9b7a5e6241e9b589802ae03a2 |
--- /dev/null |
+++ b/owners_finder.py |
@@ -0,0 +1,399 @@ |
+# Copyright (c) 2013 The Chromium Authors. All rights reserved. |
+# Use of this source code is governed by a BSD-style license that can be |
+# found in the LICENSE file. |
+ |
+"""Interactive tool for finding reviewers/owners for a change.""" |
+ |
+import os |
+import copy |
+import owners as owners_module |
+ |
+ |
+def first(iterable): |
+ for element in iterable: |
+ return element |
+ |
+ |
+class OwnersFinder(object): |
+ COLOR_LINK = '\033[4m' |
+ COLOR_BOLD = '\033[1;32m' |
+ COLOR_GREY = '\033[0;37m' |
+ COLOR_RESET = '\033[0m' |
+ |
+ indentation = 0 |
+ |
+ def __init__(self, files, local_root, |
+ fopen, os_path, glob, |
+ email_postfix='@chromium.org', |
+ disable_color=False): |
+ self.email_postfix = email_postfix |
+ |
+ if os.name == 'nt' or disable_color: |
+ self.COLOR_LINK = '' |
+ self.COLOR_BOLD = '' |
+ self.COLOR_GREY = '' |
+ self.COLOR_RESET = '' |
+ |
+ self.db = owners_module.Database(local_root, fopen, os_path, glob) |
+ self.db.load_data_needed_for(files) |
+ |
+ self.os_path = os_path |
+ |
+ self.file_to_owners = {} |
+ self._map_files_to_owners(files) |
Dirk Pranke
2013/07/30 22:01:00
Can you use db.owned_by and db.owners_for here and
Bei Zhang
2013/08/12 22:43:12
They're for different purposes.
db.owned_by and d
|
+ |
+ self.owner_to_files = {} |
+ self._map_owners_to_files() |
+ |
+ # The score of each owner. |
+ # We calculated the score of each owner like this: |
+ # 1. Each owner have a score of zero at the beginning; each file entry in |
+ # the CL have a "base score" of 1.0; |
+ # 2. For each entry in the CL: |
+ # a. If there is an OWNER entry for that entry containing K owners, each |
+ # owner will gain ((base score of that file) / K). The base score of |
+ # that entry will be divided by 10.0. |
+ # b. If step 2 reaches the local_root or EVERYONE is in the entry, exit |
+ # step 2. |
+ # c. Go to step a, and use the parent directory the current entry as the |
+ # new entry. |
Dirk Pranke
2013/07/30 22:01:00
I think there's a typo in this sentence.
Bei Zhang
2013/08/12 22:43:12
Done.
|
+ # |
+ # With this algorithm it is easy to find out these desirable properties: |
+ # 1. A direct owner gains more score than an indirect owner of a file. |
+ # 2. If a file can be reviewed by many people, each owner will gain less |
+ # score from that file. |
Dirk Pranke
2013/07/30 22:01:00
I still don't understand how you came up with this
Bei Zhang
2013/08/12 22:43:12
There is no reason.
I will try to reuse it.
On 20
|
+ self.owners_score = {} |
+ self._calculate_score() |
+ |
+ self.original_files_to_owners = copy.deepcopy(self.file_to_owners) |
+ self.comments = self.db.comments |
+ |
+ # This is the queue that will be shown in the interactive questions. |
+ # It is initially sorted by the score in descending order. In the |
+ # interactive questions a user can choose to "defer" its decision, then the |
+ # owner will be put to the end of the queue and shown later. |
+ self.owners_queue = [] |
+ |
+ self.unreviewed_files = set() |
+ self.reviewed_by = {} |
+ self.selected_owners = set() |
+ self.deselected_owners = set() |
+ self.reset() |
+ |
+ def run(self): |
+ self.reset() |
+ while len(self.owners_queue) > 0 and len(self.unreviewed_files) > 0: |
+ owner = self.owners_queue[0] |
+ |
+ if owner in self.selected_owners: |
+ continue |
+ |
+ if len(self.unreviewed_files) == 0: |
+ self.writeln('Finished.\n\n') |
+ break |
+ if owner in self.deselected_owners: |
+ # If this owner is already deselected. |
+ continue |
+ if not any((file_name in self.unreviewed_files) |
+ for file_name in self.owner_to_files[owner]): |
+ self.deselect_owner(owner) |
+ continue |
+ |
+ self.print_info(owner) |
+ |
+ while True: |
+ inp = self.input_command(owner) |
+ if inp == 'y' or inp == 'yes': |
+ self.select_owner(owner) |
+ break |
+ elif inp == 'n' or inp == 'no': |
+ self.deselect_owner(owner) |
+ break |
+ elif inp == '' or inp == 'd' or inp == 'defer': |
+ self.owners_queue.append(self.owners_queue.pop(0)) |
+ break |
+ elif inp == 'f' or inp == 'files': |
+ self.list_files() |
+ break |
+ elif inp == 'o' or inp == 'owners': |
+ self.list_owners(self.owners_queue) |
+ break |
+ elif inp == 'p' or inp == 'pick': |
+ self.pick_owner(raw_input('Pick an owner: ')) |
+ break |
+ elif inp.startswith('p ') or inp.startswith('pick '): |
+ self.pick_owner(inp.split(' ', 2)[1].strip()) |
+ break |
+ elif inp == 'r' or inp == 'restart': |
+ self.reset() |
+ break |
+ elif inp == 'q' or inp == 'quit': |
+ # Exit with error |
+ return 1 |
+ |
+ self.print_result() |
+ return 0 |
+ |
+ def _owners_of(self, file_name): |
+ """Iterate (owner, depth, entry)s for a file.""" |
+ depth = 0 |
+ db = self.db |
+ if file_name in db.owners_for: |
+ for owner in db.owners_for[file_name]: |
+ yield owner, depth, file_name |
+ while file_name != '': |
+ depth += 1 |
+ if file_name in db.stop_looking: |
+ break |
+ file_name = self.os_path.dirname(file_name) |
+ if file_name in db.owners_for: |
+ for owner in db.owners_for[file_name]: |
+ yield owner, depth, file_name |
+ |
+ def _map_files_to_owners(self, files): |
+ for file_name in files: |
+ owners_set = set() |
+ for owner, _, _ in self._owners_of(file_name): |
+ owners_set.add(owner) |
+ if owner == owners_module.EVERYONE: |
+ break |
+ # Eliminate files that EVERYONE can review |
+ if owners_module.EVERYONE in owners_set: |
+ continue |
+ # raise exception is not owner can be found |
+ if len(owners_set) == 0: |
+ raise Exception('File "%s" has no owner' % file_name) |
+ self.file_to_owners[file_name] = owners_set |
+ |
+ def _map_owners_to_files(self): |
+ for file_name in self.file_to_owners: |
+ for owner_name in self.file_to_owners[file_name]: |
+ self.owner_to_files.setdefault(owner_name, set()) |
+ self.owner_to_files[owner_name].add(file_name) |
+ |
+ def _calculate_score(self): |
+ # Files that EVERYONE owns is already eliminated. |
Dirk Pranke
2013/07/30 22:01:00
Nit: "are" already eliminated.
|
+ for file_name in self.file_to_owners: |
+ for owner, depth, entry_name in self._owners_of(file_name): |
+ self.owners_score.setdefault(owner, {}) |
+ self.owners_score[owner].setdefault(file_name, 0) |
+ self.owners_score[owner][file_name] += \ |
+ pow(0.1, depth) / len(self.db.owners_for[entry_name]) |
Dirk Pranke
2013/07/30 22:01:00
Same comments as above re: scoring ...
|
+ for owner in self.owners_score: |
+ self.owners_score[owner] = sum(self.owners_score[owner].values()) |
+ |
+ def reset(self): |
+ self.file_to_owners = copy.deepcopy(self.original_files_to_owners) |
+ self.unreviewed_files = set(self.file_to_owners.keys()) |
+ self.reviewed_by = {} |
+ self.selected_owners = set() |
+ self.deselected_owners = set() |
+ |
+ # Initialize owners queue, sort it by the score |
+ self.owners_queue = list(sorted(self.owner_to_files.keys(), |
+ key=lambda owner: self.owners_score[owner], |
+ reverse=True)) |
+ self.find_mandatory_owners() |
+ |
+ def select_owner(self, owner, findMandatoryOwners=True): |
+ if owner in self.selected_owners: |
+ return |
+ if owner in self.deselected_owners: |
+ return |
+ if not (owner in self.owners_queue): |
+ return |
+ self.writeln('Selected: ' + owner) |
+ self.owners_queue.remove(owner) |
+ self.selected_owners.add(owner) |
+ for file_name in filter( |
+ lambda file_name: file_name in self.unreviewed_files, |
+ self.owner_to_files[owner]): |
+ self.unreviewed_files.remove(file_name) |
+ self.reviewed_by[file_name] = owner |
+ if findMandatoryOwners: |
+ self.find_mandatory_owners() |
+ |
+ def deselect_owner(self, owner, findMandatoryOwners=True): |
+ if owner in self.selected_owners: |
+ return |
+ if owner in self.deselected_owners: |
+ return |
+ if not (owner in self.owners_queue): |
+ return |
+ self.writeln('Deselected: ' + owner) |
+ self.owners_queue.remove(owner) |
+ self.deselected_owners.add(owner) |
+ for file_name in self.owner_to_files[owner] & self.unreviewed_files: |
+ self.file_to_owners[file_name].remove(owner) |
+ if findMandatoryOwners: |
+ self.find_mandatory_owners() |
+ |
+ def find_mandatory_owners(self): |
+ continues = True |
+ for owner in self.owners_queue: |
+ if owner in self.selected_owners: |
+ continue |
+ if owner in self.deselected_owners: |
+ continue |
+ if len(self.owner_to_files[owner] & self.unreviewed_files) == 0: |
+ self.deselect_owner(owner, False) |
+ |
+ while continues: |
+ continues = False |
+ for file_name in filter( |
+ lambda file_name: len(self.file_to_owners[file_name]) == 1, |
+ self.unreviewed_files): |
+ owner = first(self.file_to_owners[file_name]) |
+ self.select_owner(owner, False) |
+ continues = True |
+ break |
+ |
+ def print_comments(self, owner): |
+ if owner not in self.comments: |
+ self.writeln(self.bold_name(owner)) |
+ else: |
+ self.writeln(self.bold_name(owner) + ' is commented as:') |
+ self.indent() |
+ for path in self.comments[owner]: |
+ if len(self.comments[owner][path]) > 0: |
+ self.writeln(self.greyed(self.comments[owner][path]) + |
+ ' (at ' + self.bold(path or '<root>') + ')') |
+ else: |
+ self.writeln(self.greyed('[No comment] ') + ' (at ' + |
+ self.bold(path or '<root>') + ')') |
+ self.unindent() |
+ |
+ def print_file_info(self, file_name, except_owner=''): |
+ if file_name not in self.unreviewed_files: |
+ self.writeln(self.greyed(file_name + |
+ ' (by ' + |
+ self.bold_name(self.reviewed_by[file_name]) + |
+ ')')) |
+ else: |
+ if len(self.file_to_owners[file_name]) <= 3: |
+ other_owners = [] |
+ for ow in self.file_to_owners[file_name]: |
+ if ow != except_owner: |
+ other_owners.append(self.bold_name(ow)) |
+ self.writeln(file_name + |
+ ' [' + (', '.join(other_owners)) + ']') |
+ else: |
+ self.writeln(file_name + ' [' + |
+ self.bold(str(len(self.file_to_owners[file_name]))) + |
+ ']') |
+ |
+ def print_file_info_detailed(self, file_name): |
+ self.writeln(file_name) |
+ self.indent() |
+ for ow in sorted(self.file_to_owners[file_name]): |
+ if ow in self.deselected_owners: |
+ self.writeln(self.bold_name(self.greyed(ow))) |
+ elif ow in self.selected_owners: |
+ self.writeln(self.bold_name(self.greyed(ow))) |
+ else: |
+ self.writeln(self.bold_name(ow)) |
+ self.unindent() |
+ |
+ def print_owned_files_for(self, owner): |
+ # Print owned files |
+ self.print_comments(owner) |
+ self.writeln(self.bold_name(owner) + ' owns ' + |
+ str(len(self.owner_to_files[owner])) + ' file(s):') |
+ self.indent() |
+ for file_name in sorted(self.owner_to_files[owner]): |
+ self.print_file_info(file_name, owner) |
+ self.unindent() |
+ self.writeln() |
+ |
+ def list_owners(self, owners_queue): |
+ if (len(self.owner_to_files) - len(self.deselected_owners) - |
+ len(self.selected_owners)) > 3: |
+ for ow in owners_queue: |
+ if ow not in self.deselected_owners and ow not in self.selected_owners: |
+ self.print_comments(ow) |
+ else: |
+ for ow in owners_queue: |
+ if ow not in self.deselected_owners and ow not in self.selected_owners: |
+ self.writeln() |
+ self.print_owned_files_for(ow) |
+ |
+ def list_files(self): |
+ self.indent() |
+ if len(self.unreviewed_files) > 5: |
+ for file_name in sorted(self.unreviewed_files): |
+ self.print_file_info(file_name) |
+ else: |
+ for file_name in self.unreviewed_files: |
+ self.print_file_info_detailed(file_name) |
+ self.unindent() |
+ |
+ def pick_owner(self, ow): |
+ # Allowing to omit domain suffixes |
+ if ow not in self.owner_to_files: |
+ if ow + self.email_postfix in self.owner_to_files: |
+ ow += self.email_postfix |
+ |
+ if ow not in self.owner_to_files: |
+ self.writeln('You cannot pick ' + self.bold_name(ow) + ' manually. ' + |
+ 'It\'s an invalid name or not related to the change list.') |
+ return False |
+ elif ow in self.selected_owners: |
+ self.writeln('You cannot pick ' + self.bold_name(ow) + ' manually. ' + |
+ 'It\'s already selected.') |
+ return False |
+ elif ow in self.deselected_owners: |
+ self.writeln('You cannot pick ' + self.bold_name(ow) + ' manually.' + |
+ 'It\'s already unselected.') |
+ return False |
+ |
+ self.select_owner(ow) |
+ return True |
+ |
+ def print_result(self): |
+ # Print results |
+ self.writeln() |
+ self.writeln() |
+ self.writeln('** You selected these owners **') |
+ self.writeln() |
+ for owner in self.selected_owners: |
+ self.writeln(self.bold_name(owner) + ':') |
+ self.indent() |
+ for file_name in sorted(self.owner_to_files[owner]): |
+ self.writeln(file_name) |
+ self.unindent() |
+ |
+ def bold(self, text): |
+ return self.COLOR_BOLD + text + self.COLOR_RESET |
+ |
+ def bold_name(self, name): |
+ return (self.COLOR_BOLD + |
+ name.replace(self.email_postfix, '') + self.COLOR_RESET) |
+ |
+ def greyed(self, text): |
+ return self.COLOR_GREY + text + self.COLOR_RESET |
+ |
+ def indent(self): |
+ self.indentation += 1 |
+ |
+ def unindent(self): |
+ self.indentation -= 1 |
+ |
+ def print_indent(self): |
+ return ' ' * self.indentation |
+ |
+ def writeln(self, text=''): |
+ print self.print_indent() + text |
+ |
+ def hr(self): |
+ self.writeln('=====================') |
+ |
+ def print_info(self, owner): |
+ self.hr() |
+ self.writeln( |
+ self.bold(str(len(self.unreviewed_files))) + ' file(s) left.') |
+ self.print_owned_files_for(owner) |
+ |
+ def input_command(self, owner): |
+ self.writeln('Add ' + self.bold_name(owner) + ' as your reviewer? ') |
+ return raw_input( |
+ '[yes/no/Defer/pick/files/owners/quit/restart]: ').lower() |