OLD | NEW |
1 # Copyright (c) 2012 The Chromium Authors. All rights reserved. | 1 # Copyright (c) 2012 The Chromium Authors. All rights reserved. |
2 # Use of this source code is governed by a BSD-style license that can be | 2 # Use of this source code is governed by a BSD-style license that can be |
3 # found in the LICENSE file. | 3 # found in the LICENSE file. |
4 """Out of band HTTP push.""" | 4 """Out of band HTTP push.""" |
5 | 5 |
6 import Queue | 6 import Queue |
7 import json | 7 import json |
8 import logging | 8 import logging |
9 import os | 9 import os |
10 import threading | 10 import threading |
(...skipping 65 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
76 self.thread = threading.Thread(target=self._worker_thread) | 76 self.thread = threading.Thread(target=self._worker_thread) |
77 self.thread.daemon = True | 77 self.thread.daemon = True |
78 self.thread.start() | 78 self.thread.start() |
79 | 79 |
80 def close(self): | 80 def close(self): |
81 self.queue.put(self._TERMINATE) | 81 self.queue.put(self._TERMINATE) |
82 self.thread.join() | 82 self.thread.join() |
83 | 83 |
84 def send(self, pending, packet): | 84 def send(self, pending, packet): |
85 """Queues a packet.""" | 85 """Queues a packet.""" |
86 logging.debug('For issue %d, sending %s', pending.issue, packet) | 86 logging.debug('For issue %d, queuing for send: %s', pending.issue, packet) |
87 self.queue.put(self._package(pending, packet)) | 87 self.queue.put(self._package(pending, packet)) |
88 | 88 |
89 def _get_items(self): | 89 def _get_items(self): |
90 """Waits for an item to be queued and returns up to 10 next items if queued | 90 """Waits for an item to be queued and returns up to 10 next items if queued |
91 fast enough. | 91 fast enough. |
92 """ | 92 """ |
93 items = [self.queue.get()] | 93 items = [self.queue.get()] |
94 try: | 94 try: |
95 for _ in range(9): | 95 for _ in range(9): |
96 items.append(self.queue.get_nowait()) | 96 items.append(self.queue.get_nowait()) |
97 except Queue.Empty: | 97 except Queue.Empty: |
98 pass | 98 pass |
99 return items | 99 return items |
100 | 100 |
101 def _worker_thread(self): | 101 def _worker_thread(self): |
102 """Sends the packets in a loop through HTTP POST.""" | 102 """Sends the packets in a loop through HTTP POST.""" |
103 params = { | 103 params = { |
104 'Content-type': 'application/x-www-form-urlencoded', | 104 'Content-type': 'application/x-www-form-urlencoded', |
105 'Accept': 'text/plain' | 105 'Accept': 'text/plain' |
106 } | 106 } |
107 done = False | 107 done = False |
108 try: | 108 try: |
109 while not done: | 109 while not done: |
110 items = self._get_items() | 110 items = self._get_items() |
111 if self._TERMINATE in items: | 111 if self._TERMINATE in items: |
112 done = True | 112 done = True |
113 logging.debug('Worker thread exiting') | 113 logging.debug('Worker thread exiting') |
114 items.remove(self._TERMINATE) | 114 items.remove(self._TERMINATE) |
115 url = self.url + self.resource | 115 url = self.url + self.resource |
116 logging.debug('Sending %d items to %s' % (len(items), url)) | 116 logging.debug('Sending %d items to %s: %r', len(items), url, items) |
117 try: | 117 try: |
118 data = [('p', json.dumps(item)) for item in items] | 118 data = [('p', json.dumps(item)) for item in items] |
119 data.append(('password', self.password)) | 119 data.append(('password', self.password)) |
120 urllib.urlopen(url, urllib.urlencode(data), params).read() | 120 urllib.urlopen(url, urllib.urlencode(data), params).read() |
121 except IOError as e: | 121 except IOError as e: |
122 logging.error(e) | 122 logging.error(e) |
123 for item in items: | 123 for item in items: |
124 self.queue.put(item) | 124 self.queue.put(item) |
125 if not done: | 125 if not done: |
126 time.sleep(1) | 126 time.sleep(1) |
127 # Don't retry if done. | 127 # Don't retry if done. |
128 except Exception as e: | 128 except Exception as e: |
129 traceback.print_exc() | 129 traceback.print_exc() |
130 errors.send_stack(e) | 130 errors.send_stack(e) |
OLD | NEW |