Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(688)

Side by Side Diff: tools/testrunner/network/network_execution.py

Issue 10919265: First commit of new tools/run-tests.py (Closed) Base URL: https://v8.googlecode.com/svn/branches/bleeding_edge
Patch Set: s/server.py/test-server.py/ in README Created 8 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « tools/testrunner/network/endpoint.py ('k') | tools/testrunner/network/perfdata.py » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 # Copyright 2012 the V8 project authors. All rights reserved.
2 # Redistribution and use in source and binary forms, with or without
3 # modification, are permitted provided that the following conditions are
4 # met:
5 #
6 # * Redistributions of source code must retain the above copyright
7 # notice, this list of conditions and the following disclaimer.
8 # * Redistributions in binary form must reproduce the above
9 # copyright notice, this list of conditions and the following
10 # disclaimer in the documentation and/or other materials provided
11 # with the distribution.
12 # * Neither the name of Google Inc. nor the names of its
13 # contributors may be used to endorse or promote products derived
14 # from this software without specific prior written permission.
15 #
16 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
17 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
18 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
19 # A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
20 # OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
23 # DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
24 # THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
25 # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
26 # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
27
28
29 import os
30 import socket
31 import subprocess
32 import threading
33 import time
34
35 from . import distro
36 from . import perfdata
37 from ..local import execution
38 from ..objects import peer
39 from ..objects import workpacket
40 from ..server import compression
41 from ..server import constants
42 from ..server import local_handler
43 from ..server import signatures
44
45
46 def GetPeers():
47 data = local_handler.LocalQuery([constants.REQUEST_PEERS])
48 if not data: return []
49 return [ peer.Peer.Unpack(p) for p in data ]
50
51
52 class NetworkedRunner(execution.Runner):
53 def __init__(self, suites, progress_indicator, context, peers, workspace):
54 self.suites = suites
55 num_tests = 0
56 datapath = os.path.join("out", "testrunner_data")
57 self.perf_data_manager = perfdata.PerfDataManager(datapath)
58 self.perfdata = self.perf_data_manager.GetStore(context.arch, context.mode)
59 for s in suites:
60 for t in s.tests:
61 t.duration = self.perfdata.FetchPerfData(t) or 1.0
62 num_tests += len(s.tests)
63 self._CommonInit(num_tests, progress_indicator, context)
64 self.tests = [] # Only used if we need to fall back to local execution.
65 self.tests_lock = threading.Lock()
66 self.peers = peers
67 self.pubkey_fingerprint = None # Fetched later.
68 self.base_rev = subprocess.check_output(
69 "cd %s; git log -1 --format=%%H --grep=git-svn-id" % workspace,
70 shell=True)
71 self.patch = subprocess.check_output(
72 "cd %s; git diff %s" % (workspace, self.base_rev), shell=True)
73 self.binaries = {}
74 self.initialization_lock = threading.Lock()
75 self.initialization_lock.acquire() # Released when init is done.
76 self._OpenLocalConnection()
77 self.local_receiver_thread = threading.Thread(
78 target=self._ListenLocalConnection)
79 self.local_receiver_thread.daemon = True
80 self.local_receiver_thread.start()
81 self.initialization_lock.acquire()
82 self.initialization_lock.release()
83
84 def _OpenLocalConnection(self):
85 self.local_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
86 code = self.local_socket.connect_ex(("localhost", constants.CLIENT_PORT))
87 if code != 0:
88 raise RuntimeError("Failed to connect to local server")
89 compression.Send([constants.REQUEST_PUBKEY_FINGERPRINT], self.local_socket)
90
91 def _ListenLocalConnection(self):
92 release_lock_countdown = 1 # Pubkey.
93 self.local_receiver = compression.Receiver(self.local_socket)
94 while not self.local_receiver.IsDone():
95 data = self.local_receiver.Current()
96 if data[0] == constants.REQUEST_PUBKEY_FINGERPRINT:
97 pubkey = data[1]
98 if not pubkey: raise RuntimeError("Received empty public key")
99 self.pubkey_fingerprint = pubkey
100 release_lock_countdown -= 1
101 if release_lock_countdown == 0:
102 self.initialization_lock.release()
103 release_lock_countdown -= 1 # Prevent repeated triggering.
104 self.local_receiver.Advance()
105
106 def Run(self, jobs):
107 self.indicator.Starting()
108 need_libv8 = False
109 for s in self.suites:
110 shell = s.shell()
111 if shell not in self.binaries:
112 path = os.path.join(self.context.shell_dir, shell)
113 # Check if this is a shared library build.
114 try:
115 ldd = subprocess.check_output("ldd %s | grep libv8\\.so" % (path),
116 shell=True)
117 ldd = ldd.strip().split(" ")
118 assert ldd[0] == "libv8.so"
119 assert ldd[1] == "=>"
120 need_libv8 = True
121 binary_needs_libv8 = True
122 libv8 = signatures.ReadFileAndSignature(ldd[2])
123 except:
124 binary_needs_libv8 = False
125 binary = signatures.ReadFileAndSignature(path)
126 if binary[0] is None:
127 print("Error: Failed to create signature.")
128 assert binary[1] != 0
129 return binary[1]
130 binary.append(binary_needs_libv8)
131 self.binaries[shell] = binary
132 if need_libv8:
133 self.binaries["libv8.so"] = libv8
134 distro.Assign(self.suites, self.peers)
135 # Spawn one thread for each peer.
136 threads = []
137 for p in self.peers:
138 thread = threading.Thread(target=self._TalkToPeer, args=[p])
139 threads.append(thread)
140 thread.start()
141 try:
142 for thread in threads:
143 # Use a timeout so that signals (Ctrl+C) will be processed.
144 thread.join(timeout=10000000)
145 self._AnalyzePeerRuntimes()
146 except KeyboardInterrupt:
147 self.terminate = True
148 raise
149 except Exception, _e:
150 # If there's an exception we schedule an interruption for any
151 # remaining threads...
152 self.terminate = True
153 # ...and then reraise the exception to bail out.
154 raise
155 compression.Send(constants.END_OF_STREAM, self.local_socket)
156 self.local_socket.close()
157 if self.tests:
158 self._RunInternal(jobs)
159 self.indicator.Done()
160 return not self.failed
161
162 def _TalkToPeer(self, peer):
163 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
164 sock.settimeout(self.context.timeout + 10)
165 code = sock.connect_ex((peer.address, constants.PEER_PORT))
166 if code == 0:
167 try:
168 peer.runtime = None
169 start_time = time.time()
170 packet = workpacket.WorkPacket(peer=peer, context=self.context,
171 base_revision=self.base_rev,
172 patch=self.patch,
173 pubkey=self.pubkey_fingerprint)
174 data, test_map = packet.Pack(self.binaries)
175 compression.Send(data, sock)
176 compression.Send(constants.END_OF_STREAM, sock)
177 rec = compression.Receiver(sock)
178 while not rec.IsDone() and not self.terminate:
179 data_list = rec.Current()
180 for data in data_list:
181 test_id = data[0]
182 if test_id < 0:
183 # The peer is reporting an error.
184 print("Peer %s reports error: %s" % (peer.address, data[1]))
185 rec.Advance()
186 continue
187 test = test_map.pop(test_id)
188 test.MergeResult(data)
189 try:
190 self.perfdata.UpdatePerfData(test)
191 except Exception, e:
192 print("UpdatePerfData exception: %s" % e)
193 pass # Just keep working.
194 with self.lock:
195 perf_key = self.perfdata.GetKey(test)
196 compression.Send(
197 [constants.INFORM_DURATION, perf_key, test.duration,
198 self.context.arch, self.context.mode],
199 self.local_socket)
200 self.indicator.AboutToRun(test)
201 if test.suite.HasUnexpectedOutput(test):
202 self.failed.append(test)
203 if test.output.HasCrashed():
204 self.crashed += 1
205 else:
206 self.succeeded += 1
207 self.remaining -= 1
208 self.indicator.HasRun(test)
209 rec.Advance()
210 peer.runtime = time.time() - start_time
211 except Exception:
212 pass # Fall back to local execution.
213 else:
214 compression.Send([constants.UNRESPONSIVE_PEER, peer.address],
215 self.local_socket)
216 sock.close()
217 if len(test_map) > 0:
218 # Some tests have not received any results. Run them locally.
219 print("No results for %d tests, running them locally." % len(test_map))
220 self._EnqueueLocally(test_map)
221
222 def _EnqueueLocally(self, test_map):
223 with self.tests_lock:
224 for test in test_map:
225 self.tests.append(test_map[test])
226
227 def _AnalyzePeerRuntimes(self):
228 total_runtime = 0.0
229 total_work = 0.0
230 for p in self.peers:
231 if p.runtime is None:
232 return
233 total_runtime += p.runtime
234 total_work += p.assigned_work
235 for p in self.peers:
236 p.assigned_work /= total_work
237 p.runtime /= total_runtime
238 perf_correction = p.assigned_work / p.runtime
239 old_perf = p.relative_performance
240 p.relative_performance = (old_perf + perf_correction) / 2.0
241 compression.Send([constants.UPDATE_PERF, p.address,
242 p.relative_performance],
243 self.local_socket)
OLDNEW
« no previous file with comments | « tools/testrunner/network/endpoint.py ('k') | tools/testrunner/network/perfdata.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698