Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(248)

Side by Side Diff: appengine/swarming/swarming_bot/bot_code/bot_main.py

Issue 2024313003: Send authorization headers when calling Swarming backend. (Closed) Base URL: https://chromium.googlesource.com/external/github.com/luci/luci-py@master
Patch Set: Created 4 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 # Copyright 2013 The LUCI Authors. All rights reserved. 1 # Copyright 2013 The LUCI Authors. All rights reserved.
2 # Use of this source code is governed under the Apache License, Version 2.0 2 # Use of this source code is governed under the Apache License, Version 2.0
3 # that can be found in the LICENSE file. 3 # that can be found in the LICENSE file.
4 4
5 """Swarming bot main process. 5 """Swarming bot main process.
6 6
7 This is the program that communicates with the Swarming server, ensures the code 7 This is the program that communicates with the Swarming server, ensures the code
8 is always up to date and executes a child process to run tasks and upload 8 is always up to date and executes a child process to run tasks and upload
9 results back. 9 results back.
10 10
11 It manages self-update and rebooting the host in case of problems. 11 It manages self-update and rebooting the host in case of problems.
12 12
13 Set the environment variable SWARMING_LOAD_TEST=1 to disable the use of 13 Set the environment variable SWARMING_LOAD_TEST=1 to disable the use of
14 server-provided bot_config.py. This permits safe load testing. 14 server-provided bot_config.py. This permits safe load testing.
15 """ 15 """
16 16
17 import contextlib 17 import contextlib
18 import functools
18 import json 19 import json
19 import logging 20 import logging
20 import optparse 21 import optparse
21 import os 22 import os
23 import Queue
22 import shutil 24 import shutil
23 import signal
24 import sys 25 import sys
25 import tempfile 26 import tempfile
26 import threading 27 import threading
27 import time 28 import time
28 import traceback 29 import traceback
29 import zipfile 30 import zipfile
30 31
31 import common 32 import common
33 import remote_client
32 import singleton 34 import singleton
33 from api import bot 35 from api import bot
34 from api import os_utilities 36 from api import os_utilities
35 from utils import file_path 37 from utils import file_path
36 from utils import net 38 from utils import net
37 from utils import on_error 39 from utils import on_error
38 from utils import subprocess42 40 from utils import subprocess42
39 from utils import zip_package 41 from utils import zip_package
40 42
41 43
(...skipping 121 matching lines...) Expand 10 before | Expand all | Expand 10 after
163 should_continue = bot_config.setup_bot(botobj) 165 should_continue = bot_config.setup_bot(botobj)
164 except Exception as e: 166 except Exception as e:
165 msg = '%s\n%s' % (e, traceback.format_exc()[-2048:]) 167 msg = '%s\n%s' % (e, traceback.format_exc()[-2048:])
166 botobj.post_error('bot_config.setup_bot() threw: %s' % msg) 168 botobj.post_error('bot_config.setup_bot() threw: %s' % msg)
167 return 169 return
168 170
169 if not should_continue and not skip_reboot: 171 if not should_continue and not skip_reboot:
170 botobj.restart('Starting new swarming bot: %s' % THIS_FILE) 172 botobj.restart('Starting new swarming bot: %s' % THIS_FILE)
171 173
172 174
175 def get_authentication_headers(botobj):
176 """Calls bot_config.get_authentication_headers() if it is defined."""
177 if _in_load_test_mode():
178 return (None, None)
179 logging.info('get_authentication_headers()')
180 from config import bot_config
181 func = getattr(bot_config, 'get_authentication_headers', None)
182 return func(botobj) if func else (None, None)
M-A Ruel 2016/06/01 17:08:02 wrap that to catch all exceptions like the other f
Vadim Sh. 2016/06/03 01:15:32 Exceptions are handled in RemoteClient, where it k
183
184
173 ### end of bot_config handler part. 185 ### end of bot_config handler part.
174 186
175 187
176 def get_min_free_space(): 188 def get_min_free_space():
177 """Returns free disk space needed. 189 """Returns free disk space needed.
178 190
179 Add a "250 MiB slack space" for logs, temporary files and whatever other leak. 191 Add a "250 MiB slack space" for logs, temporary files and whatever other leak.
180 """ 192 """
181 return int((os_utilities.get_min_free_space(THIS_FILE) + 250.) * 1024 * 1024) 193 return int((os_utilities.get_min_free_space(THIS_FILE) + 250.) * 1024 * 1024)
182 194
(...skipping 38 matching lines...) Expand 10 before | Expand all | Expand 10 after
221 exception handler for incoming commands from the Swarming server. If for 233 exception handler for incoming commands from the Swarming server. If for
222 any reason the local test runner script can not be run successfully, 234 any reason the local test runner script can not be run successfully,
223 this function is invoked. 235 this function is invoked.
224 """ 236 """
225 logging.error('Error: %s', error) 237 logging.error('Error: %s', error)
226 data = { 238 data = {
227 'id': botobj.id, 239 'id': botobj.id,
228 'message': error, 240 'message': error,
229 'task_id': task_id, 241 'task_id': task_id,
230 } 242 }
231 return net.url_read_json( 243 return botobj.remote.url_read_json(
232 botobj.server + '/swarming/api/v1/bot/task_error/%s' % task_id, data=data) 244 '/swarming/api/v1/bot/task_error/%s' % task_id, data=data)
233 245
234 246
235 def on_shutdown_hook(b): 247 def on_shutdown_hook(b):
236 """Called when the bot is restarting.""" 248 """Called when the bot is restarting."""
237 call_hook(b, 'on_bot_shutdown') 249 call_hook(b, 'on_bot_shutdown')
238 # Aggressively set itself up so we ensure the auto-reboot configuration is 250 # Aggressively set itself up so we ensure the auto-reboot configuration is
239 # fine before restarting the host. This is important as some tasks delete the 251 # fine before restarting the host. This is important as some tasks delete the
240 # autorestart script (!) 252 # autorestart script (!)
241 setup_bot(True) 253 setup_bot(True)
242 254
243 255
244 def get_bot(): 256 def get_bot():
245 """Returns a valid Bot instance. 257 """Returns a valid Bot instance.
246 258
247 Should only be called once in the process lifetime. 259 Should only be called once in the process lifetime.
248 """ 260 """
249 # This variable is used to bootstrap the initial bot.Bot object, which then is 261 # This variable is used to bootstrap the initial bot.Bot object, which then is
250 # used to get the dimensions and state. 262 # used to get the dimensions and state.
251 attributes = { 263 attributes = {
252 'dimensions': {u'id': ['none']}, 264 'dimensions': {u'id': ['none']},
253 'state': {}, 265 'state': {},
254 'version': generate_version(), 266 'version': generate_version(),
255 } 267 }
256 config = get_config() 268 config = get_config()
257 assert not config['server'].endswith('/'), config 269 assert not config['server'].endswith('/'), config
258 270
259 # Create a temporary object to call the hooks. 271 # Use temporary Bot object to call get_attributes. Attributes are needed to
272 # construct the "real" bot.Bot.
273 attributes = get_attributes(
274 bot.Bot(
275 remote_client.RemoteClient(config['server'], None),
276 attributes,
277 config['server'],
278 config['server_version'],
279 os.path.dirname(THIS_FILE),
280 on_shutdown_hook))
281
282 # Make remote client callback use the returned bot object. We assume here
283 # RemoteClient doesn't call its callback in the constructor (since 'botobj' is
284 # undefined during the construction).
260 botobj = bot.Bot( 285 botobj = bot.Bot(
286 remote_client.RemoteClient(
287 config['server'],
288 lambda: get_authentication_headers(botobj)),
261 attributes, 289 attributes,
262 config['server'], 290 config['server'],
263 config['server_version'], 291 config['server_version'],
264 os.path.dirname(THIS_FILE), 292 os.path.dirname(THIS_FILE),
265 on_shutdown_hook) 293 on_shutdown_hook)
266 return bot.Bot( 294 return botobj
267 get_attributes(botobj),
268 config['server'],
269 config['server_version'],
270 os.path.dirname(THIS_FILE),
271 on_shutdown_hook)
272 295
273 296
274 def clean_isolated_cache(botobj): 297 def clean_isolated_cache(botobj):
275 """Asks run_isolated to clean its cache. 298 """Asks run_isolated to clean its cache.
276 299
277 This may take a while but it ensures that in the case of a run_isolated run 300 This may take a while but it ensures that in the case of a run_isolated run
278 failed and it temporarily used more space than min_free_disk, it can cleans up 301 failed and it temporarily used more space than min_free_disk, it can cleans up
279 the mess properly. 302 the mess properly.
280 303
281 It will remove unexpected files, remove corrupted files, trim the cache size 304 It will remove unexpected files, remove corrupted files, trim the cache size
(...skipping 46 matching lines...) Expand 10 before | Expand all | Expand 10 after
328 # First thing is to get an arbitrary url. This also ensures the network is 351 # First thing is to get an arbitrary url. This also ensures the network is
329 # up and running, which is necessary before trying to get the FQDN below. 352 # up and running, which is necessary before trying to get the FQDN below.
330 resp = net.url_read(config['server'] + '/swarming/api/v1/bot/server_ping') 353 resp = net.url_read(config['server'] + '/swarming/api/v1/bot/server_ping')
331 if resp is None: 354 if resp is None:
332 logging.error('No response from server_ping') 355 logging.error('No response from server_ping')
333 except Exception as e: 356 except Exception as e:
334 # url_read() already traps pretty much every exceptions. This except 357 # url_read() already traps pretty much every exceptions. This except
335 # clause is kept there "just in case". 358 # clause is kept there "just in case".
336 logging.exception('server_ping threw') 359 logging.exception('server_ping threw')
337 360
361 # Next we make sure the bot can make authenticated calls by grabbing
362 # the auth headers, retrying on errors a bunch of times. We don't give up
363 # if it fails though (maybe the bot will "fix itself" later).
364 botobj = get_bot()
365 try:
366 botobj.remote.initialize(quit_bit)
367 except remote_client.InitializationError as exc:
368 botobj.post_error('failed to grab auth headers: %s' % exc.last_error)
369 logging.error('Can\'t grab auth headers, continuing anyway...')
370
338 if quit_bit.is_set(): 371 if quit_bit.is_set():
339 logging.info('Early quit 1') 372 logging.info('Early quit 1')
340 return 0 373 return 0
341 374
342 # If this fails, there's hardly anything that can be done, the bot can't 375 # If this fails, there's hardly anything that can be done, the bot can't
343 # even get to the point to be able to self-update. 376 # even get to the point to be able to self-update.
344 botobj = get_bot() 377 resp = botobj.remote.url_read_json(
345 resp = net.url_read_json( 378 '/swarming/api/v1/bot/handshake', data=botobj._attributes)
346 botobj.server + '/swarming/api/v1/bot/handshake',
347 data=botobj._attributes)
348 if not resp: 379 if not resp:
349 logging.error('Failed to contact for handshake') 380 logging.error('Failed to contact for handshake')
350 else: 381 else:
351 logging.info('Connected to %s', resp.get('server_version')) 382 logging.info('Connected to %s', resp.get('server_version'))
352 if resp.get('bot_version') != botobj._attributes['version']: 383 if resp.get('bot_version') != botobj._attributes['version']:
353 logging.warning( 384 logging.warning(
354 'Found out we\'ll need to update: server said %s; we\'re %s', 385 'Found out we\'ll need to update: server said %s; we\'re %s',
355 resp.get('bot_version'), botobj._attributes['version']) 386 resp.get('bot_version'), botobj._attributes['version'])
356 387
357 if arg_error: 388 if arg_error:
(...skipping 46 matching lines...) Expand 10 before | Expand all | Expand 10 after
404 return 0 435 return 0
405 436
406 437
407 def poll_server(botobj, quit_bit): 438 def poll_server(botobj, quit_bit):
408 """Polls the server to run one loop. 439 """Polls the server to run one loop.
409 440
410 Returns True if executed some action, False if server asked the bot to sleep. 441 Returns True if executed some action, False if server asked the bot to sleep.
411 """ 442 """
412 # Access to a protected member _XXX of a client class - pylint: disable=W0212 443 # Access to a protected member _XXX of a client class - pylint: disable=W0212
413 start = time.time() 444 start = time.time()
414 resp = net.url_read_json( 445 resp = botobj.remote.url_read_json(
415 botobj.server + '/swarming/api/v1/bot/poll', data=botobj._attributes) 446 '/swarming/api/v1/bot/poll', data=botobj._attributes)
416 if not resp: 447 if not resp:
417 return False 448 return False
418 logging.debug('Server response:\n%s', resp) 449 logging.debug('Server response:\n%s', resp)
419 450
420 cmd = resp['cmd'] 451 cmd = resp['cmd']
421 if cmd == 'sleep': 452 if cmd == 'sleep':
422 quit_bit.wait(resp['duration']) 453 quit_bit.wait(resp['duration'])
423 return False 454 return False
424 455
425 if cmd == 'terminate': 456 if cmd == 'terminate':
426 quit_bit.set() 457 quit_bit.set()
427 # This is similar to post_update() in task_runner.py. 458 # This is similar to post_update() in task_runner.py.
428 params = { 459 params = {
429 'cost_usd': 0, 460 'cost_usd': 0,
430 'duration': 0, 461 'duration': 0,
431 'exit_code': 0, 462 'exit_code': 0,
432 'hard_timeout': False, 463 'hard_timeout': False,
433 'id': botobj.id, 464 'id': botobj.id,
434 'io_timeout': False, 465 'io_timeout': False,
435 'output': '', 466 'output': '',
436 'output_chunk_start': 0, 467 'output_chunk_start': 0,
437 'task_id': resp['task_id'], 468 'task_id': resp['task_id'],
438 } 469 }
439 net.url_read_json( 470 botobj.remote.url_read_json(
440 botobj.server + '/swarming/api/v1/bot/task_update/%s' % resp['task_id'], 471 '/swarming/api/v1/bot/task_update/%s' % resp['task_id'],
441 data=params) 472 data=params)
442 return False 473 return False
443 474
444 if cmd == 'run': 475 if cmd == 'run':
445 if run_manifest(botobj, resp['manifest'], start): 476 if run_manifest(botobj, resp['manifest'], start):
446 # Completed a task successfully so update swarming_bot.zip if necessary. 477 # Completed a task successfully so update swarming_bot.zip if necessary.
447 update_lkgbc(botobj) 478 update_lkgbc(botobj)
448 # TODO(maruel): Handle the case where quit_bit.is_set() happens here. This 479 # TODO(maruel): Handle the case where quit_bit.is_set() happens here. This
449 # is concerning as this means a signal (often SIGTERM) was received while 480 # is concerning as this means a signal (often SIGTERM) was received while
450 # running the task. Make sure the host is properly restarting. 481 # running the task. Make sure the host is properly restarting.
(...skipping 36 matching lines...) Expand 10 before | Expand all | Expand 10 after
487 if not manifest['command']: 518 if not manifest['command']:
488 hard_timeout += manifest['io_timeout'] or 600 519 hard_timeout += manifest['io_timeout'] or 600
489 520
490 url = manifest.get('host', botobj.server) 521 url = manifest.get('host', botobj.server)
491 task_dimensions = manifest['dimensions'] 522 task_dimensions = manifest['dimensions']
492 task_result = {} 523 task_result = {}
493 524
494 failure = False 525 failure = False
495 internal_failure = False 526 internal_failure = False
496 msg = None 527 msg = None
528 headers_dumper = None
497 work_dir = os.path.join(botobj.base_dir, 'work') 529 work_dir = os.path.join(botobj.base_dir, 'work')
498 try: 530 try:
499 try: 531 try:
500 if os.path.isdir(work_dir): 532 if os.path.isdir(work_dir):
501 file_path.rmtree(work_dir) 533 file_path.rmtree(work_dir)
502 except OSError: 534 except OSError:
503 # If a previous task created an undeleteable file/directory inside 'work', 535 # If a previous task created an undeleteable file/directory inside 'work',
504 # make sure that following tasks are not affected. This is done by working 536 # make sure that following tasks are not affected. This is done by working
505 # around the undeleteable directory by creating a temporary directory 537 # around the undeleteable directory by creating a temporary directory
506 # instead. This is not normal behavior. The bot will report a failure on 538 # instead. This is not normal behavior. The bot will report a failure on
507 # start. 539 # start.
508 work_dir = tempfile.mkdtemp(dir=botobj.base_dir, prefix='work') 540 work_dir = tempfile.mkdtemp(dir=botobj.base_dir, prefix='work')
509 else: 541 else:
510 os.makedirs(work_dir) 542 os.makedirs(work_dir)
511 543
512 env = os.environ.copy() 544 env = os.environ.copy()
513 # Windows in particular does not tolerate unicode strings in environment 545 # Windows in particular does not tolerate unicode strings in environment
514 # variables. 546 # variables.
515 env['SWARMING_TASK_ID'] = task_id.encode('ascii') 547 env['SWARMING_TASK_ID'] = task_id.encode('ascii')
516 548
517 task_in_file = os.path.join(work_dir, 'task_runner_in.json') 549 task_in_file = os.path.join(work_dir, 'task_runner_in.json')
518 with open(task_in_file, 'wb') as f: 550 with open(task_in_file, 'wb') as f:
519 f.write(json.dumps(manifest)) 551 f.write(json.dumps(manifest))
520 call_hook(botobj, 'on_before_task') 552 call_hook(botobj, 'on_before_task')
521 task_result_file = os.path.join(work_dir, 'task_runner_out.json') 553 task_result_file = os.path.join(work_dir, 'task_runner_out.json')
522 if os.path.exists(task_result_file): 554 if os.path.exists(task_result_file):
523 os.remove(task_result_file) 555 os.remove(task_result_file)
556
557 # Start a thread that periodically puts authentication header in a file on
558 # disk. task_runner reads them from there before making HTTP calls to the
559 # swarming server.
560 headers_file = os.path.join(work_dir, 'bot_auth_headers.json')
561 if botobj.remote.uses_auth:
562 headers_dumper = AuthHeadersDumper(botobj.remote, headers_file)
563 headers_dumper.start()
564
524 command = [ 565 command = [
525 sys.executable, THIS_FILE, 'task_runner', 566 sys.executable, THIS_FILE, 'task_runner',
526 '--swarming-server', url, 567 '--swarming-server', url,
527 '--in-file', task_in_file, 568 '--in-file', task_in_file,
528 '--out-file', task_result_file, 569 '--out-file', task_result_file,
529 '--cost-usd-hour', str(botobj.state.get('cost_usd_hour') or 0.), 570 '--cost-usd-hour', str(botobj.state.get('cost_usd_hour') or 0.),
530 # Include the time taken to poll the task in the cost. 571 # Include the time taken to poll the task in the cost.
531 '--start', str(start), 572 '--start', str(start),
532 '--min-free-space', str(get_min_free_space()), 573 '--min-free-space', str(get_min_free_space()),
533 ] 574 ]
575 if botobj.remote.uses_auth:
576 command.extend(['--auth-headers-file', headers_file])
534 logging.debug('Running command: %s', command) 577 logging.debug('Running command: %s', command)
578
535 # Put the output file into the current working directory, which should be 579 # Put the output file into the current working directory, which should be
536 # the one containing swarming_bot.zip. 580 # the one containing swarming_bot.zip.
537 log_path = os.path.join(botobj.base_dir, 'logs', 'task_runner_stdout.log') 581 log_path = os.path.join(botobj.base_dir, 'logs', 'task_runner_stdout.log')
538 os_utilities.roll_log(log_path) 582 os_utilities.roll_log(log_path)
539 os_utilities.trim_rolled_log(log_path) 583 os_utilities.trim_rolled_log(log_path)
540 with open(log_path, 'a+b') as f: 584 with open(log_path, 'a+b') as f:
541 proc = subprocess42.Popen( 585 proc = subprocess42.Popen(
542 command, 586 command,
543 detached=True, 587 detached=True,
544 cwd=botobj.base_dir, 588 cwd=botobj.base_dir,
(...skipping 40 matching lines...) Expand 10 before | Expand all | Expand 10 after
585 failure = bool(task_result.get('exit_code')) if task_result else False 629 failure = bool(task_result.get('exit_code')) if task_result else False
586 return not internal_failure and not failure 630 return not internal_failure and not failure
587 except Exception as e: 631 except Exception as e:
588 # Failures include IOError when writing if the disk is full, OSError if 632 # Failures include IOError when writing if the disk is full, OSError if
589 # swarming_bot.zip doesn't exist anymore, etc. 633 # swarming_bot.zip doesn't exist anymore, etc.
590 logging.exception('run_manifest failed') 634 logging.exception('run_manifest failed')
591 msg = 'Internal exception occured: %s\n%s' % ( 635 msg = 'Internal exception occured: %s\n%s' % (
592 e, traceback.format_exc()[-2048:]) 636 e, traceback.format_exc()[-2048:])
593 internal_failure = True 637 internal_failure = True
594 finally: 638 finally:
639 if headers_dumper:
640 headers_dumper.stop()
595 if internal_failure: 641 if internal_failure:
596 post_error_task(botobj, msg, task_id) 642 post_error_task(botobj, msg, task_id)
597 call_hook( 643 call_hook(
598 botobj, 'on_after_task', failure, internal_failure, task_dimensions, 644 botobj, 'on_after_task', failure, internal_failure, task_dimensions,
599 task_result) 645 task_result)
600 if os.path.isdir(work_dir): 646 if os.path.isdir(work_dir):
601 try: 647 try:
602 file_path.rmtree(work_dir) 648 file_path.rmtree(work_dir)
603 except Exception as e: 649 except Exception as e:
604 botobj.post_error( 650 botobj.post_error(
(...skipping 10 matching lines...) Expand all
615 661
616 Does not return. 662 Does not return.
617 """ 663 """
618 # Alternate between .1.zip and .2.zip. 664 # Alternate between .1.zip and .2.zip.
619 new_zip = 'swarming_bot.1.zip' 665 new_zip = 'swarming_bot.1.zip'
620 if os.path.basename(THIS_FILE) == new_zip: 666 if os.path.basename(THIS_FILE) == new_zip:
621 new_zip = 'swarming_bot.2.zip' 667 new_zip = 'swarming_bot.2.zip'
622 new_zip = os.path.join(os.path.dirname(THIS_FILE), new_zip) 668 new_zip = os.path.join(os.path.dirname(THIS_FILE), new_zip)
623 669
624 # Download as a new file. 670 # Download as a new file.
625 url = botobj.server + '/swarming/api/v1/bot/bot_code/%s' % version 671 url_path = '/swarming/api/v1/bot/bot_code/%s' % version
626 if not net.url_retrieve(new_zip, url): 672 if not botobj.remote.url_retrieve(new_zip, url_path):
627 # It can happen when a server is rapidly updated multiple times in a row. 673 # It can happen when a server is rapidly updated multiple times in a row.
628 botobj.post_error( 674 botobj.post_error(
629 'Unable to download %s from %s; first tried version %s' % 675 'Unable to download %s from %s; first tried version %s' %
630 (new_zip, url, version)) 676 (new_zip, botobj.server + url_path, version))
631 # Poll again, this may work next time. To prevent busy-loop, sleep a little. 677 # Poll again, this may work next time. To prevent busy-loop, sleep a little.
632 time.sleep(2) 678 time.sleep(2)
633 return 679 return
634 680
635 s = os.stat(new_zip) 681 s = os.stat(new_zip)
636 logging.info('Restarting to %s; %d bytes.', new_zip, s.st_size) 682 logging.info('Restarting to %s; %d bytes.', new_zip, s.st_size)
637 sys.stdout.flush() 683 sys.stdout.flush()
638 sys.stderr.flush() 684 sys.stderr.flush()
639 685
640 proc = subprocess42.Popen( 686 proc = subprocess42.Popen(
(...skipping 44 matching lines...) Expand 10 before | Expand all | Expand 10 after
685 cur = os.stat(THIS_FILE) 731 cur = os.stat(THIS_FILE)
686 if org.st_size == org.st_size and org.st_mtime >= cur.st_mtime: 732 if org.st_size == org.st_size and org.st_mtime >= cur.st_mtime:
687 return 733 return
688 734
689 # Copy the file back. 735 # Copy the file back.
690 shutil.copy(THIS_FILE, golden) 736 shutil.copy(THIS_FILE, golden)
691 except Exception as e: 737 except Exception as e:
692 botobj.post_error('Failed to update LKGBC: %s' % e) 738 botobj.post_error('Failed to update LKGBC: %s' % e)
693 739
694 740
741 class AuthHeadersDumper(object):
742 """Represents a thread that dumps auth headers to a file on disk each minute.
743
744 'task_runner' reads them from there when making HTTP calls.
745 """
746
747 def __init__(self, remote, path):
748 self._remote = remote
749 self._path = path
750 self._thread = None
751 self._signal = None
752
753 def start(self):
754 """Starts a thread that dumps headers to the file."""
755 assert self._thread is None
756 self._dump_headers() # initial dump
757 self._signal = Queue.Queue()
758 self._thread = threading.Thread(
759 target=self._run,
760 args=(self._signal,),
761 name='AuthHeadersDumper %s' % self._path)
762 self._thread.daemon = True
763 self._thread.start()
764
765 def stop(self):
766 """Stops the dumping thread (if it is running)."""
767 if not self._thread:
768 return
769 self._signal.put(None)
770 self._thread.join(60) # don't wait forever
771 if self._thread.is_alive():
772 logging.error('AuthHeadersDumper thread failed to terminate in time')
773 self._signal = None
774 self._thread = None
775
776 def _dump_headers(self):
777 logging.info('Dumping auth headers to %s', self._path)
778 # TODO(vadimsh): Implement.
779
780 def _run(self, signal):
781 while True:
782 try:
783 signal.get(timeout=60)
784 return # the stop signal received, quit the thread
785 except Queue.Empty:
786 self._dump_headers()
787
788
695 def get_config(): 789 def get_config():
696 """Returns the data from config.json.""" 790 """Returns the data from config.json."""
697 global _ERROR_HANDLER_WAS_REGISTERED 791 global _ERROR_HANDLER_WAS_REGISTERED
698 792
699 with contextlib.closing(zipfile.ZipFile(THIS_FILE, 'r')) as f: 793 with contextlib.closing(zipfile.ZipFile(THIS_FILE, 'r')) as f:
700 config = json.load(f.open('config/config.json', 'r')) 794 config = json.load(f.open('config/config.json', 'r'))
701 795
702 server = config.get('server', '') 796 server = config.get('server', '')
703 if not _ERROR_HANDLER_WAS_REGISTERED and server: 797 if not _ERROR_HANDLER_WAS_REGISTERED and server:
704 on_error.report_on_exception_exit(server) 798 on_error.report_on_exception_exit(server)
(...skipping 33 matching lines...) Expand 10 before | Expand all | Expand 10 after
738 832
739 error = None 833 error = None
740 if len(args) != 0: 834 if len(args) != 0:
741 error = 'Unexpected arguments: %s' % args 835 error = 'Unexpected arguments: %s' % args
742 try: 836 try:
743 return run_bot(error) 837 return run_bot(error)
744 finally: 838 finally:
745 call_hook(bot.Bot(None, None, None, os.path.dirname(THIS_FILE), None), 839 call_hook(bot.Bot(None, None, None, os.path.dirname(THIS_FILE), None),
746 'on_bot_shutdown') 840 'on_bot_shutdown')
747 logging.info('main() returning') 841 logging.info('main() returning')
OLDNEW
« no previous file with comments | « appengine/swarming/swarming_bot/api/bot.py ('k') | appengine/swarming/swarming_bot/bot_code/remote_client.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698