Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(208)

Side by Side Diff: tools/testrunner/network/network_execution.py

Issue 10919265: First commit of new tools/run-tests.py (Closed) Base URL: https://v8.googlecode.com/svn/branches/bleeding_edge
Patch Set: Created 8 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
(Empty)
1 # Copyright 2012 the V8 project authors. All rights reserved.
2 # Redistribution and use in source and binary forms, with or without
3 # modification, are permitted provided that the following conditions are
4 # met:
5 #
6 # * Redistributions of source code must retain the above copyright
7 # notice, this list of conditions and the following disclaimer.
8 # * Redistributions in binary form must reproduce the above
9 # copyright notice, this list of conditions and the following
10 # disclaimer in the documentation and/or other materials provided
11 # with the distribution.
12 # * Neither the name of Google Inc. nor the names of its
13 # contributors may be used to endorse or promote products derived
14 # from this software without specific prior written permission.
15 #
16 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
17 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
18 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
19 # A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
20 # OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
23 # DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
24 # THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
25 # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
26 # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
27
28
29 import os
30 import socket
31 import subprocess
32 import threading
33 import time
34
35 from . import distro
36 from . import perfdata
37 from ..local import execution
38 from ..objects import peer
39 from ..objects import workpacket
40 from ..server import compression
41 from ..server import constants
42 from ..server import local_handler
43 from ..server import signatures
44
45
46 def GetPeers():
47 data = local_handler.LocalQuery([constants.REQUEST_PEERS])
48 if not data: return []
49 return [ peer.Peer.Unpack(p) for p in data ]
50
51
52 class NetworkedRunner(execution.Runner):
53 def __init__(self, suites, progress_indicator, context, peers, workspace):
54 self.suites = suites
55 num_tests = 0
56 datapath = os.path.join("out", "testrunner_data")
57 self.perf_data_manager = perfdata.PerfDataManager(datapath)
58 self.perfdata = self.perf_data_manager.GetStore(context.arch, context.mode)
59 for s in suites:
60 for t in s.tests:
61 t.duration = self.perfdata.FetchPerfData(t) or 1.0
62 num_tests += len(s.tests)
63 self._CommonInit(num_tests, progress_indicator, context)
64 self.tests = [] # Only used if we need to fall back to local execution.
65 self.tests_lock = threading.Lock()
66 self.peers = peers
67 self.pubkey_fingerprint = None # Fetched later.
68 self.base_rev = subprocess.check_output(
69 "cd %s; git log -1 --format=%%H --grep=git-svn-id" % workspace,
70 shell=True)
71 self.patch = subprocess.check_output(
72 "cd %s; git diff %s" % (workspace, self.base_rev), shell=True)
73 self.binaries = {}
74 self.initialization_lock = threading.Lock()
75 self.initialization_lock.acquire() # Released when init is done.
76 self._OpenLocalConnection()
77 self.local_receiver_thread = threading.Thread(
78 target=self._ListenLocalConnection)
79 self.local_receiver_thread.daemon = True
80 self.local_receiver_thread.start()
81 self.initialization_lock.acquire()
82 self.initialization_lock.release()
83
84 def _OpenLocalConnection(self):
85 self.local_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
86 code = self.local_socket.connect_ex(("localhost", constants.CLIENT_PORT))
87 if code != 0:
88 raise RuntimeError("Failed to connect to local server")
89 compression.Send([constants.REQUEST_PUBKEY_FINGERPRINT], self.local_socket)
90
91 def _ListenLocalConnection(self):
92 release_lock_countdown = 1 # Pubkey.
93 self.local_receiver = compression.Receiver(self.local_socket)
94 while not self.local_receiver.IsDone():
95 data = self.local_receiver.Current()
96 if data[0] == constants.REQUEST_PUBKEY_FINGERPRINT:
97 pubkey = data[1]
98 if not pubkey: raise RuntimeError("Received empty public key")
99 self.pubkey_fingerprint = pubkey
100 release_lock_countdown -= 1
101 if release_lock_countdown == 0:
102 self.initialization_lock.release()
103 release_lock_countdown -= 1 # Prevent repeated triggering.
104 self.local_receiver.Advance()
105
106 def Run(self, jobs):
107 self.indicator.Starting()
108 need_libv8 = False
109 for s in self.suites:
110 shell = s.shell()
111 if shell not in self.binaries:
112 path = os.path.join(self.context.shell_dir, shell)
113 # Check if this is a shared library build.
114 try:
115 ldd = subprocess.check_output("ldd %s | grep libv8\\.so" % (path),
116 shell=True)
117 ldd = ldd.strip().split(" ")
118 assert ldd[0] == "libv8.so"
119 assert ldd[1] == "=>"
120 need_libv8 = True
121 binary_needs_libv8 = True
122 libv8 = signatures.ReadFileAndSignature(ldd[2])
123 except:
124 binary_needs_libv8 = False
125 binary = signatures.ReadFileAndSignature(path)
126 if binary[0] is None:
127 print("Error: Failed to create signature.")
128 assert binary[1] != 0
129 return binary[1]
130 binary.append(binary_needs_libv8)
131 self.binaries[shell] = binary
132 if need_libv8:
133 self.binaries["libv8.so"] = libv8
134 distro.Assign(self.suites, self.peers)
135 # Spawn one thread for each peer.
136 threads = []
137 for p in self.peers:
138 thread = threading.Thread(target=self._TalkToPeer, args=[p])
139 threads.append(thread)
140 thread.start()
141 try:
142 for thread in threads:
143 # Use a timeout so that signals (Ctrl+C) will be processed.
144 thread.join(timeout=10000000)
145 self._AnalyzePeerRuntimes()
146 except KeyboardInterrupt:
147 self.terminate = True
148 raise
149 except Exception, _e:
150 # If there's an exception we schedule an interruption for any
151 # remaining threads...
152 self.terminate = True
153 # ...and then reraise the exception to bail out.
154 raise
155 compression.Send(constants.END_OF_STREAM, self.local_socket)
156 self.local_socket.close()
157 if self.queue is not None:
158 self._RunInternal(jobs)
159 self.indicator.Done()
160 return not self.failed
161
162 def _TalkToPeer(self, peer):
163 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
164 sock.settimeout(self.context.timeout + 10)
165 code = sock.connect_ex((peer.address, constants.PEER_PORT))
166 if code == 0:
167 try:
168 peer.runtime = None
169 start_time = time.time()
170 packet = workpacket.WorkPacket(peer=peer, context=self.context,
171 base_revision=self.base_rev,
172 patch=self.patch,
173 pubkey=self.pubkey_fingerprint)
174 data, test_map = packet.Pack(self.binaries)
175 compression.Send(data, sock)
176 compression.Send(constants.END_OF_STREAM, sock)
177 rec = compression.Receiver(sock)
178 while not rec.IsDone() and not self.terminate:
179 data_list = rec.Current()
180 for data in data_list:
181 test_id = data[0]
182 if test_id < 0:
183 # The peer is reporting an error.
184 print("Peer %s reports error: %s" % (peer.address, data[1]))
185 rec.Advance()
186 continue
187 test = test_map.pop(test_id)
188 test.MergeResult(data)
189 self.perfdata.UpdatePerfData(test)
190 with self.lock:
191 perf_key = self.perfdata.GetKey(test)
192 compression.Send(
193 [constants.INFORM_DURATION, perf_key, test.duration,
194 self.context.arch, self.context.mode],
195 self.local_socket)
196 self.indicator.AboutToRun(test)
197 if test.suite.HasUnexpectedOutput(test):
198 self.failed.append(test)
199 if test.output.HasCrashed():
200 self.crashed += 1
201 else:
202 self.succeeded += 1
203 self.remaining -= 1
204 self.indicator.HasRun(test)
205 rec.Advance()
206 peer.runtime = time.time() - start_time
207 except Exception:
208 pass # Fall back to local execution.
209 else:
210 compression.Send([constants.UNRESPONSIVE_PEER, peer.address],
211 self.local_socket)
212 sock.close()
213 if len(test_map) > 0:
214 # Some tests have not received any results. Run them locally.
215 print("No results for %d tests, running them locally." % len(test_map))
216 self._EnqueueLocally(test_map)
217
218 def _EnqueueLocally(self, test_map):
219 with self.tests_lock:
220 for test in test_map:
221 self.tests.append(test_map[test])
222
223 def _AnalyzePeerRuntimes(self):
224 total_runtime = 0.0
225 total_work = 0.0
226 for p in self.peers:
227 if p.runtime is None:
228 return
229 total_runtime += p.runtime
230 total_work += p.assigned_work
231 for p in self.peers:
232 p.assigned_work /= total_work
233 p.runtime /= total_runtime
234 perf_correction = p.assigned_work / p.runtime
235 old_perf = p.relative_performance
236 p.relative_performance = (old_perf + perf_correction) / 2.0
237 compression.Send([constants.UPDATE_PERF, p.address,
238 p.relative_performance],
239 self.local_socket)
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698