Index: tools/swarming_load_test_bot.py |
diff --git a/tools/swarming_load_test_bot.py b/tools/swarming_load_test_bot.py |
new file mode 100755 |
index 0000000000000000000000000000000000000000..2f11a2e569f7c9c022daf7a81e9a2cb77877f099 |
--- /dev/null |
+++ b/tools/swarming_load_test_bot.py |
@@ -0,0 +1,324 @@ |
+#!/usr/bin/env python |
+# Copyright 2013 The Chromium Authors. All rights reserved. |
+# Use of this source code is governed by a BSD-style license that can be |
+# found in the LICENSE file. |
+ |
+"""Triggers a ton of fake jobs to test its handling under high load. |
+ |
+Generates an histogram with the latencies to process the tasks and number of |
+retries. |
+""" |
+ |
+import hashlib |
+import json |
+import logging |
+import optparse |
+import os |
+import Queue |
+import socket |
+import StringIO |
+import sys |
+import threading |
+import time |
+import zipfile |
+ |
+ROOT_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) |
+ |
+sys.path.insert(0, ROOT_DIR) |
+ |
+from third_party import colorama |
+ |
+from utils import graph |
+from utils import net |
+from utils import threading_utils |
+ |
+# Line too long (NN/80) |
+# pylint: disable=C0301 |
+ |
+ |
+def print_results(results, columns, buckets): |
+ delays = [i for i in results if isinstance(i, float)] |
+ failures = [i for i in results if not isinstance(i, float)] |
+ |
+ print('%sDELAYS%s:' % (colorama.Fore.RED, colorama.Fore.RESET)) |
+ graph.print_histogram( |
+ graph.generate_histogram(delays, buckets), columns, ' %.3f') |
+ print('') |
+ print('Total items : %d' % len(results)) |
+ average = 0 |
+ if delays: |
+ average = sum(delays)/ len(delays) |
+ print('Average delay: %s' % graph.to_units(average)) |
+ print('') |
+ |
+ if failures: |
+ print('%sEVENTS%s:' % (colorama.Fore.RED, colorama.Fore.RESET)) |
+ values = {} |
+ for f in failures: |
+ values.setdefault(f, 0) |
+ values[f] += 1 |
+ graph.print_histogram(values, columns, ' %s') |
+ print('') |
+ |
+ |
+def calculate_version(url): |
+ """Retrieves the swarm_bot code and returns the SHA-1 for it.""" |
+ # Cannot use url_open() since zipfile requires .seek(). |
+ archive = zipfile.ZipFile(StringIO.StringIO(net.url_read(url))) |
+ # See |
+ # https://code.google.com/p/swarming/source/browse/src/common/version.py?repo=swarming-server |
+ files = ( |
+ 'slave_machine.py', |
+ 'swarm_bot/local_test_runner.py', |
+ 'common/__init__.py', |
+ 'common/swarm_constants.py', |
+ 'common/version.py', |
+ 'common/test_request_message.py', |
+ 'common/url_helper.py', |
+ ) |
+ d = hashlib.sha1() |
+ for f in files: |
+ d.update(archive.read(f)) |
+ return d.hexdigest() |
+ |
+ |
+class FakeSwarmBot(object): |
+ """This is a Fake swarm_bot implementation simulating it is running AIX. |
+ |
+ If someones fires up a real AIX slave, well, sorry. |
+ |
+ It polls for job, acts as if it was processing them and return the fake |
+ result. |
+ """ |
+ def __init__( |
+ self, swarming_url, swarm_bot_hash, index, progress, duration, ping, |
+ events, kill_event): |
+ self._lock = threading.Lock() |
+ self._swarming = swarming_url |
+ self._index = index |
+ self._progress = progress |
+ self._duration = duration |
+ self._ping = ping |
+ self._events = events |
+ self._kill_event = kill_event |
+ |
+ # See |
+ # https://code.google.com/p/swarming/source/browse/src/swarm_bot/slave_machine.py?repo=swarming-server |
+ # and |
+ # https://chromium.googlesource.com/chromium/tools/build.git/+/master/scripts/tools/swarm_bootstrap/swarm_bootstrap.py |
+ # for more details. |
+ self._attributes = { |
+ 'dimensions': { |
+ # Use improbable values to reduce the chance of interferring with real |
+ # slaves. |
+ 'bits': '36', |
+ 'machine': os.uname()[4] + '-experimental', |
+ 'os': ['AIX'], |
+ }, |
+ # Use an impossible hostname. |
+ 'id': '%s-%d' % (socket.getfqdn().lower(), index), |
+ 'try_count': 0, |
+ 'tag': '%s-%d' % (socket.getfqdn().lower(), index), |
+ 'version': swarm_bot_hash, |
+ } |
+ |
+ self._thread = threading.Thread(target=self._run, name='bot%d' % index) |
+ self._thread.daemon = True |
+ self._thread.start() |
+ |
+ def join(self): |
+ self._thread.join() |
+ |
+ def is_alive(self): |
+ return self._thread.is_alive() |
+ |
+ def _run(self): |
+ try: |
+ self._progress.update_item('%d alive' % self._index, bots=1) |
+ while True: |
+ if self._kill_event.get(): |
+ return |
+ data = {'attributes': json.dumps(self._attributes)} |
+ request = net.url_open(self._swarming + '/poll_for_test', data=data) |
+ if request is None: |
+ self._events.put('poll_for_test_empty') |
+ continue |
+ start = time.time() |
+ try: |
+ manifest = json.load(request) |
+ except ValueError: |
+ self._progress.update_item('Failed to poll') |
+ self._events.put('poll_for_test_invalid') |
+ continue |
+ |
+ commands = [c['function'] for c in manifest.get('commands', [])] |
+ if not commands: |
+ # Nothing to run. |
+ self._events.put('sleep') |
+ time.sleep(manifest['come_back']) |
+ continue |
+ |
+ if commands == ['UpdateSlave']: |
csharp
2013/10/03 16:08:04
I don't think this is needed anymore, right?
Marc-Antoine Ruel (Google)
2013/10/03 16:47:11
It is, see line 163.
csharp
2013/10/03 19:00:34
I think we can just mark UpdateSlave as an unexpec
Marc-Antoine Ruel (Google)
2013/10/03 19:15:29
I disagree, this is something we should test, so t
csharp
2013/10/04 13:43:13
Ok.
|
+ # Calculate the proper SHA-1 and loop again. |
+ # This could happen if the Swarming server is upgraded while this |
+ # script runs. |
+ self._attributes['version'] = calculate_version( |
+ manifest['commands'][0]['args']) |
+ self._events.put('update_slave') |
+ continue |
+ |
+ if commands != ['StoreFiles', 'RunCommands']: |
+ self._progress.update_item( |
+ 'Unexpected RPC call %s\n%s' % (commands, manifest)) |
+ self._events.put('unknown_rpc') |
+ break |
+ |
+ # The normal way Swarming works is that it 'stores' a test_run.swarm |
+ # file and then defer control to swarm_bot/local_test_runner.py. |
+ store_cmd = manifest['commands'][0] |
+ assert len(store_cmd['args']) == 1, store_cmd['args'] |
+ filepath, filename, test_run_content = store_cmd['args'][0] |
+ assert filepath == '' |
+ assert filename == 'test_run.swarm' |
+ assert manifest['commands'][1] == { |
+ u'function': u'RunCommands', |
+ u'args': [ |
+ u'swarm_bot/local_test_runner.py', u'-f', |
+ u'test_run.swarm', u'--restart_on_failure', |
+ ], |
+ }, manifest['commands'][1] |
+ result_url = manifest['result_url'] |
+ test_run = json.loads(test_run_content) |
+ assert result_url == test_run['result_url'] |
+ ping_url = test_run['ping_url'] |
+ self._progress.update_item('%d processing' % self._index, processing=1) |
+ |
+ # Fake activity and send ping request every 0.5 second. |
+ while True: |
+ remaining = max(0, time.time() - start - self._duration) |
+ if remaining > self._ping: |
+ # In theory, we should use test_run['ping_delay'] but this is a load |
+ # test. Make sure the server melts down. |
+ result = net.url_read(ping_url) |
+ assert result == 'OK' |
+ remaining = max(0, time.time() - start - self._duration) |
+ if not remaining: |
+ break |
+ time.sleep(remaining) |
+ |
+ data = { |
+ 'c': test_run['configuration']['config_name'], |
+ 'n': test_run['test_run_name'], |
+ 'o': False, |
+ 'result_output': 'This task ran with great success', |
+ 's': True, |
+ 'x': '0', |
+ } |
+ result = net.url_read(manifest['result_url'], data=data) |
+ self._progress.update_item( |
+ '%d processed' % self._index, processing=-1, processed=1) |
+ if not result: |
+ self._events.put('result_url_fail') |
+ else: |
+ assert result == 'Successfully update the runner results.', result |
+ self._events.put(time.time() - start) |
+ finally: |
+ self._progress.update_item('%d quit' % self._index, bots=-1) |
+ |
+ |
+def main(): |
+ colorama.init() |
+ parser = optparse.OptionParser(description=sys.modules[__name__].__doc__) |
+ parser.add_option( |
+ '-S', '--swarming', |
+ metavar='URL', default='', |
+ help='Swarming server to use') |
+ |
+ group = optparse.OptionGroup(parser, 'Load generated') |
+ group.add_option( |
+ '--slaves', type='int', default=300, metavar='N', |
+ help='Number of swarm bot slaves, default: %default') |
+ group.add_option( |
+ '-c', '--consume', type='float', default=60., metavar='N', |
+ help='Duration (s) for consuming a request, default: %default') |
+ group.add_option( |
csharp
2013/10/03 16:08:04
As mentioned before, probably remove this since pi
Marc-Antoine Ruel (Google)
2013/10/03 16:47:11
Done.
|
+ '-p', '--ping', type='float', default=0.5, metavar='N', |
+ help='Ping delay (s) while consuming a request, normally, it would be in ' |
+ 'the range of 30s but this is a load test, default: %default') |
+ parser.add_option_group(group) |
+ |
+ group = optparse.OptionGroup(parser, 'Display options') |
+ group.add_option( |
+ '--columns', type='int', default=graph.get_console_width(), metavar='N', |
+ help='For histogram display, default:%default') |
+ group.add_option( |
+ '--buckets', type='int', default=20, metavar='N', |
+ help='Number of buckets for histogram display, default:%default') |
+ parser.add_option_group(group) |
+ |
+ parser.add_option( |
+ '--dump', metavar='FOO.JSON', help='Dumps to json file') |
+ parser.add_option( |
+ '-v', '--verbose', action='store_true', help='Enables logging') |
+ |
+ options, args = parser.parse_args() |
+ logging.basicConfig(level=logging.INFO if options.verbose else logging.FATAL) |
+ if args: |
+ parser.error('Unsupported args: %s' % args) |
+ options.swarming = options.swarming.rstrip('/') |
+ if not options.swarming: |
+ parser.error('--swarming is required.') |
+ if options.consume <= 0: |
+ parser.error('Needs --consume > 0. 0.01 is a valid value.') |
+ |
+ print( |
+ 'Running %d slaves, each task lasting %.1fs' % ( |
+ options.slaves, options.consume)) |
+ # Calculate the SHA-1 of the swarm_bot code. |
+ swarm_bot_hash = calculate_version(options.swarming + '/get_slave_code') |
csharp
2013/10/03 16:08:04
Nit: Move to just above line 287
Marc-Antoine Ruel (Google)
2013/10/03 16:47:11
Done.
|
+ |
+ print('Ctrl-C to exit.') |
+ print('[processing/processed/bots]') |
+ columns = [('processing', 0), ('processed', 0), ('bots', 0)] |
+ progress = threading_utils.Progress(columns) |
+ events = Queue.Queue() |
+ start = time.time() |
+ kill_event = threading_utils.Bit() |
+ slaves = [ |
+ FakeSwarmBot( |
+ options.swarming, swarm_bot_hash, i, progress, options.consume, |
+ options.ping, events, kill_event) |
+ for i in range(options.slaves) |
+ ] |
+ try: |
+ # Wait for all the slaves to come alive. |
+ while not all(s.is_alive() for s in slaves): |
+ time.sleep(0.01) |
+ progress.update_item('Ready to run') |
+ while slaves: |
+ progress.print_update() |
+ time.sleep(0.01) |
+ # The slaves could be told to suicide. |
csharp
2013/10/03 16:08:04
Nit: suicide-> die ("be told to suicide" just soun
Marc-Antoine Ruel (Google)
2013/10/03 16:47:11
Done.
|
+ slaves = [s for s in slaves if s.is_alive()] |
+ except KeyboardInterrupt: |
+ kill_event.set() |
+ |
+ progress.update_item('Waiting for slaves to quit.', raw=True) |
csharp
2013/10/03 16:08:04
It would be nice it when the fake bots went down t
Marc-Antoine Ruel (Google)
2013/10/03 16:47:11
Yep. I'd want this functionality for the Android s
csharp
2013/10/03 19:00:34
The machine just needs to call: "/delete_machine_s
Marc-Antoine Ruel (Google)
2013/10/03 19:15:29
Done.
|
+ progress.update_item('') |
+ while slaves: |
+ progress.print_update() |
+ slaves = [s for s in slaves if s.is_alive()] |
+ # At this point, progress is not used anymore. |
+ print('') |
+ print('Ran for %.1fs.' % (time.time() - start)) |
+ print('') |
+ results = events.queue |
+ print_results(results, options.columns, options.buckets) |
+ if options.dump: |
+ with open(options.dump, 'w') as f: |
+ json.dump(results, f, separators=(',',':')) |
+ return 0 |
+ |
+ |
+if __name__ == '__main__': |
+ sys.exit(main()) |