Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(28)

Side by Side Diff: appengine/swarming/swarming_bot/bot_code/file_refresher.py

Issue 2024313003: Send authorization headers when calling Swarming backend. (Closed) Base URL: https://chromium.googlesource.com/external/github.com/luci/luci-py@master
Patch Set: rebase Created 4 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 # Copyright 2016 The LUCI Authors. All rights reserved.
2 # Use of this source code is governed under the Apache License, Version 2.0
3 # that can be found in the LICENSE file.
4
5 import json
6 import logging
7 import Queue
8 import threading
9 import time
10
11 from utils import file_path
12
13
14 class FileRefresher(object):
15 """Represents a thread that periodically dumps result of a callback to a file.
16
17 Used by bot_main to send authnetication headers to task_runner. task_runner
18 reads them from the file when making HTTP calls.
19
20 Uses JSON for serialization. Doesn't delete the file when stopped.
21 """
22
23 def __init__(self, path, producer_callback, interval_sec=60):
24 self._path = path
25 self._producer_callback = producer_callback
26 self._interval_sec = interval_sec
27 self._thread = None
28 self._signal = None
29 self._last_dumped_blob = None
30
31 def start(self):
32 """Starts a thread that dumps value to the file."""
33 assert self._thread is None
34 self._dump() # initial dump
35 self._signal = Queue.Queue()
36 self._thread = threading.Thread(
37 target=self._run,
M-A Ruel 2016/06/03 20:00:49 normally +4
Vadim Sh. 2016/06/03 23:37:55 Done.
38 args=(self._signal,),
39 name='FileRefresher %s' % self._path)
40 self._thread.daemon = True
41 self._thread.start()
42
43 def stop(self):
44 """Stops the dumping thread (if it is running)."""
45 if not self._thread:
46 return
47 self._signal.put(None)
48 self._thread.join(60) # don't wait forever
49 if self._thread.is_alive():
50 logging.error('FileRefresher thread failed to terminate in time')
51 self._signal = None
52 self._thread = None
53
54 def _dump(self):
55 try:
56 blob = json.dumps(
57 self._producer_callback(),
58 sort_keys=True,
59 indent=2,
60 separators=(',', ': '))
61 except Exception:
62 logging.exception('Unexpected exception in the callback')
63 return
64 if blob == self._last_dumped_blob:
65 return # already have it on disk
66 logging.info('Updating %s', self._path)
67
68 # On Windows the file may be locked by reading process. Don't freak out,
69 # just retry a bit later.
70 attempts = 100
71 while True:
72 try:
73 file_path.atomic_replace(self._path, blob)
74 self._last_dumped_blob = blob
75 return # success!
76 except (IOError, OSError) as e:
77 logging.error('Failed to update the file: %s', e)
78 if not attempts:
79 logging.error(
80 'Failed to update the file %s after many attempts, giving up',
81 self._path)
82 return
83 attempts -= 1
84 time.sleep(0.1)
85
86 def _run(self, signal):
M-A Ruel 2016/06/03 20:00:49 why have you made it reusable, e.g. we can start()
Vadim Sh. 2016/06/03 23:37:55 I didn't intend it to be reusable, I just happened
87 while True:
88 try:
89 signal.get(timeout=self._interval_sec)
90 return # the stop signal received, quit the thread
91 except Queue.Empty:
92 self._dump()
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698