| Index: appengine/swarming/swarming_bot/bot_code/file_refresher.py | 
| diff --git a/appengine/swarming/swarming_bot/bot_code/file_refresher.py b/appengine/swarming/swarming_bot/bot_code/file_refresher.py | 
| new file mode 100644 | 
| index 0000000000000000000000000000000000000000..9eb8361eb44aacd1f59dad23f6ac7f52f8061a9a | 
| --- /dev/null | 
| +++ b/appengine/swarming/swarming_bot/bot_code/file_refresher.py | 
| @@ -0,0 +1,92 @@ | 
| +# Copyright 2016 The LUCI Authors. All rights reserved. | 
| +# Use of this source code is governed under the Apache License, Version 2.0 | 
| +# that can be found in the LICENSE file. | 
| + | 
| +import json | 
| +import logging | 
| +import Queue | 
| +import threading | 
| +import time | 
| + | 
| +from utils import file_path | 
| + | 
| + | 
| +class FileRefresher(object): | 
| +  """Represents a thread that periodically dumps result of a callback to a file. | 
| + | 
| +  Used by bot_main to send authnetication headers to task_runner. task_runner | 
| +  reads them from the file when making HTTP calls. | 
| + | 
| +  Uses JSON for serialization. Doesn't delete the file when stopped. | 
| +  """ | 
| + | 
| +  def __init__(self, path, producer_callback, interval_sec=60): | 
| +    self._path = path | 
| +    self._producer_callback = producer_callback | 
| +    self._interval_sec = interval_sec | 
| +    self._thread = None | 
| +    self._signal = None | 
| +    self._last_dumped_blob = None | 
| + | 
| +  def start(self): | 
| +    """Starts a thread that dumps value to the file.""" | 
| +    assert self._thread is None | 
| +    self._dump() # initial dump | 
| +    self._signal = Queue.Queue() | 
| +    self._thread = threading.Thread( | 
| +      target=self._run, | 
| +      args=(self._signal,), | 
| +      name='FileRefresher %s' % self._path) | 
| +    self._thread.daemon = True | 
| +    self._thread.start() | 
| + | 
| +  def stop(self): | 
| +    """Stops the dumping thread (if it is running).""" | 
| +    if not self._thread: | 
| +      return | 
| +    self._signal.put(None) | 
| +    self._thread.join(60) # don't wait forever | 
| +    if self._thread.is_alive(): | 
| +      logging.error('FileRefresher thread failed to terminate in time') | 
| +    self._signal = None | 
| +    self._thread = None | 
| + | 
| +  def _dump(self): | 
| +    try: | 
| +      blob = json.dumps( | 
| +          self._producer_callback(), | 
| +          sort_keys=True, | 
| +          indent=2, | 
| +          separators=(',', ': ')) | 
| +    except Exception: | 
| +      logging.exception('Unexpected exception in the callback') | 
| +      return | 
| +    if blob == self._last_dumped_blob: | 
| +      return # already have it on disk | 
| +    logging.info('Updating %s', self._path) | 
| + | 
| +    # On Windows the file may be locked by reading process. Don't freak out, | 
| +    # just retry a bit later. | 
| +    attempts = 100 | 
| +    while True: | 
| +      try: | 
| +        file_path.replace_file_content(self._path, blob) | 
| +        self._last_dumped_blob = blob | 
| +        return # success! | 
| +      except (IOError, OSError) as e: | 
| +        logging.error('Failed to update the file: %s', e) | 
| +      if not attempts: | 
| +        logging.error( | 
| +            'Failed to update the file %s after many attempts, giving up', | 
| +            self._path) | 
| +        return | 
| +      attempts -= 1 | 
| +      time.sleep(0.1) | 
| + | 
| +  def _run(self, signal): | 
| +    while True: | 
| +      try: | 
| +        signal.get(timeout=self._interval_sec) | 
| +        return # the stop signal received, quit the thread | 
| +      except Queue.Empty: | 
| +        self._dump() | 
|  |