OLD | NEW |
---|---|
1 # Copyright 2013 The LUCI Authors. All rights reserved. | 1 # Copyright 2013 The LUCI Authors. All rights reserved. |
2 # Use of this source code is governed under the Apache License, Version 2.0 | 2 # Use of this source code is governed under the Apache License, Version 2.0 |
3 # that can be found in the LICENSE file. | 3 # that can be found in the LICENSE file. |
4 | 4 |
5 """Runs a Swarming task. | 5 """Runs a Swarming task. |
6 | 6 |
7 Downloads all the necessary files to run the task, executes the command and | 7 Downloads all the necessary files to run the task, executes the command and |
8 streams results back to the Swarming server. | 8 streams results back to the Swarming server. |
9 | 9 |
10 The process exit code is 0 when the task was executed, even if the task itself | 10 The process exit code is 0 when the task was executed, even if the task itself |
(...skipping 139 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
150 | 150 |
151 | 151 |
152 class MustExit(Exception): | 152 class MustExit(Exception): |
153 """Raised on signal that the process must exit immediately.""" | 153 """Raised on signal that the process must exit immediately.""" |
154 def __init__(self, sig): | 154 def __init__(self, sig): |
155 super(MustExit, self).__init__() | 155 super(MustExit, self).__init__() |
156 self.signal = sig | 156 self.signal = sig |
157 | 157 |
158 | 158 |
159 def load_and_run( | 159 def load_and_run( |
160 in_file, swarming_server, cost_usd_hour, start, out_file, min_free_space): | 160 in_file, swarming_server, auth_headers_file, cost_usd_hour, start, |
161 out_file, min_free_space): | |
161 """Loads the task's metadata and execute it. | 162 """Loads the task's metadata and execute it. |
162 | 163 |
163 This may throw all sorts of exceptions in case of failure. It's up to the | 164 This may throw all sorts of exceptions in case of failure. It's up to the |
164 caller to trap them. These shall be considered 'internal_failure' instead of | 165 caller to trap them. These shall be considered 'internal_failure' instead of |
165 'failure' from a TaskRunResult standpoint. | 166 'failure' from a TaskRunResult standpoint. |
166 """ | 167 """ |
167 # The work directory is guaranteed to exist since it was created by | 168 # The work directory is guaranteed to exist since it was created by |
168 # bot_main.py and contains the manifest. Temporary files will be downloaded | 169 # bot_main.py and contains the manifest. Temporary files will be downloaded |
169 # there. It's bot_main.py that will delete the directory afterward. Tests are | 170 # there. It's bot_main.py that will delete the directory afterward. Tests are |
170 # not run from there. | 171 # not run from there. |
171 task_result = None | 172 task_result = None |
172 def handler(sig, _): | 173 def handler(sig, _): |
173 logging.info('Got signal %s', sig) | 174 logging.info('Got signal %s', sig) |
174 raise MustExit(sig) | 175 raise MustExit(sig) |
175 work_dir = os.path.dirname(out_file) | 176 work_dir = os.path.dirname(out_file) |
176 try: | 177 try: |
177 with subprocess42.set_signal_handler([SIG_BREAK_OR_TERM], handler): | 178 with subprocess42.set_signal_handler([SIG_BREAK_OR_TERM], handler): |
178 if not os.path.isdir(work_dir): | 179 if not os.path.isdir(work_dir): |
179 raise ValueError('%s expected to exist' % work_dir) | 180 raise ValueError('%s expected to exist' % work_dir) |
180 | 181 |
181 with open(in_file, 'rb') as f: | 182 with open(in_file, 'rb') as f: |
182 task_details = TaskDetails(json.load(f)) | 183 task_details = TaskDetails(json.load(f)) |
183 | 184 |
184 task_result = run_command( | 185 task_result = run_command( |
185 swarming_server, task_details, work_dir, cost_usd_hour, start, | 186 swarming_server, task_details, work_dir, auth_headers_file, |
186 min_free_space) | 187 cost_usd_hour, start, min_free_space) |
187 except MustExit as e: | 188 except MustExit as e: |
188 # This normally means run_command() didn't get the chance to run, as it | 189 # This normally means run_command() didn't get the chance to run, as it |
189 # itself trap MustExit and will report accordingly. In this case, we want | 190 # itself trap MustExit and will report accordingly. In this case, we want |
190 # the parent process to send the message instead. | 191 # the parent process to send the message instead. |
191 if not task_result: | 192 if not task_result: |
192 task_result = { | 193 task_result = { |
193 u'exit_code': None, | 194 u'exit_code': None, |
194 u'hard_timeout': False, | 195 u'hard_timeout': False, |
195 u'io_timeout': False, | 196 u'io_timeout': False, |
196 u'must_signal_internal_failure': | 197 u'must_signal_internal_failure': |
197 u'task_runner received signal %s' % e.signal, | 198 u'task_runner received signal %s' % e.signal, |
198 u'version': OUT_VERSION, | 199 u'version': OUT_VERSION, |
199 } | 200 } |
200 finally: | 201 finally: |
201 # We've found tests to delete 'work' when quitting, causing an exception | 202 # We've found tests to delete 'work' when quitting, causing an exception |
202 # here. Try to recreate the directory if necessary. | 203 # here. Try to recreate the directory if necessary. |
203 if not os.path.isdir(work_dir): | 204 if not os.path.isdir(work_dir): |
204 os.mkdir(work_dir) | 205 os.mkdir(work_dir) |
205 with open(out_file, 'wb') as f: | 206 with open(out_file, 'wb') as f: |
206 json.dump(task_result, f) | 207 json.dump(task_result, f) |
207 | 208 |
208 | 209 |
209 def post_update(swarming_server, params, exit_code, stdout, output_chunk_start): | 210 def post_update( |
211 swarming_server, auth_headers_file, params, exit_code, | |
212 stdout, output_chunk_start): | |
210 """Posts task update to task_update. | 213 """Posts task update to task_update. |
211 | 214 |
212 Arguments: | 215 Arguments: |
213 swarming_server: Base URL to Swarming server. | 216 swarming_server: Base URL to Swarming server. |
217 auth_headers_file: file to read HTTP authentication headers from. | |
214 params: Default JSON parameters for the POST. | 218 params: Default JSON parameters for the POST. |
215 exit_code: Process exit code, only when a command completed. | 219 exit_code: Process exit code, only when a command completed. |
216 stdout: Incremental output since last call, if any. | 220 stdout: Incremental output since last call, if any. |
217 output_chunk_start: Total number of stdout previously sent, for coherency | 221 output_chunk_start: Total number of stdout previously sent, for coherency |
218 with the server. | 222 with the server. |
219 """ | 223 """ |
220 params = params.copy() | 224 params = params.copy() |
221 if exit_code is not None: | 225 if exit_code is not None: |
222 params['exit_code'] = exit_code | 226 params['exit_code'] = exit_code |
223 if stdout: | 227 if stdout: |
224 # The output_chunk_start is used by the server to make sure that the stdout | 228 # The output_chunk_start is used by the server to make sure that the stdout |
225 # chunks are processed and saved in the DB in order. | 229 # chunks are processed and saved in the DB in order. |
226 params['output'] = base64.b64encode(stdout) | 230 params['output'] = base64.b64encode(stdout) |
227 params['output_chunk_start'] = output_chunk_start | 231 params['output_chunk_start'] = output_chunk_start |
232 headers = read_auth_headers(auth_headers_file) if auth_headers_file else {} | |
228 # TODO(maruel): Support early cancellation. | 233 # TODO(maruel): Support early cancellation. |
229 # https://code.google.com/p/swarming/issues/detail?id=62 | 234 # https://code.google.com/p/swarming/issues/detail?id=62 |
230 resp = net.url_read_json( | 235 resp = net.url_read_json( |
231 swarming_server+'/swarming/api/v1/bot/task_update/%s' % params['task_id'], | 236 swarming_server+'/swarming/api/v1/bot/task_update/%s' % params['task_id'], |
232 data=params) | 237 data=params, |
238 headers=headers, | |
239 follow_redirects=False) | |
233 logging.debug('post_update() = %s', resp) | 240 logging.debug('post_update() = %s', resp) |
234 if not resp or resp.get('error'): | 241 if not resp or resp.get('error'): |
235 # Abandon it. This will force a process exit. | 242 # Abandon it. This will force a process exit. |
236 raise ValueError(resp.get('error') if resp else 'Failed to contact server') | 243 raise ValueError(resp.get('error') if resp else 'Failed to contact server') |
237 | 244 |
238 | 245 |
239 def should_post_update(stdout, now, last_packet): | 246 def should_post_update(stdout, now, last_packet): |
240 """Returns True if it's time to send a task_update packet via post_update(). | 247 """Returns True if it's time to send a task_update packet via post_update(). |
241 | 248 |
242 Sends a packet when one of this condition is met: | 249 Sends a packet when one of this condition is met: |
243 - more than MAX_CHUNK_SIZE of stdout is buffered. | 250 - more than MAX_CHUNK_SIZE of stdout is buffered. |
244 - last packet was sent more than MIN_PACKET_INTERNAL seconds ago and there was | 251 - last packet was sent more than MIN_PACKET_INTERNAL seconds ago and there was |
245 stdout. | 252 stdout. |
246 - last packet was sent more than MAX_PACKET_INTERVAL seconds ago. | 253 - last packet was sent more than MAX_PACKET_INTERVAL seconds ago. |
247 """ | 254 """ |
248 packet_interval = MIN_PACKET_INTERNAL if stdout else MAX_PACKET_INTERVAL | 255 packet_interval = MIN_PACKET_INTERNAL if stdout else MAX_PACKET_INTERVAL |
249 return len(stdout) >= MAX_CHUNK_SIZE or (now - last_packet) > packet_interval | 256 return len(stdout) >= MAX_CHUNK_SIZE or (now - last_packet) > packet_interval |
250 | 257 |
251 | 258 |
259 def read_auth_headers(path): | |
260 """Reads authentication headers from the given file. | |
261 | |
262 The file is kept up-to-date by the main bot process (see AuthHeadersDumper in | |
263 bot_main.py). | |
264 | |
265 Retries a bunch of times on errors to workaround Windows file locking issues. | |
266 If it fails to read the file even after a bunch of retries, raises ValueError | |
267 that eventually aborts the task (since we can't run it without | |
268 authentication). | |
269 """ | |
270 attempts = 100 | |
271 while True: | |
272 try: | |
273 with open(path, 'rb') as f: | |
274 headers = json.load(f) | |
275 if not isinstance(headers, dict): | |
276 raise ValueError('Expecting dict, got %r' % (headers,)) | |
277 # The headers are ASCII for sure, so don't bother with picking the | |
278 # correct unicode encoding, default would work. | |
279 return {str(k): str(v) for k, v in headers.iteritems()} | |
280 except (OSError, IOError, ValueError) as e: | |
281 last_error = 'Failed to read auth headers from %s: %s' % (path, e) | |
282 attempts -= 1 | |
283 if not attempts: | |
284 raise ValueError(last_error) | |
Vadim Sh.
2016/06/03 01:15:32
ValueError is what's being thrown and caught by ot
| |
285 time.sleep(0.05) | |
286 | |
287 | |
252 def calc_yield_wait(task_details, start, last_io, timed_out, stdout): | 288 def calc_yield_wait(task_details, start, last_io, timed_out, stdout): |
253 """Calculates the maximum number of seconds to wait in yield_any().""" | 289 """Calculates the maximum number of seconds to wait in yield_any().""" |
254 now = monotonic_time() | 290 now = monotonic_time() |
255 if timed_out: | 291 if timed_out: |
256 # Give a |grace_period| seconds delay. | 292 # Give a |grace_period| seconds delay. |
257 if task_details.grace_period: | 293 if task_details.grace_period: |
258 return max(now - timed_out - task_details.grace_period, 0.) | 294 return max(now - timed_out - task_details.grace_period, 0.) |
259 return 0. | 295 return 0. |
260 | 296 |
261 out = MIN_PACKET_INTERNAL if stdout else MAX_PACKET_INTERVAL | 297 out = MIN_PACKET_INTERNAL if stdout else MAX_PACKET_INTERVAL |
(...skipping 13 matching lines...) Expand all Loading... | |
275 proc.wait(grace_period) | 311 proc.wait(grace_period) |
276 except subprocess42.TimeoutError: | 312 except subprocess42.TimeoutError: |
277 logging.warning('SIGKILL finally due to %s', reason) | 313 logging.warning('SIGKILL finally due to %s', reason) |
278 proc.kill() | 314 proc.kill() |
279 exit_code = proc.wait() | 315 exit_code = proc.wait() |
280 logging.info('Waiting for proces exit in finally - done') | 316 logging.info('Waiting for proces exit in finally - done') |
281 return exit_code | 317 return exit_code |
282 | 318 |
283 | 319 |
284 def run_command( | 320 def run_command( |
285 swarming_server, task_details, work_dir, cost_usd_hour, task_start, | 321 swarming_server, task_details, work_dir, auth_headers_file, cost_usd_hour, |
286 min_free_space): | 322 task_start, min_free_space): |
287 """Runs a command and sends packets to the server to stream results back. | 323 """Runs a command and sends packets to the server to stream results back. |
288 | 324 |
289 Implements both I/O and hard timeouts. Sends the packets numbered, so the | 325 Implements both I/O and hard timeouts. Sends the packets numbered, so the |
290 server can ensure they are processed in order. | 326 server can ensure they are processed in order. |
291 | 327 |
292 Returns: | 328 Returns: |
293 Metadata about the command. | 329 Metadata about the command. |
294 """ | 330 """ |
295 # TODO(maruel): This function is incomprehensible, split and refactor. | 331 # TODO(maruel): This function is incomprehensible, split and refactor. |
296 # Signal the command is about to be started. | 332 # Signal the command is about to be started. |
297 last_packet = start = now = monotonic_time() | 333 last_packet = start = now = monotonic_time() |
298 params = { | 334 params = { |
299 'cost_usd': cost_usd_hour * (now - task_start) / 60. / 60., | 335 'cost_usd': cost_usd_hour * (now - task_start) / 60. / 60., |
300 'id': task_details.bot_id, | 336 'id': task_details.bot_id, |
301 'task_id': task_details.task_id, | 337 'task_id': task_details.task_id, |
302 } | 338 } |
303 post_update(swarming_server, params, None, '', 0) | 339 post_update(swarming_server, auth_headers_file, params, None, '', 0) |
304 | 340 |
305 isolated_result = os.path.join(work_dir, 'isolated_result.json') | 341 isolated_result = os.path.join(work_dir, 'isolated_result.json') |
306 cmd = get_isolated_cmd( | 342 cmd = get_isolated_cmd( |
307 work_dir, task_details, isolated_result, min_free_space) | 343 work_dir, task_details, isolated_result, min_free_space) |
308 # Hard timeout enforcement is deferred to run_isolated. Grace is doubled to | 344 # Hard timeout enforcement is deferred to run_isolated. Grace is doubled to |
309 # give one 'grace_period' slot to the child process and one slot to upload | 345 # give one 'grace_period' slot to the child process and one slot to upload |
310 # the results back. | 346 # the results back. |
311 task_details.hard_timeout = 0 | 347 task_details.hard_timeout = 0 |
312 if task_details.grace_period: | 348 if task_details.grace_period: |
313 task_details.grace_period *= 2 | 349 task_details.grace_period *= 2 |
(...skipping 21 matching lines...) Expand all Loading... | |
335 stdout=subprocess42.PIPE, | 371 stdout=subprocess42.PIPE, |
336 stderr=subprocess42.STDOUT, | 372 stderr=subprocess42.STDOUT, |
337 stdin=subprocess42.PIPE) | 373 stdin=subprocess42.PIPE) |
338 except OSError as e: | 374 except OSError as e: |
339 stdout = 'Command "%s" failed to start.\nError: %s' % (' '.join(cmd), e) | 375 stdout = 'Command "%s" failed to start.\nError: %s' % (' '.join(cmd), e) |
340 now = monotonic_time() | 376 now = monotonic_time() |
341 params['cost_usd'] = cost_usd_hour * (now - task_start) / 60. / 60. | 377 params['cost_usd'] = cost_usd_hour * (now - task_start) / 60. / 60. |
342 params['duration'] = now - start | 378 params['duration'] = now - start |
343 params['io_timeout'] = False | 379 params['io_timeout'] = False |
344 params['hard_timeout'] = False | 380 params['hard_timeout'] = False |
345 post_update(swarming_server, params, 1, stdout, 0) | 381 post_update(swarming_server, auth_headers_file, params, 1, stdout, 0) |
346 return { | 382 return { |
347 u'exit_code': 1, | 383 u'exit_code': 1, |
348 u'hard_timeout': False, | 384 u'hard_timeout': False, |
349 u'io_timeout': False, | 385 u'io_timeout': False, |
350 u'must_signal_internal_failure': None, | 386 u'must_signal_internal_failure': None, |
351 u'version': OUT_VERSION, | 387 u'version': OUT_VERSION, |
352 } | 388 } |
353 | 389 |
354 output_chunk_start = 0 | 390 output_chunk_start = 0 |
355 stdout = '' | 391 stdout = '' |
(...skipping 11 matching lines...) Expand all Loading... | |
367 now = monotonic_time() | 403 now = monotonic_time() |
368 if new_data: | 404 if new_data: |
369 stdout += new_data | 405 stdout += new_data |
370 last_io = now | 406 last_io = now |
371 | 407 |
372 # Post update if necessary. | 408 # Post update if necessary. |
373 if should_post_update(stdout, now, last_packet): | 409 if should_post_update(stdout, now, last_packet): |
374 last_packet = monotonic_time() | 410 last_packet = monotonic_time() |
375 params['cost_usd'] = ( | 411 params['cost_usd'] = ( |
376 cost_usd_hour * (last_packet - task_start) / 60. / 60.) | 412 cost_usd_hour * (last_packet - task_start) / 60. / 60.) |
377 post_update(swarming_server, params, None, stdout, output_chunk_start) | 413 post_update( |
414 swarming_server, auth_headers_file, params, None, | |
415 stdout, output_chunk_start) | |
378 output_chunk_start += len(stdout) | 416 output_chunk_start += len(stdout) |
379 stdout = '' | 417 stdout = '' |
380 | 418 |
381 # Send signal on timeout if necessary. Both are failures, not | 419 # Send signal on timeout if necessary. Both are failures, not |
382 # internal_failures. | 420 # internal_failures. |
383 # Eventually kill but return 0 so bot_main.py doesn't cancel the task. | 421 # Eventually kill but return 0 so bot_main.py doesn't cancel the task. |
384 if not timed_out: | 422 if not timed_out: |
385 if (task_details.io_timeout and | 423 if (task_details.io_timeout and |
386 now - last_io > task_details.io_timeout): | 424 now - last_io > task_details.io_timeout): |
387 had_io_timeout = True | 425 had_io_timeout = True |
(...skipping 85 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
473 params['isolated_stats'] = stats | 511 params['isolated_stats'] = stats |
474 except (IOError, OSError, ValueError) as e: | 512 except (IOError, OSError, ValueError) as e: |
475 logging.error('Swallowing error: %s', e) | 513 logging.error('Swallowing error: %s', e) |
476 if not must_signal_internal_failure: | 514 if not must_signal_internal_failure: |
477 must_signal_internal_failure = str(e) | 515 must_signal_internal_failure = str(e) |
478 # TODO(maruel): Send the internal failure here instead of sending it through | 516 # TODO(maruel): Send the internal failure here instead of sending it through |
479 # bot_main, this causes a race condition. | 517 # bot_main, this causes a race condition. |
480 if exit_code is None: | 518 if exit_code is None: |
481 exit_code = -1 | 519 exit_code = -1 |
482 params['hard_timeout'] = had_hard_timeout | 520 params['hard_timeout'] = had_hard_timeout |
483 post_update(swarming_server, params, exit_code, stdout, output_chunk_start) | 521 post_update( |
522 swarming_server, auth_headers_file, params, exit_code, | |
523 stdout, output_chunk_start) | |
484 return { | 524 return { |
485 u'exit_code': exit_code, | 525 u'exit_code': exit_code, |
486 u'hard_timeout': had_hard_timeout, | 526 u'hard_timeout': had_hard_timeout, |
487 u'io_timeout': had_io_timeout, | 527 u'io_timeout': had_io_timeout, |
488 u'must_signal_internal_failure': must_signal_internal_failure, | 528 u'must_signal_internal_failure': must_signal_internal_failure, |
489 u'version': OUT_VERSION, | 529 u'version': OUT_VERSION, |
490 } | 530 } |
491 finally: | 531 finally: |
492 try: | 532 try: |
493 os.remove(isolated_result) | 533 os.remove(isolated_result) |
494 except OSError: | 534 except OSError: |
495 pass | 535 pass |
496 | 536 |
497 | 537 |
498 def main(args): | 538 def main(args): |
499 subprocess42.inhibit_os_error_reporting() | 539 subprocess42.inhibit_os_error_reporting() |
500 parser = optparse.OptionParser(description=sys.modules[__name__].__doc__) | 540 parser = optparse.OptionParser(description=sys.modules[__name__].__doc__) |
501 parser.add_option('--in-file', help='Name of the request file') | 541 parser.add_option('--in-file', help='Name of the request file') |
502 parser.add_option( | 542 parser.add_option( |
503 '--out-file', help='Name of the JSON file to write a task summary to') | 543 '--out-file', help='Name of the JSON file to write a task summary to') |
504 parser.add_option( | 544 parser.add_option( |
505 '--swarming-server', help='Swarming server to send data back') | 545 '--swarming-server', help='Swarming server to send data back') |
506 parser.add_option( | 546 parser.add_option( |
547 '--auth-headers-file', | |
548 help='Name of the file to read authentication headers from') | |
549 parser.add_option( | |
507 '--cost-usd-hour', type='float', help='Cost of this VM in $/h') | 550 '--cost-usd-hour', type='float', help='Cost of this VM in $/h') |
508 parser.add_option('--start', type='float', help='Time this task was started') | 551 parser.add_option('--start', type='float', help='Time this task was started') |
509 parser.add_option( | 552 parser.add_option( |
510 '--min-free-space', type='int', | 553 '--min-free-space', type='int', |
511 help='Value to send down to run_isolated') | 554 help='Value to send down to run_isolated') |
512 | 555 |
513 options, args = parser.parse_args(args) | 556 options, args = parser.parse_args(args) |
514 if not options.in_file or not options.out_file or args: | 557 if not options.in_file or not options.out_file or args: |
515 parser.error('task_runner is meant to be used by swarming_bot.') | 558 parser.error('task_runner is meant to be used by swarming_bot.') |
516 | 559 |
517 on_error.report_on_exception_exit(options.swarming_server) | 560 on_error.report_on_exception_exit(options.swarming_server) |
518 | 561 |
519 logging.info('starting') | 562 logging.info('starting') |
520 now = monotonic_time() | 563 now = monotonic_time() |
521 if options.start > now: | 564 if options.start > now: |
522 options.start = now | 565 options.start = now |
523 | 566 |
524 try: | 567 try: |
525 load_and_run( | 568 load_and_run( |
526 options.in_file, options.swarming_server, options.cost_usd_hour, | 569 options.in_file, options.swarming_server, options.auth_headers_file, |
527 options.start, options.out_file, options.min_free_space) | 570 options.cost_usd_hour, options.start, options.out_file, |
571 options.min_free_space) | |
528 return 0 | 572 return 0 |
529 finally: | 573 finally: |
530 logging.info('quitting') | 574 logging.info('quitting') |
OLD | NEW |