Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(923)

Unified Diff: tools/testrunner/network/network_execution.py

Issue 10919265: First commit of new tools/run-tests.py (Closed) Base URL: https://v8.googlecode.com/svn/branches/bleeding_edge
Patch Set: s/server.py/test-server.py/ in README Created 8 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « tools/testrunner/network/endpoint.py ('k') | tools/testrunner/network/perfdata.py » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: tools/testrunner/network/network_execution.py
diff --git a/tools/testrunner/network/network_execution.py b/tools/testrunner/network/network_execution.py
new file mode 100644
index 0000000000000000000000000000000000000000..2f33d353b1908b68014437ce24f4f1a99eac4073
--- /dev/null
+++ b/tools/testrunner/network/network_execution.py
@@ -0,0 +1,243 @@
+# Copyright 2012 the V8 project authors. All rights reserved.
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following
+# disclaimer in the documentation and/or other materials provided
+# with the distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived
+# from this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+
+import os
+import socket
+import subprocess
+import threading
+import time
+
+from . import distro
+from . import perfdata
+from ..local import execution
+from ..objects import peer
+from ..objects import workpacket
+from ..server import compression
+from ..server import constants
+from ..server import local_handler
+from ..server import signatures
+
+
+def GetPeers():
+ data = local_handler.LocalQuery([constants.REQUEST_PEERS])
+ if not data: return []
+ return [ peer.Peer.Unpack(p) for p in data ]
+
+
+class NetworkedRunner(execution.Runner):
+ def __init__(self, suites, progress_indicator, context, peers, workspace):
+ self.suites = suites
+ num_tests = 0
+ datapath = os.path.join("out", "testrunner_data")
+ self.perf_data_manager = perfdata.PerfDataManager(datapath)
+ self.perfdata = self.perf_data_manager.GetStore(context.arch, context.mode)
+ for s in suites:
+ for t in s.tests:
+ t.duration = self.perfdata.FetchPerfData(t) or 1.0
+ num_tests += len(s.tests)
+ self._CommonInit(num_tests, progress_indicator, context)
+ self.tests = [] # Only used if we need to fall back to local execution.
+ self.tests_lock = threading.Lock()
+ self.peers = peers
+ self.pubkey_fingerprint = None # Fetched later.
+ self.base_rev = subprocess.check_output(
+ "cd %s; git log -1 --format=%%H --grep=git-svn-id" % workspace,
+ shell=True)
+ self.patch = subprocess.check_output(
+ "cd %s; git diff %s" % (workspace, self.base_rev), shell=True)
+ self.binaries = {}
+ self.initialization_lock = threading.Lock()
+ self.initialization_lock.acquire() # Released when init is done.
+ self._OpenLocalConnection()
+ self.local_receiver_thread = threading.Thread(
+ target=self._ListenLocalConnection)
+ self.local_receiver_thread.daemon = True
+ self.local_receiver_thread.start()
+ self.initialization_lock.acquire()
+ self.initialization_lock.release()
+
+ def _OpenLocalConnection(self):
+ self.local_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ code = self.local_socket.connect_ex(("localhost", constants.CLIENT_PORT))
+ if code != 0:
+ raise RuntimeError("Failed to connect to local server")
+ compression.Send([constants.REQUEST_PUBKEY_FINGERPRINT], self.local_socket)
+
+ def _ListenLocalConnection(self):
+ release_lock_countdown = 1 # Pubkey.
+ self.local_receiver = compression.Receiver(self.local_socket)
+ while not self.local_receiver.IsDone():
+ data = self.local_receiver.Current()
+ if data[0] == constants.REQUEST_PUBKEY_FINGERPRINT:
+ pubkey = data[1]
+ if not pubkey: raise RuntimeError("Received empty public key")
+ self.pubkey_fingerprint = pubkey
+ release_lock_countdown -= 1
+ if release_lock_countdown == 0:
+ self.initialization_lock.release()
+ release_lock_countdown -= 1 # Prevent repeated triggering.
+ self.local_receiver.Advance()
+
+ def Run(self, jobs):
+ self.indicator.Starting()
+ need_libv8 = False
+ for s in self.suites:
+ shell = s.shell()
+ if shell not in self.binaries:
+ path = os.path.join(self.context.shell_dir, shell)
+ # Check if this is a shared library build.
+ try:
+ ldd = subprocess.check_output("ldd %s | grep libv8\\.so" % (path),
+ shell=True)
+ ldd = ldd.strip().split(" ")
+ assert ldd[0] == "libv8.so"
+ assert ldd[1] == "=>"
+ need_libv8 = True
+ binary_needs_libv8 = True
+ libv8 = signatures.ReadFileAndSignature(ldd[2])
+ except:
+ binary_needs_libv8 = False
+ binary = signatures.ReadFileAndSignature(path)
+ if binary[0] is None:
+ print("Error: Failed to create signature.")
+ assert binary[1] != 0
+ return binary[1]
+ binary.append(binary_needs_libv8)
+ self.binaries[shell] = binary
+ if need_libv8:
+ self.binaries["libv8.so"] = libv8
+ distro.Assign(self.suites, self.peers)
+ # Spawn one thread for each peer.
+ threads = []
+ for p in self.peers:
+ thread = threading.Thread(target=self._TalkToPeer, args=[p])
+ threads.append(thread)
+ thread.start()
+ try:
+ for thread in threads:
+ # Use a timeout so that signals (Ctrl+C) will be processed.
+ thread.join(timeout=10000000)
+ self._AnalyzePeerRuntimes()
+ except KeyboardInterrupt:
+ self.terminate = True
+ raise
+ except Exception, _e:
+ # If there's an exception we schedule an interruption for any
+ # remaining threads...
+ self.terminate = True
+ # ...and then reraise the exception to bail out.
+ raise
+ compression.Send(constants.END_OF_STREAM, self.local_socket)
+ self.local_socket.close()
+ if self.tests:
+ self._RunInternal(jobs)
+ self.indicator.Done()
+ return not self.failed
+
+ def _TalkToPeer(self, peer):
+ sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ sock.settimeout(self.context.timeout + 10)
+ code = sock.connect_ex((peer.address, constants.PEER_PORT))
+ if code == 0:
+ try:
+ peer.runtime = None
+ start_time = time.time()
+ packet = workpacket.WorkPacket(peer=peer, context=self.context,
+ base_revision=self.base_rev,
+ patch=self.patch,
+ pubkey=self.pubkey_fingerprint)
+ data, test_map = packet.Pack(self.binaries)
+ compression.Send(data, sock)
+ compression.Send(constants.END_OF_STREAM, sock)
+ rec = compression.Receiver(sock)
+ while not rec.IsDone() and not self.terminate:
+ data_list = rec.Current()
+ for data in data_list:
+ test_id = data[0]
+ if test_id < 0:
+ # The peer is reporting an error.
+ print("Peer %s reports error: %s" % (peer.address, data[1]))
+ rec.Advance()
+ continue
+ test = test_map.pop(test_id)
+ test.MergeResult(data)
+ try:
+ self.perfdata.UpdatePerfData(test)
+ except Exception, e:
+ print("UpdatePerfData exception: %s" % e)
+ pass # Just keep working.
+ with self.lock:
+ perf_key = self.perfdata.GetKey(test)
+ compression.Send(
+ [constants.INFORM_DURATION, perf_key, test.duration,
+ self.context.arch, self.context.mode],
+ self.local_socket)
+ self.indicator.AboutToRun(test)
+ if test.suite.HasUnexpectedOutput(test):
+ self.failed.append(test)
+ if test.output.HasCrashed():
+ self.crashed += 1
+ else:
+ self.succeeded += 1
+ self.remaining -= 1
+ self.indicator.HasRun(test)
+ rec.Advance()
+ peer.runtime = time.time() - start_time
+ except Exception:
+ pass # Fall back to local execution.
+ else:
+ compression.Send([constants.UNRESPONSIVE_PEER, peer.address],
+ self.local_socket)
+ sock.close()
+ if len(test_map) > 0:
+ # Some tests have not received any results. Run them locally.
+ print("No results for %d tests, running them locally." % len(test_map))
+ self._EnqueueLocally(test_map)
+
+ def _EnqueueLocally(self, test_map):
+ with self.tests_lock:
+ for test in test_map:
+ self.tests.append(test_map[test])
+
+ def _AnalyzePeerRuntimes(self):
+ total_runtime = 0.0
+ total_work = 0.0
+ for p in self.peers:
+ if p.runtime is None:
+ return
+ total_runtime += p.runtime
+ total_work += p.assigned_work
+ for p in self.peers:
+ p.assigned_work /= total_work
+ p.runtime /= total_runtime
+ perf_correction = p.assigned_work / p.runtime
+ old_perf = p.relative_performance
+ p.relative_performance = (old_perf + perf_correction) / 2.0
+ compression.Send([constants.UPDATE_PERF, p.address,
+ p.relative_performance],
+ self.local_socket)
« no previous file with comments | « tools/testrunner/network/endpoint.py ('k') | tools/testrunner/network/perfdata.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698