OLD | NEW |
| (Empty) |
1 # Copyright (c) 2012 The Chromium Authors. All rights reserved. | |
2 # Use of this source code is governed by a BSD-style license that can be | |
3 # found in the LICENSE file. | |
4 | |
5 | |
6 import logging | |
7 import multiprocessing | |
8 | |
9 from pylib import android_commands | |
10 from pylib.base.test_result import TestResults | |
11 from pylib.forwarder import Forwarder | |
12 | |
13 | |
14 # Number of times we retry a test suite in case of failure. | |
15 NUM_RETRIES = 3 | |
16 | |
17 | |
18 def _ShardedTestRunnable(test): | |
19 """Standalone function needed by multiprocessing.Pool.""" | |
20 log_format = '[' + test.device + '] # %(asctime)-15s: %(message)s' | |
21 if logging.getLogger().handlers: | |
22 logging.getLogger().handlers[0].setFormatter(logging.Formatter(log_format)) | |
23 else: | |
24 logging.basicConfig(format=log_format) | |
25 # Handle SystemExit here since python has a bug to exit current process | |
26 try: | |
27 return test.Run() | |
28 except SystemExit: | |
29 return TestResults() | |
30 | |
31 | |
32 def SetTestsContainer(tests_container): | |
33 """Sets tests container. | |
34 | |
35 multiprocessing.Queue can't be pickled across processes, so we need to set | |
36 this as a 'global', per process, via multiprocessing.Pool. | |
37 """ | |
38 BaseTestSharder.tests_container = tests_container | |
39 | |
40 | |
41 class BaseTestSharder(object): | |
42 """Base class for sharding tests across multiple devices. | |
43 | |
44 Args: | |
45 attached_devices: A list of attached devices. | |
46 """ | |
47 # See more in SetTestsContainer. | |
48 tests_container = None | |
49 | |
50 def __init__(self, attached_devices, build_type='Debug'): | |
51 self.attached_devices = attached_devices | |
52 # Worst case scenario: a device will drop offline per run, so we need | |
53 # to retry until we're out of devices. | |
54 | |
55 # TODO(frankf): There are two sources of flakiness: | |
56 # 1. Device flakiness | |
57 # 2. Test/product flakiness | |
58 # We should differentiate between these. Otherwise, blindly retrying tests | |
59 # might mask test/product flakiness. For type 2, we should follow the | |
60 # general chrome best practices. | |
61 self.retries = NUM_RETRIES | |
62 self.tests = [] | |
63 self.build_type = build_type | |
64 | |
65 def CreateShardedTestRunner(self, device, index): | |
66 """Factory function to create a suite-specific test runner. | |
67 | |
68 Args: | |
69 device: Device serial where this shard will run | |
70 index: Index of this device in the pool. | |
71 | |
72 Returns: | |
73 An object of BaseTestRunner type (that can provide a "Run()" method). | |
74 """ | |
75 pass | |
76 | |
77 def SetupSharding(self, tests): | |
78 """Called before starting the shards.""" | |
79 pass | |
80 | |
81 def _KillHostForwarder(self): | |
82 Forwarder.KillHost(self.build_type) | |
83 | |
84 def RunShardedTests(self): | |
85 """Runs the tests in all connected devices. | |
86 | |
87 Returns: | |
88 A TestResults object. | |
89 """ | |
90 logging.warning('*' * 80) | |
91 logging.warning('Sharding in ' + str(len(self.attached_devices)) + | |
92 ' devices.') | |
93 logging.warning('Note that the output is not synchronized.') | |
94 logging.warning('Look for the "Final result" banner in the end.') | |
95 logging.warning('*' * 80) | |
96 final_results = TestResults() | |
97 self._KillHostForwarder() | |
98 for retry in xrange(self.retries): | |
99 logging.warning('Try %d of %d', retry + 1, self.retries) | |
100 logging.warning('Attempting to run %d tests: %s' | |
101 % (len(self.tests), self.tests)) | |
102 self.SetupSharding(self.tests) | |
103 test_runners = [] | |
104 | |
105 # Try to create N shards, and retrying on failure. | |
106 try: | |
107 for index, device in enumerate(self.attached_devices): | |
108 logging.warning('*' * 80) | |
109 logging.warning('Creating shard %d for %s', index, device) | |
110 logging.warning('*' * 80) | |
111 test_runner = self.CreateShardedTestRunner(device, index) | |
112 test_runners += [test_runner] | |
113 except android_commands.errors.DeviceUnresponsiveError as e: | |
114 logging.critical('****Failed to create a shard: [%s]', e) | |
115 self.attached_devices.remove(device) | |
116 continue | |
117 | |
118 logging.warning('Starting...') | |
119 pool = multiprocessing.Pool(len(self.attached_devices), | |
120 SetTestsContainer, | |
121 [BaseTestSharder.tests_container]) | |
122 # map can't handle KeyboardInterrupt exception. It's a python bug. | |
123 # So use map_async instead. | |
124 async_results = pool.map_async(_ShardedTestRunnable, test_runners) | |
125 try: | |
126 results_lists = async_results.get(999999) | |
127 except android_commands.errors.DeviceUnresponsiveError as e: | |
128 logging.critical('****Failed to run test: [%s]', e) | |
129 self.attached_devices = android_commands.GetAttachedDevices() | |
130 continue | |
131 test_results = TestResults.FromTestResults(results_lists) | |
132 # Re-check the attached devices for some devices may | |
133 # become offline | |
134 retry_devices = set(android_commands.GetAttachedDevices()) | |
135 # Remove devices that had exceptions. | |
136 retry_devices -= TestResults.DeviceExceptions(results_lists) | |
137 # Retry on devices that didn't have any exception. | |
138 self.attached_devices = list(retry_devices) | |
139 | |
140 # TODO(frankf): Do not break TestResults encapsulation. | |
141 if (retry == self.retries - 1 or | |
142 len(self.attached_devices) == 0): | |
143 all_passed = final_results.ok + test_results.ok | |
144 final_results = test_results | |
145 final_results.ok = all_passed | |
146 break | |
147 else: | |
148 final_results.ok += test_results.ok | |
149 | |
150 self.tests = [] | |
151 for t in test_results.GetAllBroken(): | |
152 self.tests += [t.name] | |
153 if not self.tests: | |
154 break | |
155 else: | |
156 # We ran out retries, possibly out of healthy devices. | |
157 # There's no recovery at this point. | |
158 raise Exception('Unrecoverable error while retrying test runs.') | |
159 self._KillHostForwarder() | |
160 return final_results | |
OLD | NEW |