OLD | NEW |
---|---|
1 # Copyright 2013 The LUCI Authors. All rights reserved. | 1 # Copyright 2013 The LUCI Authors. All rights reserved. |
2 # Use of this source code is governed under the Apache License, Version 2.0 | 2 # Use of this source code is governed under the Apache License, Version 2.0 |
3 # that can be found in the LICENSE file. | 3 # that can be found in the LICENSE file. |
4 | 4 |
5 """Swarming bot main process. | 5 """Swarming bot main process. |
6 | 6 |
7 This is the program that communicates with the Swarming server, ensures the code | 7 This is the program that communicates with the Swarming server, ensures the code |
8 is always up to date and executes a child process to run tasks and upload | 8 is always up to date and executes a child process to run tasks and upload |
9 results back. | 9 results back. |
10 | 10 |
11 It manages self-update and rebooting the host in case of problems. | 11 It manages self-update and rebooting the host in case of problems. |
12 | 12 |
13 Set the environment variable SWARMING_LOAD_TEST=1 to disable the use of | 13 Set the environment variable SWARMING_LOAD_TEST=1 to disable the use of |
14 server-provided bot_config.py. This permits safe load testing. | 14 server-provided bot_config.py. This permits safe load testing. |
15 """ | 15 """ |
16 | 16 |
17 import contextlib | 17 import contextlib |
18 import functools | |
18 import json | 19 import json |
19 import logging | 20 import logging |
20 import optparse | 21 import optparse |
21 import os | 22 import os |
23 import Queue | |
22 import shutil | 24 import shutil |
23 import signal | |
24 import sys | 25 import sys |
25 import tempfile | 26 import tempfile |
26 import threading | 27 import threading |
27 import time | 28 import time |
28 import traceback | 29 import traceback |
29 import zipfile | 30 import zipfile |
30 | 31 |
31 import common | 32 import common |
33 import remote_client | |
32 import singleton | 34 import singleton |
33 from api import bot | 35 from api import bot |
34 from api import os_utilities | 36 from api import os_utilities |
35 from utils import file_path | 37 from utils import file_path |
36 from utils import net | 38 from utils import net |
37 from utils import on_error | 39 from utils import on_error |
38 from utils import subprocess42 | 40 from utils import subprocess42 |
39 from utils import zip_package | 41 from utils import zip_package |
40 | 42 |
41 | 43 |
(...skipping 121 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
163 should_continue = bot_config.setup_bot(botobj) | 165 should_continue = bot_config.setup_bot(botobj) |
164 except Exception as e: | 166 except Exception as e: |
165 msg = '%s\n%s' % (e, traceback.format_exc()[-2048:]) | 167 msg = '%s\n%s' % (e, traceback.format_exc()[-2048:]) |
166 botobj.post_error('bot_config.setup_bot() threw: %s' % msg) | 168 botobj.post_error('bot_config.setup_bot() threw: %s' % msg) |
167 return | 169 return |
168 | 170 |
169 if not should_continue and not skip_reboot: | 171 if not should_continue and not skip_reboot: |
170 botobj.restart('Starting new swarming bot: %s' % THIS_FILE) | 172 botobj.restart('Starting new swarming bot: %s' % THIS_FILE) |
171 | 173 |
172 | 174 |
175 def get_authentication_headers(botobj): | |
176 """Calls bot_config.get_authentication_headers() if it is defined.""" | |
177 if _in_load_test_mode(): | |
178 return (None, None) | |
179 logging.info('get_authentication_headers()') | |
180 from config import bot_config | |
181 func = getattr(bot_config, 'get_authentication_headers', None) | |
182 return func(botobj) if func else (None, None) | |
M-A Ruel
2016/06/01 17:08:02
wrap that to catch all exceptions like the other f
Vadim Sh.
2016/06/03 01:15:32
Exceptions are handled in RemoteClient, where it k
| |
183 | |
184 | |
173 ### end of bot_config handler part. | 185 ### end of bot_config handler part. |
174 | 186 |
175 | 187 |
176 def get_min_free_space(): | 188 def get_min_free_space(): |
177 """Returns free disk space needed. | 189 """Returns free disk space needed. |
178 | 190 |
179 Add a "250 MiB slack space" for logs, temporary files and whatever other leak. | 191 Add a "250 MiB slack space" for logs, temporary files and whatever other leak. |
180 """ | 192 """ |
181 return int((os_utilities.get_min_free_space(THIS_FILE) + 250.) * 1024 * 1024) | 193 return int((os_utilities.get_min_free_space(THIS_FILE) + 250.) * 1024 * 1024) |
182 | 194 |
(...skipping 38 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
221 exception handler for incoming commands from the Swarming server. If for | 233 exception handler for incoming commands from the Swarming server. If for |
222 any reason the local test runner script can not be run successfully, | 234 any reason the local test runner script can not be run successfully, |
223 this function is invoked. | 235 this function is invoked. |
224 """ | 236 """ |
225 logging.error('Error: %s', error) | 237 logging.error('Error: %s', error) |
226 data = { | 238 data = { |
227 'id': botobj.id, | 239 'id': botobj.id, |
228 'message': error, | 240 'message': error, |
229 'task_id': task_id, | 241 'task_id': task_id, |
230 } | 242 } |
231 return net.url_read_json( | 243 return botobj.remote.url_read_json( |
232 botobj.server + '/swarming/api/v1/bot/task_error/%s' % task_id, data=data) | 244 '/swarming/api/v1/bot/task_error/%s' % task_id, data=data) |
233 | 245 |
234 | 246 |
235 def on_shutdown_hook(b): | 247 def on_shutdown_hook(b): |
236 """Called when the bot is restarting.""" | 248 """Called when the bot is restarting.""" |
237 call_hook(b, 'on_bot_shutdown') | 249 call_hook(b, 'on_bot_shutdown') |
238 # Aggressively set itself up so we ensure the auto-reboot configuration is | 250 # Aggressively set itself up so we ensure the auto-reboot configuration is |
239 # fine before restarting the host. This is important as some tasks delete the | 251 # fine before restarting the host. This is important as some tasks delete the |
240 # autorestart script (!) | 252 # autorestart script (!) |
241 setup_bot(True) | 253 setup_bot(True) |
242 | 254 |
243 | 255 |
244 def get_bot(): | 256 def get_bot(): |
245 """Returns a valid Bot instance. | 257 """Returns a valid Bot instance. |
246 | 258 |
247 Should only be called once in the process lifetime. | 259 Should only be called once in the process lifetime. |
248 """ | 260 """ |
249 # This variable is used to bootstrap the initial bot.Bot object, which then is | 261 # This variable is used to bootstrap the initial bot.Bot object, which then is |
250 # used to get the dimensions and state. | 262 # used to get the dimensions and state. |
251 attributes = { | 263 attributes = { |
252 'dimensions': {u'id': ['none']}, | 264 'dimensions': {u'id': ['none']}, |
253 'state': {}, | 265 'state': {}, |
254 'version': generate_version(), | 266 'version': generate_version(), |
255 } | 267 } |
256 config = get_config() | 268 config = get_config() |
257 assert not config['server'].endswith('/'), config | 269 assert not config['server'].endswith('/'), config |
258 | 270 |
259 # Create a temporary object to call the hooks. | 271 # Use temporary Bot object to call get_attributes. Attributes are needed to |
272 # construct the "real" bot.Bot. | |
273 attributes = get_attributes( | |
274 bot.Bot( | |
275 remote_client.RemoteClient(config['server'], None), | |
276 attributes, | |
277 config['server'], | |
278 config['server_version'], | |
279 os.path.dirname(THIS_FILE), | |
280 on_shutdown_hook)) | |
281 | |
282 # Make remote client callback use the returned bot object. We assume here | |
283 # RemoteClient doesn't call its callback in the constructor (since 'botobj' is | |
284 # undefined during the construction). | |
260 botobj = bot.Bot( | 285 botobj = bot.Bot( |
286 remote_client.RemoteClient( | |
287 config['server'], | |
288 lambda: get_authentication_headers(botobj)), | |
261 attributes, | 289 attributes, |
262 config['server'], | 290 config['server'], |
263 config['server_version'], | 291 config['server_version'], |
264 os.path.dirname(THIS_FILE), | 292 os.path.dirname(THIS_FILE), |
265 on_shutdown_hook) | 293 on_shutdown_hook) |
266 return bot.Bot( | 294 return botobj |
267 get_attributes(botobj), | |
268 config['server'], | |
269 config['server_version'], | |
270 os.path.dirname(THIS_FILE), | |
271 on_shutdown_hook) | |
272 | 295 |
273 | 296 |
274 def clean_isolated_cache(botobj): | 297 def clean_isolated_cache(botobj): |
275 """Asks run_isolated to clean its cache. | 298 """Asks run_isolated to clean its cache. |
276 | 299 |
277 This may take a while but it ensures that in the case of a run_isolated run | 300 This may take a while but it ensures that in the case of a run_isolated run |
278 failed and it temporarily used more space than min_free_disk, it can cleans up | 301 failed and it temporarily used more space than min_free_disk, it can cleans up |
279 the mess properly. | 302 the mess properly. |
280 | 303 |
281 It will remove unexpected files, remove corrupted files, trim the cache size | 304 It will remove unexpected files, remove corrupted files, trim the cache size |
(...skipping 46 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
328 # First thing is to get an arbitrary url. This also ensures the network is | 351 # First thing is to get an arbitrary url. This also ensures the network is |
329 # up and running, which is necessary before trying to get the FQDN below. | 352 # up and running, which is necessary before trying to get the FQDN below. |
330 resp = net.url_read(config['server'] + '/swarming/api/v1/bot/server_ping') | 353 resp = net.url_read(config['server'] + '/swarming/api/v1/bot/server_ping') |
331 if resp is None: | 354 if resp is None: |
332 logging.error('No response from server_ping') | 355 logging.error('No response from server_ping') |
333 except Exception as e: | 356 except Exception as e: |
334 # url_read() already traps pretty much every exceptions. This except | 357 # url_read() already traps pretty much every exceptions. This except |
335 # clause is kept there "just in case". | 358 # clause is kept there "just in case". |
336 logging.exception('server_ping threw') | 359 logging.exception('server_ping threw') |
337 | 360 |
361 # Next we make sure the bot can make authenticated calls by grabbing | |
362 # the auth headers, retrying on errors a bunch of times. We don't give up | |
363 # if it fails though (maybe the bot will "fix itself" later). | |
364 botobj = get_bot() | |
365 try: | |
366 botobj.remote.initialize(quit_bit) | |
367 except remote_client.InitializationError as exc: | |
368 botobj.post_error('failed to grab auth headers: %s' % exc.last_error) | |
369 logging.error('Can\'t grab auth headers, continuing anyway...') | |
370 | |
338 if quit_bit.is_set(): | 371 if quit_bit.is_set(): |
339 logging.info('Early quit 1') | 372 logging.info('Early quit 1') |
340 return 0 | 373 return 0 |
341 | 374 |
342 # If this fails, there's hardly anything that can be done, the bot can't | 375 # If this fails, there's hardly anything that can be done, the bot can't |
343 # even get to the point to be able to self-update. | 376 # even get to the point to be able to self-update. |
344 botobj = get_bot() | 377 resp = botobj.remote.url_read_json( |
345 resp = net.url_read_json( | 378 '/swarming/api/v1/bot/handshake', data=botobj._attributes) |
346 botobj.server + '/swarming/api/v1/bot/handshake', | |
347 data=botobj._attributes) | |
348 if not resp: | 379 if not resp: |
349 logging.error('Failed to contact for handshake') | 380 logging.error('Failed to contact for handshake') |
350 else: | 381 else: |
351 logging.info('Connected to %s', resp.get('server_version')) | 382 logging.info('Connected to %s', resp.get('server_version')) |
352 if resp.get('bot_version') != botobj._attributes['version']: | 383 if resp.get('bot_version') != botobj._attributes['version']: |
353 logging.warning( | 384 logging.warning( |
354 'Found out we\'ll need to update: server said %s; we\'re %s', | 385 'Found out we\'ll need to update: server said %s; we\'re %s', |
355 resp.get('bot_version'), botobj._attributes['version']) | 386 resp.get('bot_version'), botobj._attributes['version']) |
356 | 387 |
357 if arg_error: | 388 if arg_error: |
(...skipping 46 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
404 return 0 | 435 return 0 |
405 | 436 |
406 | 437 |
407 def poll_server(botobj, quit_bit): | 438 def poll_server(botobj, quit_bit): |
408 """Polls the server to run one loop. | 439 """Polls the server to run one loop. |
409 | 440 |
410 Returns True if executed some action, False if server asked the bot to sleep. | 441 Returns True if executed some action, False if server asked the bot to sleep. |
411 """ | 442 """ |
412 # Access to a protected member _XXX of a client class - pylint: disable=W0212 | 443 # Access to a protected member _XXX of a client class - pylint: disable=W0212 |
413 start = time.time() | 444 start = time.time() |
414 resp = net.url_read_json( | 445 resp = botobj.remote.url_read_json( |
415 botobj.server + '/swarming/api/v1/bot/poll', data=botobj._attributes) | 446 '/swarming/api/v1/bot/poll', data=botobj._attributes) |
416 if not resp: | 447 if not resp: |
417 return False | 448 return False |
418 logging.debug('Server response:\n%s', resp) | 449 logging.debug('Server response:\n%s', resp) |
419 | 450 |
420 cmd = resp['cmd'] | 451 cmd = resp['cmd'] |
421 if cmd == 'sleep': | 452 if cmd == 'sleep': |
422 quit_bit.wait(resp['duration']) | 453 quit_bit.wait(resp['duration']) |
423 return False | 454 return False |
424 | 455 |
425 if cmd == 'terminate': | 456 if cmd == 'terminate': |
426 quit_bit.set() | 457 quit_bit.set() |
427 # This is similar to post_update() in task_runner.py. | 458 # This is similar to post_update() in task_runner.py. |
428 params = { | 459 params = { |
429 'cost_usd': 0, | 460 'cost_usd': 0, |
430 'duration': 0, | 461 'duration': 0, |
431 'exit_code': 0, | 462 'exit_code': 0, |
432 'hard_timeout': False, | 463 'hard_timeout': False, |
433 'id': botobj.id, | 464 'id': botobj.id, |
434 'io_timeout': False, | 465 'io_timeout': False, |
435 'output': '', | 466 'output': '', |
436 'output_chunk_start': 0, | 467 'output_chunk_start': 0, |
437 'task_id': resp['task_id'], | 468 'task_id': resp['task_id'], |
438 } | 469 } |
439 net.url_read_json( | 470 botobj.remote.url_read_json( |
440 botobj.server + '/swarming/api/v1/bot/task_update/%s' % resp['task_id'], | 471 '/swarming/api/v1/bot/task_update/%s' % resp['task_id'], |
441 data=params) | 472 data=params) |
442 return False | 473 return False |
443 | 474 |
444 if cmd == 'run': | 475 if cmd == 'run': |
445 if run_manifest(botobj, resp['manifest'], start): | 476 if run_manifest(botobj, resp['manifest'], start): |
446 # Completed a task successfully so update swarming_bot.zip if necessary. | 477 # Completed a task successfully so update swarming_bot.zip if necessary. |
447 update_lkgbc(botobj) | 478 update_lkgbc(botobj) |
448 # TODO(maruel): Handle the case where quit_bit.is_set() happens here. This | 479 # TODO(maruel): Handle the case where quit_bit.is_set() happens here. This |
449 # is concerning as this means a signal (often SIGTERM) was received while | 480 # is concerning as this means a signal (often SIGTERM) was received while |
450 # running the task. Make sure the host is properly restarting. | 481 # running the task. Make sure the host is properly restarting. |
(...skipping 36 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
487 if not manifest['command']: | 518 if not manifest['command']: |
488 hard_timeout += manifest['io_timeout'] or 600 | 519 hard_timeout += manifest['io_timeout'] or 600 |
489 | 520 |
490 url = manifest.get('host', botobj.server) | 521 url = manifest.get('host', botobj.server) |
491 task_dimensions = manifest['dimensions'] | 522 task_dimensions = manifest['dimensions'] |
492 task_result = {} | 523 task_result = {} |
493 | 524 |
494 failure = False | 525 failure = False |
495 internal_failure = False | 526 internal_failure = False |
496 msg = None | 527 msg = None |
528 headers_dumper = None | |
497 work_dir = os.path.join(botobj.base_dir, 'work') | 529 work_dir = os.path.join(botobj.base_dir, 'work') |
498 try: | 530 try: |
499 try: | 531 try: |
500 if os.path.isdir(work_dir): | 532 if os.path.isdir(work_dir): |
501 file_path.rmtree(work_dir) | 533 file_path.rmtree(work_dir) |
502 except OSError: | 534 except OSError: |
503 # If a previous task created an undeleteable file/directory inside 'work', | 535 # If a previous task created an undeleteable file/directory inside 'work', |
504 # make sure that following tasks are not affected. This is done by working | 536 # make sure that following tasks are not affected. This is done by working |
505 # around the undeleteable directory by creating a temporary directory | 537 # around the undeleteable directory by creating a temporary directory |
506 # instead. This is not normal behavior. The bot will report a failure on | 538 # instead. This is not normal behavior. The bot will report a failure on |
507 # start. | 539 # start. |
508 work_dir = tempfile.mkdtemp(dir=botobj.base_dir, prefix='work') | 540 work_dir = tempfile.mkdtemp(dir=botobj.base_dir, prefix='work') |
509 else: | 541 else: |
510 os.makedirs(work_dir) | 542 os.makedirs(work_dir) |
511 | 543 |
512 env = os.environ.copy() | 544 env = os.environ.copy() |
513 # Windows in particular does not tolerate unicode strings in environment | 545 # Windows in particular does not tolerate unicode strings in environment |
514 # variables. | 546 # variables. |
515 env['SWARMING_TASK_ID'] = task_id.encode('ascii') | 547 env['SWARMING_TASK_ID'] = task_id.encode('ascii') |
516 | 548 |
517 task_in_file = os.path.join(work_dir, 'task_runner_in.json') | 549 task_in_file = os.path.join(work_dir, 'task_runner_in.json') |
518 with open(task_in_file, 'wb') as f: | 550 with open(task_in_file, 'wb') as f: |
519 f.write(json.dumps(manifest)) | 551 f.write(json.dumps(manifest)) |
520 call_hook(botobj, 'on_before_task') | 552 call_hook(botobj, 'on_before_task') |
521 task_result_file = os.path.join(work_dir, 'task_runner_out.json') | 553 task_result_file = os.path.join(work_dir, 'task_runner_out.json') |
522 if os.path.exists(task_result_file): | 554 if os.path.exists(task_result_file): |
523 os.remove(task_result_file) | 555 os.remove(task_result_file) |
556 | |
557 # Start a thread that periodically puts authentication header in a file on | |
558 # disk. task_runner reads them from there before making HTTP calls to the | |
559 # swarming server. | |
560 headers_file = os.path.join(work_dir, 'bot_auth_headers.json') | |
561 if botobj.remote.uses_auth: | |
562 headers_dumper = AuthHeadersDumper(botobj.remote, headers_file) | |
563 headers_dumper.start() | |
564 | |
524 command = [ | 565 command = [ |
525 sys.executable, THIS_FILE, 'task_runner', | 566 sys.executable, THIS_FILE, 'task_runner', |
526 '--swarming-server', url, | 567 '--swarming-server', url, |
527 '--in-file', task_in_file, | 568 '--in-file', task_in_file, |
528 '--out-file', task_result_file, | 569 '--out-file', task_result_file, |
529 '--cost-usd-hour', str(botobj.state.get('cost_usd_hour') or 0.), | 570 '--cost-usd-hour', str(botobj.state.get('cost_usd_hour') or 0.), |
530 # Include the time taken to poll the task in the cost. | 571 # Include the time taken to poll the task in the cost. |
531 '--start', str(start), | 572 '--start', str(start), |
532 '--min-free-space', str(get_min_free_space()), | 573 '--min-free-space', str(get_min_free_space()), |
533 ] | 574 ] |
575 if botobj.remote.uses_auth: | |
576 command.extend(['--auth-headers-file', headers_file]) | |
534 logging.debug('Running command: %s', command) | 577 logging.debug('Running command: %s', command) |
578 | |
535 # Put the output file into the current working directory, which should be | 579 # Put the output file into the current working directory, which should be |
536 # the one containing swarming_bot.zip. | 580 # the one containing swarming_bot.zip. |
537 log_path = os.path.join(botobj.base_dir, 'logs', 'task_runner_stdout.log') | 581 log_path = os.path.join(botobj.base_dir, 'logs', 'task_runner_stdout.log') |
538 os_utilities.roll_log(log_path) | 582 os_utilities.roll_log(log_path) |
539 os_utilities.trim_rolled_log(log_path) | 583 os_utilities.trim_rolled_log(log_path) |
540 with open(log_path, 'a+b') as f: | 584 with open(log_path, 'a+b') as f: |
541 proc = subprocess42.Popen( | 585 proc = subprocess42.Popen( |
542 command, | 586 command, |
543 detached=True, | 587 detached=True, |
544 cwd=botobj.base_dir, | 588 cwd=botobj.base_dir, |
(...skipping 40 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
585 failure = bool(task_result.get('exit_code')) if task_result else False | 629 failure = bool(task_result.get('exit_code')) if task_result else False |
586 return not internal_failure and not failure | 630 return not internal_failure and not failure |
587 except Exception as e: | 631 except Exception as e: |
588 # Failures include IOError when writing if the disk is full, OSError if | 632 # Failures include IOError when writing if the disk is full, OSError if |
589 # swarming_bot.zip doesn't exist anymore, etc. | 633 # swarming_bot.zip doesn't exist anymore, etc. |
590 logging.exception('run_manifest failed') | 634 logging.exception('run_manifest failed') |
591 msg = 'Internal exception occured: %s\n%s' % ( | 635 msg = 'Internal exception occured: %s\n%s' % ( |
592 e, traceback.format_exc()[-2048:]) | 636 e, traceback.format_exc()[-2048:]) |
593 internal_failure = True | 637 internal_failure = True |
594 finally: | 638 finally: |
639 if headers_dumper: | |
640 headers_dumper.stop() | |
595 if internal_failure: | 641 if internal_failure: |
596 post_error_task(botobj, msg, task_id) | 642 post_error_task(botobj, msg, task_id) |
597 call_hook( | 643 call_hook( |
598 botobj, 'on_after_task', failure, internal_failure, task_dimensions, | 644 botobj, 'on_after_task', failure, internal_failure, task_dimensions, |
599 task_result) | 645 task_result) |
600 if os.path.isdir(work_dir): | 646 if os.path.isdir(work_dir): |
601 try: | 647 try: |
602 file_path.rmtree(work_dir) | 648 file_path.rmtree(work_dir) |
603 except Exception as e: | 649 except Exception as e: |
604 botobj.post_error( | 650 botobj.post_error( |
(...skipping 10 matching lines...) Expand all Loading... | |
615 | 661 |
616 Does not return. | 662 Does not return. |
617 """ | 663 """ |
618 # Alternate between .1.zip and .2.zip. | 664 # Alternate between .1.zip and .2.zip. |
619 new_zip = 'swarming_bot.1.zip' | 665 new_zip = 'swarming_bot.1.zip' |
620 if os.path.basename(THIS_FILE) == new_zip: | 666 if os.path.basename(THIS_FILE) == new_zip: |
621 new_zip = 'swarming_bot.2.zip' | 667 new_zip = 'swarming_bot.2.zip' |
622 new_zip = os.path.join(os.path.dirname(THIS_FILE), new_zip) | 668 new_zip = os.path.join(os.path.dirname(THIS_FILE), new_zip) |
623 | 669 |
624 # Download as a new file. | 670 # Download as a new file. |
625 url = botobj.server + '/swarming/api/v1/bot/bot_code/%s' % version | 671 url_path = '/swarming/api/v1/bot/bot_code/%s' % version |
626 if not net.url_retrieve(new_zip, url): | 672 if not botobj.remote.url_retrieve(new_zip, url_path): |
627 # It can happen when a server is rapidly updated multiple times in a row. | 673 # It can happen when a server is rapidly updated multiple times in a row. |
628 botobj.post_error( | 674 botobj.post_error( |
629 'Unable to download %s from %s; first tried version %s' % | 675 'Unable to download %s from %s; first tried version %s' % |
630 (new_zip, url, version)) | 676 (new_zip, botobj.server + url_path, version)) |
631 # Poll again, this may work next time. To prevent busy-loop, sleep a little. | 677 # Poll again, this may work next time. To prevent busy-loop, sleep a little. |
632 time.sleep(2) | 678 time.sleep(2) |
633 return | 679 return |
634 | 680 |
635 s = os.stat(new_zip) | 681 s = os.stat(new_zip) |
636 logging.info('Restarting to %s; %d bytes.', new_zip, s.st_size) | 682 logging.info('Restarting to %s; %d bytes.', new_zip, s.st_size) |
637 sys.stdout.flush() | 683 sys.stdout.flush() |
638 sys.stderr.flush() | 684 sys.stderr.flush() |
639 | 685 |
640 proc = subprocess42.Popen( | 686 proc = subprocess42.Popen( |
(...skipping 44 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
685 cur = os.stat(THIS_FILE) | 731 cur = os.stat(THIS_FILE) |
686 if org.st_size == org.st_size and org.st_mtime >= cur.st_mtime: | 732 if org.st_size == org.st_size and org.st_mtime >= cur.st_mtime: |
687 return | 733 return |
688 | 734 |
689 # Copy the file back. | 735 # Copy the file back. |
690 shutil.copy(THIS_FILE, golden) | 736 shutil.copy(THIS_FILE, golden) |
691 except Exception as e: | 737 except Exception as e: |
692 botobj.post_error('Failed to update LKGBC: %s' % e) | 738 botobj.post_error('Failed to update LKGBC: %s' % e) |
693 | 739 |
694 | 740 |
741 class AuthHeadersDumper(object): | |
742 """Represents a thread that dumps auth headers to a file on disk each minute. | |
743 | |
744 'task_runner' reads them from there when making HTTP calls. | |
745 """ | |
746 | |
747 def __init__(self, remote, path): | |
748 self._remote = remote | |
749 self._path = path | |
750 self._thread = None | |
751 self._signal = None | |
752 | |
753 def start(self): | |
754 """Starts a thread that dumps headers to the file.""" | |
755 assert self._thread is None | |
756 self._dump_headers() # initial dump | |
757 self._signal = Queue.Queue() | |
758 self._thread = threading.Thread( | |
759 target=self._run, | |
760 args=(self._signal,), | |
761 name='AuthHeadersDumper %s' % self._path) | |
762 self._thread.daemon = True | |
763 self._thread.start() | |
764 | |
765 def stop(self): | |
766 """Stops the dumping thread (if it is running).""" | |
767 if not self._thread: | |
768 return | |
769 self._signal.put(None) | |
770 self._thread.join(60) # don't wait forever | |
771 if self._thread.is_alive(): | |
772 logging.error('AuthHeadersDumper thread failed to terminate in time') | |
773 self._signal = None | |
774 self._thread = None | |
775 | |
776 def _dump_headers(self): | |
777 logging.info('Dumping auth headers to %s', self._path) | |
778 # TODO(vadimsh): Implement. | |
779 | |
780 def _run(self, signal): | |
781 while True: | |
782 try: | |
783 signal.get(timeout=60) | |
784 return # the stop signal received, quit the thread | |
785 except Queue.Empty: | |
786 self._dump_headers() | |
787 | |
788 | |
695 def get_config(): | 789 def get_config(): |
696 """Returns the data from config.json.""" | 790 """Returns the data from config.json.""" |
697 global _ERROR_HANDLER_WAS_REGISTERED | 791 global _ERROR_HANDLER_WAS_REGISTERED |
698 | 792 |
699 with contextlib.closing(zipfile.ZipFile(THIS_FILE, 'r')) as f: | 793 with contextlib.closing(zipfile.ZipFile(THIS_FILE, 'r')) as f: |
700 config = json.load(f.open('config/config.json', 'r')) | 794 config = json.load(f.open('config/config.json', 'r')) |
701 | 795 |
702 server = config.get('server', '') | 796 server = config.get('server', '') |
703 if not _ERROR_HANDLER_WAS_REGISTERED and server: | 797 if not _ERROR_HANDLER_WAS_REGISTERED and server: |
704 on_error.report_on_exception_exit(server) | 798 on_error.report_on_exception_exit(server) |
(...skipping 33 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
738 | 832 |
739 error = None | 833 error = None |
740 if len(args) != 0: | 834 if len(args) != 0: |
741 error = 'Unexpected arguments: %s' % args | 835 error = 'Unexpected arguments: %s' % args |
742 try: | 836 try: |
743 return run_bot(error) | 837 return run_bot(error) |
744 finally: | 838 finally: |
745 call_hook(bot.Bot(None, None, None, os.path.dirname(THIS_FILE), None), | 839 call_hook(bot.Bot(None, None, None, os.path.dirname(THIS_FILE), None), |
746 'on_bot_shutdown') | 840 'on_bot_shutdown') |
747 logging.info('main() returning') | 841 logging.info('main() returning') |
OLD | NEW |