| Index: infra_libs/event_mon/router.py
|
| diff --git a/infra_libs/event_mon/router.py b/infra_libs/event_mon/router.py
|
| deleted file mode 100644
|
| index 437e1189c6804a1b694cf867a0e417610062001a..0000000000000000000000000000000000000000
|
| --- a/infra_libs/event_mon/router.py
|
| +++ /dev/null
|
| @@ -1,271 +0,0 @@
|
| -# Copyright 2015 The Chromium Authors. All rights reserved.
|
| -# Use of this source code is governed by a BSD-style license that can be
|
| -# found in the LICENSE file.
|
| -
|
| -import logging
|
| -import os
|
| -import random
|
| -import sys
|
| -import time
|
| -
|
| -import httplib2
|
| -
|
| -import infra_libs
|
| -from infra_libs.event_mon.protos.log_request_lite_pb2 import LogRequestLite
|
| -from infra_libs.event_mon.protos.chrome_infra_log_pb2 import ChromeInfraEvent
|
| -
|
| -
|
| -def time_ms():
|
| - """Return current timestamp in milliseconds."""
|
| - return int(1000 * time.time())
|
| -
|
| -
|
| -def backoff_time(attempt, retry_backoff=2., max_delay=30.):
|
| - """Compute randomized exponential backoff time.
|
| -
|
| - Args:
|
| - attempt (int): attempt number, starting at zero.
|
| -
|
| - Keyword Args:
|
| - retry_backoff(float): backoff time on the first attempt.
|
| - max_delay(float): maximum returned value.
|
| - """
|
| - delay = retry_backoff * (2 ** attempt)
|
| - # Add +-25% of variation.
|
| - delay += delay * ((random.random() - 0.5) / 2.)
|
| - return min(delay, max_delay)
|
| -
|
| -
|
| -class _Router(object):
|
| - """Route events to the right destination. Base class.
|
| -
|
| - This object is meant to be a singleton, and is not part of the API.
|
| - Subclasses must implement _send_to_endpoint().
|
| -
|
| - Usage:
|
| - router = _Router()
|
| - event = ChromeInfraEvent.LogEventLite(...)
|
| - ... fill in event ...
|
| - router.push_event(event)
|
| - """
|
| - def push_event(self, log_events):
|
| - """Enqueue event to push to the collection service.
|
| -
|
| - This method offers no guarantee on return that the event have been pushed
|
| - externally, as some buffering can take place.
|
| -
|
| - Args:
|
| - log_events (LogRequestLite.LogEventLite or list/tuple of): events.
|
| -
|
| - Returns:
|
| - success (bool): False if an error happened. True means 'event accepted',
|
| - but NOT 'event successfully pushed to the remote'.
|
| - """
|
| - if isinstance(log_events, LogRequestLite.LogEventLite):
|
| - log_events = (log_events,)
|
| -
|
| - if not isinstance(log_events, (list, tuple)):
|
| - logging.error('Invalid type for "event", should be LogEventLite or '
|
| - 'list of. Got %s' % str(type(log_events)))
|
| - return False
|
| -
|
| - request_p = LogRequestLite()
|
| - request_p.log_source_name = 'CHROME_INFRA'
|
| - request_p.log_event.extend(log_events) # copies the protobuf
|
| - # Sets the sending time here for safety, _send_to_endpoint should change it
|
| - # if needed.
|
| - request_p.request_time_ms = time_ms()
|
| - return self._send_to_endpoint(request_p)
|
| -
|
| - def _send_to_endpoint(self, events):
|
| - """Send a protobuf to wherever it should be sent.
|
| -
|
| - This method is called by push_event.
|
| - If some computation is require, make sure to update events.request_time_ms
|
| - right before sending.
|
| -
|
| - Args:
|
| - events(LogRequestLite): protobuf to send.
|
| -
|
| - Returns:
|
| - success(bool): whether POSTing/writing succeeded or not.
|
| - """
|
| - raise NotImplementedError('Please implement _send_to_endpoint().')
|
| -
|
| -
|
| -class _LocalFileRouter(_Router):
|
| - def __init__(self, output_file, dry_run=False):
|
| - """Initialize the router.
|
| -
|
| - Writes a serialized LogRequestLite protobuf in a local file. File is
|
| - created/truncated before writing (no append).
|
| -
|
| - Args:
|
| - output_file(str): path to file where to write the protobuf.
|
| -
|
| - Keyword Args:
|
| - dry_run(bool): if True, the file is not written.
|
| - """
|
| - _Router.__init__(self)
|
| - self.output_file = output_file
|
| - self._dry_run = dry_run
|
| -
|
| - def _send_to_endpoint(self, events):
|
| - try:
|
| - if not os.path.isdir(os.path.dirname(self.output_file)):
|
| - logging.error('File cannot be written, parent directory does '
|
| - 'not exist: %s' % os.path.dirname(self.output_file))
|
| - if self._dry_run:
|
| - logging.info('Would have written in %s', self.output_file)
|
| - else:
|
| - with open(self.output_file, 'wb') as f:
|
| - f.write(events.SerializeToString()) # pragma: no branch
|
| - except Exception:
|
| - logging.exception('Failed to write in file: %s', self.output_file)
|
| - return False
|
| -
|
| - return True
|
| -
|
| -
|
| -class _TextStreamRouter(_Router):
|
| - def __init__(self, stream=sys.stdout):
|
| - """Initialize the router.
|
| -
|
| - Args:
|
| - stream(File): where to write the output.
|
| - """
|
| - _Router.__init__(self)
|
| - self.stream = stream
|
| -
|
| - def _send_to_endpoint(self, events):
|
| - # Prints individual events because it's what we're usually interested in
|
| - # in that case.
|
| - infra_events = [str(ChromeInfraEvent.FromString(
|
| - ev.source_extension)) for ev in events.log_event]
|
| - try:
|
| - self.stream.write('%s\n' % '\n'.join(infra_events))
|
| - except Exception:
|
| - logging.exception('Unable to write to provided stream')
|
| - return False
|
| - return True
|
| -
|
| -
|
| -class _LoggingStreamRouter(_Router):
|
| - def __init__(self, severity=logging.INFO):
|
| - """Initialize the router.
|
| -
|
| - Args:
|
| - severity: severity of the messages for reporting events
|
| - """
|
| - _Router.__init__(self)
|
| - self.severity = severity
|
| -
|
| - def _send_to_endpoint(self, events):
|
| - try:
|
| - for ev in events.log_event:
|
| - ev_str = str(ChromeInfraEvent.FromString(ev.source_extension))
|
| - logging.log(self.severity, 'Sending event_mon event:\n%s' % ev_str)
|
| - except Exception:
|
| - logging.exception('Unable to log the events')
|
| - return False
|
| - return True
|
| -
|
| -
|
| -class _HttpRouter(_Router):
|
| - def __init__(self, cache, endpoint, timeout=10, try_num=3, retry_backoff=2.,
|
| - dry_run=False, _sleep_fn=time.sleep):
|
| - """Initialize the router.
|
| -
|
| - Args:
|
| - cache(dict): This must be config._cache. Passed as a parameter to
|
| - avoid a circular import.
|
| - endpoint(str or None): None means 'dry run'. What would be sent is printed
|
| - on stdout. If endpoint starts with 'https://' data is POSTed there.
|
| - Otherwise it is interpreted as a file where to write serialized
|
| - LogEventLite protos.
|
| - try_num(int): max number of http requests send to the endpoint.
|
| - retry_backoff(float): time in seconds before retrying posting to the
|
| - endpoint. Randomized exponential backoff is applied on subsequent
|
| - retries.
|
| - dry_run(boolean): if True, no http request is sent. Instead a message is
|
| - printed.
|
| - _sleep_fn (function): function to wait specified number of seconds. This
|
| - argument is provided for testing purposes.
|
| - """
|
| - HTTP_IDENTIFIER = 'event_mon'
|
| - _Router.__init__(self)
|
| - self.endpoint = endpoint
|
| - self.try_num = try_num
|
| - self.retry_backoff = retry_backoff
|
| - self._cache = cache
|
| - self._http = infra_libs.InstrumentedHttp(HTTP_IDENTIFIER, timeout=timeout)
|
| - self._dry_run = dry_run
|
| - self._sleep_fn = _sleep_fn
|
| -
|
| - # TODO(pgervais) pass this as parameters instead.
|
| - if self._cache.get('service_account_creds'):
|
| - try:
|
| - logging.debug('Activating OAuth2 authentication.')
|
| - self._http = infra_libs.get_authenticated_http(
|
| - self._cache['service_account_creds'],
|
| - service_accounts_creds_root=
|
| - self._cache['service_accounts_creds_root'],
|
| - scope='https://www.googleapis.com/auth/cclog',
|
| - http_identifier=HTTP_IDENTIFIER,
|
| - timeout=timeout
|
| - )
|
| - except IOError:
|
| - logging.error('Unable to read credentials, requests will be '
|
| - 'unauthenticated. File: %s',
|
| - self._cache.get('service_account_creds'))
|
| -
|
| - def _send_to_endpoint(self, events):
|
| - """Send protobuf to endpoint
|
| -
|
| - Args:
|
| - events(LogRequestLite): the protobuf to send.
|
| -
|
| - Returns:
|
| - success(bool): whether POSTing/writing succeeded or not.
|
| - """
|
| - if not self.endpoint.startswith('https://'):
|
| - logging.error("Received invalid https endpoint: %s", self.endpoint)
|
| - return False
|
| -
|
| - logging.debug('event_mon: POSTing events to %s', self.endpoint)
|
| -
|
| - attempt = 0 # silencing pylint
|
| - for attempt in xrange(self.try_num): # pragma: no branch
|
| - # (re)set this time at the very last moment
|
| - events.request_time_ms = time_ms()
|
| - response = None
|
| - try:
|
| - if self._dry_run:
|
| - logging.info('Http requests disabled. Not sending anything')
|
| - else: # pragma: no cover
|
| - response, _ = self._http.request(
|
| - uri=self.endpoint,
|
| - method='POST',
|
| - headers={'Content-Type': 'application/octet-stream'},
|
| - body=events.SerializeToString()
|
| - )
|
| -
|
| - if self._dry_run or response.status == 200:
|
| - logging.debug('Succeeded POSTing data after %d attempts', attempt + 1)
|
| - return True
|
| -
|
| - except Exception:
|
| - logging.exception('exception when POSTing data')
|
| -
|
| - if response:
|
| - logging.error('failed to POST data to %s Status: %d (attempt %d)',
|
| - self.endpoint, response.status, attempt)
|
| -
|
| - if attempt == 0:
|
| - logging.error('data: %s', str(events)[:2000])
|
| -
|
| - self._sleep_fn(backoff_time(attempt, retry_backoff=self.retry_backoff))
|
| -
|
| - logging.error('failed to POST data after %d attempts, giving up.',
|
| - attempt+1)
|
| - return False
|
|
|