| Index: appengine/swarming/swarming_bot/bot_code/file_refresher.py
|
| diff --git a/appengine/swarming/swarming_bot/bot_code/file_refresher.py b/appengine/swarming/swarming_bot/bot_code/file_refresher.py
|
| new file mode 100644
|
| index 0000000000000000000000000000000000000000..9eb8361eb44aacd1f59dad23f6ac7f52f8061a9a
|
| --- /dev/null
|
| +++ b/appengine/swarming/swarming_bot/bot_code/file_refresher.py
|
| @@ -0,0 +1,92 @@
|
| +# Copyright 2016 The LUCI Authors. All rights reserved.
|
| +# Use of this source code is governed under the Apache License, Version 2.0
|
| +# that can be found in the LICENSE file.
|
| +
|
| +import json
|
| +import logging
|
| +import Queue
|
| +import threading
|
| +import time
|
| +
|
| +from utils import file_path
|
| +
|
| +
|
| +class FileRefresher(object):
|
| + """Represents a thread that periodically dumps result of a callback to a file.
|
| +
|
| + Used by bot_main to send authnetication headers to task_runner. task_runner
|
| + reads them from the file when making HTTP calls.
|
| +
|
| + Uses JSON for serialization. Doesn't delete the file when stopped.
|
| + """
|
| +
|
| + def __init__(self, path, producer_callback, interval_sec=60):
|
| + self._path = path
|
| + self._producer_callback = producer_callback
|
| + self._interval_sec = interval_sec
|
| + self._thread = None
|
| + self._signal = None
|
| + self._last_dumped_blob = None
|
| +
|
| + def start(self):
|
| + """Starts a thread that dumps value to the file."""
|
| + assert self._thread is None
|
| + self._dump() # initial dump
|
| + self._signal = Queue.Queue()
|
| + self._thread = threading.Thread(
|
| + target=self._run,
|
| + args=(self._signal,),
|
| + name='FileRefresher %s' % self._path)
|
| + self._thread.daemon = True
|
| + self._thread.start()
|
| +
|
| + def stop(self):
|
| + """Stops the dumping thread (if it is running)."""
|
| + if not self._thread:
|
| + return
|
| + self._signal.put(None)
|
| + self._thread.join(60) # don't wait forever
|
| + if self._thread.is_alive():
|
| + logging.error('FileRefresher thread failed to terminate in time')
|
| + self._signal = None
|
| + self._thread = None
|
| +
|
| + def _dump(self):
|
| + try:
|
| + blob = json.dumps(
|
| + self._producer_callback(),
|
| + sort_keys=True,
|
| + indent=2,
|
| + separators=(',', ': '))
|
| + except Exception:
|
| + logging.exception('Unexpected exception in the callback')
|
| + return
|
| + if blob == self._last_dumped_blob:
|
| + return # already have it on disk
|
| + logging.info('Updating %s', self._path)
|
| +
|
| + # On Windows the file may be locked by reading process. Don't freak out,
|
| + # just retry a bit later.
|
| + attempts = 100
|
| + while True:
|
| + try:
|
| + file_path.replace_file_content(self._path, blob)
|
| + self._last_dumped_blob = blob
|
| + return # success!
|
| + except (IOError, OSError) as e:
|
| + logging.error('Failed to update the file: %s', e)
|
| + if not attempts:
|
| + logging.error(
|
| + 'Failed to update the file %s after many attempts, giving up',
|
| + self._path)
|
| + return
|
| + attempts -= 1
|
| + time.sleep(0.1)
|
| +
|
| + def _run(self, signal):
|
| + while True:
|
| + try:
|
| + signal.get(timeout=self._interval_sec)
|
| + return # the stop signal received, quit the thread
|
| + except Queue.Empty:
|
| + self._dump()
|
|
|