Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(42)

Side by Side Diff: appengine/swarming/swarming_bot/bot_code/task_runner.py

Issue 2024313003: Send authorization headers when calling Swarming backend. (Closed) Base URL: https://chromium.googlesource.com/external/github.com/luci/luci-py@master
Patch Set: Created 4 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 # Copyright 2013 The LUCI Authors. All rights reserved. 1 # Copyright 2013 The LUCI Authors. All rights reserved.
2 # Use of this source code is governed under the Apache License, Version 2.0 2 # Use of this source code is governed under the Apache License, Version 2.0
3 # that can be found in the LICENSE file. 3 # that can be found in the LICENSE file.
4 4
5 """Runs a Swarming task. 5 """Runs a Swarming task.
6 6
7 Downloads all the necessary files to run the task, executes the command and 7 Downloads all the necessary files to run the task, executes the command and
8 streams results back to the Swarming server. 8 streams results back to the Swarming server.
9 9
10 The process exit code is 0 when the task was executed, even if the task itself 10 The process exit code is 0 when the task was executed, even if the task itself
(...skipping 139 matching lines...) Expand 10 before | Expand all | Expand 10 after
150 150
151 151
152 class MustExit(Exception): 152 class MustExit(Exception):
153 """Raised on signal that the process must exit immediately.""" 153 """Raised on signal that the process must exit immediately."""
154 def __init__(self, sig): 154 def __init__(self, sig):
155 super(MustExit, self).__init__() 155 super(MustExit, self).__init__()
156 self.signal = sig 156 self.signal = sig
157 157
158 158
159 def load_and_run( 159 def load_and_run(
160 in_file, swarming_server, cost_usd_hour, start, out_file, min_free_space): 160 in_file, swarming_server, auth_headers_file, cost_usd_hour, start,
161 out_file, min_free_space):
161 """Loads the task's metadata and execute it. 162 """Loads the task's metadata and execute it.
162 163
163 This may throw all sorts of exceptions in case of failure. It's up to the 164 This may throw all sorts of exceptions in case of failure. It's up to the
164 caller to trap them. These shall be considered 'internal_failure' instead of 165 caller to trap them. These shall be considered 'internal_failure' instead of
165 'failure' from a TaskRunResult standpoint. 166 'failure' from a TaskRunResult standpoint.
166 """ 167 """
167 # The work directory is guaranteed to exist since it was created by 168 # The work directory is guaranteed to exist since it was created by
168 # bot_main.py and contains the manifest. Temporary files will be downloaded 169 # bot_main.py and contains the manifest. Temporary files will be downloaded
169 # there. It's bot_main.py that will delete the directory afterward. Tests are 170 # there. It's bot_main.py that will delete the directory afterward. Tests are
170 # not run from there. 171 # not run from there.
171 task_result = None 172 task_result = None
172 def handler(sig, _): 173 def handler(sig, _):
173 logging.info('Got signal %s', sig) 174 logging.info('Got signal %s', sig)
174 raise MustExit(sig) 175 raise MustExit(sig)
175 work_dir = os.path.dirname(out_file) 176 work_dir = os.path.dirname(out_file)
176 try: 177 try:
177 with subprocess42.set_signal_handler([SIG_BREAK_OR_TERM], handler): 178 with subprocess42.set_signal_handler([SIG_BREAK_OR_TERM], handler):
178 if not os.path.isdir(work_dir): 179 if not os.path.isdir(work_dir):
179 raise ValueError('%s expected to exist' % work_dir) 180 raise ValueError('%s expected to exist' % work_dir)
180 181
181 with open(in_file, 'rb') as f: 182 with open(in_file, 'rb') as f:
182 task_details = TaskDetails(json.load(f)) 183 task_details = TaskDetails(json.load(f))
183 184
184 task_result = run_command( 185 task_result = run_command(
185 swarming_server, task_details, work_dir, cost_usd_hour, start, 186 swarming_server, task_details, work_dir, auth_headers_file,
186 min_free_space) 187 cost_usd_hour, start, min_free_space)
187 except MustExit as e: 188 except MustExit as e:
188 # This normally means run_command() didn't get the chance to run, as it 189 # This normally means run_command() didn't get the chance to run, as it
189 # itself trap MustExit and will report accordingly. In this case, we want 190 # itself trap MustExit and will report accordingly. In this case, we want
190 # the parent process to send the message instead. 191 # the parent process to send the message instead.
191 if not task_result: 192 if not task_result:
192 task_result = { 193 task_result = {
193 u'exit_code': None, 194 u'exit_code': None,
194 u'hard_timeout': False, 195 u'hard_timeout': False,
195 u'io_timeout': False, 196 u'io_timeout': False,
196 u'must_signal_internal_failure': 197 u'must_signal_internal_failure':
197 u'task_runner received signal %s' % e.signal, 198 u'task_runner received signal %s' % e.signal,
198 u'version': OUT_VERSION, 199 u'version': OUT_VERSION,
199 } 200 }
200 finally: 201 finally:
201 # We've found tests to delete 'work' when quitting, causing an exception 202 # We've found tests to delete 'work' when quitting, causing an exception
202 # here. Try to recreate the directory if necessary. 203 # here. Try to recreate the directory if necessary.
203 if not os.path.isdir(work_dir): 204 if not os.path.isdir(work_dir):
204 os.mkdir(work_dir) 205 os.mkdir(work_dir)
205 with open(out_file, 'wb') as f: 206 with open(out_file, 'wb') as f:
206 json.dump(task_result, f) 207 json.dump(task_result, f)
207 208
208 209
209 def post_update(swarming_server, params, exit_code, stdout, output_chunk_start): 210 def post_update(
211 swarming_server, auth_headers_file, params, exit_code,
212 stdout, output_chunk_start):
210 """Posts task update to task_update. 213 """Posts task update to task_update.
211 214
212 Arguments: 215 Arguments:
213 swarming_server: Base URL to Swarming server. 216 swarming_server: Base URL to Swarming server.
217 auth_headers_file: file to read HTTP authentication headers from.
214 params: Default JSON parameters for the POST. 218 params: Default JSON parameters for the POST.
215 exit_code: Process exit code, only when a command completed. 219 exit_code: Process exit code, only when a command completed.
216 stdout: Incremental output since last call, if any. 220 stdout: Incremental output since last call, if any.
217 output_chunk_start: Total number of stdout previously sent, for coherency 221 output_chunk_start: Total number of stdout previously sent, for coherency
218 with the server. 222 with the server.
219 """ 223 """
220 params = params.copy() 224 params = params.copy()
221 if exit_code is not None: 225 if exit_code is not None:
222 params['exit_code'] = exit_code 226 params['exit_code'] = exit_code
223 if stdout: 227 if stdout:
224 # The output_chunk_start is used by the server to make sure that the stdout 228 # The output_chunk_start is used by the server to make sure that the stdout
225 # chunks are processed and saved in the DB in order. 229 # chunks are processed and saved in the DB in order.
226 params['output'] = base64.b64encode(stdout) 230 params['output'] = base64.b64encode(stdout)
227 params['output_chunk_start'] = output_chunk_start 231 params['output_chunk_start'] = output_chunk_start
232 headers = read_auth_headers(auth_headers_file) if auth_headers_file else None
228 # TODO(maruel): Support early cancellation. 233 # TODO(maruel): Support early cancellation.
229 # https://code.google.com/p/swarming/issues/detail?id=62 234 # https://code.google.com/p/swarming/issues/detail?id=62
230 resp = net.url_read_json( 235 resp = net.url_read_json(
231 swarming_server+'/swarming/api/v1/bot/task_update/%s' % params['task_id'], 236 swarming_server+'/swarming/api/v1/bot/task_update/%s' % params['task_id'],
232 data=params) 237 data=params,
238 headers=headers,
239 follow_redirects=False)
233 logging.debug('post_update() = %s', resp) 240 logging.debug('post_update() = %s', resp)
234 if not resp or resp.get('error'): 241 if not resp or resp.get('error'):
235 # Abandon it. This will force a process exit. 242 # Abandon it. This will force a process exit.
236 raise ValueError(resp.get('error') if resp else 'Failed to contact server') 243 raise ValueError(resp.get('error') if resp else 'Failed to contact server')
237 244
238 245
239 def should_post_update(stdout, now, last_packet): 246 def should_post_update(stdout, now, last_packet):
240 """Returns True if it's time to send a task_update packet via post_update(). 247 """Returns True if it's time to send a task_update packet via post_update().
241 248
242 Sends a packet when one of this condition is met: 249 Sends a packet when one of this condition is met:
243 - more than MAX_CHUNK_SIZE of stdout is buffered. 250 - more than MAX_CHUNK_SIZE of stdout is buffered.
244 - last packet was sent more than MIN_PACKET_INTERNAL seconds ago and there was 251 - last packet was sent more than MIN_PACKET_INTERNAL seconds ago and there was
245 stdout. 252 stdout.
246 - last packet was sent more than MAX_PACKET_INTERVAL seconds ago. 253 - last packet was sent more than MAX_PACKET_INTERVAL seconds ago.
247 """ 254 """
248 packet_interval = MIN_PACKET_INTERNAL if stdout else MAX_PACKET_INTERVAL 255 packet_interval = MIN_PACKET_INTERNAL if stdout else MAX_PACKET_INTERVAL
249 return len(stdout) >= MAX_CHUNK_SIZE or (now - last_packet) > packet_interval 256 return len(stdout) >= MAX_CHUNK_SIZE or (now - last_packet) > packet_interval
250 257
251 258
259 def read_auth_headers(path):
260 """Reads authentication headers from the given file.
261
262 The file is kept up-to-date by the main bot process (see AuthHeadersDumper in
263 bot_main.py).
264 """
265 # TODO(vadimsh): Implement.
266 return {}
267
268
252 def calc_yield_wait(task_details, start, last_io, timed_out, stdout): 269 def calc_yield_wait(task_details, start, last_io, timed_out, stdout):
253 """Calculates the maximum number of seconds to wait in yield_any().""" 270 """Calculates the maximum number of seconds to wait in yield_any()."""
254 now = monotonic_time() 271 now = monotonic_time()
255 if timed_out: 272 if timed_out:
256 # Give a |grace_period| seconds delay. 273 # Give a |grace_period| seconds delay.
257 if task_details.grace_period: 274 if task_details.grace_period:
258 return max(now - timed_out - task_details.grace_period, 0.) 275 return max(now - timed_out - task_details.grace_period, 0.)
259 return 0. 276 return 0.
260 277
261 out = MIN_PACKET_INTERNAL if stdout else MAX_PACKET_INTERVAL 278 out = MIN_PACKET_INTERNAL if stdout else MAX_PACKET_INTERVAL
(...skipping 13 matching lines...) Expand all
275 proc.wait(grace_period) 292 proc.wait(grace_period)
276 except subprocess42.TimeoutError: 293 except subprocess42.TimeoutError:
277 logging.warning('SIGKILL finally due to %s', reason) 294 logging.warning('SIGKILL finally due to %s', reason)
278 proc.kill() 295 proc.kill()
279 exit_code = proc.wait() 296 exit_code = proc.wait()
280 logging.info('Waiting for proces exit in finally - done') 297 logging.info('Waiting for proces exit in finally - done')
281 return exit_code 298 return exit_code
282 299
283 300
284 def run_command( 301 def run_command(
285 swarming_server, task_details, work_dir, cost_usd_hour, task_start, 302 swarming_server, task_details, work_dir, auth_headers_file, cost_usd_hour,
286 min_free_space): 303 task_start, min_free_space):
287 """Runs a command and sends packets to the server to stream results back. 304 """Runs a command and sends packets to the server to stream results back.
288 305
289 Implements both I/O and hard timeouts. Sends the packets numbered, so the 306 Implements both I/O and hard timeouts. Sends the packets numbered, so the
290 server can ensure they are processed in order. 307 server can ensure they are processed in order.
291 308
292 Returns: 309 Returns:
293 Metadata about the command. 310 Metadata about the command.
294 """ 311 """
295 # TODO(maruel): This function is incomprehensible, split and refactor. 312 # TODO(maruel): This function is incomprehensible, split and refactor.
296 # Signal the command is about to be started. 313 # Signal the command is about to be started.
297 last_packet = start = now = monotonic_time() 314 last_packet = start = now = monotonic_time()
298 params = { 315 params = {
299 'cost_usd': cost_usd_hour * (now - task_start) / 60. / 60., 316 'cost_usd': cost_usd_hour * (now - task_start) / 60. / 60.,
300 'id': task_details.bot_id, 317 'id': task_details.bot_id,
301 'task_id': task_details.task_id, 318 'task_id': task_details.task_id,
302 } 319 }
303 post_update(swarming_server, params, None, '', 0) 320 post_update(swarming_server, auth_headers_file, params, None, '', 0)
304 321
305 isolated_result = os.path.join(work_dir, 'isolated_result.json') 322 isolated_result = os.path.join(work_dir, 'isolated_result.json')
306 cmd = get_isolated_cmd( 323 cmd = get_isolated_cmd(
307 work_dir, task_details, isolated_result, min_free_space) 324 work_dir, task_details, isolated_result, min_free_space)
308 # Hard timeout enforcement is deferred to run_isolated. Grace is doubled to 325 # Hard timeout enforcement is deferred to run_isolated. Grace is doubled to
309 # give one 'grace_period' slot to the child process and one slot to upload 326 # give one 'grace_period' slot to the child process and one slot to upload
310 # the results back. 327 # the results back.
311 task_details.hard_timeout = 0 328 task_details.hard_timeout = 0
312 if task_details.grace_period: 329 if task_details.grace_period:
313 task_details.grace_period *= 2 330 task_details.grace_period *= 2
(...skipping 21 matching lines...) Expand all
335 stdout=subprocess42.PIPE, 352 stdout=subprocess42.PIPE,
336 stderr=subprocess42.STDOUT, 353 stderr=subprocess42.STDOUT,
337 stdin=subprocess42.PIPE) 354 stdin=subprocess42.PIPE)
338 except OSError as e: 355 except OSError as e:
339 stdout = 'Command "%s" failed to start.\nError: %s' % (' '.join(cmd), e) 356 stdout = 'Command "%s" failed to start.\nError: %s' % (' '.join(cmd), e)
340 now = monotonic_time() 357 now = monotonic_time()
341 params['cost_usd'] = cost_usd_hour * (now - task_start) / 60. / 60. 358 params['cost_usd'] = cost_usd_hour * (now - task_start) / 60. / 60.
342 params['duration'] = now - start 359 params['duration'] = now - start
343 params['io_timeout'] = False 360 params['io_timeout'] = False
344 params['hard_timeout'] = False 361 params['hard_timeout'] = False
345 post_update(swarming_server, params, 1, stdout, 0) 362 post_update(swarming_server, auth_headers_file, params, 1, stdout, 0)
346 return { 363 return {
347 u'exit_code': 1, 364 u'exit_code': 1,
348 u'hard_timeout': False, 365 u'hard_timeout': False,
349 u'io_timeout': False, 366 u'io_timeout': False,
350 u'must_signal_internal_failure': None, 367 u'must_signal_internal_failure': None,
351 u'version': OUT_VERSION, 368 u'version': OUT_VERSION,
352 } 369 }
353 370
354 output_chunk_start = 0 371 output_chunk_start = 0
355 stdout = '' 372 stdout = ''
(...skipping 11 matching lines...) Expand all
367 now = monotonic_time() 384 now = monotonic_time()
368 if new_data: 385 if new_data:
369 stdout += new_data 386 stdout += new_data
370 last_io = now 387 last_io = now
371 388
372 # Post update if necessary. 389 # Post update if necessary.
373 if should_post_update(stdout, now, last_packet): 390 if should_post_update(stdout, now, last_packet):
374 last_packet = monotonic_time() 391 last_packet = monotonic_time()
375 params['cost_usd'] = ( 392 params['cost_usd'] = (
376 cost_usd_hour * (last_packet - task_start) / 60. / 60.) 393 cost_usd_hour * (last_packet - task_start) / 60. / 60.)
377 post_update(swarming_server, params, None, stdout, output_chunk_start) 394 post_update(
395 swarming_server, auth_headers_file, params, None,
396 stdout, output_chunk_start)
378 output_chunk_start += len(stdout) 397 output_chunk_start += len(stdout)
379 stdout = '' 398 stdout = ''
380 399
381 # Send signal on timeout if necessary. Both are failures, not 400 # Send signal on timeout if necessary. Both are failures, not
382 # internal_failures. 401 # internal_failures.
383 # Eventually kill but return 0 so bot_main.py doesn't cancel the task. 402 # Eventually kill but return 0 so bot_main.py doesn't cancel the task.
384 if not timed_out: 403 if not timed_out:
385 if (task_details.io_timeout and 404 if (task_details.io_timeout and
386 now - last_io > task_details.io_timeout): 405 now - last_io > task_details.io_timeout):
387 had_io_timeout = True 406 had_io_timeout = True
(...skipping 85 matching lines...) Expand 10 before | Expand all | Expand 10 after
473 params['isolated_stats'] = stats 492 params['isolated_stats'] = stats
474 except (IOError, OSError, ValueError) as e: 493 except (IOError, OSError, ValueError) as e:
475 logging.error('Swallowing error: %s', e) 494 logging.error('Swallowing error: %s', e)
476 if not must_signal_internal_failure: 495 if not must_signal_internal_failure:
477 must_signal_internal_failure = str(e) 496 must_signal_internal_failure = str(e)
478 # TODO(maruel): Send the internal failure here instead of sending it through 497 # TODO(maruel): Send the internal failure here instead of sending it through
479 # bot_main, this causes a race condition. 498 # bot_main, this causes a race condition.
480 if exit_code is None: 499 if exit_code is None:
481 exit_code = -1 500 exit_code = -1
482 params['hard_timeout'] = had_hard_timeout 501 params['hard_timeout'] = had_hard_timeout
483 post_update(swarming_server, params, exit_code, stdout, output_chunk_start) 502 post_update(
503 swarming_server, auth_headers_file, params, exit_code,
504 stdout, output_chunk_start)
484 return { 505 return {
485 u'exit_code': exit_code, 506 u'exit_code': exit_code,
486 u'hard_timeout': had_hard_timeout, 507 u'hard_timeout': had_hard_timeout,
487 u'io_timeout': had_io_timeout, 508 u'io_timeout': had_io_timeout,
488 u'must_signal_internal_failure': must_signal_internal_failure, 509 u'must_signal_internal_failure': must_signal_internal_failure,
489 u'version': OUT_VERSION, 510 u'version': OUT_VERSION,
490 } 511 }
491 finally: 512 finally:
492 try: 513 try:
493 os.remove(isolated_result) 514 os.remove(isolated_result)
494 except OSError: 515 except OSError:
495 pass 516 pass
496 517
497 518
498 def main(args): 519 def main(args):
499 subprocess42.inhibit_os_error_reporting() 520 subprocess42.inhibit_os_error_reporting()
500 parser = optparse.OptionParser(description=sys.modules[__name__].__doc__) 521 parser = optparse.OptionParser(description=sys.modules[__name__].__doc__)
501 parser.add_option('--in-file', help='Name of the request file') 522 parser.add_option('--in-file', help='Name of the request file')
502 parser.add_option( 523 parser.add_option(
503 '--out-file', help='Name of the JSON file to write a task summary to') 524 '--out-file', help='Name of the JSON file to write a task summary to')
504 parser.add_option( 525 parser.add_option(
505 '--swarming-server', help='Swarming server to send data back') 526 '--swarming-server', help='Swarming server to send data back')
506 parser.add_option( 527 parser.add_option(
528 '--auth-headers-file',
529 help='Name of the file to read authentication headers from')
530 parser.add_option(
507 '--cost-usd-hour', type='float', help='Cost of this VM in $/h') 531 '--cost-usd-hour', type='float', help='Cost of this VM in $/h')
508 parser.add_option('--start', type='float', help='Time this task was started') 532 parser.add_option('--start', type='float', help='Time this task was started')
509 parser.add_option( 533 parser.add_option(
510 '--min-free-space', type='int', 534 '--min-free-space', type='int',
511 help='Value to send down to run_isolated') 535 help='Value to send down to run_isolated')
512 536
513 options, args = parser.parse_args(args) 537 options, args = parser.parse_args(args)
514 if not options.in_file or not options.out_file or args: 538 if not options.in_file or not options.out_file or args:
515 parser.error('task_runner is meant to be used by swarming_bot.') 539 parser.error('task_runner is meant to be used by swarming_bot.')
516 540
517 on_error.report_on_exception_exit(options.swarming_server) 541 on_error.report_on_exception_exit(options.swarming_server)
518 542
519 logging.info('starting') 543 logging.info('starting')
520 now = monotonic_time() 544 now = monotonic_time()
521 if options.start > now: 545 if options.start > now:
522 options.start = now 546 options.start = now
523 547
524 try: 548 try:
525 load_and_run( 549 load_and_run(
526 options.in_file, options.swarming_server, options.cost_usd_hour, 550 options.in_file, options.swarming_server, options.auth_headers_file,
527 options.start, options.out_file, options.min_free_space) 551 options.cost_usd_hour, options.start, options.out_file,
552 options.min_free_space)
528 return 0 553 return 0
529 finally: 554 finally:
530 logging.info('quitting') 555 logging.info('quitting')
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698