OLD | NEW |
1 # Copyright (c) 2012 The Chromium Authors. All rights reserved. | 1 # Copyright (c) 2012 The Chromium Authors. All rights reserved. |
2 # Use of this source code is governed by a BSD-style license that can be | 2 # Use of this source code is governed by a BSD-style license that can be |
3 # found in the LICENSE file. | 3 # found in the LICENSE file. |
4 """Out of band HTTP push.""" | 4 """Out of band HTTP push.""" |
5 | 5 |
6 import Queue | 6 import Queue |
7 import json | 7 import json |
8 import logging | 8 import logging |
9 import os | 9 import os |
10 import threading | 10 import threading |
(...skipping 41 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
52 with self.lock: | 52 with self.lock: |
53 with open(os.path.join('workdir', 'events.json'), 'w') as f: | 53 with open(os.path.join('workdir', 'events.json'), 'w') as f: |
54 json.dump(self.queue, f, indent=2) | 54 json.dump(self.queue, f, indent=2) |
55 | 55 |
56 def send(self, pending, packet): | 56 def send(self, pending, packet): |
57 with self.lock: | 57 with self.lock: |
58 self.queue.append(self._package(pending, packet)) | 58 self.queue.append(self._package(pending, packet)) |
59 | 59 |
60 | 60 |
61 class AsyncPush(AsyncPushNoop): | 61 class AsyncPush(AsyncPushNoop): |
62 """Sends HTTP Post in a background worker thread. | 62 """Sends HTTP Post asynchronously to the tree status application. |
63 | 63 |
64 Thread-safe. | 64 This object uses a background worker thread, and is thread-safe. |
65 """ | 65 """ |
66 _TERMINATE = object() | 66 _TERMINATE = object() |
67 | 67 |
68 def __init__(self, url, password): | 68 def __init__(self, url, password, resource='/receiver'): |
69 super(AsyncPush, self).__init__() | 69 super(AsyncPush, self).__init__() |
70 assert url | 70 assert url |
71 assert password | 71 assert password |
72 self.url = url | 72 self.url = url |
| 73 self.resource = resource |
73 self.password = password | 74 self.password = password |
74 self.queue = Queue.Queue() | 75 self.queue = Queue.Queue() |
75 self.thread = threading.Thread(target=self._worker_thread) | 76 self.thread = threading.Thread(target=self._worker_thread) |
76 self.thread.daemon = True | 77 self.thread.daemon = True |
77 self.thread.start() | 78 self.thread.start() |
78 | 79 |
79 def close(self): | 80 def close(self): |
80 self.queue.put(self._TERMINATE) | 81 self.queue.put(self._TERMINATE) |
81 self.thread.join() | 82 self.thread.join() |
82 | 83 |
(...skipping 21 matching lines...) Expand all Loading... |
104 'Accept': 'text/plain' | 105 'Accept': 'text/plain' |
105 } | 106 } |
106 done = False | 107 done = False |
107 try: | 108 try: |
108 while not done: | 109 while not done: |
109 items = self._get_items() | 110 items = self._get_items() |
110 if self._TERMINATE in items: | 111 if self._TERMINATE in items: |
111 done = True | 112 done = True |
112 logging.debug('Worker thread exiting') | 113 logging.debug('Worker thread exiting') |
113 items.remove(self._TERMINATE) | 114 items.remove(self._TERMINATE) |
114 url = self.url + '/receiver' | 115 url = self.url + self.resource |
115 logging.debug('Sending %d items to %s' % (len(items), url)) | 116 logging.debug('Sending %d items to %s' % (len(items), url)) |
116 try: | 117 try: |
117 data = [('p', json.dumps(item)) for item in items] | 118 data = [('p', json.dumps(item)) for item in items] |
118 data.append(('password', self.password)) | 119 data.append(('password', self.password)) |
119 urllib.urlopen(url, urllib.urlencode(data), params).read() | 120 urllib.urlopen(url, urllib.urlencode(data), params).read() |
120 except IOError as e: | 121 except IOError as e: |
121 logging.error(e) | 122 logging.error(e) |
122 for item in items: | 123 for item in items: |
123 self.queue.put(item) | 124 self.queue.put(item) |
124 if not done: | 125 if not done: |
125 time.sleep(1) | 126 time.sleep(1) |
126 # Don't retry if done. | 127 # Don't retry if done. |
127 except Exception as e: | 128 except Exception as e: |
128 traceback.print_exc() | 129 traceback.print_exc() |
129 errors.send_stack(e) | 130 errors.send_stack(e) |
OLD | NEW |