| OLD | NEW |
| 1 # Copyright 2013 The LUCI Authors. All rights reserved. | 1 # Copyright 2013 The LUCI Authors. All rights reserved. |
| 2 # Use of this source code is governed under the Apache License, Version 2.0 | 2 # Use of this source code is governed under the Apache License, Version 2.0 |
| 3 # that can be found in the LICENSE file. | 3 # that can be found in the LICENSE file. |
| 4 | 4 |
| 5 """Runs a Swarming task. | 5 """Runs a Swarming task. |
| 6 | 6 |
| 7 Downloads all the necessary files to run the task, executes the command and | 7 Downloads all the necessary files to run the task, executes the command and |
| 8 streams results back to the Swarming server. | 8 streams results back to the Swarming server. |
| 9 | 9 |
| 10 The process exit code is 0 when the task was executed, even if the task itself | 10 The process exit code is 0 when the task was executed, even if the task itself |
| (...skipping 10 matching lines...) Expand all Loading... |
| 21 import os | 21 import os |
| 22 import signal | 22 import signal |
| 23 import sys | 23 import sys |
| 24 import time | 24 import time |
| 25 | 25 |
| 26 from utils import net | 26 from utils import net |
| 27 from utils import on_error | 27 from utils import on_error |
| 28 from utils import subprocess42 | 28 from utils import subprocess42 |
| 29 from utils import zip_package | 29 from utils import zip_package |
| 30 | 30 |
| 31 import bot_auth |
| 32 import file_reader |
| 33 |
| 31 | 34 |
| 32 # Path to this file or the zip containing this file. | 35 # Path to this file or the zip containing this file. |
| 33 THIS_FILE = os.path.abspath(zip_package.get_main_script_path()) | 36 THIS_FILE = os.path.abspath(zip_package.get_main_script_path()) |
| 34 | 37 |
| 35 | 38 |
| 36 # Sends a maximum of 100kb of stdout per task_update packet. | 39 # Sends a maximum of 100kb of stdout per task_update packet. |
| 37 MAX_CHUNK_SIZE = 102400 | 40 MAX_CHUNK_SIZE = 102400 |
| 38 | 41 |
| 39 | 42 |
| 40 # Maximum wait between task_update packet when there's no output. | 43 # Maximum wait between task_update packet when there's no output. |
| (...skipping 134 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 175 work_dir = os.path.dirname(out_file) | 178 work_dir = os.path.dirname(out_file) |
| 176 try: | 179 try: |
| 177 with subprocess42.set_signal_handler([SIG_BREAK_OR_TERM], handler): | 180 with subprocess42.set_signal_handler([SIG_BREAK_OR_TERM], handler): |
| 178 if not os.path.isdir(work_dir): | 181 if not os.path.isdir(work_dir): |
| 179 raise ValueError('%s expected to exist' % work_dir) | 182 raise ValueError('%s expected to exist' % work_dir) |
| 180 | 183 |
| 181 with open(in_file, 'rb') as f: | 184 with open(in_file, 'rb') as f: |
| 182 task_details = TaskDetails(json.load(f)) | 185 task_details = TaskDetails(json.load(f)) |
| 183 | 186 |
| 184 task_result = run_command( | 187 task_result = run_command( |
| 185 swarming_server, task_details, work_dir, cost_usd_hour, start, | 188 swarming_server, task_details, work_dir, |
| 186 min_free_space) | 189 cost_usd_hour, start, min_free_space) |
| 187 except MustExit as e: | 190 except MustExit as e: |
| 188 # This normally means run_command() didn't get the chance to run, as it | 191 # This normally means run_command() didn't get the chance to run, as it |
| 189 # itself trap MustExit and will report accordingly. In this case, we want | 192 # itself trap MustExit and will report accordingly. In this case, we want |
| 190 # the parent process to send the message instead. | 193 # the parent process to send the message instead. |
| 191 if not task_result: | 194 if not task_result: |
| 192 task_result = { | 195 task_result = { |
| 193 u'exit_code': None, | 196 u'exit_code': None, |
| 194 u'hard_timeout': False, | 197 u'hard_timeout': False, |
| 195 u'io_timeout': False, | 198 u'io_timeout': False, |
| 196 u'must_signal_internal_failure': | 199 u'must_signal_internal_failure': |
| 197 u'task_runner received signal %s' % e.signal, | 200 u'task_runner received signal %s' % e.signal, |
| 198 u'version': OUT_VERSION, | 201 u'version': OUT_VERSION, |
| 199 } | 202 } |
| 200 finally: | 203 finally: |
| 201 # We've found tests to delete 'work' when quitting, causing an exception | 204 # We've found tests to delete 'work' when quitting, causing an exception |
| 202 # here. Try to recreate the directory if necessary. | 205 # here. Try to recreate the directory if necessary. |
| 203 if not os.path.isdir(work_dir): | 206 if not os.path.isdir(work_dir): |
| 204 os.mkdir(work_dir) | 207 os.mkdir(work_dir) |
| 205 with open(out_file, 'wb') as f: | 208 with open(out_file, 'wb') as f: |
| 206 json.dump(task_result, f) | 209 json.dump(task_result, f) |
| 207 | 210 |
| 208 | 211 |
| 209 def post_update(swarming_server, params, exit_code, stdout, output_chunk_start): | 212 def post_update( |
| 213 swarming_server, auth_headers, params, exit_code, |
| 214 stdout, output_chunk_start): |
| 210 """Posts task update to task_update. | 215 """Posts task update to task_update. |
| 211 | 216 |
| 212 Arguments: | 217 Arguments: |
| 213 swarming_server: Base URL to Swarming server. | 218 swarming_server: Base URL to Swarming server. |
| 219 auth_headers: dict with HTTP authentication headers. |
| 214 params: Default JSON parameters for the POST. | 220 params: Default JSON parameters for the POST. |
| 215 exit_code: Process exit code, only when a command completed. | 221 exit_code: Process exit code, only when a command completed. |
| 216 stdout: Incremental output since last call, if any. | 222 stdout: Incremental output since last call, if any. |
| 217 output_chunk_start: Total number of stdout previously sent, for coherency | 223 output_chunk_start: Total number of stdout previously sent, for coherency |
| 218 with the server. | 224 with the server. |
| 219 """ | 225 """ |
| 220 params = params.copy() | 226 params = params.copy() |
| 221 if exit_code is not None: | 227 if exit_code is not None: |
| 222 params['exit_code'] = exit_code | 228 params['exit_code'] = exit_code |
| 223 if stdout: | 229 if stdout: |
| 224 # The output_chunk_start is used by the server to make sure that the stdout | 230 # The output_chunk_start is used by the server to make sure that the stdout |
| 225 # chunks are processed and saved in the DB in order. | 231 # chunks are processed and saved in the DB in order. |
| 226 params['output'] = base64.b64encode(stdout) | 232 params['output'] = base64.b64encode(stdout) |
| 227 params['output_chunk_start'] = output_chunk_start | 233 params['output_chunk_start'] = output_chunk_start |
| 228 # TODO(maruel): Support early cancellation. | 234 # TODO(maruel): Support early cancellation. |
| 229 # https://code.google.com/p/swarming/issues/detail?id=62 | 235 # https://code.google.com/p/swarming/issues/detail?id=62 |
| 230 resp = net.url_read_json( | 236 resp = net.url_read_json( |
| 231 swarming_server+'/swarming/api/v1/bot/task_update/%s' % params['task_id'], | 237 swarming_server+'/swarming/api/v1/bot/task_update/%s' % params['task_id'], |
| 232 data=params) | 238 data=params, |
| 239 headers=auth_headers, |
| 240 follow_redirects=False) |
| 233 logging.debug('post_update() = %s', resp) | 241 logging.debug('post_update() = %s', resp) |
| 234 if not resp or resp.get('error'): | 242 if not resp or resp.get('error'): |
| 235 # Abandon it. This will force a process exit. | 243 # Abandon it. This will force a process exit. |
| 236 raise ValueError(resp.get('error') if resp else 'Failed to contact server') | 244 raise ValueError(resp.get('error') if resp else 'Failed to contact server') |
| 237 | 245 |
| 238 | 246 |
| 239 def should_post_update(stdout, now, last_packet): | 247 def should_post_update(stdout, now, last_packet): |
| 240 """Returns True if it's time to send a task_update packet via post_update(). | 248 """Returns True if it's time to send a task_update packet via post_update(). |
| 241 | 249 |
| 242 Sends a packet when one of this condition is met: | 250 Sends a packet when one of this condition is met: |
| (...skipping 31 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 274 try: | 282 try: |
| 275 proc.wait(grace_period) | 283 proc.wait(grace_period) |
| 276 except subprocess42.TimeoutError: | 284 except subprocess42.TimeoutError: |
| 277 logging.warning('SIGKILL finally due to %s', reason) | 285 logging.warning('SIGKILL finally due to %s', reason) |
| 278 proc.kill() | 286 proc.kill() |
| 279 exit_code = proc.wait() | 287 exit_code = proc.wait() |
| 280 logging.info('Waiting for proces exit in finally - done') | 288 logging.info('Waiting for proces exit in finally - done') |
| 281 return exit_code | 289 return exit_code |
| 282 | 290 |
| 283 | 291 |
| 292 def start_reading_headers(auth_params_file): |
| 293 """Spawns a thread that rereads headers from SWARMING_AUTH_PARAMS file. |
| 294 |
| 295 Returns: |
| 296 Tuple (callback that returns the last known headers, stop callback). |
| 297 |
| 298 Raises: |
| 299 file_reader.FatalReadError if headers file can't be read. |
| 300 ValueError if it can be read, but its body is invalid. |
| 301 """ |
| 302 # Read headers more often than bot_main writes them, to reduce maximum |
| 303 # possible latency between headers are updated and read. |
| 304 reader = file_reader.FileReaderThread(auth_params_file, interval_sec=30) |
| 305 |
| 306 def read_and_validate_headers(): |
| 307 val = bot_auth.process_auth_params_json(reader.last_value or {}) |
| 308 return val.swarming_http_headers |
| 309 |
| 310 reader.start() |
| 311 read_and_validate_headers() # initial validation, may raise ValueError |
| 312 return read_and_validate_headers, reader.stop |
| 313 |
| 314 |
| 284 def run_command( | 315 def run_command( |
| 285 swarming_server, task_details, work_dir, cost_usd_hour, task_start, | 316 swarming_server, task_details, work_dir, cost_usd_hour, |
| 286 min_free_space): | 317 task_start, min_free_space): |
| 287 """Runs a command and sends packets to the server to stream results back. | 318 """Runs a command and sends packets to the server to stream results back. |
| 288 | 319 |
| 289 Implements both I/O and hard timeouts. Sends the packets numbered, so the | 320 Implements both I/O and hard timeouts. Sends the packets numbered, so the |
| 290 server can ensure they are processed in order. | 321 server can ensure they are processed in order. |
| 291 | 322 |
| 292 Returns: | 323 Returns: |
| 293 Metadata about the command. | 324 Metadata about the command. |
| 294 """ | 325 """ |
| 295 # TODO(maruel): This function is incomprehensible, split and refactor. | 326 # TODO(maruel): This function is incomprehensible, split and refactor. |
| 327 |
| 328 # Grab initial auth headers and start rereading them in parallel thread. They |
| 329 # MUST be there already. It's fatal internal error if they are not. |
| 330 headers_cb = lambda: {} |
| 331 stop_headers_reader = lambda: None |
| 332 auth_params_file = os.environ.get('SWARMING_AUTH_PARAMS') |
| 333 if auth_params_file: |
| 334 try: |
| 335 headers_cb, stop_headers_reader = start_reading_headers(auth_params_file) |
| 336 except (ValueError, file_reader.FatalReadError) as e: |
| 337 return { |
| 338 u'exit_code': 1, |
| 339 u'hard_timeout': False, |
| 340 u'io_timeout': False, |
| 341 u'must_signal_internal_failure': str(e), |
| 342 u'version': OUT_VERSION, |
| 343 } |
| 344 |
| 296 # Signal the command is about to be started. | 345 # Signal the command is about to be started. |
| 297 last_packet = start = now = monotonic_time() | 346 last_packet = start = now = monotonic_time() |
| 298 params = { | 347 params = { |
| 299 'cost_usd': cost_usd_hour * (now - task_start) / 60. / 60., | 348 'cost_usd': cost_usd_hour * (now - task_start) / 60. / 60., |
| 300 'id': task_details.bot_id, | 349 'id': task_details.bot_id, |
| 301 'task_id': task_details.task_id, | 350 'task_id': task_details.task_id, |
| 302 } | 351 } |
| 303 post_update(swarming_server, params, None, '', 0) | 352 post_update(swarming_server, headers_cb(), params, None, '', 0) |
| 304 | 353 |
| 305 isolated_result = os.path.join(work_dir, 'isolated_result.json') | 354 isolated_result = os.path.join(work_dir, 'isolated_result.json') |
| 306 cmd = get_isolated_cmd( | 355 cmd = get_isolated_cmd( |
| 307 work_dir, task_details, isolated_result, min_free_space) | 356 work_dir, task_details, isolated_result, min_free_space) |
| 308 # Hard timeout enforcement is deferred to run_isolated. Grace is doubled to | 357 # Hard timeout enforcement is deferred to run_isolated. Grace is doubled to |
| 309 # give one 'grace_period' slot to the child process and one slot to upload | 358 # give one 'grace_period' slot to the child process and one slot to upload |
| 310 # the results back. | 359 # the results back. |
| 311 task_details.hard_timeout = 0 | 360 task_details.hard_timeout = 0 |
| 312 if task_details.grace_period: | 361 if task_details.grace_period: |
| 313 task_details.grace_period *= 2 | 362 task_details.grace_period *= 2 |
| (...skipping 21 matching lines...) Expand all Loading... |
| 335 stdout=subprocess42.PIPE, | 384 stdout=subprocess42.PIPE, |
| 336 stderr=subprocess42.STDOUT, | 385 stderr=subprocess42.STDOUT, |
| 337 stdin=subprocess42.PIPE) | 386 stdin=subprocess42.PIPE) |
| 338 except OSError as e: | 387 except OSError as e: |
| 339 stdout = 'Command "%s" failed to start.\nError: %s' % (' '.join(cmd), e) | 388 stdout = 'Command "%s" failed to start.\nError: %s' % (' '.join(cmd), e) |
| 340 now = monotonic_time() | 389 now = monotonic_time() |
| 341 params['cost_usd'] = cost_usd_hour * (now - task_start) / 60. / 60. | 390 params['cost_usd'] = cost_usd_hour * (now - task_start) / 60. / 60. |
| 342 params['duration'] = now - start | 391 params['duration'] = now - start |
| 343 params['io_timeout'] = False | 392 params['io_timeout'] = False |
| 344 params['hard_timeout'] = False | 393 params['hard_timeout'] = False |
| 345 post_update(swarming_server, params, 1, stdout, 0) | 394 post_update(swarming_server, headers_cb(), params, 1, stdout, 0) |
| 346 return { | 395 return { |
| 347 u'exit_code': 1, | 396 u'exit_code': 1, |
| 348 u'hard_timeout': False, | 397 u'hard_timeout': False, |
| 349 u'io_timeout': False, | 398 u'io_timeout': False, |
| 350 u'must_signal_internal_failure': None, | 399 u'must_signal_internal_failure': None, |
| 351 u'version': OUT_VERSION, | 400 u'version': OUT_VERSION, |
| 352 } | 401 } |
| 353 | 402 |
| 354 output_chunk_start = 0 | 403 output_chunk_start = 0 |
| 355 stdout = '' | 404 stdout = '' |
| (...skipping 11 matching lines...) Expand all Loading... |
| 367 now = monotonic_time() | 416 now = monotonic_time() |
| 368 if new_data: | 417 if new_data: |
| 369 stdout += new_data | 418 stdout += new_data |
| 370 last_io = now | 419 last_io = now |
| 371 | 420 |
| 372 # Post update if necessary. | 421 # Post update if necessary. |
| 373 if should_post_update(stdout, now, last_packet): | 422 if should_post_update(stdout, now, last_packet): |
| 374 last_packet = monotonic_time() | 423 last_packet = monotonic_time() |
| 375 params['cost_usd'] = ( | 424 params['cost_usd'] = ( |
| 376 cost_usd_hour * (last_packet - task_start) / 60. / 60.) | 425 cost_usd_hour * (last_packet - task_start) / 60. / 60.) |
| 377 post_update(swarming_server, params, None, stdout, output_chunk_start) | 426 post_update( |
| 427 swarming_server, headers_cb(), params, None, |
| 428 stdout, output_chunk_start) |
| 378 output_chunk_start += len(stdout) | 429 output_chunk_start += len(stdout) |
| 379 stdout = '' | 430 stdout = '' |
| 380 | 431 |
| 381 # Send signal on timeout if necessary. Both are failures, not | 432 # Send signal on timeout if necessary. Both are failures, not |
| 382 # internal_failures. | 433 # internal_failures. |
| 383 # Eventually kill but return 0 so bot_main.py doesn't cancel the task. | 434 # Eventually kill but return 0 so bot_main.py doesn't cancel the task. |
| 384 if not timed_out: | 435 if not timed_out: |
| 385 if (task_details.io_timeout and | 436 if (task_details.io_timeout and |
| 386 now - last_io > task_details.io_timeout): | 437 now - last_io > task_details.io_timeout): |
| 387 had_io_timeout = True | 438 had_io_timeout = True |
| (...skipping 85 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 473 params['isolated_stats'] = isolated_stats | 524 params['isolated_stats'] = isolated_stats |
| 474 except (IOError, OSError, ValueError) as e: | 525 except (IOError, OSError, ValueError) as e: |
| 475 logging.error('Swallowing error: %s', e) | 526 logging.error('Swallowing error: %s', e) |
| 476 if not must_signal_internal_failure: | 527 if not must_signal_internal_failure: |
| 477 must_signal_internal_failure = str(e) | 528 must_signal_internal_failure = str(e) |
| 478 # TODO(maruel): Send the internal failure here instead of sending it through | 529 # TODO(maruel): Send the internal failure here instead of sending it through |
| 479 # bot_main, this causes a race condition. | 530 # bot_main, this causes a race condition. |
| 480 if exit_code is None: | 531 if exit_code is None: |
| 481 exit_code = -1 | 532 exit_code = -1 |
| 482 params['hard_timeout'] = had_hard_timeout | 533 params['hard_timeout'] = had_hard_timeout |
| 483 post_update(swarming_server, params, exit_code, stdout, output_chunk_start) | 534 post_update( |
| 535 swarming_server, headers_cb(), params, exit_code, |
| 536 stdout, output_chunk_start) |
| 484 return { | 537 return { |
| 485 u'exit_code': exit_code, | 538 u'exit_code': exit_code, |
| 486 u'hard_timeout': had_hard_timeout, | 539 u'hard_timeout': had_hard_timeout, |
| 487 u'io_timeout': had_io_timeout, | 540 u'io_timeout': had_io_timeout, |
| 488 u'must_signal_internal_failure': must_signal_internal_failure, | 541 u'must_signal_internal_failure': must_signal_internal_failure, |
| 489 u'version': OUT_VERSION, | 542 u'version': OUT_VERSION, |
| 490 } | 543 } |
| 491 finally: | 544 finally: |
| 492 try: | 545 try: |
| 493 os.remove(isolated_result) | 546 os.remove(isolated_result) |
| 494 except OSError: | 547 except OSError: |
| 495 pass | 548 pass |
| 549 stop_headers_reader() |
| 496 | 550 |
| 497 | 551 |
| 498 def main(args): | 552 def main(args): |
| 499 subprocess42.inhibit_os_error_reporting() | 553 subprocess42.inhibit_os_error_reporting() |
| 500 parser = optparse.OptionParser(description=sys.modules[__name__].__doc__) | 554 parser = optparse.OptionParser(description=sys.modules[__name__].__doc__) |
| 501 parser.add_option('--in-file', help='Name of the request file') | 555 parser.add_option('--in-file', help='Name of the request file') |
| 502 parser.add_option( | 556 parser.add_option( |
| 503 '--out-file', help='Name of the JSON file to write a task summary to') | 557 '--out-file', help='Name of the JSON file to write a task summary to') |
| 504 parser.add_option( | 558 parser.add_option( |
| 505 '--swarming-server', help='Swarming server to send data back') | 559 '--swarming-server', help='Swarming server to send data back') |
| (...skipping 15 matching lines...) Expand all Loading... |
| 521 if options.start > now: | 575 if options.start > now: |
| 522 options.start = now | 576 options.start = now |
| 523 | 577 |
| 524 try: | 578 try: |
| 525 load_and_run( | 579 load_and_run( |
| 526 options.in_file, options.swarming_server, options.cost_usd_hour, | 580 options.in_file, options.swarming_server, options.cost_usd_hour, |
| 527 options.start, options.out_file, options.min_free_space) | 581 options.start, options.out_file, options.min_free_space) |
| 528 return 0 | 582 return 0 |
| 529 finally: | 583 finally: |
| 530 logging.info('quitting') | 584 logging.info('quitting') |
| OLD | NEW |