OLD | NEW |
| (Empty) |
1 # Copyright 2015 The Chromium Authors. All rights reserved. | |
2 # Use of this source code is governed by a BSD-style license that can be | |
3 # found in the LICENSE file. | |
4 | |
5 import logging | |
6 import os | |
7 import random | |
8 import sys | |
9 import time | |
10 | |
11 import httplib2 | |
12 | |
13 import infra_libs | |
14 from infra_libs.event_mon.protos.log_request_lite_pb2 import LogRequestLite | |
15 from infra_libs.event_mon.protos.chrome_infra_log_pb2 import ChromeInfraEvent | |
16 | |
17 | |
18 def time_ms(): | |
19 """Return current timestamp in milliseconds.""" | |
20 return int(1000 * time.time()) | |
21 | |
22 | |
23 def backoff_time(attempt, retry_backoff=2., max_delay=30.): | |
24 """Compute randomized exponential backoff time. | |
25 | |
26 Args: | |
27 attempt (int): attempt number, starting at zero. | |
28 | |
29 Keyword Args: | |
30 retry_backoff(float): backoff time on the first attempt. | |
31 max_delay(float): maximum returned value. | |
32 """ | |
33 delay = retry_backoff * (2 ** attempt) | |
34 # Add +-25% of variation. | |
35 delay += delay * ((random.random() - 0.5) / 2.) | |
36 return min(delay, max_delay) | |
37 | |
38 | |
39 class _Router(object): | |
40 """Route events to the right destination. Base class. | |
41 | |
42 This object is meant to be a singleton, and is not part of the API. | |
43 Subclasses must implement _send_to_endpoint(). | |
44 | |
45 Usage: | |
46 router = _Router() | |
47 event = ChromeInfraEvent.LogEventLite(...) | |
48 ... fill in event ... | |
49 router.push_event(event) | |
50 """ | |
51 def push_event(self, log_events): | |
52 """Enqueue event to push to the collection service. | |
53 | |
54 This method offers no guarantee on return that the event have been pushed | |
55 externally, as some buffering can take place. | |
56 | |
57 Args: | |
58 log_events (LogRequestLite.LogEventLite or list/tuple of): events. | |
59 | |
60 Returns: | |
61 success (bool): False if an error happened. True means 'event accepted', | |
62 but NOT 'event successfully pushed to the remote'. | |
63 """ | |
64 if isinstance(log_events, LogRequestLite.LogEventLite): | |
65 log_events = (log_events,) | |
66 | |
67 if not isinstance(log_events, (list, tuple)): | |
68 logging.error('Invalid type for "event", should be LogEventLite or ' | |
69 'list of. Got %s' % str(type(log_events))) | |
70 return False | |
71 | |
72 request_p = LogRequestLite() | |
73 request_p.log_source_name = 'CHROME_INFRA' | |
74 request_p.log_event.extend(log_events) # copies the protobuf | |
75 # Sets the sending time here for safety, _send_to_endpoint should change it | |
76 # if needed. | |
77 request_p.request_time_ms = time_ms() | |
78 return self._send_to_endpoint(request_p) | |
79 | |
80 def _send_to_endpoint(self, events): | |
81 """Send a protobuf to wherever it should be sent. | |
82 | |
83 This method is called by push_event. | |
84 If some computation is require, make sure to update events.request_time_ms | |
85 right before sending. | |
86 | |
87 Args: | |
88 events(LogRequestLite): protobuf to send. | |
89 | |
90 Returns: | |
91 success(bool): whether POSTing/writing succeeded or not. | |
92 """ | |
93 raise NotImplementedError('Please implement _send_to_endpoint().') | |
94 | |
95 | |
96 class _LocalFileRouter(_Router): | |
97 def __init__(self, output_file, dry_run=False): | |
98 """Initialize the router. | |
99 | |
100 Writes a serialized LogRequestLite protobuf in a local file. File is | |
101 created/truncated before writing (no append). | |
102 | |
103 Args: | |
104 output_file(str): path to file where to write the protobuf. | |
105 | |
106 Keyword Args: | |
107 dry_run(bool): if True, the file is not written. | |
108 """ | |
109 _Router.__init__(self) | |
110 self.output_file = output_file | |
111 self._dry_run = dry_run | |
112 | |
113 def _send_to_endpoint(self, events): | |
114 try: | |
115 if not os.path.isdir(os.path.dirname(self.output_file)): | |
116 logging.error('File cannot be written, parent directory does ' | |
117 'not exist: %s' % os.path.dirname(self.output_file)) | |
118 if self._dry_run: | |
119 logging.info('Would have written in %s', self.output_file) | |
120 else: | |
121 with open(self.output_file, 'wb') as f: | |
122 f.write(events.SerializeToString()) # pragma: no branch | |
123 except Exception: | |
124 logging.exception('Failed to write in file: %s', self.output_file) | |
125 return False | |
126 | |
127 return True | |
128 | |
129 | |
130 class _TextStreamRouter(_Router): | |
131 def __init__(self, stream=sys.stdout): | |
132 """Initialize the router. | |
133 | |
134 Args: | |
135 stream(File): where to write the output. | |
136 """ | |
137 _Router.__init__(self) | |
138 self.stream = stream | |
139 | |
140 def _send_to_endpoint(self, events): | |
141 # Prints individual events because it's what we're usually interested in | |
142 # in that case. | |
143 infra_events = [str(ChromeInfraEvent.FromString( | |
144 ev.source_extension)) for ev in events.log_event] | |
145 try: | |
146 self.stream.write('%s\n' % '\n'.join(infra_events)) | |
147 except Exception: | |
148 logging.exception('Unable to write to provided stream') | |
149 return False | |
150 return True | |
151 | |
152 | |
153 class _LoggingStreamRouter(_Router): | |
154 def __init__(self, severity=logging.INFO): | |
155 """Initialize the router. | |
156 | |
157 Args: | |
158 severity: severity of the messages for reporting events | |
159 """ | |
160 _Router.__init__(self) | |
161 self.severity = severity | |
162 | |
163 def _send_to_endpoint(self, events): | |
164 try: | |
165 for ev in events.log_event: | |
166 ev_str = str(ChromeInfraEvent.FromString(ev.source_extension)) | |
167 logging.log(self.severity, 'Sending event_mon event:\n%s' % ev_str) | |
168 except Exception: | |
169 logging.exception('Unable to log the events') | |
170 return False | |
171 return True | |
172 | |
173 | |
174 class _HttpRouter(_Router): | |
175 def __init__(self, cache, endpoint, timeout=10, try_num=3, retry_backoff=2., | |
176 dry_run=False, _sleep_fn=time.sleep): | |
177 """Initialize the router. | |
178 | |
179 Args: | |
180 cache(dict): This must be config._cache. Passed as a parameter to | |
181 avoid a circular import. | |
182 endpoint(str or None): None means 'dry run'. What would be sent is printed | |
183 on stdout. If endpoint starts with 'https://' data is POSTed there. | |
184 Otherwise it is interpreted as a file where to write serialized | |
185 LogEventLite protos. | |
186 try_num(int): max number of http requests send to the endpoint. | |
187 retry_backoff(float): time in seconds before retrying posting to the | |
188 endpoint. Randomized exponential backoff is applied on subsequent | |
189 retries. | |
190 dry_run(boolean): if True, no http request is sent. Instead a message is | |
191 printed. | |
192 _sleep_fn (function): function to wait specified number of seconds. This | |
193 argument is provided for testing purposes. | |
194 """ | |
195 HTTP_IDENTIFIER = 'event_mon' | |
196 _Router.__init__(self) | |
197 self.endpoint = endpoint | |
198 self.try_num = try_num | |
199 self.retry_backoff = retry_backoff | |
200 self._cache = cache | |
201 self._http = infra_libs.InstrumentedHttp(HTTP_IDENTIFIER, timeout=timeout) | |
202 self._dry_run = dry_run | |
203 self._sleep_fn = _sleep_fn | |
204 | |
205 # TODO(pgervais) pass this as parameters instead. | |
206 if self._cache.get('service_account_creds'): | |
207 try: | |
208 logging.debug('Activating OAuth2 authentication.') | |
209 self._http = infra_libs.get_authenticated_http( | |
210 self._cache['service_account_creds'], | |
211 service_accounts_creds_root= | |
212 self._cache['service_accounts_creds_root'], | |
213 scope='https://www.googleapis.com/auth/cclog', | |
214 http_identifier=HTTP_IDENTIFIER, | |
215 timeout=timeout | |
216 ) | |
217 except IOError: | |
218 logging.error('Unable to read credentials, requests will be ' | |
219 'unauthenticated. File: %s', | |
220 self._cache.get('service_account_creds')) | |
221 | |
222 def _send_to_endpoint(self, events): | |
223 """Send protobuf to endpoint | |
224 | |
225 Args: | |
226 events(LogRequestLite): the protobuf to send. | |
227 | |
228 Returns: | |
229 success(bool): whether POSTing/writing succeeded or not. | |
230 """ | |
231 if not self.endpoint.startswith('https://'): | |
232 logging.error("Received invalid https endpoint: %s", self.endpoint) | |
233 return False | |
234 | |
235 logging.debug('event_mon: POSTing events to %s', self.endpoint) | |
236 | |
237 attempt = 0 # silencing pylint | |
238 for attempt in xrange(self.try_num): # pragma: no branch | |
239 # (re)set this time at the very last moment | |
240 events.request_time_ms = time_ms() | |
241 response = None | |
242 try: | |
243 if self._dry_run: | |
244 logging.info('Http requests disabled. Not sending anything') | |
245 else: # pragma: no cover | |
246 response, _ = self._http.request( | |
247 uri=self.endpoint, | |
248 method='POST', | |
249 headers={'Content-Type': 'application/octet-stream'}, | |
250 body=events.SerializeToString() | |
251 ) | |
252 | |
253 if self._dry_run or response.status == 200: | |
254 logging.debug('Succeeded POSTing data after %d attempts', attempt + 1) | |
255 return True | |
256 | |
257 except Exception: | |
258 logging.exception('exception when POSTing data') | |
259 | |
260 if response: | |
261 logging.error('failed to POST data to %s Status: %d (attempt %d)', | |
262 self.endpoint, response.status, attempt) | |
263 | |
264 if attempt == 0: | |
265 logging.error('data: %s', str(events)[:2000]) | |
266 | |
267 self._sleep_fn(backoff_time(attempt, retry_backoff=self.retry_backoff)) | |
268 | |
269 logging.error('failed to POST data after %d attempts, giving up.', | |
270 attempt+1) | |
271 return False | |
OLD | NEW |