Chromium Code Reviews| Index: appengine/swarming/swarming_bot/bot_code/file_reader.py |
| diff --git a/appengine/swarming/swarming_bot/bot_code/file_reader.py b/appengine/swarming/swarming_bot/bot_code/file_reader.py |
| new file mode 100644 |
| index 0000000000000000000000000000000000000000..9d50be430fc2928948f782931ee24f62f5fbe847 |
| --- /dev/null |
| +++ b/appengine/swarming/swarming_bot/bot_code/file_reader.py |
| @@ -0,0 +1,93 @@ |
| +# Copyright 2016 The LUCI Authors. All rights reserved. |
| +# Use of this source code is governed under the Apache License, Version 2.0 |
| +# that can be found in the LICENSE file. |
| + |
| +import json |
| +import logging |
| +import Queue |
| +import threading |
| +import time |
| + |
| + |
| +class FatalReadError(Exception): |
| + """Raised by 'FileReaderThread.start' if it can't read the file.""" |
| + |
| + |
| +class FileReaderThread(object): |
| + """Represents a thread that periodically rereads file from disk. |
| + |
| + Used by task_runner to read authentication headers generated by bot_main. |
| + |
| + Uses JSON for serialization. |
| + |
| + The instance is not reusable (i.e. once stopped, cannot be started again). |
| + """ |
| + |
| + def __init__(self, path, interval_sec=60, max_attempts=100): |
| + self._path = path |
| + self._interval_sec = interval_sec |
| + self._max_attempts = max_attempts |
| + self._thread = None |
| + self._signal = Queue.Queue() |
| + self._lock = threading.Lock() |
| + self._last_value = None |
| + |
| + def start(self): |
| + """Starts the thread that periodically rereads the value. |
| + |
| + Once 'start' returns, 'last_value' can be used to grab the read value. It |
| + will be kept in-sync with the contents of the file until 'stop' is called. |
| + |
| + Raises: |
| + FatalReadError is the file cannot be read even after many retries. |
| + """ |
| + assert self._thread is None |
| + self._read() # initial read |
| + self._thread = threading.Thread( |
| + target=self._run, name='FileReaderThread %s' % self._path) |
| + self._thread.daemon = True |
| + self._thread.start() |
| + |
| + def stop(self): |
| + """Stops the reading thread (if it is running).""" |
| + if not self._thread: |
| + return |
| + self._signal.put(None) |
| + self._thread.join(60) # don't wait forever |
| + if self._thread.is_alive(): |
| + logging.error('FileReaderThread failed to terminate in time') |
| + |
| + @property |
| + def last_value(self): |
| + """Last read value.""" |
| + with self._lock: |
| + return self._last_value |
| + |
| + def _read(self): |
| + attempts = self._max_attempts |
| + while True: |
| + try: |
| + with open(self._path, 'rb') as f: |
| + body = json.load(f) |
| + with self._lock: |
| + self._last_value = body |
| + return # success! |
| + except (OSError, IOError, ValueError) as e: |
|
M-A Ruel
2016/06/06 21:18:36
IOError, OSError, ValueError
Vadim Sh.
2016/06/06 22:17:09
Done.
|
| + last_error = 'Failed to read auth headers from %s: %s' % (self._path, e) |
| + attempts -= 1 |
| + if not attempts: |
| + raise FatalReadError(last_error) |
| + time.sleep(0.05) |
|
M-A Ruel
2016/06/06 21:18:36
I'd prefer using:
self._signal.get(timeout=0.05)
Vadim Sh.
2016/06/06 22:17:09
Done.
|
| + |
| + def _run(self): |
| + while True: |
| + try: |
| + self._signal.get(timeout=self._interval_sec) |
| + return # the stop signal received, quit the thread |
| + except Queue.Empty: |
| + try: |
| + self._read() |
| + except FatalReadError as e: |
| + # Log the error and simply keep last read value as it was. 'start' |
| + # makes sure to read it at least once. |
| + logging.error('Can\'t reread the file: %s', e) |