OLD | NEW |
---|---|
(Empty) | |
1 #!/usr/bin/env python | |
2 # Copyright 2013 The Chromium Authors. All rights reserved. | |
3 # Use of this source code is governed by a BSD-style license that can be | |
4 # found in the LICENSE file. | |
5 | |
6 """Triggers a ton of fake jobs to test its handling under high load. | |
7 | |
8 Generates an histogram with the latencies to process the tasks and number of | |
9 retries. | |
10 """ | |
11 | |
12 import hashlib | |
13 import json | |
14 import logging | |
15 import optparse | |
16 import os | |
17 import Queue | |
18 import socket | |
19 import StringIO | |
20 import sys | |
21 import threading | |
22 import time | |
23 import zipfile | |
24 | |
25 ROOT_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) | |
26 | |
27 sys.path.insert(0, ROOT_DIR) | |
28 | |
29 from third_party import colorama | |
30 | |
31 from utils import graph | |
32 from utils import net | |
33 from utils import threading_utils | |
34 | |
35 # Line too long (NN/80) | |
36 # pylint: disable=C0301 | |
37 | |
38 | |
39 def print_results(results, columns, buckets): | |
40 delays = [i for i in results if isinstance(i, float)] | |
41 failures = [i for i in results if not isinstance(i, float)] | |
42 | |
43 print('%sDELAYS%s:' % (colorama.Fore.RED, colorama.Fore.RESET)) | |
44 graph.print_histogram( | |
45 graph.generate_histogram(delays, buckets), columns, ' %.3f') | |
46 print('') | |
47 print('Total items : %d' % len(results)) | |
48 average = 0 | |
49 if delays: | |
50 average = sum(delays)/ len(delays) | |
51 print('Average delay: %s' % graph.to_units(average)) | |
52 print('') | |
53 | |
54 if failures: | |
55 print('%sEVENTS%s:' % (colorama.Fore.RED, colorama.Fore.RESET)) | |
56 values = {} | |
57 for f in failures: | |
58 values.setdefault(f, 0) | |
59 values[f] += 1 | |
60 graph.print_histogram(values, columns, ' %s') | |
61 print('') | |
62 | |
63 | |
64 def calculate_version(url): | |
65 """Retrieves the swarm_bot code and returns the SHA-1 for it.""" | |
66 # Cannot use url_open() since zipfile requires .seek(). | |
67 archive = zipfile.ZipFile(StringIO.StringIO(net.url_read(url))) | |
68 # See | |
69 # https://code.google.com/p/swarming/source/browse/src/common/version.py?repo= swarming-server | |
70 files = ( | |
71 'slave_machine.py', | |
72 'swarm_bot/local_test_runner.py', | |
73 'common/__init__.py', | |
74 'common/swarm_constants.py', | |
75 'common/version.py', | |
76 'common/test_request_message.py', | |
77 'common/url_helper.py', | |
78 ) | |
79 d = hashlib.sha1() | |
80 for f in files: | |
81 d.update(archive.read(f)) | |
82 return d.hexdigest() | |
83 | |
84 | |
85 class FakeSwarmBot(object): | |
86 """This is a Fake swarm_bot implementation simulating it is running AIX. | |
87 | |
88 If someones fires up a real AIX slave, well, sorry. | |
89 | |
90 It polls for job, acts as if it was processing them and return the fake | |
91 result. | |
92 """ | |
93 def __init__( | |
94 self, swarming_url, swarm_bot_hash, index, progress, duration, ping, | |
95 events, kill_event): | |
96 self._lock = threading.Lock() | |
97 self._swarming = swarming_url | |
98 self._index = index | |
99 self._progress = progress | |
100 self._duration = duration | |
101 self._ping = ping | |
102 self._events = events | |
103 self._kill_event = kill_event | |
104 | |
105 # See | |
106 # https://code.google.com/p/swarming/source/browse/src/swarm_bot/slave_machi ne.py?repo=swarming-server | |
107 # and | |
108 # https://chromium.googlesource.com/chromium/tools/build.git/+/master/script s/tools/swarm_bootstrap/swarm_bootstrap.py | |
109 # for more details. | |
110 self._attributes = { | |
111 'dimensions': { | |
112 # Use improbable values to reduce the chance of interferring with real | |
113 # slaves. | |
114 'bits': '36', | |
115 'machine': os.uname()[4] + '-experimental', | |
116 'os': ['AIX'], | |
117 }, | |
118 # Use an impossible hostname. | |
119 'id': '%s-%d' % (socket.getfqdn().lower(), index), | |
120 'try_count': 0, | |
121 'tag': '%s-%d' % (socket.getfqdn().lower(), index), | |
122 'version': swarm_bot_hash, | |
123 } | |
124 | |
125 self._thread = threading.Thread(target=self._run, name='bot%d' % index) | |
126 self._thread.daemon = True | |
127 self._thread.start() | |
128 | |
129 def join(self): | |
130 self._thread.join() | |
131 | |
132 def is_alive(self): | |
133 return self._thread.is_alive() | |
134 | |
135 def _run(self): | |
136 try: | |
137 self._progress.update_item('%d alive' % self._index, bots=1) | |
138 while True: | |
139 if self._kill_event.get(): | |
140 return | |
141 data = {'attributes': json.dumps(self._attributes)} | |
142 request = net.url_open(self._swarming + '/poll_for_test', data=data) | |
143 if request is None: | |
144 self._events.put('poll_for_test_empty') | |
145 continue | |
146 start = time.time() | |
147 try: | |
148 manifest = json.load(request) | |
149 except ValueError: | |
150 self._progress.update_item('Failed to poll') | |
151 self._events.put('poll_for_test_invalid') | |
152 continue | |
153 | |
154 commands = [c['function'] for c in manifest.get('commands', [])] | |
155 if not commands: | |
156 # Nothing to run. | |
157 self._events.put('sleep') | |
158 time.sleep(manifest['come_back']) | |
159 continue | |
160 | |
161 if commands == ['UpdateSlave']: | |
csharp
2013/10/03 16:08:04
I don't think this is needed anymore, right?
Marc-Antoine Ruel (Google)
2013/10/03 16:47:11
It is, see line 163.
csharp
2013/10/03 19:00:34
I think we can just mark UpdateSlave as an unexpec
Marc-Antoine Ruel (Google)
2013/10/03 19:15:29
I disagree, this is something we should test, so t
csharp
2013/10/04 13:43:13
Ok.
| |
162 # Calculate the proper SHA-1 and loop again. | |
163 # This could happen if the Swarming server is upgraded while this | |
164 # script runs. | |
165 self._attributes['version'] = calculate_version( | |
166 manifest['commands'][0]['args']) | |
167 self._events.put('update_slave') | |
168 continue | |
169 | |
170 if commands != ['StoreFiles', 'RunCommands']: | |
171 self._progress.update_item( | |
172 'Unexpected RPC call %s\n%s' % (commands, manifest)) | |
173 self._events.put('unknown_rpc') | |
174 break | |
175 | |
176 # The normal way Swarming works is that it 'stores' a test_run.swarm | |
177 # file and then defer control to swarm_bot/local_test_runner.py. | |
178 store_cmd = manifest['commands'][0] | |
179 assert len(store_cmd['args']) == 1, store_cmd['args'] | |
180 filepath, filename, test_run_content = store_cmd['args'][0] | |
181 assert filepath == '' | |
182 assert filename == 'test_run.swarm' | |
183 assert manifest['commands'][1] == { | |
184 u'function': u'RunCommands', | |
185 u'args': [ | |
186 u'swarm_bot/local_test_runner.py', u'-f', | |
187 u'test_run.swarm', u'--restart_on_failure', | |
188 ], | |
189 }, manifest['commands'][1] | |
190 result_url = manifest['result_url'] | |
191 test_run = json.loads(test_run_content) | |
192 assert result_url == test_run['result_url'] | |
193 ping_url = test_run['ping_url'] | |
194 self._progress.update_item('%d processing' % self._index, processing=1) | |
195 | |
196 # Fake activity and send ping request every 0.5 second. | |
197 while True: | |
198 remaining = max(0, time.time() - start - self._duration) | |
199 if remaining > self._ping: | |
200 # In theory, we should use test_run['ping_delay'] but this is a load | |
201 # test. Make sure the server melts down. | |
202 result = net.url_read(ping_url) | |
203 assert result == 'OK' | |
204 remaining = max(0, time.time() - start - self._duration) | |
205 if not remaining: | |
206 break | |
207 time.sleep(remaining) | |
208 | |
209 data = { | |
210 'c': test_run['configuration']['config_name'], | |
211 'n': test_run['test_run_name'], | |
212 'o': False, | |
213 'result_output': 'This task ran with great success', | |
214 's': True, | |
215 'x': '0', | |
216 } | |
217 result = net.url_read(manifest['result_url'], data=data) | |
218 self._progress.update_item( | |
219 '%d processed' % self._index, processing=-1, processed=1) | |
220 if not result: | |
221 self._events.put('result_url_fail') | |
222 else: | |
223 assert result == 'Successfully update the runner results.', result | |
224 self._events.put(time.time() - start) | |
225 finally: | |
226 self._progress.update_item('%d quit' % self._index, bots=-1) | |
227 | |
228 | |
229 def main(): | |
230 colorama.init() | |
231 parser = optparse.OptionParser(description=sys.modules[__name__].__doc__) | |
232 parser.add_option( | |
233 '-S', '--swarming', | |
234 metavar='URL', default='', | |
235 help='Swarming server to use') | |
236 | |
237 group = optparse.OptionGroup(parser, 'Load generated') | |
238 group.add_option( | |
239 '--slaves', type='int', default=300, metavar='N', | |
240 help='Number of swarm bot slaves, default: %default') | |
241 group.add_option( | |
242 '-c', '--consume', type='float', default=60., metavar='N', | |
243 help='Duration (s) for consuming a request, default: %default') | |
244 group.add_option( | |
csharp
2013/10/03 16:08:04
As mentioned before, probably remove this since pi
Marc-Antoine Ruel (Google)
2013/10/03 16:47:11
Done.
| |
245 '-p', '--ping', type='float', default=0.5, metavar='N', | |
246 help='Ping delay (s) while consuming a request, normally, it would be in ' | |
247 'the range of 30s but this is a load test, default: %default') | |
248 parser.add_option_group(group) | |
249 | |
250 group = optparse.OptionGroup(parser, 'Display options') | |
251 group.add_option( | |
252 '--columns', type='int', default=graph.get_console_width(), metavar='N', | |
253 help='For histogram display, default:%default') | |
254 group.add_option( | |
255 '--buckets', type='int', default=20, metavar='N', | |
256 help='Number of buckets for histogram display, default:%default') | |
257 parser.add_option_group(group) | |
258 | |
259 parser.add_option( | |
260 '--dump', metavar='FOO.JSON', help='Dumps to json file') | |
261 parser.add_option( | |
262 '-v', '--verbose', action='store_true', help='Enables logging') | |
263 | |
264 options, args = parser.parse_args() | |
265 logging.basicConfig(level=logging.INFO if options.verbose else logging.FATAL) | |
266 if args: | |
267 parser.error('Unsupported args: %s' % args) | |
268 options.swarming = options.swarming.rstrip('/') | |
269 if not options.swarming: | |
270 parser.error('--swarming is required.') | |
271 if options.consume <= 0: | |
272 parser.error('Needs --consume > 0. 0.01 is a valid value.') | |
273 | |
274 print( | |
275 'Running %d slaves, each task lasting %.1fs' % ( | |
276 options.slaves, options.consume)) | |
277 # Calculate the SHA-1 of the swarm_bot code. | |
278 swarm_bot_hash = calculate_version(options.swarming + '/get_slave_code') | |
csharp
2013/10/03 16:08:04
Nit: Move to just above line 287
Marc-Antoine Ruel (Google)
2013/10/03 16:47:11
Done.
| |
279 | |
280 print('Ctrl-C to exit.') | |
281 print('[processing/processed/bots]') | |
282 columns = [('processing', 0), ('processed', 0), ('bots', 0)] | |
283 progress = threading_utils.Progress(columns) | |
284 events = Queue.Queue() | |
285 start = time.time() | |
286 kill_event = threading_utils.Bit() | |
287 slaves = [ | |
288 FakeSwarmBot( | |
289 options.swarming, swarm_bot_hash, i, progress, options.consume, | |
290 options.ping, events, kill_event) | |
291 for i in range(options.slaves) | |
292 ] | |
293 try: | |
294 # Wait for all the slaves to come alive. | |
295 while not all(s.is_alive() for s in slaves): | |
296 time.sleep(0.01) | |
297 progress.update_item('Ready to run') | |
298 while slaves: | |
299 progress.print_update() | |
300 time.sleep(0.01) | |
301 # The slaves could be told to suicide. | |
csharp
2013/10/03 16:08:04
Nit: suicide-> die ("be told to suicide" just soun
Marc-Antoine Ruel (Google)
2013/10/03 16:47:11
Done.
| |
302 slaves = [s for s in slaves if s.is_alive()] | |
303 except KeyboardInterrupt: | |
304 kill_event.set() | |
305 | |
306 progress.update_item('Waiting for slaves to quit.', raw=True) | |
csharp
2013/10/03 16:08:04
It would be nice it when the fake bots went down t
Marc-Antoine Ruel (Google)
2013/10/03 16:47:11
Yep. I'd want this functionality for the Android s
csharp
2013/10/03 19:00:34
The machine just needs to call: "/delete_machine_s
Marc-Antoine Ruel (Google)
2013/10/03 19:15:29
Done.
| |
307 progress.update_item('') | |
308 while slaves: | |
309 progress.print_update() | |
310 slaves = [s for s in slaves if s.is_alive()] | |
311 # At this point, progress is not used anymore. | |
312 print('') | |
313 print('Ran for %.1fs.' % (time.time() - start)) | |
314 print('') | |
315 results = events.queue | |
316 print_results(results, options.columns, options.buckets) | |
317 if options.dump: | |
318 with open(options.dump, 'w') as f: | |
319 json.dump(results, f, separators=(',',':')) | |
320 return 0 | |
321 | |
322 | |
323 if __name__ == '__main__': | |
324 sys.exit(main()) | |
OLD | NEW |