OLD | NEW |
(Empty) | |
| 1 # Copyright 2016 The LUCI Authors. All rights reserved. |
| 2 # Use of this source code is governed under the Apache License, Version 2.0 |
| 3 # that can be found in the LICENSE file. |
| 4 |
| 5 import json |
| 6 import logging |
| 7 import Queue |
| 8 import threading |
| 9 |
| 10 |
| 11 class FatalReadError(Exception): |
| 12 """Raised by 'FileReaderThread.start' if it can't read the file.""" |
| 13 |
| 14 |
| 15 class FileReaderThread(object): |
| 16 """Represents a thread that periodically rereads file from disk. |
| 17 |
| 18 Used by task_runner to read authentication headers generated by bot_main. |
| 19 |
| 20 Uses JSON for serialization. |
| 21 |
| 22 The instance is not reusable (i.e. once stopped, cannot be started again). |
| 23 """ |
| 24 |
| 25 def __init__(self, path, interval_sec=60, max_attempts=100): |
| 26 self._path = path |
| 27 self._interval_sec = interval_sec |
| 28 self._max_attempts = max_attempts |
| 29 self._thread = None |
| 30 self._signal = Queue.Queue() |
| 31 self._lock = threading.Lock() |
| 32 self._last_value = None |
| 33 |
| 34 def start(self): |
| 35 """Starts the thread that periodically rereads the value. |
| 36 |
| 37 Once 'start' returns, 'last_value' can be used to grab the read value. It |
| 38 will be kept in-sync with the contents of the file until 'stop' is called. |
| 39 |
| 40 Raises: |
| 41 FatalReadError is the file cannot be read even after many retries. |
| 42 """ |
| 43 assert self._thread is None |
| 44 self._read() # initial read |
| 45 self._thread = threading.Thread( |
| 46 target=self._run, name='FileReaderThread %s' % self._path) |
| 47 self._thread.daemon = True |
| 48 self._thread.start() |
| 49 |
| 50 def stop(self): |
| 51 """Stops the reading thread (if it is running).""" |
| 52 if not self._thread: |
| 53 return |
| 54 self._signal.put(None) |
| 55 self._thread.join(60) # don't wait forever |
| 56 if self._thread.is_alive(): |
| 57 logging.error('FileReaderThread failed to terminate in time') |
| 58 |
| 59 @property |
| 60 def last_value(self): |
| 61 """Last read value.""" |
| 62 with self._lock: |
| 63 return self._last_value |
| 64 |
| 65 def _read(self): |
| 66 """Attempts to read the file, retrying a bunch of times. |
| 67 |
| 68 Returns: |
| 69 True to carry on, False to exit the thread. |
| 70 """ |
| 71 attempts = self._max_attempts |
| 72 while True: |
| 73 try: |
| 74 with open(self._path, 'rb') as f: |
| 75 body = json.load(f) |
| 76 with self._lock: |
| 77 self._last_value = body |
| 78 return True # success! |
| 79 except (IOError, OSError, ValueError) as e: |
| 80 last_error = 'Failed to read auth headers from %s: %s' % (self._path, e) |
| 81 attempts -= 1 |
| 82 if not attempts: |
| 83 raise FatalReadError(last_error) |
| 84 if not self._wait(0.05): |
| 85 return False |
| 86 |
| 87 def _wait(self, timeout): |
| 88 """Waits for the given duration or until the stop signal. |
| 89 |
| 90 Returns: |
| 91 True if waited, False if received the stop signal. |
| 92 """ |
| 93 try: |
| 94 self._signal.get(timeout=timeout) |
| 95 return False |
| 96 except Queue.Empty: |
| 97 return True |
| 98 |
| 99 def _run(self): |
| 100 while self._wait(self._interval_sec): |
| 101 try: |
| 102 if not self._read(): |
| 103 return |
| 104 except FatalReadError as e: |
| 105 # Log the error and simply keep last read value as it was. 'start' |
| 106 # makes sure to read it at least once. |
| 107 logging.error('Can\'t reread the file: %s', e) |
OLD | NEW |