Index: tools/testrunner/network/network_execution.py |
diff --git a/tools/testrunner/network/network_execution.py b/tools/testrunner/network/network_execution.py |
deleted file mode 100644 |
index b17249dfac2bd3a5fb9cc6a2bb72444b354ab739..0000000000000000000000000000000000000000 |
--- a/tools/testrunner/network/network_execution.py |
+++ /dev/null |
@@ -1,249 +0,0 @@ |
-# Copyright 2012 the V8 project authors. All rights reserved. |
-# Redistribution and use in source and binary forms, with or without |
-# modification, are permitted provided that the following conditions are |
-# met: |
-# |
-# * Redistributions of source code must retain the above copyright |
-# notice, this list of conditions and the following disclaimer. |
-# * Redistributions in binary form must reproduce the above |
-# copyright notice, this list of conditions and the following |
-# disclaimer in the documentation and/or other materials provided |
-# with the distribution. |
-# * Neither the name of Google Inc. nor the names of its |
-# contributors may be used to endorse or promote products derived |
-# from this software without specific prior written permission. |
-# |
-# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS |
-# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT |
-# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR |
-# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT |
-# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, |
-# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT |
-# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, |
-# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY |
-# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT |
-# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE |
-# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |
- |
- |
-import os |
-import socket |
-import subprocess |
-import threading |
-import time |
- |
-from . import distro |
-from . import perfdata |
-from ..local import execution |
-from ..objects import peer |
-from ..objects import workpacket |
-from ..server import compression |
-from ..server import constants |
-from ..server import local_handler |
-from ..server import signatures |
- |
- |
-def GetPeers(): |
- data = local_handler.LocalQuery([constants.REQUEST_PEERS]) |
- if not data: return [] |
- return [ peer.Peer.Unpack(p) for p in data ] |
- |
- |
-class NetworkedRunner(execution.Runner): |
- def __init__(self, suites, progress_indicator, context, peers, workspace): |
- self.suites = suites |
- num_tests = 0 |
- datapath = os.path.join("out", "testrunner_data") |
- self.perf_data_manager = perfdata.PerfDataManager(datapath) |
- self.perfdata = self.perf_data_manager.GetStore(context.arch, context.mode) |
- for s in suites: |
- for t in s.tests: |
- t.duration = self.perfdata.FetchPerfData(t) or 1.0 |
- num_tests += len(s.tests) |
- self._CommonInit(num_tests, progress_indicator, context) |
- self.tests = [] # Only used if we need to fall back to local execution. |
- self.tests_lock = threading.Lock() |
- self.peers = peers |
- self.pubkey_fingerprint = None # Fetched later. |
- self.base_rev = subprocess.check_output( |
- "cd %s; git log -1 --format=%%H --grep=git-svn-id" % workspace, |
- shell=True).strip() |
- self.base_svn_rev = subprocess.check_output( |
- "cd %s; git log -1 %s" # Get commit description. |
- " | grep -e '^\s*git-svn-id:'" # Extract "git-svn-id" line. |
- " | awk '{print $2}'" # Extract "repository@revision" part. |
- " | sed -e 's/.*@//'" % # Strip away "repository@". |
- (workspace, self.base_rev), shell=True).strip() |
- self.patch = subprocess.check_output( |
- "cd %s; git diff %s" % (workspace, self.base_rev), shell=True) |
- self.binaries = {} |
- self.initialization_lock = threading.Lock() |
- self.initialization_lock.acquire() # Released when init is done. |
- self._OpenLocalConnection() |
- self.local_receiver_thread = threading.Thread( |
- target=self._ListenLocalConnection) |
- self.local_receiver_thread.daemon = True |
- self.local_receiver_thread.start() |
- self.initialization_lock.acquire() |
- self.initialization_lock.release() |
- |
- def _OpenLocalConnection(self): |
- self.local_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) |
- code = self.local_socket.connect_ex(("localhost", constants.CLIENT_PORT)) |
- if code != 0: |
- raise RuntimeError("Failed to connect to local server") |
- compression.Send([constants.REQUEST_PUBKEY_FINGERPRINT], self.local_socket) |
- |
- def _ListenLocalConnection(self): |
- release_lock_countdown = 1 # Pubkey. |
- self.local_receiver = compression.Receiver(self.local_socket) |
- while not self.local_receiver.IsDone(): |
- data = self.local_receiver.Current() |
- if data[0] == constants.REQUEST_PUBKEY_FINGERPRINT: |
- pubkey = data[1] |
- if not pubkey: raise RuntimeError("Received empty public key") |
- self.pubkey_fingerprint = pubkey |
- release_lock_countdown -= 1 |
- if release_lock_countdown == 0: |
- self.initialization_lock.release() |
- release_lock_countdown -= 1 # Prevent repeated triggering. |
- self.local_receiver.Advance() |
- |
- def Run(self, jobs): |
- self.indicator.Starting() |
- need_libv8 = False |
- for s in self.suites: |
- shell = s.shell() |
- if shell not in self.binaries: |
- path = os.path.join(self.context.shell_dir, shell) |
- # Check if this is a shared library build. |
- try: |
- ldd = subprocess.check_output("ldd %s | grep libv8\\.so" % (path), |
- shell=True) |
- ldd = ldd.strip().split(" ") |
- assert ldd[0] == "libv8.so" |
- assert ldd[1] == "=>" |
- need_libv8 = True |
- binary_needs_libv8 = True |
- libv8 = signatures.ReadFileAndSignature(ldd[2]) |
- except: |
- binary_needs_libv8 = False |
- binary = signatures.ReadFileAndSignature(path) |
- if binary[0] is None: |
- print("Error: Failed to create signature.") |
- assert binary[1] != 0 |
- return binary[1] |
- binary.append(binary_needs_libv8) |
- self.binaries[shell] = binary |
- if need_libv8: |
- self.binaries["libv8.so"] = libv8 |
- distro.Assign(self.suites, self.peers) |
- # Spawn one thread for each peer. |
- threads = [] |
- for p in self.peers: |
- thread = threading.Thread(target=self._TalkToPeer, args=[p]) |
- threads.append(thread) |
- thread.start() |
- try: |
- for thread in threads: |
- # Use a timeout so that signals (Ctrl+C) will be processed. |
- thread.join(timeout=10000000) |
- self._AnalyzePeerRuntimes() |
- except KeyboardInterrupt: |
- self.terminate = True |
- raise |
- except Exception, _e: |
- # If there's an exception we schedule an interruption for any |
- # remaining threads... |
- self.terminate = True |
- # ...and then reraise the exception to bail out. |
- raise |
- compression.Send(constants.END_OF_STREAM, self.local_socket) |
- self.local_socket.close() |
- if self.tests: |
- self._RunInternal(jobs) |
- self.indicator.Done() |
- return not self.failed |
- |
- def _TalkToPeer(self, peer): |
- sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) |
- sock.settimeout(self.context.timeout + 10) |
- code = sock.connect_ex((peer.address, constants.PEER_PORT)) |
- if code == 0: |
- try: |
- peer.runtime = None |
- start_time = time.time() |
- packet = workpacket.WorkPacket(peer=peer, context=self.context, |
- base_revision=self.base_svn_rev, |
- patch=self.patch, |
- pubkey=self.pubkey_fingerprint) |
- data, test_map = packet.Pack(self.binaries) |
- compression.Send(data, sock) |
- compression.Send(constants.END_OF_STREAM, sock) |
- rec = compression.Receiver(sock) |
- while not rec.IsDone() and not self.terminate: |
- data_list = rec.Current() |
- for data in data_list: |
- test_id = data[0] |
- if test_id < 0: |
- # The peer is reporting an error. |
- print("Peer %s reports error: %s" % (peer.address, data[1])) |
- rec.Advance() |
- continue |
- test = test_map.pop(test_id) |
- test.MergeResult(data) |
- try: |
- self.perfdata.UpdatePerfData(test) |
- except Exception, e: |
- print("UpdatePerfData exception: %s" % e) |
- pass # Just keep working. |
- with self.lock: |
- perf_key = self.perfdata.GetKey(test) |
- compression.Send( |
- [constants.INFORM_DURATION, perf_key, test.duration, |
- self.context.arch, self.context.mode], |
- self.local_socket) |
- self.indicator.AboutToRun(test) |
- if test.suite.HasUnexpectedOutput(test): |
- self.failed.append(test) |
- if test.output.HasCrashed(): |
- self.crashed += 1 |
- else: |
- self.succeeded += 1 |
- self.remaining -= 1 |
- self.indicator.HasRun(test) |
- rec.Advance() |
- peer.runtime = time.time() - start_time |
- except Exception: |
- pass # Fall back to local execution. |
- else: |
- compression.Send([constants.UNRESPONSIVE_PEER, peer.address], |
- self.local_socket) |
- sock.close() |
- if len(test_map) > 0: |
- # Some tests have not received any results. Run them locally. |
- print("No results for %d tests, running them locally." % len(test_map)) |
- self._EnqueueLocally(test_map) |
- |
- def _EnqueueLocally(self, test_map): |
- with self.tests_lock: |
- for test in test_map: |
- self.tests.append(test_map[test]) |
- |
- def _AnalyzePeerRuntimes(self): |
- total_runtime = 0.0 |
- total_work = 0.0 |
- for p in self.peers: |
- if p.runtime is None: |
- return |
- total_runtime += p.runtime |
- total_work += p.assigned_work |
- for p in self.peers: |
- p.assigned_work /= total_work |
- p.runtime /= total_runtime |
- perf_correction = p.assigned_work / p.runtime |
- old_perf = p.relative_performance |
- p.relative_performance = (old_perf + perf_correction) / 2.0 |
- compression.Send([constants.UPDATE_PERF, p.address, |
- p.relative_performance], |
- self.local_socket) |