Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(111)

Side by Side Diff: appengine/swarming/swarming_bot/bot_code/file_refresher.py

Issue 2024313003: Send authorization headers when calling Swarming backend. (Closed) Base URL: https://chromium.googlesource.com/external/github.com/luci/luci-py@master
Patch Set: rebase Created 4 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 # Copyright 2016 The LUCI Authors. All rights reserved.
2 # Use of this source code is governed under the Apache License, Version 2.0
3 # that can be found in the LICENSE file.
4
5 import json
6 import logging
7 import Queue
8 import threading
9
10 from utils import file_path
11
12
13 class FileRefresherThread(object):
14 """Represents a thread that periodically dumps result of a callback to a file.
15
16 Used by bot_main to send authnetication headers to task_runner. task_runner
17 reads them from the file when making HTTP calls.
18
19 Uses JSON for serialization. Doesn't delete the file when stopped.
20
21 The instance is not reusable (i.e. once stopped, cannot be started again).
22 """
23
24 def __init__(self, path, producer_callback, interval_sec=60):
25 self._path = path
26 self._producer_callback = producer_callback
27 self._interval_sec = interval_sec
28 self._thread = None
29 self._signal = Queue.Queue()
30 self._last_dumped_blob = None
31
32 def start(self):
33 """Starts a thread that dumps value to the file."""
34 assert self._thread is None
35 self._dump() # initial dump
36 self._thread = threading.Thread(
37 target=self._run, name='FileRefresherThread %s' % self._path)
38 self._thread.daemon = True
39 self._thread.start()
40
41 def stop(self):
42 """Stops the dumping thread (if it is running)."""
43 if not self._thread:
44 return
45 self._signal.put(None)
46 self._thread.join(60) # don't wait forever
47 if self._thread.is_alive():
48 logging.error('FileRefresherThread failed to terminate in time')
49
50 def _dump(self):
51 """Attempts to rewrite the file, retrying a bunch of times.
52
53 Returns:
54 True to carry on, False to exit the thread.
55 """
56 try:
57 blob = json.dumps(
58 self._producer_callback(),
59 sort_keys=True,
60 indent=2,
61 separators=(',', ': '))
62 except Exception:
63 logging.exception('Unexpected exception in the callback')
64 return True
65 if blob == self._last_dumped_blob:
66 return True # already have it on disk
67 logging.info('Updating %s', self._path)
68
69 # On Windows the file may be locked by reading process. Don't freak out,
70 # just retry a bit later.
71 attempts = 100
72 while True:
73 try:
74 file_path.atomic_replace(self._path, blob)
75 self._last_dumped_blob = blob
76 return True # success!
77 except (IOError, OSError) as e:
78 logging.error('Failed to update the file: %s', e)
79 if not attempts:
80 logging.error(
81 'Failed to update the file %s after many attempts, giving up',
82 self._path)
83 return True
84 attempts -= 1
85 if not self._wait(0.05):
86 return False
87
88 def _wait(self, timeout):
89 """Waits for the given duration or until the stop signal.
90
91 Returns:
92 True if waited, False if received the stop signal.
93 """
94 try:
95 self._signal.get(timeout=timeout)
96 return False
97 except Queue.Empty:
98 return True
99
100 def _run(self):
101 while self._wait(self._interval_sec) and self._dump():
102 pass
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698