Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1)

Side by Side Diff: appengine/swarming/swarming_bot/bot_code/task_runner.py

Issue 2024313003: Send authorization headers when calling Swarming backend. (Closed) Base URL: https://chromium.googlesource.com/external/github.com/luci/luci-py@master
Patch Set: rebase Created 4 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 # Copyright 2013 The LUCI Authors. All rights reserved. 1 # Copyright 2013 The LUCI Authors. All rights reserved.
2 # Use of this source code is governed under the Apache License, Version 2.0 2 # Use of this source code is governed under the Apache License, Version 2.0
3 # that can be found in the LICENSE file. 3 # that can be found in the LICENSE file.
4 4
5 """Runs a Swarming task. 5 """Runs a Swarming task.
6 6
7 Downloads all the necessary files to run the task, executes the command and 7 Downloads all the necessary files to run the task, executes the command and
8 streams results back to the Swarming server. 8 streams results back to the Swarming server.
9 9
10 The process exit code is 0 when the task was executed, even if the task itself 10 The process exit code is 0 when the task was executed, even if the task itself
(...skipping 139 matching lines...) Expand 10 before | Expand all | Expand 10 after
150 150
151 151
152 class MustExit(Exception): 152 class MustExit(Exception):
153 """Raised on signal that the process must exit immediately.""" 153 """Raised on signal that the process must exit immediately."""
154 def __init__(self, sig): 154 def __init__(self, sig):
155 super(MustExit, self).__init__() 155 super(MustExit, self).__init__()
156 self.signal = sig 156 self.signal = sig
157 157
158 158
159 def load_and_run( 159 def load_and_run(
160 in_file, swarming_server, cost_usd_hour, start, out_file, min_free_space): 160 in_file, swarming_server, auth_headers_file, cost_usd_hour, start,
161 out_file, min_free_space):
161 """Loads the task's metadata and execute it. 162 """Loads the task's metadata and execute it.
162 163
163 This may throw all sorts of exceptions in case of failure. It's up to the 164 This may throw all sorts of exceptions in case of failure. It's up to the
164 caller to trap them. These shall be considered 'internal_failure' instead of 165 caller to trap them. These shall be considered 'internal_failure' instead of
165 'failure' from a TaskRunResult standpoint. 166 'failure' from a TaskRunResult standpoint.
166 """ 167 """
167 # The work directory is guaranteed to exist since it was created by 168 # The work directory is guaranteed to exist since it was created by
168 # bot_main.py and contains the manifest. Temporary files will be downloaded 169 # bot_main.py and contains the manifest. Temporary files will be downloaded
169 # there. It's bot_main.py that will delete the directory afterward. Tests are 170 # there. It's bot_main.py that will delete the directory afterward. Tests are
170 # not run from there. 171 # not run from there.
171 task_result = None 172 task_result = None
172 def handler(sig, _): 173 def handler(sig, _):
173 logging.info('Got signal %s', sig) 174 logging.info('Got signal %s', sig)
174 raise MustExit(sig) 175 raise MustExit(sig)
175 work_dir = os.path.dirname(out_file) 176 work_dir = os.path.dirname(out_file)
176 try: 177 try:
177 with subprocess42.set_signal_handler([SIG_BREAK_OR_TERM], handler): 178 with subprocess42.set_signal_handler([SIG_BREAK_OR_TERM], handler):
178 if not os.path.isdir(work_dir): 179 if not os.path.isdir(work_dir):
179 raise ValueError('%s expected to exist' % work_dir) 180 raise ValueError('%s expected to exist' % work_dir)
180 181
181 with open(in_file, 'rb') as f: 182 with open(in_file, 'rb') as f:
182 task_details = TaskDetails(json.load(f)) 183 task_details = TaskDetails(json.load(f))
183 184
184 task_result = run_command( 185 task_result = run_command(
185 swarming_server, task_details, work_dir, cost_usd_hour, start, 186 swarming_server, task_details, work_dir, auth_headers_file,
186 min_free_space) 187 cost_usd_hour, start, min_free_space)
187 except MustExit as e: 188 except MustExit as e:
188 # This normally means run_command() didn't get the chance to run, as it 189 # This normally means run_command() didn't get the chance to run, as it
189 # itself trap MustExit and will report accordingly. In this case, we want 190 # itself trap MustExit and will report accordingly. In this case, we want
190 # the parent process to send the message instead. 191 # the parent process to send the message instead.
191 if not task_result: 192 if not task_result:
192 task_result = { 193 task_result = {
193 u'exit_code': None, 194 u'exit_code': None,
194 u'hard_timeout': False, 195 u'hard_timeout': False,
195 u'io_timeout': False, 196 u'io_timeout': False,
196 u'must_signal_internal_failure': 197 u'must_signal_internal_failure':
197 u'task_runner received signal %s' % e.signal, 198 u'task_runner received signal %s' % e.signal,
198 u'version': OUT_VERSION, 199 u'version': OUT_VERSION,
199 } 200 }
200 finally: 201 finally:
201 # We've found tests to delete 'work' when quitting, causing an exception 202 # We've found tests to delete 'work' when quitting, causing an exception
202 # here. Try to recreate the directory if necessary. 203 # here. Try to recreate the directory if necessary.
203 if not os.path.isdir(work_dir): 204 if not os.path.isdir(work_dir):
204 os.mkdir(work_dir) 205 os.mkdir(work_dir)
205 with open(out_file, 'wb') as f: 206 with open(out_file, 'wb') as f:
206 json.dump(task_result, f) 207 json.dump(task_result, f)
207 208
208 209
209 def post_update(swarming_server, params, exit_code, stdout, output_chunk_start): 210 def post_update(
211 swarming_server, auth_headers_file, params, exit_code,
212 stdout, output_chunk_start):
210 """Posts task update to task_update. 213 """Posts task update to task_update.
211 214
212 Arguments: 215 Arguments:
213 swarming_server: Base URL to Swarming server. 216 swarming_server: Base URL to Swarming server.
217 auth_headers_file: file to read HTTP authentication headers from.
214 params: Default JSON parameters for the POST. 218 params: Default JSON parameters for the POST.
215 exit_code: Process exit code, only when a command completed. 219 exit_code: Process exit code, only when a command completed.
216 stdout: Incremental output since last call, if any. 220 stdout: Incremental output since last call, if any.
217 output_chunk_start: Total number of stdout previously sent, for coherency 221 output_chunk_start: Total number of stdout previously sent, for coherency
218 with the server. 222 with the server.
219 """ 223 """
220 params = params.copy() 224 params = params.copy()
221 if exit_code is not None: 225 if exit_code is not None:
222 params['exit_code'] = exit_code 226 params['exit_code'] = exit_code
223 if stdout: 227 if stdout:
224 # The output_chunk_start is used by the server to make sure that the stdout 228 # The output_chunk_start is used by the server to make sure that the stdout
225 # chunks are processed and saved in the DB in order. 229 # chunks are processed and saved in the DB in order.
226 params['output'] = base64.b64encode(stdout) 230 params['output'] = base64.b64encode(stdout)
227 params['output_chunk_start'] = output_chunk_start 231 params['output_chunk_start'] = output_chunk_start
232 headers = read_auth_headers(auth_headers_file) if auth_headers_file else {}
M-A Ruel 2016/06/03 20:00:49 I think it'd prefer to have it update the token in
Vadim Sh. 2016/06/03 23:37:55 Done. I made it symmetrical to FileRefresherThread
228 # TODO(maruel): Support early cancellation. 233 # TODO(maruel): Support early cancellation.
229 # https://code.google.com/p/swarming/issues/detail?id=62 234 # https://code.google.com/p/swarming/issues/detail?id=62
230 resp = net.url_read_json( 235 resp = net.url_read_json(
231 swarming_server+'/swarming/api/v1/bot/task_update/%s' % params['task_id'], 236 swarming_server+'/swarming/api/v1/bot/task_update/%s' % params['task_id'],
232 data=params) 237 data=params,
238 headers=headers,
239 follow_redirects=False)
233 logging.debug('post_update() = %s', resp) 240 logging.debug('post_update() = %s', resp)
234 if not resp or resp.get('error'): 241 if not resp or resp.get('error'):
235 # Abandon it. This will force a process exit. 242 # Abandon it. This will force a process exit.
236 raise ValueError(resp.get('error') if resp else 'Failed to contact server') 243 raise ValueError(resp.get('error') if resp else 'Failed to contact server')
237 244
238 245
239 def should_post_update(stdout, now, last_packet): 246 def should_post_update(stdout, now, last_packet):
240 """Returns True if it's time to send a task_update packet via post_update(). 247 """Returns True if it's time to send a task_update packet via post_update().
241 248
242 Sends a packet when one of this condition is met: 249 Sends a packet when one of this condition is met:
243 - more than MAX_CHUNK_SIZE of stdout is buffered. 250 - more than MAX_CHUNK_SIZE of stdout is buffered.
244 - last packet was sent more than MIN_PACKET_INTERNAL seconds ago and there was 251 - last packet was sent more than MIN_PACKET_INTERNAL seconds ago and there was
245 stdout. 252 stdout.
246 - last packet was sent more than MAX_PACKET_INTERVAL seconds ago. 253 - last packet was sent more than MAX_PACKET_INTERVAL seconds ago.
247 """ 254 """
248 packet_interval = MIN_PACKET_INTERNAL if stdout else MAX_PACKET_INTERVAL 255 packet_interval = MIN_PACKET_INTERNAL if stdout else MAX_PACKET_INTERVAL
249 return len(stdout) >= MAX_CHUNK_SIZE or (now - last_packet) > packet_interval 256 return len(stdout) >= MAX_CHUNK_SIZE or (now - last_packet) > packet_interval
250 257
251 258
259 def read_auth_headers(path):
260 """Reads authentication headers from the given file.
261
262 The file is kept up-to-date by the main bot process (see AuthHeadersDumper in
263 bot_main.py).
264
265 Retries a bunch of times on errors to workaround Windows file locking issues.
266 If it fails to read the file even after a bunch of retries, raises ValueError
267 that eventually aborts the task (since we can't run it without
268 authentication).
269 """
270 attempts = 100
271 while True:
272 try:
273 with open(path, 'rb') as f:
274 headers = json.load(f)
275 if not isinstance(headers, dict):
276 raise ValueError('Expecting dict, got %r' % (headers,))
277 # The headers are ASCII for sure, so don't bother with picking the
278 # correct unicode encoding, default would work.
279 return {str(k): str(v) for k, v in headers.iteritems()}
280 except (OSError, IOError, ValueError) as e:
281 last_error = 'Failed to read auth headers from %s: %s' % (path, e)
282 attempts -= 1
283 if not attempts:
284 raise ValueError(last_error)
285 time.sleep(0.05)
286
287
252 def calc_yield_wait(task_details, start, last_io, timed_out, stdout): 288 def calc_yield_wait(task_details, start, last_io, timed_out, stdout):
253 """Calculates the maximum number of seconds to wait in yield_any().""" 289 """Calculates the maximum number of seconds to wait in yield_any()."""
254 now = monotonic_time() 290 now = monotonic_time()
255 if timed_out: 291 if timed_out:
256 # Give a |grace_period| seconds delay. 292 # Give a |grace_period| seconds delay.
257 if task_details.grace_period: 293 if task_details.grace_period:
258 return max(now - timed_out - task_details.grace_period, 0.) 294 return max(now - timed_out - task_details.grace_period, 0.)
259 return 0. 295 return 0.
260 296
261 out = MIN_PACKET_INTERNAL if stdout else MAX_PACKET_INTERVAL 297 out = MIN_PACKET_INTERNAL if stdout else MAX_PACKET_INTERVAL
(...skipping 13 matching lines...) Expand all
275 proc.wait(grace_period) 311 proc.wait(grace_period)
276 except subprocess42.TimeoutError: 312 except subprocess42.TimeoutError:
277 logging.warning('SIGKILL finally due to %s', reason) 313 logging.warning('SIGKILL finally due to %s', reason)
278 proc.kill() 314 proc.kill()
279 exit_code = proc.wait() 315 exit_code = proc.wait()
280 logging.info('Waiting for proces exit in finally - done') 316 logging.info('Waiting for proces exit in finally - done')
281 return exit_code 317 return exit_code
282 318
283 319
284 def run_command( 320 def run_command(
285 swarming_server, task_details, work_dir, cost_usd_hour, task_start, 321 swarming_server, task_details, work_dir, auth_headers_file, cost_usd_hour,
286 min_free_space): 322 task_start, min_free_space):
287 """Runs a command and sends packets to the server to stream results back. 323 """Runs a command and sends packets to the server to stream results back.
288 324
289 Implements both I/O and hard timeouts. Sends the packets numbered, so the 325 Implements both I/O and hard timeouts. Sends the packets numbered, so the
290 server can ensure they are processed in order. 326 server can ensure they are processed in order.
291 327
292 Returns: 328 Returns:
293 Metadata about the command. 329 Metadata about the command.
294 """ 330 """
295 # TODO(maruel): This function is incomprehensible, split and refactor. 331 # TODO(maruel): This function is incomprehensible, split and refactor.
296 # Signal the command is about to be started. 332 # Signal the command is about to be started.
297 last_packet = start = now = monotonic_time() 333 last_packet = start = now = monotonic_time()
298 params = { 334 params = {
299 'cost_usd': cost_usd_hour * (now - task_start) / 60. / 60., 335 'cost_usd': cost_usd_hour * (now - task_start) / 60. / 60.,
300 'id': task_details.bot_id, 336 'id': task_details.bot_id,
301 'task_id': task_details.task_id, 337 'task_id': task_details.task_id,
302 } 338 }
303 post_update(swarming_server, params, None, '', 0) 339 post_update(swarming_server, auth_headers_file, params, None, '', 0)
304 340
305 isolated_result = os.path.join(work_dir, 'isolated_result.json') 341 isolated_result = os.path.join(work_dir, 'isolated_result.json')
306 cmd = get_isolated_cmd( 342 cmd = get_isolated_cmd(
307 work_dir, task_details, isolated_result, min_free_space) 343 work_dir, task_details, isolated_result, min_free_space)
308 # Hard timeout enforcement is deferred to run_isolated. Grace is doubled to 344 # Hard timeout enforcement is deferred to run_isolated. Grace is doubled to
309 # give one 'grace_period' slot to the child process and one slot to upload 345 # give one 'grace_period' slot to the child process and one slot to upload
310 # the results back. 346 # the results back.
311 task_details.hard_timeout = 0 347 task_details.hard_timeout = 0
312 if task_details.grace_period: 348 if task_details.grace_period:
313 task_details.grace_period *= 2 349 task_details.grace_period *= 2
(...skipping 21 matching lines...) Expand all
335 stdout=subprocess42.PIPE, 371 stdout=subprocess42.PIPE,
336 stderr=subprocess42.STDOUT, 372 stderr=subprocess42.STDOUT,
337 stdin=subprocess42.PIPE) 373 stdin=subprocess42.PIPE)
338 except OSError as e: 374 except OSError as e:
339 stdout = 'Command "%s" failed to start.\nError: %s' % (' '.join(cmd), e) 375 stdout = 'Command "%s" failed to start.\nError: %s' % (' '.join(cmd), e)
340 now = monotonic_time() 376 now = monotonic_time()
341 params['cost_usd'] = cost_usd_hour * (now - task_start) / 60. / 60. 377 params['cost_usd'] = cost_usd_hour * (now - task_start) / 60. / 60.
342 params['duration'] = now - start 378 params['duration'] = now - start
343 params['io_timeout'] = False 379 params['io_timeout'] = False
344 params['hard_timeout'] = False 380 params['hard_timeout'] = False
345 post_update(swarming_server, params, 1, stdout, 0) 381 post_update(swarming_server, auth_headers_file, params, 1, stdout, 0)
346 return { 382 return {
347 u'exit_code': 1, 383 u'exit_code': 1,
348 u'hard_timeout': False, 384 u'hard_timeout': False,
349 u'io_timeout': False, 385 u'io_timeout': False,
350 u'must_signal_internal_failure': None, 386 u'must_signal_internal_failure': None,
351 u'version': OUT_VERSION, 387 u'version': OUT_VERSION,
352 } 388 }
353 389
354 output_chunk_start = 0 390 output_chunk_start = 0
355 stdout = '' 391 stdout = ''
(...skipping 11 matching lines...) Expand all
367 now = monotonic_time() 403 now = monotonic_time()
368 if new_data: 404 if new_data:
369 stdout += new_data 405 stdout += new_data
370 last_io = now 406 last_io = now
371 407
372 # Post update if necessary. 408 # Post update if necessary.
373 if should_post_update(stdout, now, last_packet): 409 if should_post_update(stdout, now, last_packet):
374 last_packet = monotonic_time() 410 last_packet = monotonic_time()
375 params['cost_usd'] = ( 411 params['cost_usd'] = (
376 cost_usd_hour * (last_packet - task_start) / 60. / 60.) 412 cost_usd_hour * (last_packet - task_start) / 60. / 60.)
377 post_update(swarming_server, params, None, stdout, output_chunk_start) 413 post_update(
414 swarming_server, auth_headers_file, params, None,
415 stdout, output_chunk_start)
378 output_chunk_start += len(stdout) 416 output_chunk_start += len(stdout)
379 stdout = '' 417 stdout = ''
380 418
381 # Send signal on timeout if necessary. Both are failures, not 419 # Send signal on timeout if necessary. Both are failures, not
382 # internal_failures. 420 # internal_failures.
383 # Eventually kill but return 0 so bot_main.py doesn't cancel the task. 421 # Eventually kill but return 0 so bot_main.py doesn't cancel the task.
384 if not timed_out: 422 if not timed_out:
385 if (task_details.io_timeout and 423 if (task_details.io_timeout and
386 now - last_io > task_details.io_timeout): 424 now - last_io > task_details.io_timeout):
387 had_io_timeout = True 425 had_io_timeout = True
(...skipping 85 matching lines...) Expand 10 before | Expand all | Expand 10 after
473 params['isolated_stats'] = isolated_stats 511 params['isolated_stats'] = isolated_stats
474 except (IOError, OSError, ValueError) as e: 512 except (IOError, OSError, ValueError) as e:
475 logging.error('Swallowing error: %s', e) 513 logging.error('Swallowing error: %s', e)
476 if not must_signal_internal_failure: 514 if not must_signal_internal_failure:
477 must_signal_internal_failure = str(e) 515 must_signal_internal_failure = str(e)
478 # TODO(maruel): Send the internal failure here instead of sending it through 516 # TODO(maruel): Send the internal failure here instead of sending it through
479 # bot_main, this causes a race condition. 517 # bot_main, this causes a race condition.
480 if exit_code is None: 518 if exit_code is None:
481 exit_code = -1 519 exit_code = -1
482 params['hard_timeout'] = had_hard_timeout 520 params['hard_timeout'] = had_hard_timeout
483 post_update(swarming_server, params, exit_code, stdout, output_chunk_start) 521 post_update(
522 swarming_server, auth_headers_file, params, exit_code,
523 stdout, output_chunk_start)
484 return { 524 return {
485 u'exit_code': exit_code, 525 u'exit_code': exit_code,
486 u'hard_timeout': had_hard_timeout, 526 u'hard_timeout': had_hard_timeout,
487 u'io_timeout': had_io_timeout, 527 u'io_timeout': had_io_timeout,
488 u'must_signal_internal_failure': must_signal_internal_failure, 528 u'must_signal_internal_failure': must_signal_internal_failure,
489 u'version': OUT_VERSION, 529 u'version': OUT_VERSION,
490 } 530 }
491 finally: 531 finally:
492 try: 532 try:
493 os.remove(isolated_result) 533 os.remove(isolated_result)
494 except OSError: 534 except OSError:
495 pass 535 pass
496 536
497 537
498 def main(args): 538 def main(args):
499 subprocess42.inhibit_os_error_reporting() 539 subprocess42.inhibit_os_error_reporting()
500 parser = optparse.OptionParser(description=sys.modules[__name__].__doc__) 540 parser = optparse.OptionParser(description=sys.modules[__name__].__doc__)
501 parser.add_option('--in-file', help='Name of the request file') 541 parser.add_option('--in-file', help='Name of the request file')
502 parser.add_option( 542 parser.add_option(
503 '--out-file', help='Name of the JSON file to write a task summary to') 543 '--out-file', help='Name of the JSON file to write a task summary to')
504 parser.add_option( 544 parser.add_option(
505 '--swarming-server', help='Swarming server to send data back') 545 '--swarming-server', help='Swarming server to send data back')
506 parser.add_option( 546 parser.add_option(
547 '--auth-headers-file',
548 help='Name of the file to read authentication headers from')
549 parser.add_option(
507 '--cost-usd-hour', type='float', help='Cost of this VM in $/h') 550 '--cost-usd-hour', type='float', help='Cost of this VM in $/h')
508 parser.add_option('--start', type='float', help='Time this task was started') 551 parser.add_option('--start', type='float', help='Time this task was started')
509 parser.add_option( 552 parser.add_option(
510 '--min-free-space', type='int', 553 '--min-free-space', type='int',
511 help='Value to send down to run_isolated') 554 help='Value to send down to run_isolated')
512 555
513 options, args = parser.parse_args(args) 556 options, args = parser.parse_args(args)
514 if not options.in_file or not options.out_file or args: 557 if not options.in_file or not options.out_file or args:
515 parser.error('task_runner is meant to be used by swarming_bot.') 558 parser.error('task_runner is meant to be used by swarming_bot.')
516 559
517 on_error.report_on_exception_exit(options.swarming_server) 560 on_error.report_on_exception_exit(options.swarming_server)
518 561
519 logging.info('starting') 562 logging.info('starting')
520 now = monotonic_time() 563 now = monotonic_time()
521 if options.start > now: 564 if options.start > now:
522 options.start = now 565 options.start = now
523 566
524 try: 567 try:
525 load_and_run( 568 load_and_run(
526 options.in_file, options.swarming_server, options.cost_usd_hour, 569 options.in_file, options.swarming_server, options.auth_headers_file,
527 options.start, options.out_file, options.min_free_space) 570 options.cost_usd_hour, options.start, options.out_file,
571 options.min_free_space)
528 return 0 572 return 0
529 finally: 573 finally:
530 logging.info('quitting') 574 logging.info('quitting')
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698