Index: appengine/swarming/swarming_bot/bot_code/file_reader.py |
diff --git a/appengine/swarming/swarming_bot/bot_code/file_reader.py b/appengine/swarming/swarming_bot/bot_code/file_reader.py |
new file mode 100644 |
index 0000000000000000000000000000000000000000..b967d73a816bf7f45e52d7dc7f3889b5d5c295b9 |
--- /dev/null |
+++ b/appengine/swarming/swarming_bot/bot_code/file_reader.py |
@@ -0,0 +1,107 @@ |
+# Copyright 2016 The LUCI Authors. All rights reserved. |
+# Use of this source code is governed under the Apache License, Version 2.0 |
+# that can be found in the LICENSE file. |
+ |
+import json |
+import logging |
+import Queue |
+import threading |
+ |
+ |
+class FatalReadError(Exception): |
+ """Raised by 'FileReaderThread.start' if it can't read the file.""" |
+ |
+ |
+class FileReaderThread(object): |
+ """Represents a thread that periodically rereads file from disk. |
+ |
+ Used by task_runner to read authentication headers generated by bot_main. |
+ |
+ Uses JSON for serialization. |
+ |
+ The instance is not reusable (i.e. once stopped, cannot be started again). |
+ """ |
+ |
+ def __init__(self, path, interval_sec=60, max_attempts=100): |
+ self._path = path |
+ self._interval_sec = interval_sec |
+ self._max_attempts = max_attempts |
+ self._thread = None |
+ self._signal = Queue.Queue() |
+ self._lock = threading.Lock() |
+ self._last_value = None |
+ |
+ def start(self): |
+ """Starts the thread that periodically rereads the value. |
+ |
+ Once 'start' returns, 'last_value' can be used to grab the read value. It |
+ will be kept in-sync with the contents of the file until 'stop' is called. |
+ |
+ Raises: |
+ FatalReadError is the file cannot be read even after many retries. |
+ """ |
+ assert self._thread is None |
+ self._read() # initial read |
+ self._thread = threading.Thread( |
+ target=self._run, name='FileReaderThread %s' % self._path) |
+ self._thread.daemon = True |
+ self._thread.start() |
+ |
+ def stop(self): |
+ """Stops the reading thread (if it is running).""" |
+ if not self._thread: |
+ return |
+ self._signal.put(None) |
+ self._thread.join(60) # don't wait forever |
+ if self._thread.is_alive(): |
+ logging.error('FileReaderThread failed to terminate in time') |
+ |
+ @property |
+ def last_value(self): |
+ """Last read value.""" |
+ with self._lock: |
+ return self._last_value |
+ |
+ def _read(self): |
+ """Attempts to read the file, retrying a bunch of times. |
+ |
+ Returns: |
+ True to carry on, False to exit the thread. |
+ """ |
+ attempts = self._max_attempts |
+ while True: |
+ try: |
+ with open(self._path, 'rb') as f: |
+ body = json.load(f) |
+ with self._lock: |
+ self._last_value = body |
+ return True # success! |
+ except (IOError, OSError, ValueError) as e: |
+ last_error = 'Failed to read auth headers from %s: %s' % (self._path, e) |
+ attempts -= 1 |
+ if not attempts: |
+ raise FatalReadError(last_error) |
+ if not self._wait(0.05): |
+ return False |
+ |
+ def _wait(self, timeout): |
+ """Waits for the given duration or until the stop signal. |
+ |
+ Returns: |
+ True if waited, False if received the stop signal. |
+ """ |
+ try: |
+ self._signal.get(timeout=timeout) |
+ return False |
+ except Queue.Empty: |
+ return True |
+ |
+ def _run(self): |
+ while self._wait(self._interval_sec): |
+ try: |
+ if not self._read(): |
+ return |
+ except FatalReadError as e: |
+ # Log the error and simply keep last read value as it was. 'start' |
+ # makes sure to read it at least once. |
+ logging.error('Can\'t reread the file: %s', e) |