Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(59)

Unified Diff: client/bin/factory.py

Issue 3340013: Fix status reporting and auto-seq logic. (Closed) Base URL: ssh://git@gitrw.chromium.org:9222/autotest.git
Patch Set: address comment, and patch factory_Verify Created 10 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « no previous file | client/bin/factory_ui » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: client/bin/factory.py
diff --git a/client/bin/factory.py b/client/bin/factory.py
index 194e60402ad0e2c103a7f440a56793db0f0b31c4..d0245ee33c455e572bf5b1eec4c5f36f029706e3 100644
--- a/client/bin/factory.py
+++ b/client/bin/factory.py
@@ -38,12 +38,6 @@ def log(s):
def log_shared_data(key, value):
print >> sys.stderr, '%s %s=%s' % (DATA_PREFIX, key, repr(value))
-def lookup_status_by_unique_name(unique_name, test_list, status_file_path):
- """ quick way to determine the status of given test """
- status_map = StatusMap(test_list, status_file_path)
- testdb = status_map.test_db
- xtest = testdb.get_test_by_unique_name(unique_name)
- return status_map.lookup_status(xtest)
class FactoryTest:
def __repr__(self):
@@ -77,17 +71,52 @@ class AutomatedSubTest(FactoryAutotestTest):
dargs={}, drop_caches=False, unique_name=None):
self.__dict__.update(vars())
-class AutomatedRebootSubTest(FactoryAutotestTest):
+class AutomatedRebootSubTest(AutomatedSubTest):
def __init__(self, label_en='', label_zw='', iterations=None,
autotest_name='factory_RebootStub', dargs={},
drop_caches=False, unique_name=None):
self.__dict__.update(vars())
+class KbdShortcutDatabase:
+ '''Track the bindings between keyboard shortcuts and tests.'''
+
+ def __init__(self, test_list, test_db):
+ self._kbd_shortcut_map = dict(
+ (test.kbd_shortcut, test) for test in test_list)
+
+ # Validate keyboard shortcut uniqueness.
+ assert(None not in self._kbd_shortcut_map)
+ delta = set(test_list) - set(self._kbd_shortcut_map.values())
+ for test in delta:
+ collision = kbd_shortcut_map[test.kbd_shortcut]
+ log('ERROR: tests %s and %s both have kbd_shortcut %s' %
+ (test_db.get_unique_id_str(test),
+ test_db.get_unique_id_str(collision),
+ test.kbd_shortcut))
+ assert not delta
+
+ def get_shortcut_keys(self):
+ return set(self._kbd_shortcut_map)
+
+ def lookup_test(self, kbd_shortcut):
+ return self._kbd_shortcut_map.get(kbd_shortcut)
+
+
class TestDatabase:
+ '''This class parses a test_list and allows searching for tests or
+ their attributes. It also generates tag_prefix values for each
+ runnable test. Autotest allows tests with the same name to be
+ uniquely identified by the "tag" parameter to the job.run_test()
+ function. The factory system generates this value as the
+ combination of a tag_prefix value and a counter that tracks how
+ many times a test has run. Tests are identified uniquely in the
+ status log by both the tag_prefix and their autotest name. These
+ values are referred to here as the unique_details for the test.'''
def __init__(self, test_list):
- self.test_queue = [t for t in reversed(test_list)]
+ self._test_list = test_list
+ self._subtest_set = set()
self._subtest_parent_map = {}
self._tag_prefix_map = {}
for test in test_list:
@@ -97,51 +126,29 @@ class TestDatabase:
step_count = 1
for subtest in test.subtest_list:
self._subtest_parent_map[subtest] = test
+ self._subtest_set.add(subtest)
if not isinstance(subtest, FactoryAutotestTest):
continue
tag_prefix = ('%s_s%d' % (test.subtest_tag_prefix, step_count))
self._tag_prefix_map[subtest] = tag_prefix
step_count += 1
- self.seq_test_set = set(test for test in test_list
- if isinstance(test, AutomatedSequence))
- self.subtest_set = set(reduce(lambda x, y: x + y,
- [test.subtest_list for test in
- self.seq_test_set], []))
- self._subtest_map = dict((self._tag_prefix_map[st], st)
- for st in self.subtest_set)
- self.all_tests = set(test_list) | self.subtest_set
- self._unique_name_map = dict((t.unique_name, t) for t in self.all_tests
- if isinstance(t, FactoryAutotestTest)
- and t.unique_name is not None)
- self._unique_details_map = dict((self.get_unique_details(t), t)
- for t in self.all_tests)
- self._kbd_shortcut_map = dict((test.kbd_shortcut, test)
- for test in test_list)
- self.kbd_shortcut_set = set(self._kbd_shortcut_map)
+ self._tag_prefix_to_subtest_map = dict(
+ (self._tag_prefix_map[st], st) for st in self._subtest_set)
+ self._unique_details_map = dict(
+ (self.get_unique_details(t), t) for t in self.get_all_tests()
+ if isinstance(t, FactoryAutotestTest))
+ self._unique_name_map = dict(
+ (t.unique_name, t) for t in self.get_all_tests()
+ if getattr(t, 'unique_name', None) is not None)
- # Validate keyboard shortcut uniqueness.
- assert(None not in self.kbd_shortcut_set)
- delta = set(test_list) - set(self._kbd_shortcut_map.values())
- for test in delta:
- collision = kbd_shortcut_map[test.kbd_shortcut]
- log('ERROR: tests %s and %s both have kbd_shortcut %s' %
- (test.label_en, collision.label_en, test.kbd_shortcut))
- assert not delta
+ def get_test_by_unique_details(self, autotest_name, tag_prefix):
+ return self._unique_details_map.get((autotest_name, tag_prefix))
def get_tag_prefix(self, test):
return self._tag_prefix_map[test]
def get_unique_details(self, test):
- if isinstance(test, AutomatedSequence):
- return test.subtest_tag_prefix
- return '%s.%s' % (test.autotest_name, self.get_tag_prefix(test))
-
- def get_test_by_unique_details(self, autotest_name, tag_prefix):
- unique_details = '%s.%s' % (autotest_name, tag_prefix)
- return self._unique_details_map.get(unique_details)
-
- def get_test_by_kbd_shortcut(self, kbd_shortcut):
- return self._kbd_shortcut_map.get(kbd_shortcut)
+ return (test.autotest_name, self.get_tag_prefix(test))
def get_test_by_unique_name(self, unique_name):
return self._unique_name_map.get(unique_name)
@@ -150,59 +157,89 @@ class TestDatabase:
return self._subtest_parent_map.get(test)
def get_subtest_by_tag_prefix(self, tag_prefix):
- return self._subtest_map.get(tag_prefix)
+ return self._tag_prefix_to_subtest_map.get(tag_prefix)
+
+ def get_automated_sequences(self):
+ return [test for test in self._test_list
+ if isinstance(test, AutomatedSequence)]
+
+ def get_all_tests(self):
+ return set(self._test_list) | self._subtest_set
+
+ def get_unique_id_str(self, test):
+ '''Intended primarily to identify tests for debugging.'''
+ if test is None:
+ return None
+ if isinstance(test, AutomatedSequence):
+ return test.subtest_tag_prefix
+ return '%s.%s' % self.get_unique_details(test)
class StatusMap:
+ '''Parse the contents of an autotest status file for factory tests
+ into a database containing status, count, and error message
+ information. On __init__ the status file is parsed once. Changes
+ to the file are dealt with by running read_new_data(). Complexity
+ is introduced here because AutomatedSequences are not directly
+ represented in the status file and their status must be derived
+ from subtest results.'''
class Entry:
def __init__(self):
self.status = UNTESTED
self.count = 0
- self.label_box = None
self.error_msg = None
- def __init__(self, test_list, status_file_path):
- self.test_db = TestDatabase(test_list)
- all_tests = self.test_db.all_tests
- self._status_map = dict((t, StatusMap.Entry()) for t in all_tests)
+ def __init__(self, test_list, status_file_path, test_db=None,
+ status_change_callback=None):
+ self._test_list = [test for test in test_list]
+ self._test_db = test_db if test_db else TestDatabase(test_list)
+ self._status_map = dict(
+ (test, StatusMap.Entry()) for test in self._test_db.get_all_tests())
self._status_file_path = status_file_path
self._status_file_pos = 0
+ self._active_automated_seq = None
+ self._status_change_callback = status_change_callback
self.read_new_data()
- def lookup_status(self, test):
- return self._status_map[test].status
+ def lookup_status(self, test, min_count=0):
+ entry = self._status_map[test]
+ return entry.status if entry.count >= min_count else UNTESTED
def lookup_count(self, test):
- return self._status_map[test].count
-
- def lookup_label_box(self, test):
- return self._status_map[test].label_box
+ if isinstance(test, AutomatedSubTest):
+ parent = self._test_db.get_subtest_parent(test)
+ return self._status_map[parent].count
+ else:
+ return self._status_map[test].count
def lookup_error_msg(self, test):
return self._status_map[test].error_msg
- def lookup_tag(self, test):
- tag_prefix = self.test_db.get_tag_prefix(test)
- count = self._status_map[test].count
- return '%s_%s' % (tag_prefix, count)
-
- def incr_count(self, test):
- self._status_map[test].count += 1
-
- def filter(self, target_status):
+ def filter_by_status(self, target_status):
comp = (isinstance(target_status, list) and
(lambda s: s in target_status) or
(lambda s: s == target_status))
- return [t for t in self.test_db.test_queue
- if comp(self.lookup_status(t))]
+ return [test for test in self._test_db.all_tests
+ if comp(self.lookup_status(test))]
def next_untested(self):
- remaining = self.filter(UNTESTED)
- unique_details = [self.test_db.get_unique_details(t) for t in remaining]
- log('remaining untested = [%s]' % ', '.join(unique_details))
- return remaining is not [] and remaining.pop() or None
+ for test in self._test_list:
+ if self.lookup_status(test) == UNTESTED:
+ return test
+ return None
+
+ def get_active_top_level_test(self):
+ if self._active_automated_seq:
+ return self._active_automated_seq
+ active_tests = [test for test in self._test_list
+ if self.lookup_status(test) == ACTIVE]
+ if len(active_tests) > 1:
+ log('ERROR -- multiple active top level tests %s' %
+ repr([self._test_db.get_unique_id_str(test)
+ for test in active_tests]))
+ return active_tests.pop() if active_tests != [] else None
def read_new_data(self):
with open(self._status_file_path) as file:
@@ -215,80 +252,68 @@ class StatusMap:
continue
status = STATUS_CODE_MAP[code]
error_msg = status == FAILED and cols[len(cols) - 2] or None
- log('reading code = %s, test_id = %s, error_msg = "%s"'
- % (code, test_id, error_msg))
autotest_name, _, tag = test_id.rpartition('.')
tag_prefix, _, count = tag.rpartition('_')
- test = self.test_db.get_test_by_unique_details(
+ test = self._test_db.get_test_by_unique_details(
autotest_name, tag_prefix)
if test is None:
- log('ignoring update (%s) for test "%s" "%s"' %
- (status, autotest_name, tag_prefix))
+ log('status map ignoring update (%s) for test %s %s' %
+ (status, repr(autotest_name), repr(tag_prefix)))
continue
self.update(test, status, int(count), error_msg)
- map(self.update_seq_test, self.test_db.seq_test_set)
self._status_file_pos = file.tell()
- def get_active_top_level_test(self):
- active_tests = set(self.filter(ACTIVE)) - self.test_db.subtest_set
- return active_tests and active_tests.pop() or None
-
- def get_active_subtest(self):
- active_subtests = set(self.filter(ACTIVE)) & self.test_db.subtest_set
- return active_subtests and active_subtests.pop() or None
-
- def register_active(self, test):
- active_tests = set(self.filter(ACTIVE))
- assert(test not in active_tests)
- if test in self.test_db.subtest_set:
- parent_seq_test = self.test_db.get_subtest_parent(test)
- active_tests -= set([parent_seq_test])
- for bad_test in active_tests:
- unique_details = self.test_db.get_unique_details(bad_test)
- log('WARNING: assuming test %s FAILED (status log has no data)' %
- unique_details)
- self.update(bad_test, FAILED, self.lookup_count(bad_test),
- 'assumed FAILED (status log has no data)')
-
def update(self, test, status, count, error_msg):
entry = self._status_map[test]
- unique_details = self.test_db.get_unique_details(test)
+ unique_id_str = self._test_db.get_unique_id_str(test)
if count < entry.count:
- log('ERROR: count regression for %s (%d -> %d)' %
- (unique_details, entry.count, count))
+ log('ignoring older data for %s (data count %d < state count %d)' %
+ (unique_id_str, count, entry.count))
+ return
if isinstance(test, InformationScreen) and status in [PASSED, FAILED]:
status = UNTESTED
+ parent_as = self._test_db.get_subtest_parent(test)
if status != entry.status:
- log('status change for %s : %s/%s -> %s/%s' %
- (unique_details, entry.status, entry.count, status, count))
- if entry.label_box is not None:
- entry.label_box.update(status)
- if status == ACTIVE:
- self.register_active(test)
+ log('status change for %s : %s/%s -> %s/%s (as = %s)' %
+ (unique_id_str, entry.status, entry.count, status,
+ count, self._test_db.get_unique_id_str(parent_as)))
+ if self._status_change_callback is not None:
+ self._status_change_callback(test, status)
entry.status = status
entry.count = count
entry.error_msg = error_msg
- log('%s new status = %s' %
- (unique_details, self._status_map[test].status))
-
- def update_seq_test(self, test):
- subtest_status_set = set(map(self.lookup_status, test.subtest_list))
- max_count = max(map(self.lookup_count, test.subtest_list))
+ if (status == ACTIVE and not isinstance(test, AutomatedSequence) and
+ self._active_automated_seq != parent_as):
+ if self._active_automated_seq is not None:
+ self.update_automated_sequence(self._active_automated_seq)
+ self._active_automated_seq = parent_as
+ if parent_as is not None:
+ self.update_automated_sequence(parent_as)
+
+ def update_automated_sequence(self, test):
+ max_count = max([self._status_map[st].count
+ for st in test.subtest_list])
+ lookup_fn = lambda x: self.lookup_status(x, min_count=max_count)
+ subtest_status_set = set(
+ self.lookup_status(subtest) for subtest in test.subtest_list)
+ log('automated sequence %s status set = %s' % (
+ self._test_db.get_unique_id_str(test),
+ repr(subtest_status_set)))
if len(subtest_status_set) == 1:
status = subtest_status_set.pop()
- elif subtest_status_set == set([PASSED, FAILED]):
- status = FAILED
- else:
+ elif (test == self._active_automated_seq and
+ FAILED not in subtest_status_set):
status = ACTIVE
+ else:
+ status = FAILED
self.update(test, status, max_count, None)
- def set_label_box(self, test, label_box):
- entry = self._status_map[test]
- entry.label_box = label_box
- label_box.update(entry.status)
-
class LogData:
+ '''Parse the factory log looking for specially formatted
+ name-value declarations and recording the last of any such
+ bindings in a dict. Data in the right format is written to the
+ log using log_shared_data().'''
def __init__(self):
self._log_file_pos = 0
@@ -312,55 +337,64 @@ class LogData:
class ControlState:
-
- def __init__(self, job, test_list, status_map, status_file_path, nuke_fn):
+ '''Track the state needed to run and terminate factory tests. The
+ shared data written to the factory log records the pid information
+ for each test as it gets run. If the factory UI sees a keyboard
+ shortcut event, it sends a SIGUSR1 event to the control process,
+ which then uses the log information to terminate the running
+ test.'''
+
+ def __init__(self, job, test_list, test_db, status_map,
+ status_file_path, nuke_fn):
self._job = job
self._status_map = status_map
+ self._kbd_shortcut_db = KbdShortcutDatabase(test_list, test_db)
+ self._test_db = test_db
self._log_data = LogData()
self._std_dargs = {
'status_file_path' : status_file_path,
'test_list': test_list}
self._nuke_fn = nuke_fn
- self.activated_kbd_shortcut_test = None
signal.signal(signal.SIGUSR1, self.kill_current_test_callback)
def kill_current_test_callback(self, signum, frame):
self._log_data.read_new_data()
active_test_data = self._log_data.get('active_test_data')
- log('KILLING active_test_data %s' % repr(active_test_data))
if active_test_data is not None:
+ log('SIGUSR1 ... KILLING active test %s' % repr(active_test_data))
self._nuke_fn(*active_test_data)
-
- def run_test(self, test):
+ else:
+ log('SIGUSR1 ... KILLING NOTHING, no active test')
+
+ def process_kbd_shortcut_activation(self):
+ kbd_shortcut = self._log_data.get('activated_kbd_shortcut')
+ if kbd_shortcut is None:
+ return None
+ target_test = self._kbd_shortcut_db.lookup_test(kbd_shortcut)
+ log('activated kbd_shortcut %s -> %s' % (
+ kbd_shortcut, self._test_db.get_unique_id_str(target_test)))
log_shared_data('activated_kbd_shortcut', None)
+ return target_test
- self._status_map.incr_count(test)
+ def run_test(self, test, count):
self._log_data.read_new_data()
- test_tag = self._status_map.lookup_tag(test)
+ test_tag = '%s_%s' % (self._test_db.get_tag_prefix(test), count)
dargs = {}
dargs.update(test.dargs)
dargs.update(self._std_dargs)
dargs.update({'tag': test_tag,
'subtest_tag': test_tag,
'shared_dict': self._log_data.shared_dict})
-
self._job.factory_shared_dict = self._log_data.shared_dict
-
- log('control shared dict = %s' % repr(self._log_data.shared_dict))
-
- if test.drop_caches:
- self._job.drop_caches_between_iterations = True
- self.activated_kbd_shortcut_test = None
-
+ self._job.drop_caches_between_iterations = test.drop_caches
self._job.run_test(test.autotest_name, **dargs)
-
self._job.drop_caches_between_iterations = False
self._log_data.read_new_data()
- kbd_shortcut = self._log_data.shared_dict.pop(
- 'activated_kbd_shortcut', None)
- if kbd_shortcut is not None:
- test_db = self._status_map.test_db
- target_test = test_db.get_test_by_kbd_shortcut(kbd_shortcut)
- self.activated_kbd_shortcut_test = target_test
- log('kbd_shortcut %s -> %s)' % (
- kbd_shortcut, test_db.get_unique_details(target_test)))
+ return self.process_kbd_shortcut_activation()
+
+
+def lookup_status_by_unique_name(unique_name, test_list, status_file_path):
+ """Determine the status of given test. Somewhat heavyweight,
+ since it parses the status file."""
+ test = TestDatabase(test_list).get_test_by_unique_name(unique_name)
+ return StatusMap(test_list, status_file_path, test_db).lookup_status(test)
« no previous file with comments | « no previous file | client/bin/factory_ui » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698