Index: tools/testrunner/network/network_execution.py |
diff --git a/tools/testrunner/network/network_execution.py b/tools/testrunner/network/network_execution.py |
new file mode 100644 |
index 0000000000000000000000000000000000000000..2f33d353b1908b68014437ce24f4f1a99eac4073 |
--- /dev/null |
+++ b/tools/testrunner/network/network_execution.py |
@@ -0,0 +1,243 @@ |
+# Copyright 2012 the V8 project authors. All rights reserved. |
+# Redistribution and use in source and binary forms, with or without |
+# modification, are permitted provided that the following conditions are |
+# met: |
+# |
+# * Redistributions of source code must retain the above copyright |
+# notice, this list of conditions and the following disclaimer. |
+# * Redistributions in binary form must reproduce the above |
+# copyright notice, this list of conditions and the following |
+# disclaimer in the documentation and/or other materials provided |
+# with the distribution. |
+# * Neither the name of Google Inc. nor the names of its |
+# contributors may be used to endorse or promote products derived |
+# from this software without specific prior written permission. |
+# |
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS |
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT |
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR |
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT |
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, |
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT |
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, |
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY |
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT |
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE |
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |
+ |
+ |
+import os |
+import socket |
+import subprocess |
+import threading |
+import time |
+ |
+from . import distro |
+from . import perfdata |
+from ..local import execution |
+from ..objects import peer |
+from ..objects import workpacket |
+from ..server import compression |
+from ..server import constants |
+from ..server import local_handler |
+from ..server import signatures |
+ |
+ |
+def GetPeers(): |
+ data = local_handler.LocalQuery([constants.REQUEST_PEERS]) |
+ if not data: return [] |
+ return [ peer.Peer.Unpack(p) for p in data ] |
+ |
+ |
+class NetworkedRunner(execution.Runner): |
+ def __init__(self, suites, progress_indicator, context, peers, workspace): |
+ self.suites = suites |
+ num_tests = 0 |
+ datapath = os.path.join("out", "testrunner_data") |
+ self.perf_data_manager = perfdata.PerfDataManager(datapath) |
+ self.perfdata = self.perf_data_manager.GetStore(context.arch, context.mode) |
+ for s in suites: |
+ for t in s.tests: |
+ t.duration = self.perfdata.FetchPerfData(t) or 1.0 |
+ num_tests += len(s.tests) |
+ self._CommonInit(num_tests, progress_indicator, context) |
+ self.tests = [] # Only used if we need to fall back to local execution. |
+ self.tests_lock = threading.Lock() |
+ self.peers = peers |
+ self.pubkey_fingerprint = None # Fetched later. |
+ self.base_rev = subprocess.check_output( |
+ "cd %s; git log -1 --format=%%H --grep=git-svn-id" % workspace, |
+ shell=True) |
+ self.patch = subprocess.check_output( |
+ "cd %s; git diff %s" % (workspace, self.base_rev), shell=True) |
+ self.binaries = {} |
+ self.initialization_lock = threading.Lock() |
+ self.initialization_lock.acquire() # Released when init is done. |
+ self._OpenLocalConnection() |
+ self.local_receiver_thread = threading.Thread( |
+ target=self._ListenLocalConnection) |
+ self.local_receiver_thread.daemon = True |
+ self.local_receiver_thread.start() |
+ self.initialization_lock.acquire() |
+ self.initialization_lock.release() |
+ |
+ def _OpenLocalConnection(self): |
+ self.local_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) |
+ code = self.local_socket.connect_ex(("localhost", constants.CLIENT_PORT)) |
+ if code != 0: |
+ raise RuntimeError("Failed to connect to local server") |
+ compression.Send([constants.REQUEST_PUBKEY_FINGERPRINT], self.local_socket) |
+ |
+ def _ListenLocalConnection(self): |
+ release_lock_countdown = 1 # Pubkey. |
+ self.local_receiver = compression.Receiver(self.local_socket) |
+ while not self.local_receiver.IsDone(): |
+ data = self.local_receiver.Current() |
+ if data[0] == constants.REQUEST_PUBKEY_FINGERPRINT: |
+ pubkey = data[1] |
+ if not pubkey: raise RuntimeError("Received empty public key") |
+ self.pubkey_fingerprint = pubkey |
+ release_lock_countdown -= 1 |
+ if release_lock_countdown == 0: |
+ self.initialization_lock.release() |
+ release_lock_countdown -= 1 # Prevent repeated triggering. |
+ self.local_receiver.Advance() |
+ |
+ def Run(self, jobs): |
+ self.indicator.Starting() |
+ need_libv8 = False |
+ for s in self.suites: |
+ shell = s.shell() |
+ if shell not in self.binaries: |
+ path = os.path.join(self.context.shell_dir, shell) |
+ # Check if this is a shared library build. |
+ try: |
+ ldd = subprocess.check_output("ldd %s | grep libv8\\.so" % (path), |
+ shell=True) |
+ ldd = ldd.strip().split(" ") |
+ assert ldd[0] == "libv8.so" |
+ assert ldd[1] == "=>" |
+ need_libv8 = True |
+ binary_needs_libv8 = True |
+ libv8 = signatures.ReadFileAndSignature(ldd[2]) |
+ except: |
+ binary_needs_libv8 = False |
+ binary = signatures.ReadFileAndSignature(path) |
+ if binary[0] is None: |
+ print("Error: Failed to create signature.") |
+ assert binary[1] != 0 |
+ return binary[1] |
+ binary.append(binary_needs_libv8) |
+ self.binaries[shell] = binary |
+ if need_libv8: |
+ self.binaries["libv8.so"] = libv8 |
+ distro.Assign(self.suites, self.peers) |
+ # Spawn one thread for each peer. |
+ threads = [] |
+ for p in self.peers: |
+ thread = threading.Thread(target=self._TalkToPeer, args=[p]) |
+ threads.append(thread) |
+ thread.start() |
+ try: |
+ for thread in threads: |
+ # Use a timeout so that signals (Ctrl+C) will be processed. |
+ thread.join(timeout=10000000) |
+ self._AnalyzePeerRuntimes() |
+ except KeyboardInterrupt: |
+ self.terminate = True |
+ raise |
+ except Exception, _e: |
+ # If there's an exception we schedule an interruption for any |
+ # remaining threads... |
+ self.terminate = True |
+ # ...and then reraise the exception to bail out. |
+ raise |
+ compression.Send(constants.END_OF_STREAM, self.local_socket) |
+ self.local_socket.close() |
+ if self.tests: |
+ self._RunInternal(jobs) |
+ self.indicator.Done() |
+ return not self.failed |
+ |
+ def _TalkToPeer(self, peer): |
+ sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) |
+ sock.settimeout(self.context.timeout + 10) |
+ code = sock.connect_ex((peer.address, constants.PEER_PORT)) |
+ if code == 0: |
+ try: |
+ peer.runtime = None |
+ start_time = time.time() |
+ packet = workpacket.WorkPacket(peer=peer, context=self.context, |
+ base_revision=self.base_rev, |
+ patch=self.patch, |
+ pubkey=self.pubkey_fingerprint) |
+ data, test_map = packet.Pack(self.binaries) |
+ compression.Send(data, sock) |
+ compression.Send(constants.END_OF_STREAM, sock) |
+ rec = compression.Receiver(sock) |
+ while not rec.IsDone() and not self.terminate: |
+ data_list = rec.Current() |
+ for data in data_list: |
+ test_id = data[0] |
+ if test_id < 0: |
+ # The peer is reporting an error. |
+ print("Peer %s reports error: %s" % (peer.address, data[1])) |
+ rec.Advance() |
+ continue |
+ test = test_map.pop(test_id) |
+ test.MergeResult(data) |
+ try: |
+ self.perfdata.UpdatePerfData(test) |
+ except Exception, e: |
+ print("UpdatePerfData exception: %s" % e) |
+ pass # Just keep working. |
+ with self.lock: |
+ perf_key = self.perfdata.GetKey(test) |
+ compression.Send( |
+ [constants.INFORM_DURATION, perf_key, test.duration, |
+ self.context.arch, self.context.mode], |
+ self.local_socket) |
+ self.indicator.AboutToRun(test) |
+ if test.suite.HasUnexpectedOutput(test): |
+ self.failed.append(test) |
+ if test.output.HasCrashed(): |
+ self.crashed += 1 |
+ else: |
+ self.succeeded += 1 |
+ self.remaining -= 1 |
+ self.indicator.HasRun(test) |
+ rec.Advance() |
+ peer.runtime = time.time() - start_time |
+ except Exception: |
+ pass # Fall back to local execution. |
+ else: |
+ compression.Send([constants.UNRESPONSIVE_PEER, peer.address], |
+ self.local_socket) |
+ sock.close() |
+ if len(test_map) > 0: |
+ # Some tests have not received any results. Run them locally. |
+ print("No results for %d tests, running them locally." % len(test_map)) |
+ self._EnqueueLocally(test_map) |
+ |
+ def _EnqueueLocally(self, test_map): |
+ with self.tests_lock: |
+ for test in test_map: |
+ self.tests.append(test_map[test]) |
+ |
+ def _AnalyzePeerRuntimes(self): |
+ total_runtime = 0.0 |
+ total_work = 0.0 |
+ for p in self.peers: |
+ if p.runtime is None: |
+ return |
+ total_runtime += p.runtime |
+ total_work += p.assigned_work |
+ for p in self.peers: |
+ p.assigned_work /= total_work |
+ p.runtime /= total_runtime |
+ perf_correction = p.assigned_work / p.runtime |
+ old_perf = p.relative_performance |
+ p.relative_performance = (old_perf + perf_correction) / 2.0 |
+ compression.Send([constants.UPDATE_PERF, p.address, |
+ p.relative_performance], |
+ self.local_socket) |