Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(2552)

Unified Diff: appengine/swarming/swarming_bot/bot_code/file_refresher.py

Issue 2024313003: Send authorization headers when calling Swarming backend. (Closed) Base URL: https://chromium.googlesource.com/external/github.com/luci/luci-py@master
Patch Set: rebase Created 4 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: appengine/swarming/swarming_bot/bot_code/file_refresher.py
diff --git a/appengine/swarming/swarming_bot/bot_code/file_refresher.py b/appengine/swarming/swarming_bot/bot_code/file_refresher.py
new file mode 100644
index 0000000000000000000000000000000000000000..8f828382ee3585a6083972023ed72fcdb2a82df1
--- /dev/null
+++ b/appengine/swarming/swarming_bot/bot_code/file_refresher.py
@@ -0,0 +1,102 @@
+# Copyright 2016 The LUCI Authors. All rights reserved.
+# Use of this source code is governed under the Apache License, Version 2.0
+# that can be found in the LICENSE file.
+
+import json
+import logging
+import Queue
+import threading
+
+from utils import file_path
+
+
+class FileRefresherThread(object):
+ """Represents a thread that periodically dumps result of a callback to a file.
+
+ Used by bot_main to send authnetication headers to task_runner. task_runner
+ reads them from the file when making HTTP calls.
+
+ Uses JSON for serialization. Doesn't delete the file when stopped.
+
+ The instance is not reusable (i.e. once stopped, cannot be started again).
+ """
+
+ def __init__(self, path, producer_callback, interval_sec=60):
+ self._path = path
+ self._producer_callback = producer_callback
+ self._interval_sec = interval_sec
+ self._thread = None
+ self._signal = Queue.Queue()
+ self._last_dumped_blob = None
+
+ def start(self):
+ """Starts a thread that dumps value to the file."""
+ assert self._thread is None
+ self._dump() # initial dump
+ self._thread = threading.Thread(
+ target=self._run, name='FileRefresherThread %s' % self._path)
+ self._thread.daemon = True
+ self._thread.start()
+
+ def stop(self):
+ """Stops the dumping thread (if it is running)."""
+ if not self._thread:
+ return
+ self._signal.put(None)
+ self._thread.join(60) # don't wait forever
+ if self._thread.is_alive():
+ logging.error('FileRefresherThread failed to terminate in time')
+
+ def _dump(self):
+ """Attempts to rewrite the file, retrying a bunch of times.
+
+ Returns:
+ True to carry on, False to exit the thread.
+ """
+ try:
+ blob = json.dumps(
+ self._producer_callback(),
+ sort_keys=True,
+ indent=2,
+ separators=(',', ': '))
+ except Exception:
+ logging.exception('Unexpected exception in the callback')
+ return True
+ if blob == self._last_dumped_blob:
+ return True # already have it on disk
+ logging.info('Updating %s', self._path)
+
+ # On Windows the file may be locked by reading process. Don't freak out,
+ # just retry a bit later.
+ attempts = 100
+ while True:
+ try:
+ file_path.atomic_replace(self._path, blob)
+ self._last_dumped_blob = blob
+ return True # success!
+ except (IOError, OSError) as e:
+ logging.error('Failed to update the file: %s', e)
+ if not attempts:
+ logging.error(
+ 'Failed to update the file %s after many attempts, giving up',
+ self._path)
+ return True
+ attempts -= 1
+ if not self._wait(0.05):
+ return False
+
+ def _wait(self, timeout):
+ """Waits for the given duration or until the stop signal.
+
+ Returns:
+ True if waited, False if received the stop signal.
+ """
+ try:
+ self._signal.get(timeout=timeout)
+ return False
+ except Queue.Empty:
+ return True
+
+ def _run(self):
+ while self._wait(self._interval_sec) and self._dump():
+ pass

Powered by Google App Engine
This is Rietveld 408576698