Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(2548)

Unified Diff: appengine/swarming/swarming_bot/bot_code/file_refresher.py

Issue 2024313003: Send authorization headers when calling Swarming backend. (Closed) Base URL: https://chromium.googlesource.com/external/github.com/luci/luci-py@master
Patch Set: extract into separate function Created 4 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: appengine/swarming/swarming_bot/bot_code/file_refresher.py
diff --git a/appengine/swarming/swarming_bot/bot_code/file_refresher.py b/appengine/swarming/swarming_bot/bot_code/file_refresher.py
new file mode 100644
index 0000000000000000000000000000000000000000..99bcd2e3f864e3a8bd99d433dbb66b87bb6a8e0d
--- /dev/null
+++ b/appengine/swarming/swarming_bot/bot_code/file_refresher.py
@@ -0,0 +1,89 @@
+# Copyright 2016 The LUCI Authors. All rights reserved.
+# Use of this source code is governed under the Apache License, Version 2.0
+# that can be found in the LICENSE file.
+
+import json
+import logging
+import Queue
+import threading
+import time
+
+from utils import file_path
+
+
+class FileRefresherThread(object):
+ """Represents a thread that periodically dumps result of a callback to a file.
+
+ Used by bot_main to send authnetication headers to task_runner. task_runner
+ reads them from the file when making HTTP calls.
+
+ Uses JSON for serialization. Doesn't delete the file when stopped.
+
+ The instance is not reusable (i.e. once stopped, cannot be started again).
+ """
+
+ def __init__(self, path, producer_callback, interval_sec=60):
+ self._path = path
+ self._producer_callback = producer_callback
+ self._interval_sec = interval_sec
+ self._thread = None
+ self._signal = Queue.Queue()
+ self._last_dumped_blob = None
+
+ def start(self):
M-A Ruel 2016/06/06 21:18:36 optional nit: I'm not sure start() is really neede
Vadim Sh. 2016/06/06 22:17:09 Having explicit 'start' is better. It gives a chan
+ """Starts a thread that dumps value to the file."""
+ assert self._thread is None
+ self._dump() # initial dump
+ self._thread = threading.Thread(
+ target=self._run, name='FileRefresherThread %s' % self._path)
+ self._thread.daemon = True
+ self._thread.start()
+
+ def stop(self):
+ """Stops the dumping thread (if it is running)."""
+ if not self._thread:
+ return
+ self._signal.put(None)
+ self._thread.join(60) # don't wait forever
+ if self._thread.is_alive():
+ logging.error('FileRefresherThread failed to terminate in time')
+
+ def _dump(self):
+ try:
+ blob = json.dumps(
+ self._producer_callback(),
+ sort_keys=True,
+ indent=2,
+ separators=(',', ': '))
+ except Exception:
+ logging.exception('Unexpected exception in the callback')
+ return
+ if blob == self._last_dumped_blob:
+ return # already have it on disk
+ logging.info('Updating %s', self._path)
+
+ # On Windows the file may be locked by reading process. Don't freak out,
+ # just retry a bit later.
+ attempts = 100
+ while True:
+ try:
+ file_path.atomic_replace(self._path, blob)
+ self._last_dumped_blob = blob
+ return # success!
+ except (IOError, OSError) as e:
+ logging.error('Failed to update the file: %s', e)
+ if not attempts:
+ logging.error(
+ 'Failed to update the file %s after many attempts, giving up',
+ self._path)
+ return
+ attempts -= 1
+ time.sleep(0.1)
M-A Ruel 2016/06/06 21:18:36 self._signal.get(timeout=0.05)
Vadim Sh. 2016/06/06 22:17:09 Done.
+
+ def _run(self):
+ while True:
+ try:
+ self._signal.get(timeout=self._interval_sec)
+ return # the stop signal received, quit the thread
+ except Queue.Empty:
+ self._dump()

Powered by Google App Engine
This is Rietveld 408576698