Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(643)

Side by Side Diff: appengine/swarming/swarming_bot/bot_code/file_refresher.py

Issue 2024313003: Send authorization headers when calling Swarming backend. (Closed) Base URL: https://chromium.googlesource.com/external/github.com/luci/luci-py@master
Patch Set: extract into separate function Created 4 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 # Copyright 2016 The LUCI Authors. All rights reserved.
2 # Use of this source code is governed under the Apache License, Version 2.0
3 # that can be found in the LICENSE file.
4
5 import json
6 import logging
7 import Queue
8 import threading
9 import time
10
11 from utils import file_path
12
13
14 class FileRefresherThread(object):
15 """Represents a thread that periodically dumps result of a callback to a file.
16
17 Used by bot_main to send authnetication headers to task_runner. task_runner
18 reads them from the file when making HTTP calls.
19
20 Uses JSON for serialization. Doesn't delete the file when stopped.
21
22 The instance is not reusable (i.e. once stopped, cannot be started again).
23 """
24
25 def __init__(self, path, producer_callback, interval_sec=60):
26 self._path = path
27 self._producer_callback = producer_callback
28 self._interval_sec = interval_sec
29 self._thread = None
30 self._signal = Queue.Queue()
31 self._last_dumped_blob = None
32
33 def start(self):
M-A Ruel 2016/06/06 21:18:36 optional nit: I'm not sure start() is really neede
Vadim Sh. 2016/06/06 22:17:09 Having explicit 'start' is better. It gives a chan
34 """Starts a thread that dumps value to the file."""
35 assert self._thread is None
36 self._dump() # initial dump
37 self._thread = threading.Thread(
38 target=self._run, name='FileRefresherThread %s' % self._path)
39 self._thread.daemon = True
40 self._thread.start()
41
42 def stop(self):
43 """Stops the dumping thread (if it is running)."""
44 if not self._thread:
45 return
46 self._signal.put(None)
47 self._thread.join(60) # don't wait forever
48 if self._thread.is_alive():
49 logging.error('FileRefresherThread failed to terminate in time')
50
51 def _dump(self):
52 try:
53 blob = json.dumps(
54 self._producer_callback(),
55 sort_keys=True,
56 indent=2,
57 separators=(',', ': '))
58 except Exception:
59 logging.exception('Unexpected exception in the callback')
60 return
61 if blob == self._last_dumped_blob:
62 return # already have it on disk
63 logging.info('Updating %s', self._path)
64
65 # On Windows the file may be locked by reading process. Don't freak out,
66 # just retry a bit later.
67 attempts = 100
68 while True:
69 try:
70 file_path.atomic_replace(self._path, blob)
71 self._last_dumped_blob = blob
72 return # success!
73 except (IOError, OSError) as e:
74 logging.error('Failed to update the file: %s', e)
75 if not attempts:
76 logging.error(
77 'Failed to update the file %s after many attempts, giving up',
78 self._path)
79 return
80 attempts -= 1
81 time.sleep(0.1)
M-A Ruel 2016/06/06 21:18:36 self._signal.get(timeout=0.05)
Vadim Sh. 2016/06/06 22:17:09 Done.
82
83 def _run(self):
84 while True:
85 try:
86 self._signal.get(timeout=self._interval_sec)
87 return # the stop signal received, quit the thread
88 except Queue.Empty:
89 self._dump()
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698