OLD | NEW |
| (Empty) |
1 # Copyright (c) 2012 The Chromium Authors. All rights reserved. | |
2 # Use of this source code is governed by a BSD-style license that can be | |
3 # found in the LICENSE file. | |
4 | |
5 | |
6 import android_commands | |
7 import logging | |
8 import multiprocessing | |
9 | |
10 from android_commands import errors | |
11 from forwarder import Forwarder | |
12 from test_result import TestResults | |
13 | |
14 | |
15 def _ShardedTestRunnable(test): | |
16 """Standalone function needed by multiprocessing.Pool.""" | |
17 log_format = '[' + test.device + '] # %(asctime)-15s: %(message)s' | |
18 if logging.getLogger().handlers: | |
19 logging.getLogger().handlers[0].setFormatter(logging.Formatter(log_format)) | |
20 else: | |
21 logging.basicConfig(format=log_format) | |
22 # Handle SystemExit here since python has a bug to exit current process | |
23 try: | |
24 return test.Run() | |
25 except SystemExit: | |
26 return TestResults() | |
27 | |
28 | |
29 def SetTestsContainer(tests_container): | |
30 """Sets tests container. | |
31 | |
32 multiprocessing.Queue can't be pickled across processes, so we need to set | |
33 this as a 'global', per process, via multiprocessing.Pool. | |
34 """ | |
35 BaseTestSharder.tests_container = tests_container | |
36 | |
37 | |
38 class BaseTestSharder(object): | |
39 """Base class for sharding tests across multiple devices. | |
40 | |
41 Args: | |
42 attached_devices: A list of attached devices. | |
43 """ | |
44 # See more in SetTestsContainer. | |
45 tests_container = None | |
46 | |
47 def __init__(self, attached_devices, build_type='Debug'): | |
48 self.attached_devices = attached_devices | |
49 # Worst case scenario: a device will drop offline per run, so we need | |
50 # to retry until we're out of devices. | |
51 self.retries = len(self.attached_devices) | |
52 self.tests = [] | |
53 self.build_type = build_type | |
54 | |
55 def CreateShardedTestRunner(self, device, index): | |
56 """Factory function to create a suite-specific test runner. | |
57 | |
58 Args: | |
59 device: Device serial where this shard will run | |
60 index: Index of this device in the pool. | |
61 | |
62 Returns: | |
63 An object of BaseTestRunner type (that can provide a "Run()" method). | |
64 """ | |
65 pass | |
66 | |
67 def SetupSharding(self, tests): | |
68 """Called before starting the shards.""" | |
69 pass | |
70 | |
71 def OnTestsCompleted(self, test_runners, test_results): | |
72 """Notifies that we completed the tests.""" | |
73 pass | |
74 | |
75 def _KillHostForwarder(self): | |
76 Forwarder.KillHost(self.build_type) | |
77 | |
78 def RunShardedTests(self): | |
79 """Runs the tests in all connected devices. | |
80 | |
81 Returns: | |
82 A TestResults object. | |
83 """ | |
84 logging.warning('*' * 80) | |
85 logging.warning('Sharding in ' + str(len(self.attached_devices)) + | |
86 ' devices.') | |
87 logging.warning('Note that the output is not synchronized.') | |
88 logging.warning('Look for the "Final result" banner in the end.') | |
89 logging.warning('*' * 80) | |
90 final_results = TestResults() | |
91 self._KillHostForwarder() | |
92 for retry in xrange(self.retries): | |
93 logging.warning('Try %d of %d', retry + 1, self.retries) | |
94 self.SetupSharding(self.tests) | |
95 test_runners = [] | |
96 | |
97 # Try to create N shards, and retrying on failure. | |
98 try: | |
99 for index, device in enumerate(self.attached_devices): | |
100 logging.warning('*' * 80) | |
101 logging.warning('Creating shard %d for %s', index, device) | |
102 logging.warning('*' * 80) | |
103 test_runner = self.CreateShardedTestRunner(device, index) | |
104 test_runners += [test_runner] | |
105 except errors.DeviceUnresponsiveError as e: | |
106 logging.critical('****Failed to create a shard: [%s]', e) | |
107 self.attached_devices.remove(device) | |
108 continue | |
109 | |
110 logging.warning('Starting...') | |
111 pool = multiprocessing.Pool(len(self.attached_devices), | |
112 SetTestsContainer, | |
113 [BaseTestSharder.tests_container]) | |
114 # map can't handle KeyboardInterrupt exception. It's a python bug. | |
115 # So use map_async instead. | |
116 async_results = pool.map_async(_ShardedTestRunnable, test_runners) | |
117 try: | |
118 results_lists = async_results.get(999999) | |
119 except errors.DeviceUnresponsiveError as e: | |
120 logging.critical('****Failed to run test: [%s]', e) | |
121 self.attached_devices = android_commands.GetAttachedDevices() | |
122 continue | |
123 test_results = TestResults.FromTestResults(results_lists) | |
124 # Re-check the attached devices for some devices may | |
125 # become offline | |
126 retry_devices = set(android_commands.GetAttachedDevices()) | |
127 # Remove devices that had exceptions. | |
128 retry_devices -= TestResults.DeviceExceptions(results_lists) | |
129 # Retry on devices that didn't have any exception. | |
130 self.attached_devices = list(retry_devices) | |
131 if (retry == self.retries - 1 or | |
132 len(self.attached_devices) == 0): | |
133 all_passed = final_results.ok + test_results.ok | |
134 final_results = test_results | |
135 final_results.ok = all_passed | |
136 break | |
137 else: | |
138 final_results.ok += test_results.ok | |
139 # Timed out tests are not reported in GetAllBroken(). | |
140 if test_results.timed_out: | |
141 final_results.timed_out = True | |
142 self.tests = [] | |
143 for t in test_results.GetAllBroken(): | |
144 self.tests += [t.name] | |
145 if not self.tests: | |
146 break | |
147 else: | |
148 # We ran out retries, possibly out of healthy devices. | |
149 # There's no recovery at this point. | |
150 raise Exception('Unrecoverable error while retrying test runs.') | |
151 self.OnTestsCompleted(test_runners, final_results) | |
152 self._KillHostForwarder() | |
153 return final_results | |
OLD | NEW |