| OLD | NEW |
| (Empty) | |
| 1 #!/usr/bin/env python |
| 2 # Copyright 2014 The Chromium Authors. All rights reserved. |
| 3 # Use of this source code is governed by a BSD-style license that can be |
| 4 # found in the LICENSE file. |
| 5 |
| 6 """Drives tests on Swarming. Both trigger and collect results. |
| 7 |
| 8 This is the shim that is called through buildbot. |
| 9 """ |
| 10 |
| 11 import logging |
| 12 import optparse |
| 13 import os |
| 14 import subprocess |
| 15 import sys |
| 16 import threading |
| 17 import Queue |
| 18 |
| 19 from common import chromium_utils |
| 20 from common import find_depot_tools # pylint: disable=W0611 |
| 21 |
| 22 from common import annotator |
| 23 from slave.swarming import swarming_utils |
| 24 |
| 25 # From depot tools/ |
| 26 import fix_encoding |
| 27 |
| 28 |
| 29 def v0_3( |
| 30 client, swarming_server, isolate_server, priority, dimensions, |
| 31 task_name, isolated_hash, env, shards): |
| 32 """Handles swarm_client/swarming.py starting 7c543276f08. |
| 33 |
| 34 It was rolled in src on r237619 on 2013-11-27. |
| 35 """ |
| 36 cmd = [ |
| 37 sys.executable, |
| 38 os.path.join(client, 'swarming.py'), |
| 39 'run', |
| 40 '--swarming', swarming_server, |
| 41 '--isolate-server', isolate_server, |
| 42 '--priority', str(priority), |
| 43 '--shards', str(shards), |
| 44 '--task-name', task_name, |
| 45 isolated_hash, |
| 46 ] |
| 47 for name, value in dimensions.iteritems(): |
| 48 if name != 'os': |
| 49 cmd.extend(('--dimension', name, value)) |
| 50 else: |
| 51 # Sadly, older version of swarming.py need special handling of os. |
| 52 old_value = [ |
| 53 k for k, v in swarming_utils.OS_MAPPING.iteritems() if v == value |
| 54 ] |
| 55 assert len(old_value) == 1 |
| 56 cmd.extend(('--os', old_value[0])) |
| 57 |
| 58 # Enable profiling on the -dev server. |
| 59 if '-dev' in swarming_server: |
| 60 cmd.append('--profile') |
| 61 for name, value in env.iteritems(): |
| 62 cmd.extend(('--env', name, value)) |
| 63 return cmd |
| 64 |
| 65 |
| 66 def v0_4( |
| 67 client, swarming_server, isolate_server, priority, dimensions, |
| 68 task_name, isolated_hash, env, shards): |
| 69 """Handles swarm_client/swarming.py starting b39e8cf08c. |
| 70 |
| 71 It was rolled in src on r246113 on 2014-01-21. |
| 72 """ |
| 73 cmd = [ |
| 74 sys.executable, |
| 75 os.path.join(client, 'swarming.py'), |
| 76 'run', |
| 77 '--swarming', swarming_server, |
| 78 '--isolate-server', isolate_server, |
| 79 '--priority', str(priority), |
| 80 '--shards', str(shards), |
| 81 '--task-name', task_name, |
| 82 isolated_hash, |
| 83 ] |
| 84 for name, value in dimensions.iteritems(): |
| 85 cmd.extend(('--dimension', name, value)) |
| 86 # Enable profiling on the -dev server. |
| 87 if '-dev' in swarming_server: |
| 88 cmd.append('--profile') |
| 89 for name, value in env.iteritems(): |
| 90 cmd.extend(('--env', name, value)) |
| 91 return cmd |
| 92 |
| 93 |
| 94 def stream_process(cmd): |
| 95 """Calls process cmd and yields its output. |
| 96 |
| 97 This is not the most efficient nor safe way to do it but it is only meant to |
| 98 be run on linux so it should be fine. Fix if necessary. |
| 99 """ |
| 100 p = subprocess.Popen( |
| 101 cmd, bufsize=1, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) |
| 102 try: |
| 103 while True: |
| 104 try: |
| 105 i = p.stdout.readline() |
| 106 if i: |
| 107 yield i |
| 108 continue |
| 109 except OSError: |
| 110 pass |
| 111 if p.poll() is not None: |
| 112 break |
| 113 yield p.returncode |
| 114 finally: |
| 115 if p.poll() is None: |
| 116 p.kill() |
| 117 |
| 118 |
| 119 def drive_one( |
| 120 client, version, swarming_server, isolate_server, priority, dimensions, |
| 121 task_name, isolated_hash, env, shards, out): |
| 122 """Executes the proper handler based on the code layout and --version support. |
| 123 """ |
| 124 def send_back(l): |
| 125 out.put((task_name, l)) |
| 126 if version < (0, 4): |
| 127 cmd = v0_3( |
| 128 client, swarming_server, isolate_server, priority, dimensions, |
| 129 task_name, isolated_hash, env, shards) |
| 130 else: |
| 131 cmd = v0_4( |
| 132 client, swarming_server, isolate_server, priority, dimensions, |
| 133 task_name, isolated_hash, env, shards) |
| 134 try: |
| 135 for i in stream_process(cmd): |
| 136 send_back(i) |
| 137 except Exception as e: |
| 138 send_back(e) |
| 139 |
| 140 |
| 141 def drive_many( |
| 142 client, version, swarming_server, isolate_server, priority, dimensions, |
| 143 steps): |
| 144 logging.info( |
| 145 'drive_many(%s, %s, %s, %s, %s, %s, %s)', |
| 146 client, version, swarming_server, isolate_server, priority, dimensions, |
| 147 steps) |
| 148 return _drive_many( |
| 149 client, version, swarming_server, isolate_server, priority, dimensions, |
| 150 steps, Queue.Queue()) |
| 151 |
| 152 |
| 153 def step_name_to_cursor(x): |
| 154 """The cursor is buildbot's step name. It is only the base test name for |
| 155 simplicity. |
| 156 |
| 157 But the swarming task name is longer, it is |
| 158 "<name>/<dimensions>/<isolated hash>". |
| 159 """ |
| 160 return x.split('/', 1)[0] |
| 161 |
| 162 |
| 163 def _drive_many( |
| 164 client, version, swarming_server, isolate_server, priority, dimensions, |
| 165 steps, out): |
| 166 """Internal version, exposed so it can be hooked in test.""" |
| 167 stream = annotator.AdvancedAnnotationStream(sys.stdout, False) |
| 168 for step_name in sorted(steps): |
| 169 # Seeds the step first before doing the cursors otherwise it is interleaved |
| 170 # in the logs of other steps. |
| 171 stream.seed_step(step_name) |
| 172 |
| 173 threads = [] |
| 174 # Create the boxes in buildbot in order for consistency. |
| 175 steps_annotations = {} |
| 176 for step_name, isolated_hash in sorted(steps.iteritems()): |
| 177 env = {} |
| 178 # TODO(maruel): Propagate GTEST_FILTER. |
| 179 #if gtest_filter not in (None, '', '.', '*'): |
| 180 # env['GTEST_FILTER'] = gtest_filter |
| 181 shards = swarming_utils.TESTS_SHARDS.get(step_name, 1) |
| 182 # This will be the key in steps_annotations. |
| 183 task_name = '%s/%s/%s' % (step_name, dimensions['os'], isolated_hash) |
| 184 t = threading.Thread( |
| 185 target=drive_one, |
| 186 args=(client, version, swarming_server, isolate_server, priority, |
| 187 dimensions, task_name, isolated_hash, env, shards, out)) |
| 188 t.daemon = True |
| 189 t.start() |
| 190 threads.append(t) |
| 191 steps_annotations[task_name] = annotator.AdvancedAnnotationStep( |
| 192 sys.stdout, False) |
| 193 items = task_name.split('/', 2) |
| 194 assert step_name == items[0] |
| 195 assert step_name == step_name_to_cursor(task_name) |
| 196 # It is important data to surface through buildbot. |
| 197 stream.step_cursor(step_name) |
| 198 steps_annotations[task_name].step_text(items[1]) |
| 199 steps_annotations[task_name].step_text(items[2]) |
| 200 collect(stream, steps_annotations, out) |
| 201 return 0 |
| 202 |
| 203 |
| 204 def collect(stream, steps_annotations, out): |
| 205 last_cursor = None |
| 206 while steps_annotations: |
| 207 try: |
| 208 # Polling FTW. |
| 209 packet = out.get(timeout=1) |
| 210 except Queue.Empty: |
| 211 continue |
| 212 task_name, item = packet |
| 213 if last_cursor != task_name: |
| 214 stream.step_cursor(step_name_to_cursor(task_name)) |
| 215 last_cursor = task_name |
| 216 if isinstance(item, int): |
| 217 # Signals it's completed. |
| 218 if item: |
| 219 steps_annotations[task_name].step_failure() |
| 220 steps_annotations[task_name].step_closed() |
| 221 del steps_annotations[task_name] |
| 222 last_cursor = None |
| 223 elif isinstance(item, Exception): |
| 224 print >> sys.stderr, item |
| 225 steps_annotations[task_name].step_failure() |
| 226 steps_annotations[task_name].step_close() |
| 227 del steps_annotations[task_name] |
| 228 last_cursor = None |
| 229 else: |
| 230 assert isinstance(item, str), item |
| 231 sys.stdout.write(item) |
| 232 out.task_done() |
| 233 |
| 234 |
| 235 def determine_steps_to_run(isolated_hashes, default_swarming_tests, testfilter): |
| 236 """Returns a dict of test:hash for the test that should be run through |
| 237 Swarming. |
| 238 |
| 239 This is done by looking at the build properties to figure out what should be |
| 240 run. |
| 241 """ |
| 242 logging.info( |
| 243 'determine_steps_to_run(%s, %s, %s)', |
| 244 isolated_hashes, default_swarming_tests, testfilter) |
| 245 # TODO(maruel): Support gtest filter. |
| 246 def should_run(name): |
| 247 return ( |
| 248 ((name in default_swarming_tests or not default_swarming_tests) and |
| 249 'defaulttests' in testfilter) or |
| 250 (name + '_swarm' in testfilter)) |
| 251 |
| 252 return dict( |
| 253 (name, isolated_hash) |
| 254 for name, isolated_hash in isolated_hashes.iteritems() |
| 255 if should_run(name)) |
| 256 |
| 257 |
| 258 def process_build_properties(options): |
| 259 """Converts build properties and factory properties into expected flags.""" |
| 260 # target_os is not defined when using a normal builder, contrary to a |
| 261 # xx_swarm_triggered buildbot<->swarming builder, and it's not needed since |
| 262 # the OS match, it's defined in builder/tester configurations. |
| 263 slave_os = options.build_properties.get('target_os', sys.platform) |
| 264 priority = swarming_utils.build_to_priority(options.build_properties) |
| 265 steps = determine_steps_to_run( |
| 266 options.build_properties.get('swarm_hashes', {}), |
| 267 options.build_properties.get('run_default_swarm_tests', []), |
| 268 options.build_properties.get('testfilter', ['defaulttests'])) |
| 269 return slave_os, priority, steps |
| 270 |
| 271 |
| 272 def main(args): |
| 273 """Note: this is solely to run the current master's code and can totally |
| 274 differ from the underlying script flags. |
| 275 |
| 276 To update these flags: |
| 277 - Update the following code to support both the previous flag and the new |
| 278 flag. |
| 279 - Change scripts/master/factory/swarm_commands.py to pass the new flag. |
| 280 - Restart all the masters using swarming. |
| 281 - Remove the old flag from this code. |
| 282 """ |
| 283 client = swarming_utils.find_client(os.getcwd()) |
| 284 if not client: |
| 285 print >> sys.stderr, 'Failed to find swarm(ing)_client' |
| 286 return 1 |
| 287 version = swarming_utils.get_version(client) |
| 288 if version < (0, 3): |
| 289 print >> sys.stderr, ( |
| 290 '%s is version %s which is too old. Please run the test locally' % |
| 291 (client, '.'.join(version))) |
| 292 return 1 |
| 293 |
| 294 parser = optparse.OptionParser(description=sys.modules[__name__].__doc__) |
| 295 parser.add_option('--verbose', action='store_true') |
| 296 parser.add_option('--swarming') |
| 297 parser.add_option('--isolate-server') |
| 298 chromium_utils.AddPropertiesOptions(parser) |
| 299 options, args = parser.parse_args(args) |
| 300 if args: |
| 301 parser.error('Unsupported args: %s' % args) |
| 302 if not options.swarming or not options.isolate_server: |
| 303 parser.error('Require both --swarming and --isolate-server') |
| 304 |
| 305 logging.basicConfig(level=logging.DEBUG if options.verbose else logging.ERROR) |
| 306 # Loads the other flags implicitly. |
| 307 slave_os, priority, steps = process_build_properties(options) |
| 308 logging.info('To run: %s, %s, %s', slave_os, priority, steps) |
| 309 if not steps: |
| 310 print('Nothing to trigger') |
| 311 annotator.AdvancedAnnotationStep(sys.stdout, False).step_warnings() |
| 312 return 0 |
| 313 print('Selected tests:') |
| 314 print('\n'.join(' %s' % s for s in sorted(steps))) |
| 315 selected_os = swarming_utils.OS_MAPPING[slave_os] |
| 316 print('Selected OS: %s' % selected_os) |
| 317 return drive_many( |
| 318 client, |
| 319 version, |
| 320 options.swarming, |
| 321 options.isolate_server, |
| 322 priority, |
| 323 {'os': selected_os}, |
| 324 steps) |
| 325 |
| 326 |
| 327 if __name__ == '__main__': |
| 328 fix_encoding.fix_encoding() |
| 329 sys.exit(main(sys.argv[1:])) |
| OLD | NEW |