Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1778)

Unified Diff: appengine/swarming/swarming_bot/bot_code/file_refresher.py

Issue 2024313003: Send authorization headers when calling Swarming backend. (Closed) Base URL: https://chromium.googlesource.com/external/github.com/luci/luci-py@master
Patch Set: rebase Created 4 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: appengine/swarming/swarming_bot/bot_code/file_refresher.py
diff --git a/appengine/swarming/swarming_bot/bot_code/file_refresher.py b/appengine/swarming/swarming_bot/bot_code/file_refresher.py
new file mode 100644
index 0000000000000000000000000000000000000000..25d4dffecc0b81d2f4a2b2df710b152a1e83ebf7
--- /dev/null
+++ b/appengine/swarming/swarming_bot/bot_code/file_refresher.py
@@ -0,0 +1,92 @@
+# Copyright 2016 The LUCI Authors. All rights reserved.
+# Use of this source code is governed under the Apache License, Version 2.0
+# that can be found in the LICENSE file.
+
+import json
+import logging
+import Queue
+import threading
+import time
+
+from utils import file_path
+
+
+class FileRefresher(object):
+ """Represents a thread that periodically dumps result of a callback to a file.
+
+ Used by bot_main to send authnetication headers to task_runner. task_runner
+ reads them from the file when making HTTP calls.
+
+ Uses JSON for serialization. Doesn't delete the file when stopped.
+ """
+
+ def __init__(self, path, producer_callback, interval_sec=60):
+ self._path = path
+ self._producer_callback = producer_callback
+ self._interval_sec = interval_sec
+ self._thread = None
+ self._signal = None
+ self._last_dumped_blob = None
+
+ def start(self):
+ """Starts a thread that dumps value to the file."""
+ assert self._thread is None
+ self._dump() # initial dump
+ self._signal = Queue.Queue()
+ self._thread = threading.Thread(
+ target=self._run,
M-A Ruel 2016/06/03 20:00:49 normally +4
Vadim Sh. 2016/06/03 23:37:55 Done.
+ args=(self._signal,),
+ name='FileRefresher %s' % self._path)
+ self._thread.daemon = True
+ self._thread.start()
+
+ def stop(self):
+ """Stops the dumping thread (if it is running)."""
+ if not self._thread:
+ return
+ self._signal.put(None)
+ self._thread.join(60) # don't wait forever
+ if self._thread.is_alive():
+ logging.error('FileRefresher thread failed to terminate in time')
+ self._signal = None
+ self._thread = None
+
+ def _dump(self):
+ try:
+ blob = json.dumps(
+ self._producer_callback(),
+ sort_keys=True,
+ indent=2,
+ separators=(',', ': '))
+ except Exception:
+ logging.exception('Unexpected exception in the callback')
+ return
+ if blob == self._last_dumped_blob:
+ return # already have it on disk
+ logging.info('Updating %s', self._path)
+
+ # On Windows the file may be locked by reading process. Don't freak out,
+ # just retry a bit later.
+ attempts = 100
+ while True:
+ try:
+ file_path.atomic_replace(self._path, blob)
+ self._last_dumped_blob = blob
+ return # success!
+ except (IOError, OSError) as e:
+ logging.error('Failed to update the file: %s', e)
+ if not attempts:
+ logging.error(
+ 'Failed to update the file %s after many attempts, giving up',
+ self._path)
+ return
+ attempts -= 1
+ time.sleep(0.1)
+
+ def _run(self, signal):
M-A Ruel 2016/06/03 20:00:49 why have you made it reusable, e.g. we can start()
Vadim Sh. 2016/06/03 23:37:55 I didn't intend it to be reusable, I just happened
+ while True:
+ try:
+ signal.get(timeout=self._interval_sec)
+ return # the stop signal received, quit the thread
+ except Queue.Empty:
+ self._dump()

Powered by Google App Engine
This is Rietveld 408576698