| OLD | NEW |
| (Empty) |
| 1 # Copyright 2015 The Chromium Authors. All rights reserved. | |
| 2 # Use of this source code is governed by a BSD-style license that can be | |
| 3 # found in the LICENSE file. | |
| 4 | |
| 5 import logging | |
| 6 import os | |
| 7 import random | |
| 8 import sys | |
| 9 import time | |
| 10 | |
| 11 import httplib2 | |
| 12 | |
| 13 import infra_libs | |
| 14 from infra_libs.event_mon.protos.log_request_lite_pb2 import LogRequestLite | |
| 15 from infra_libs.event_mon.protos.chrome_infra_log_pb2 import ChromeInfraEvent | |
| 16 | |
| 17 | |
| 18 def time_ms(): | |
| 19 """Return current timestamp in milliseconds.""" | |
| 20 return int(1000 * time.time()) | |
| 21 | |
| 22 | |
| 23 def backoff_time(attempt, retry_backoff=2., max_delay=30.): | |
| 24 """Compute randomized exponential backoff time. | |
| 25 | |
| 26 Args: | |
| 27 attempt (int): attempt number, starting at zero. | |
| 28 | |
| 29 Keyword Args: | |
| 30 retry_backoff(float): backoff time on the first attempt. | |
| 31 max_delay(float): maximum returned value. | |
| 32 """ | |
| 33 delay = retry_backoff * (2 ** attempt) | |
| 34 # Add +-25% of variation. | |
| 35 delay += delay * ((random.random() - 0.5) / 2.) | |
| 36 return min(delay, max_delay) | |
| 37 | |
| 38 | |
| 39 class _Router(object): | |
| 40 """Route events to the right destination. Base class. | |
| 41 | |
| 42 This object is meant to be a singleton, and is not part of the API. | |
| 43 Subclasses must implement _send_to_endpoint(). | |
| 44 | |
| 45 Usage: | |
| 46 router = _Router() | |
| 47 event = ChromeInfraEvent.LogEventLite(...) | |
| 48 ... fill in event ... | |
| 49 router.push_event(event) | |
| 50 """ | |
| 51 def push_event(self, log_events): | |
| 52 """Enqueue event to push to the collection service. | |
| 53 | |
| 54 This method offers no guarantee on return that the event have been pushed | |
| 55 externally, as some buffering can take place. | |
| 56 | |
| 57 Args: | |
| 58 log_events (LogRequestLite.LogEventLite or list/tuple of): events. | |
| 59 | |
| 60 Returns: | |
| 61 success (bool): False if an error happened. True means 'event accepted', | |
| 62 but NOT 'event successfully pushed to the remote'. | |
| 63 """ | |
| 64 if isinstance(log_events, LogRequestLite.LogEventLite): | |
| 65 log_events = (log_events,) | |
| 66 | |
| 67 if not isinstance(log_events, (list, tuple)): | |
| 68 logging.error('Invalid type for "event", should be LogEventLite or ' | |
| 69 'list of. Got %s' % str(type(log_events))) | |
| 70 return False | |
| 71 | |
| 72 request_p = LogRequestLite() | |
| 73 request_p.log_source_name = 'CHROME_INFRA' | |
| 74 request_p.log_event.extend(log_events) # copies the protobuf | |
| 75 # Sets the sending time here for safety, _send_to_endpoint should change it | |
| 76 # if needed. | |
| 77 request_p.request_time_ms = time_ms() | |
| 78 return self._send_to_endpoint(request_p) | |
| 79 | |
| 80 def _send_to_endpoint(self, events): | |
| 81 """Send a protobuf to wherever it should be sent. | |
| 82 | |
| 83 This method is called by push_event. | |
| 84 If some computation is require, make sure to update events.request_time_ms | |
| 85 right before sending. | |
| 86 | |
| 87 Args: | |
| 88 events(LogRequestLite): protobuf to send. | |
| 89 | |
| 90 Returns: | |
| 91 success(bool): whether POSTing/writing succeeded or not. | |
| 92 """ | |
| 93 raise NotImplementedError('Please implement _send_to_endpoint().') | |
| 94 | |
| 95 | |
| 96 class _LocalFileRouter(_Router): | |
| 97 def __init__(self, output_file, dry_run=False): | |
| 98 """Initialize the router. | |
| 99 | |
| 100 Writes a serialized LogRequestLite protobuf in a local file. File is | |
| 101 created/truncated before writing (no append). | |
| 102 | |
| 103 Args: | |
| 104 output_file(str): path to file where to write the protobuf. | |
| 105 | |
| 106 Keyword Args: | |
| 107 dry_run(bool): if True, the file is not written. | |
| 108 """ | |
| 109 _Router.__init__(self) | |
| 110 self.output_file = output_file | |
| 111 self._dry_run = dry_run | |
| 112 | |
| 113 def _send_to_endpoint(self, events): | |
| 114 try: | |
| 115 if not os.path.isdir(os.path.dirname(self.output_file)): | |
| 116 logging.error('File cannot be written, parent directory does ' | |
| 117 'not exist: %s' % os.path.dirname(self.output_file)) | |
| 118 if self._dry_run: | |
| 119 logging.info('Would have written in %s', self.output_file) | |
| 120 else: | |
| 121 with open(self.output_file, 'wb') as f: | |
| 122 f.write(events.SerializeToString()) # pragma: no branch | |
| 123 except Exception: | |
| 124 logging.exception('Failed to write in file: %s', self.output_file) | |
| 125 return False | |
| 126 | |
| 127 return True | |
| 128 | |
| 129 | |
| 130 class _TextStreamRouter(_Router): | |
| 131 def __init__(self, stream=sys.stdout): | |
| 132 """Initialize the router. | |
| 133 | |
| 134 Args: | |
| 135 stream(File): where to write the output. | |
| 136 """ | |
| 137 _Router.__init__(self) | |
| 138 self.stream = stream | |
| 139 | |
| 140 def _send_to_endpoint(self, events): | |
| 141 # Prints individual events because it's what we're usually interested in | |
| 142 # in that case. | |
| 143 infra_events = [str(ChromeInfraEvent.FromString( | |
| 144 ev.source_extension)) for ev in events.log_event] | |
| 145 try: | |
| 146 self.stream.write('%s\n' % '\n'.join(infra_events)) | |
| 147 except Exception: | |
| 148 logging.exception('Unable to write to provided stream') | |
| 149 return False | |
| 150 return True | |
| 151 | |
| 152 | |
| 153 class _LoggingStreamRouter(_Router): | |
| 154 def __init__(self, severity=logging.INFO): | |
| 155 """Initialize the router. | |
| 156 | |
| 157 Args: | |
| 158 severity: severity of the messages for reporting events | |
| 159 """ | |
| 160 _Router.__init__(self) | |
| 161 self.severity = severity | |
| 162 | |
| 163 def _send_to_endpoint(self, events): | |
| 164 try: | |
| 165 for ev in events.log_event: | |
| 166 ev_str = str(ChromeInfraEvent.FromString(ev.source_extension)) | |
| 167 logging.log(self.severity, 'Sending event_mon event:\n%s' % ev_str) | |
| 168 except Exception: | |
| 169 logging.exception('Unable to log the events') | |
| 170 return False | |
| 171 return True | |
| 172 | |
| 173 | |
| 174 class _HttpRouter(_Router): | |
| 175 def __init__(self, cache, endpoint, timeout=10, try_num=3, retry_backoff=2., | |
| 176 dry_run=False, _sleep_fn=time.sleep): | |
| 177 """Initialize the router. | |
| 178 | |
| 179 Args: | |
| 180 cache(dict): This must be config._cache. Passed as a parameter to | |
| 181 avoid a circular import. | |
| 182 endpoint(str or None): None means 'dry run'. What would be sent is printed | |
| 183 on stdout. If endpoint starts with 'https://' data is POSTed there. | |
| 184 Otherwise it is interpreted as a file where to write serialized | |
| 185 LogEventLite protos. | |
| 186 try_num(int): max number of http requests send to the endpoint. | |
| 187 retry_backoff(float): time in seconds before retrying posting to the | |
| 188 endpoint. Randomized exponential backoff is applied on subsequent | |
| 189 retries. | |
| 190 dry_run(boolean): if True, no http request is sent. Instead a message is | |
| 191 printed. | |
| 192 _sleep_fn (function): function to wait specified number of seconds. This | |
| 193 argument is provided for testing purposes. | |
| 194 """ | |
| 195 HTTP_IDENTIFIER = 'event_mon' | |
| 196 _Router.__init__(self) | |
| 197 self.endpoint = endpoint | |
| 198 self.try_num = try_num | |
| 199 self.retry_backoff = retry_backoff | |
| 200 self._cache = cache | |
| 201 self._http = infra_libs.InstrumentedHttp(HTTP_IDENTIFIER, timeout=timeout) | |
| 202 self._dry_run = dry_run | |
| 203 self._sleep_fn = _sleep_fn | |
| 204 | |
| 205 # TODO(pgervais) pass this as parameters instead. | |
| 206 if self._cache.get('service_account_creds'): | |
| 207 try: | |
| 208 logging.debug('Activating OAuth2 authentication.') | |
| 209 self._http = infra_libs.get_authenticated_http( | |
| 210 self._cache['service_account_creds'], | |
| 211 service_accounts_creds_root= | |
| 212 self._cache['service_accounts_creds_root'], | |
| 213 scope='https://www.googleapis.com/auth/cclog', | |
| 214 http_identifier=HTTP_IDENTIFIER, | |
| 215 timeout=timeout | |
| 216 ) | |
| 217 except IOError: | |
| 218 logging.error('Unable to read credentials, requests will be ' | |
| 219 'unauthenticated. File: %s', | |
| 220 self._cache.get('service_account_creds')) | |
| 221 | |
| 222 def _send_to_endpoint(self, events): | |
| 223 """Send protobuf to endpoint | |
| 224 | |
| 225 Args: | |
| 226 events(LogRequestLite): the protobuf to send. | |
| 227 | |
| 228 Returns: | |
| 229 success(bool): whether POSTing/writing succeeded or not. | |
| 230 """ | |
| 231 if not self.endpoint.startswith('https://'): | |
| 232 logging.error("Received invalid https endpoint: %s", self.endpoint) | |
| 233 return False | |
| 234 | |
| 235 logging.debug('event_mon: POSTing events to %s', self.endpoint) | |
| 236 | |
| 237 attempt = 0 # silencing pylint | |
| 238 for attempt in xrange(self.try_num): # pragma: no branch | |
| 239 # (re)set this time at the very last moment | |
| 240 events.request_time_ms = time_ms() | |
| 241 response = None | |
| 242 try: | |
| 243 if self._dry_run: | |
| 244 logging.info('Http requests disabled. Not sending anything') | |
| 245 else: # pragma: no cover | |
| 246 response, _ = self._http.request( | |
| 247 uri=self.endpoint, | |
| 248 method='POST', | |
| 249 headers={'Content-Type': 'application/octet-stream'}, | |
| 250 body=events.SerializeToString() | |
| 251 ) | |
| 252 | |
| 253 if self._dry_run or response.status == 200: | |
| 254 logging.debug('Succeeded POSTing data after %d attempts', attempt + 1) | |
| 255 return True | |
| 256 | |
| 257 except Exception: | |
| 258 logging.exception('exception when POSTing data') | |
| 259 | |
| 260 if response: | |
| 261 logging.error('failed to POST data to %s Status: %d (attempt %d)', | |
| 262 self.endpoint, response.status, attempt) | |
| 263 | |
| 264 if attempt == 0: | |
| 265 logging.error('data: %s', str(events)[:2000]) | |
| 266 | |
| 267 self._sleep_fn(backoff_time(attempt, retry_backoff=self.retry_backoff)) | |
| 268 | |
| 269 logging.error('failed to POST data after %d attempts, giving up.', | |
| 270 attempt+1) | |
| 271 return False | |
| OLD | NEW |