Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(93)

Side by Side Diff: tools/testrunner/network/network_execution.py

Issue 11035053: Rollback trunk to bleeding_edge revision 12525 (Closed) Base URL: https://v8.googlecode.com/svn/trunk
Patch Set: Created 8 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « tools/testrunner/network/endpoint.py ('k') | tools/testrunner/network/perfdata.py » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 # Copyright 2012 the V8 project authors. All rights reserved.
2 # Redistribution and use in source and binary forms, with or without
3 # modification, are permitted provided that the following conditions are
4 # met:
5 #
6 # * Redistributions of source code must retain the above copyright
7 # notice, this list of conditions and the following disclaimer.
8 # * Redistributions in binary form must reproduce the above
9 # copyright notice, this list of conditions and the following
10 # disclaimer in the documentation and/or other materials provided
11 # with the distribution.
12 # * Neither the name of Google Inc. nor the names of its
13 # contributors may be used to endorse or promote products derived
14 # from this software without specific prior written permission.
15 #
16 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
17 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
18 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
19 # A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
20 # OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
23 # DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
24 # THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
25 # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
26 # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
27
28
29 import os
30 import socket
31 import subprocess
32 import threading
33 import time
34
35 from . import distro
36 from . import perfdata
37 from ..local import execution
38 from ..objects import peer
39 from ..objects import workpacket
40 from ..server import compression
41 from ..server import constants
42 from ..server import local_handler
43 from ..server import signatures
44
45
46 def GetPeers():
47 data = local_handler.LocalQuery([constants.REQUEST_PEERS])
48 if not data: return []
49 return [ peer.Peer.Unpack(p) for p in data ]
50
51
52 class NetworkedRunner(execution.Runner):
53 def __init__(self, suites, progress_indicator, context, peers, workspace):
54 self.suites = suites
55 num_tests = 0
56 datapath = os.path.join("out", "testrunner_data")
57 self.perf_data_manager = perfdata.PerfDataManager(datapath)
58 self.perfdata = self.perf_data_manager.GetStore(context.arch, context.mode)
59 for s in suites:
60 for t in s.tests:
61 t.duration = self.perfdata.FetchPerfData(t) or 1.0
62 num_tests += len(s.tests)
63 self._CommonInit(num_tests, progress_indicator, context)
64 self.tests = [] # Only used if we need to fall back to local execution.
65 self.tests_lock = threading.Lock()
66 self.peers = peers
67 self.pubkey_fingerprint = None # Fetched later.
68 self.base_rev = subprocess.check_output(
69 "cd %s; git log -1 --format=%%H --grep=git-svn-id" % workspace,
70 shell=True).strip()
71 self.base_svn_rev = subprocess.check_output(
72 "cd %s; git log -1 %s" # Get commit description.
73 " | grep -e '^\s*git-svn-id:'" # Extract "git-svn-id" line.
74 " | awk '{print $2}'" # Extract "repository@revision" part.
75 " | sed -e 's/.*@//'" % # Strip away "repository@".
76 (workspace, self.base_rev), shell=True).strip()
77 self.patch = subprocess.check_output(
78 "cd %s; git diff %s" % (workspace, self.base_rev), shell=True)
79 self.binaries = {}
80 self.initialization_lock = threading.Lock()
81 self.initialization_lock.acquire() # Released when init is done.
82 self._OpenLocalConnection()
83 self.local_receiver_thread = threading.Thread(
84 target=self._ListenLocalConnection)
85 self.local_receiver_thread.daemon = True
86 self.local_receiver_thread.start()
87 self.initialization_lock.acquire()
88 self.initialization_lock.release()
89
90 def _OpenLocalConnection(self):
91 self.local_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
92 code = self.local_socket.connect_ex(("localhost", constants.CLIENT_PORT))
93 if code != 0:
94 raise RuntimeError("Failed to connect to local server")
95 compression.Send([constants.REQUEST_PUBKEY_FINGERPRINT], self.local_socket)
96
97 def _ListenLocalConnection(self):
98 release_lock_countdown = 1 # Pubkey.
99 self.local_receiver = compression.Receiver(self.local_socket)
100 while not self.local_receiver.IsDone():
101 data = self.local_receiver.Current()
102 if data[0] == constants.REQUEST_PUBKEY_FINGERPRINT:
103 pubkey = data[1]
104 if not pubkey: raise RuntimeError("Received empty public key")
105 self.pubkey_fingerprint = pubkey
106 release_lock_countdown -= 1
107 if release_lock_countdown == 0:
108 self.initialization_lock.release()
109 release_lock_countdown -= 1 # Prevent repeated triggering.
110 self.local_receiver.Advance()
111
112 def Run(self, jobs):
113 self.indicator.Starting()
114 need_libv8 = False
115 for s in self.suites:
116 shell = s.shell()
117 if shell not in self.binaries:
118 path = os.path.join(self.context.shell_dir, shell)
119 # Check if this is a shared library build.
120 try:
121 ldd = subprocess.check_output("ldd %s | grep libv8\\.so" % (path),
122 shell=True)
123 ldd = ldd.strip().split(" ")
124 assert ldd[0] == "libv8.so"
125 assert ldd[1] == "=>"
126 need_libv8 = True
127 binary_needs_libv8 = True
128 libv8 = signatures.ReadFileAndSignature(ldd[2])
129 except:
130 binary_needs_libv8 = False
131 binary = signatures.ReadFileAndSignature(path)
132 if binary[0] is None:
133 print("Error: Failed to create signature.")
134 assert binary[1] != 0
135 return binary[1]
136 binary.append(binary_needs_libv8)
137 self.binaries[shell] = binary
138 if need_libv8:
139 self.binaries["libv8.so"] = libv8
140 distro.Assign(self.suites, self.peers)
141 # Spawn one thread for each peer.
142 threads = []
143 for p in self.peers:
144 thread = threading.Thread(target=self._TalkToPeer, args=[p])
145 threads.append(thread)
146 thread.start()
147 try:
148 for thread in threads:
149 # Use a timeout so that signals (Ctrl+C) will be processed.
150 thread.join(timeout=10000000)
151 self._AnalyzePeerRuntimes()
152 except KeyboardInterrupt:
153 self.terminate = True
154 raise
155 except Exception, _e:
156 # If there's an exception we schedule an interruption for any
157 # remaining threads...
158 self.terminate = True
159 # ...and then reraise the exception to bail out.
160 raise
161 compression.Send(constants.END_OF_STREAM, self.local_socket)
162 self.local_socket.close()
163 if self.tests:
164 self._RunInternal(jobs)
165 self.indicator.Done()
166 return not self.failed
167
168 def _TalkToPeer(self, peer):
169 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
170 sock.settimeout(self.context.timeout + 10)
171 code = sock.connect_ex((peer.address, constants.PEER_PORT))
172 if code == 0:
173 try:
174 peer.runtime = None
175 start_time = time.time()
176 packet = workpacket.WorkPacket(peer=peer, context=self.context,
177 base_revision=self.base_svn_rev,
178 patch=self.patch,
179 pubkey=self.pubkey_fingerprint)
180 data, test_map = packet.Pack(self.binaries)
181 compression.Send(data, sock)
182 compression.Send(constants.END_OF_STREAM, sock)
183 rec = compression.Receiver(sock)
184 while not rec.IsDone() and not self.terminate:
185 data_list = rec.Current()
186 for data in data_list:
187 test_id = data[0]
188 if test_id < 0:
189 # The peer is reporting an error.
190 print("Peer %s reports error: %s" % (peer.address, data[1]))
191 rec.Advance()
192 continue
193 test = test_map.pop(test_id)
194 test.MergeResult(data)
195 try:
196 self.perfdata.UpdatePerfData(test)
197 except Exception, e:
198 print("UpdatePerfData exception: %s" % e)
199 pass # Just keep working.
200 with self.lock:
201 perf_key = self.perfdata.GetKey(test)
202 compression.Send(
203 [constants.INFORM_DURATION, perf_key, test.duration,
204 self.context.arch, self.context.mode],
205 self.local_socket)
206 self.indicator.AboutToRun(test)
207 if test.suite.HasUnexpectedOutput(test):
208 self.failed.append(test)
209 if test.output.HasCrashed():
210 self.crashed += 1
211 else:
212 self.succeeded += 1
213 self.remaining -= 1
214 self.indicator.HasRun(test)
215 rec.Advance()
216 peer.runtime = time.time() - start_time
217 except Exception:
218 pass # Fall back to local execution.
219 else:
220 compression.Send([constants.UNRESPONSIVE_PEER, peer.address],
221 self.local_socket)
222 sock.close()
223 if len(test_map) > 0:
224 # Some tests have not received any results. Run them locally.
225 print("No results for %d tests, running them locally." % len(test_map))
226 self._EnqueueLocally(test_map)
227
228 def _EnqueueLocally(self, test_map):
229 with self.tests_lock:
230 for test in test_map:
231 self.tests.append(test_map[test])
232
233 def _AnalyzePeerRuntimes(self):
234 total_runtime = 0.0
235 total_work = 0.0
236 for p in self.peers:
237 if p.runtime is None:
238 return
239 total_runtime += p.runtime
240 total_work += p.assigned_work
241 for p in self.peers:
242 p.assigned_work /= total_work
243 p.runtime /= total_runtime
244 perf_correction = p.assigned_work / p.runtime
245 old_perf = p.relative_performance
246 p.relative_performance = (old_perf + perf_correction) / 2.0
247 compression.Send([constants.UPDATE_PERF, p.address,
248 p.relative_performance],
249 self.local_socket)
OLDNEW
« no previous file with comments | « tools/testrunner/network/endpoint.py ('k') | tools/testrunner/network/perfdata.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698