Chromium Code Reviews| Index: appengine/swarming/swarming_bot/bot_code/file_refresher.py |
| diff --git a/appengine/swarming/swarming_bot/bot_code/file_refresher.py b/appengine/swarming/swarming_bot/bot_code/file_refresher.py |
| new file mode 100644 |
| index 0000000000000000000000000000000000000000..25d4dffecc0b81d2f4a2b2df710b152a1e83ebf7 |
| --- /dev/null |
| +++ b/appengine/swarming/swarming_bot/bot_code/file_refresher.py |
| @@ -0,0 +1,92 @@ |
| +# Copyright 2016 The LUCI Authors. All rights reserved. |
| +# Use of this source code is governed under the Apache License, Version 2.0 |
| +# that can be found in the LICENSE file. |
| + |
| +import json |
| +import logging |
| +import Queue |
| +import threading |
| +import time |
| + |
| +from utils import file_path |
| + |
| + |
| +class FileRefresher(object): |
| + """Represents a thread that periodically dumps result of a callback to a file. |
| + |
| + Used by bot_main to send authnetication headers to task_runner. task_runner |
| + reads them from the file when making HTTP calls. |
| + |
| + Uses JSON for serialization. Doesn't delete the file when stopped. |
| + """ |
| + |
| + def __init__(self, path, producer_callback, interval_sec=60): |
| + self._path = path |
| + self._producer_callback = producer_callback |
| + self._interval_sec = interval_sec |
| + self._thread = None |
| + self._signal = None |
| + self._last_dumped_blob = None |
| + |
| + def start(self): |
| + """Starts a thread that dumps value to the file.""" |
| + assert self._thread is None |
| + self._dump() # initial dump |
| + self._signal = Queue.Queue() |
| + self._thread = threading.Thread( |
| + target=self._run, |
|
M-A Ruel
2016/06/03 20:00:49
normally +4
Vadim Sh.
2016/06/03 23:37:55
Done.
|
| + args=(self._signal,), |
| + name='FileRefresher %s' % self._path) |
| + self._thread.daemon = True |
| + self._thread.start() |
| + |
| + def stop(self): |
| + """Stops the dumping thread (if it is running).""" |
| + if not self._thread: |
| + return |
| + self._signal.put(None) |
| + self._thread.join(60) # don't wait forever |
| + if self._thread.is_alive(): |
| + logging.error('FileRefresher thread failed to terminate in time') |
| + self._signal = None |
| + self._thread = None |
| + |
| + def _dump(self): |
| + try: |
| + blob = json.dumps( |
| + self._producer_callback(), |
| + sort_keys=True, |
| + indent=2, |
| + separators=(',', ': ')) |
| + except Exception: |
| + logging.exception('Unexpected exception in the callback') |
| + return |
| + if blob == self._last_dumped_blob: |
| + return # already have it on disk |
| + logging.info('Updating %s', self._path) |
| + |
| + # On Windows the file may be locked by reading process. Don't freak out, |
| + # just retry a bit later. |
| + attempts = 100 |
| + while True: |
| + try: |
| + file_path.atomic_replace(self._path, blob) |
| + self._last_dumped_blob = blob |
| + return # success! |
| + except (IOError, OSError) as e: |
| + logging.error('Failed to update the file: %s', e) |
| + if not attempts: |
| + logging.error( |
| + 'Failed to update the file %s after many attempts, giving up', |
| + self._path) |
| + return |
| + attempts -= 1 |
| + time.sleep(0.1) |
| + |
| + def _run(self, signal): |
|
M-A Ruel
2016/06/03 20:00:49
why have you made it reusable, e.g. we can start()
Vadim Sh.
2016/06/03 23:37:55
I didn't intend it to be reusable, I just happened
|
| + while True: |
| + try: |
| + signal.get(timeout=self._interval_sec) |
| + return # the stop signal received, quit the thread |
| + except Queue.Empty: |
| + self._dump() |