OLD | NEW |
(Empty) | |
| 1 # Copyright 2012 the V8 project authors. All rights reserved. |
| 2 # Redistribution and use in source and binary forms, with or without |
| 3 # modification, are permitted provided that the following conditions are |
| 4 # met: |
| 5 # |
| 6 # * Redistributions of source code must retain the above copyright |
| 7 # notice, this list of conditions and the following disclaimer. |
| 8 # * Redistributions in binary form must reproduce the above |
| 9 # copyright notice, this list of conditions and the following |
| 10 # disclaimer in the documentation and/or other materials provided |
| 11 # with the distribution. |
| 12 # * Neither the name of Google Inc. nor the names of its |
| 13 # contributors may be used to endorse or promote products derived |
| 14 # from this software without specific prior written permission. |
| 15 # |
| 16 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS |
| 17 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT |
| 18 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR |
| 19 # A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT |
| 20 # OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, |
| 21 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT |
| 22 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, |
| 23 # DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY |
| 24 # THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT |
| 25 # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE |
| 26 # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |
| 27 |
| 28 |
| 29 import os |
| 30 import socket |
| 31 import subprocess |
| 32 import threading |
| 33 import time |
| 34 |
| 35 from . import distro |
| 36 from . import perfdata |
| 37 from ..local import execution |
| 38 from ..objects import peer |
| 39 from ..objects import workpacket |
| 40 from ..server import compression |
| 41 from ..server import constants |
| 42 from ..server import local_handler |
| 43 from ..server import signatures |
| 44 |
| 45 |
| 46 def GetPeers(): |
| 47 data = local_handler.LocalQuery([constants.REQUEST_PEERS]) |
| 48 if not data: return [] |
| 49 return [ peer.Peer.Unpack(p) for p in data ] |
| 50 |
| 51 |
| 52 class NetworkedRunner(execution.Runner): |
| 53 def __init__(self, suites, progress_indicator, context, peers, workspace): |
| 54 self.suites = suites |
| 55 num_tests = 0 |
| 56 datapath = os.path.join("out", "testrunner_data") |
| 57 self.perf_data_manager = perfdata.PerfDataManager(datapath) |
| 58 self.perfdata = self.perf_data_manager.GetStore(context.arch, context.mode) |
| 59 for s in suites: |
| 60 for t in s.tests: |
| 61 t.duration = self.perfdata.FetchPerfData(t) or 1.0 |
| 62 num_tests += len(s.tests) |
| 63 self._CommonInit(num_tests, progress_indicator, context) |
| 64 self.tests = [] # Only used if we need to fall back to local execution. |
| 65 self.tests_lock = threading.Lock() |
| 66 self.peers = peers |
| 67 self.pubkey_fingerprint = None # Fetched later. |
| 68 self.base_rev = subprocess.check_output( |
| 69 "cd %s; git log -1 --format=%%H --grep=git-svn-id" % workspace, |
| 70 shell=True) |
| 71 self.patch = subprocess.check_output( |
| 72 "cd %s; git diff %s" % (workspace, self.base_rev), shell=True) |
| 73 self.binaries = {} |
| 74 self.initialization_lock = threading.Lock() |
| 75 self.initialization_lock.acquire() # Released when init is done. |
| 76 self._OpenLocalConnection() |
| 77 self.local_receiver_thread = threading.Thread( |
| 78 target=self._ListenLocalConnection) |
| 79 self.local_receiver_thread.daemon = True |
| 80 self.local_receiver_thread.start() |
| 81 self.initialization_lock.acquire() |
| 82 self.initialization_lock.release() |
| 83 |
| 84 def _OpenLocalConnection(self): |
| 85 self.local_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) |
| 86 code = self.local_socket.connect_ex(("localhost", constants.CLIENT_PORT)) |
| 87 if code != 0: |
| 88 raise RuntimeError("Failed to connect to local server") |
| 89 compression.Send([constants.REQUEST_PUBKEY_FINGERPRINT], self.local_socket) |
| 90 |
| 91 def _ListenLocalConnection(self): |
| 92 release_lock_countdown = 1 # Pubkey. |
| 93 self.local_receiver = compression.Receiver(self.local_socket) |
| 94 while not self.local_receiver.IsDone(): |
| 95 data = self.local_receiver.Current() |
| 96 if data[0] == constants.REQUEST_PUBKEY_FINGERPRINT: |
| 97 pubkey = data[1] |
| 98 if not pubkey: raise RuntimeError("Received empty public key") |
| 99 self.pubkey_fingerprint = pubkey |
| 100 release_lock_countdown -= 1 |
| 101 if release_lock_countdown == 0: |
| 102 self.initialization_lock.release() |
| 103 release_lock_countdown -= 1 # Prevent repeated triggering. |
| 104 self.local_receiver.Advance() |
| 105 |
| 106 def Run(self, jobs): |
| 107 self.indicator.Starting() |
| 108 need_libv8 = False |
| 109 for s in self.suites: |
| 110 shell = s.shell() |
| 111 if shell not in self.binaries: |
| 112 path = os.path.join(self.context.shell_dir, shell) |
| 113 # Check if this is a shared library build. |
| 114 try: |
| 115 ldd = subprocess.check_output("ldd %s | grep libv8\\.so" % (path), |
| 116 shell=True) |
| 117 ldd = ldd.strip().split(" ") |
| 118 assert ldd[0] == "libv8.so" |
| 119 assert ldd[1] == "=>" |
| 120 need_libv8 = True |
| 121 binary_needs_libv8 = True |
| 122 libv8 = signatures.ReadFileAndSignature(ldd[2]) |
| 123 except: |
| 124 binary_needs_libv8 = False |
| 125 binary = signatures.ReadFileAndSignature(path) |
| 126 if binary[0] is None: |
| 127 print("Error: Failed to create signature.") |
| 128 assert binary[1] != 0 |
| 129 return binary[1] |
| 130 binary.append(binary_needs_libv8) |
| 131 self.binaries[shell] = binary |
| 132 if need_libv8: |
| 133 self.binaries["libv8.so"] = libv8 |
| 134 distro.Assign(self.suites, self.peers) |
| 135 # Spawn one thread for each peer. |
| 136 threads = [] |
| 137 for p in self.peers: |
| 138 thread = threading.Thread(target=self._TalkToPeer, args=[p]) |
| 139 threads.append(thread) |
| 140 thread.start() |
| 141 try: |
| 142 for thread in threads: |
| 143 # Use a timeout so that signals (Ctrl+C) will be processed. |
| 144 thread.join(timeout=10000000) |
| 145 self._AnalyzePeerRuntimes() |
| 146 except KeyboardInterrupt: |
| 147 self.terminate = True |
| 148 raise |
| 149 except Exception, _e: |
| 150 # If there's an exception we schedule an interruption for any |
| 151 # remaining threads... |
| 152 self.terminate = True |
| 153 # ...and then reraise the exception to bail out. |
| 154 raise |
| 155 compression.Send(constants.END_OF_STREAM, self.local_socket) |
| 156 self.local_socket.close() |
| 157 if self.tests: |
| 158 self._RunInternal(jobs) |
| 159 self.indicator.Done() |
| 160 return not self.failed |
| 161 |
| 162 def _TalkToPeer(self, peer): |
| 163 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) |
| 164 sock.settimeout(self.context.timeout + 10) |
| 165 code = sock.connect_ex((peer.address, constants.PEER_PORT)) |
| 166 if code == 0: |
| 167 try: |
| 168 peer.runtime = None |
| 169 start_time = time.time() |
| 170 packet = workpacket.WorkPacket(peer=peer, context=self.context, |
| 171 base_revision=self.base_rev, |
| 172 patch=self.patch, |
| 173 pubkey=self.pubkey_fingerprint) |
| 174 data, test_map = packet.Pack(self.binaries) |
| 175 compression.Send(data, sock) |
| 176 compression.Send(constants.END_OF_STREAM, sock) |
| 177 rec = compression.Receiver(sock) |
| 178 while not rec.IsDone() and not self.terminate: |
| 179 data_list = rec.Current() |
| 180 for data in data_list: |
| 181 test_id = data[0] |
| 182 if test_id < 0: |
| 183 # The peer is reporting an error. |
| 184 print("Peer %s reports error: %s" % (peer.address, data[1])) |
| 185 rec.Advance() |
| 186 continue |
| 187 test = test_map.pop(test_id) |
| 188 test.MergeResult(data) |
| 189 try: |
| 190 self.perfdata.UpdatePerfData(test) |
| 191 except Exception, e: |
| 192 print("UpdatePerfData exception: %s" % e) |
| 193 pass # Just keep working. |
| 194 with self.lock: |
| 195 perf_key = self.perfdata.GetKey(test) |
| 196 compression.Send( |
| 197 [constants.INFORM_DURATION, perf_key, test.duration, |
| 198 self.context.arch, self.context.mode], |
| 199 self.local_socket) |
| 200 self.indicator.AboutToRun(test) |
| 201 if test.suite.HasUnexpectedOutput(test): |
| 202 self.failed.append(test) |
| 203 if test.output.HasCrashed(): |
| 204 self.crashed += 1 |
| 205 else: |
| 206 self.succeeded += 1 |
| 207 self.remaining -= 1 |
| 208 self.indicator.HasRun(test) |
| 209 rec.Advance() |
| 210 peer.runtime = time.time() - start_time |
| 211 except Exception: |
| 212 pass # Fall back to local execution. |
| 213 else: |
| 214 compression.Send([constants.UNRESPONSIVE_PEER, peer.address], |
| 215 self.local_socket) |
| 216 sock.close() |
| 217 if len(test_map) > 0: |
| 218 # Some tests have not received any results. Run them locally. |
| 219 print("No results for %d tests, running them locally." % len(test_map)) |
| 220 self._EnqueueLocally(test_map) |
| 221 |
| 222 def _EnqueueLocally(self, test_map): |
| 223 with self.tests_lock: |
| 224 for test in test_map: |
| 225 self.tests.append(test_map[test]) |
| 226 |
| 227 def _AnalyzePeerRuntimes(self): |
| 228 total_runtime = 0.0 |
| 229 total_work = 0.0 |
| 230 for p in self.peers: |
| 231 if p.runtime is None: |
| 232 return |
| 233 total_runtime += p.runtime |
| 234 total_work += p.assigned_work |
| 235 for p in self.peers: |
| 236 p.assigned_work /= total_work |
| 237 p.runtime /= total_runtime |
| 238 perf_correction = p.assigned_work / p.runtime |
| 239 old_perf = p.relative_performance |
| 240 p.relative_performance = (old_perf + perf_correction) / 2.0 |
| 241 compression.Send([constants.UPDATE_PERF, p.address, |
| 242 p.relative_performance], |
| 243 self.local_socket) |
OLD | NEW |