Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(209)

Unified Diff: infra_libs/event_mon/router.py

Issue 2213143002: Add infra_libs as a bootstrap dependency. (Closed) Base URL: https://chromium.googlesource.com/infra/infra.git@master
Patch Set: Removed the ugly import hack Created 4 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: infra_libs/event_mon/router.py
diff --git a/infra_libs/event_mon/router.py b/infra_libs/event_mon/router.py
deleted file mode 100644
index 437e1189c6804a1b694cf867a0e417610062001a..0000000000000000000000000000000000000000
--- a/infra_libs/event_mon/router.py
+++ /dev/null
@@ -1,271 +0,0 @@
-# Copyright 2015 The Chromium Authors. All rights reserved.
-# Use of this source code is governed by a BSD-style license that can be
-# found in the LICENSE file.
-
-import logging
-import os
-import random
-import sys
-import time
-
-import httplib2
-
-import infra_libs
-from infra_libs.event_mon.protos.log_request_lite_pb2 import LogRequestLite
-from infra_libs.event_mon.protos.chrome_infra_log_pb2 import ChromeInfraEvent
-
-
-def time_ms():
- """Return current timestamp in milliseconds."""
- return int(1000 * time.time())
-
-
-def backoff_time(attempt, retry_backoff=2., max_delay=30.):
- """Compute randomized exponential backoff time.
-
- Args:
- attempt (int): attempt number, starting at zero.
-
- Keyword Args:
- retry_backoff(float): backoff time on the first attempt.
- max_delay(float): maximum returned value.
- """
- delay = retry_backoff * (2 ** attempt)
- # Add +-25% of variation.
- delay += delay * ((random.random() - 0.5) / 2.)
- return min(delay, max_delay)
-
-
-class _Router(object):
- """Route events to the right destination. Base class.
-
- This object is meant to be a singleton, and is not part of the API.
- Subclasses must implement _send_to_endpoint().
-
- Usage:
- router = _Router()
- event = ChromeInfraEvent.LogEventLite(...)
- ... fill in event ...
- router.push_event(event)
- """
- def push_event(self, log_events):
- """Enqueue event to push to the collection service.
-
- This method offers no guarantee on return that the event have been pushed
- externally, as some buffering can take place.
-
- Args:
- log_events (LogRequestLite.LogEventLite or list/tuple of): events.
-
- Returns:
- success (bool): False if an error happened. True means 'event accepted',
- but NOT 'event successfully pushed to the remote'.
- """
- if isinstance(log_events, LogRequestLite.LogEventLite):
- log_events = (log_events,)
-
- if not isinstance(log_events, (list, tuple)):
- logging.error('Invalid type for "event", should be LogEventLite or '
- 'list of. Got %s' % str(type(log_events)))
- return False
-
- request_p = LogRequestLite()
- request_p.log_source_name = 'CHROME_INFRA'
- request_p.log_event.extend(log_events) # copies the protobuf
- # Sets the sending time here for safety, _send_to_endpoint should change it
- # if needed.
- request_p.request_time_ms = time_ms()
- return self._send_to_endpoint(request_p)
-
- def _send_to_endpoint(self, events):
- """Send a protobuf to wherever it should be sent.
-
- This method is called by push_event.
- If some computation is require, make sure to update events.request_time_ms
- right before sending.
-
- Args:
- events(LogRequestLite): protobuf to send.
-
- Returns:
- success(bool): whether POSTing/writing succeeded or not.
- """
- raise NotImplementedError('Please implement _send_to_endpoint().')
-
-
-class _LocalFileRouter(_Router):
- def __init__(self, output_file, dry_run=False):
- """Initialize the router.
-
- Writes a serialized LogRequestLite protobuf in a local file. File is
- created/truncated before writing (no append).
-
- Args:
- output_file(str): path to file where to write the protobuf.
-
- Keyword Args:
- dry_run(bool): if True, the file is not written.
- """
- _Router.__init__(self)
- self.output_file = output_file
- self._dry_run = dry_run
-
- def _send_to_endpoint(self, events):
- try:
- if not os.path.isdir(os.path.dirname(self.output_file)):
- logging.error('File cannot be written, parent directory does '
- 'not exist: %s' % os.path.dirname(self.output_file))
- if self._dry_run:
- logging.info('Would have written in %s', self.output_file)
- else:
- with open(self.output_file, 'wb') as f:
- f.write(events.SerializeToString()) # pragma: no branch
- except Exception:
- logging.exception('Failed to write in file: %s', self.output_file)
- return False
-
- return True
-
-
-class _TextStreamRouter(_Router):
- def __init__(self, stream=sys.stdout):
- """Initialize the router.
-
- Args:
- stream(File): where to write the output.
- """
- _Router.__init__(self)
- self.stream = stream
-
- def _send_to_endpoint(self, events):
- # Prints individual events because it's what we're usually interested in
- # in that case.
- infra_events = [str(ChromeInfraEvent.FromString(
- ev.source_extension)) for ev in events.log_event]
- try:
- self.stream.write('%s\n' % '\n'.join(infra_events))
- except Exception:
- logging.exception('Unable to write to provided stream')
- return False
- return True
-
-
-class _LoggingStreamRouter(_Router):
- def __init__(self, severity=logging.INFO):
- """Initialize the router.
-
- Args:
- severity: severity of the messages for reporting events
- """
- _Router.__init__(self)
- self.severity = severity
-
- def _send_to_endpoint(self, events):
- try:
- for ev in events.log_event:
- ev_str = str(ChromeInfraEvent.FromString(ev.source_extension))
- logging.log(self.severity, 'Sending event_mon event:\n%s' % ev_str)
- except Exception:
- logging.exception('Unable to log the events')
- return False
- return True
-
-
-class _HttpRouter(_Router):
- def __init__(self, cache, endpoint, timeout=10, try_num=3, retry_backoff=2.,
- dry_run=False, _sleep_fn=time.sleep):
- """Initialize the router.
-
- Args:
- cache(dict): This must be config._cache. Passed as a parameter to
- avoid a circular import.
- endpoint(str or None): None means 'dry run'. What would be sent is printed
- on stdout. If endpoint starts with 'https://' data is POSTed there.
- Otherwise it is interpreted as a file where to write serialized
- LogEventLite protos.
- try_num(int): max number of http requests send to the endpoint.
- retry_backoff(float): time in seconds before retrying posting to the
- endpoint. Randomized exponential backoff is applied on subsequent
- retries.
- dry_run(boolean): if True, no http request is sent. Instead a message is
- printed.
- _sleep_fn (function): function to wait specified number of seconds. This
- argument is provided for testing purposes.
- """
- HTTP_IDENTIFIER = 'event_mon'
- _Router.__init__(self)
- self.endpoint = endpoint
- self.try_num = try_num
- self.retry_backoff = retry_backoff
- self._cache = cache
- self._http = infra_libs.InstrumentedHttp(HTTP_IDENTIFIER, timeout=timeout)
- self._dry_run = dry_run
- self._sleep_fn = _sleep_fn
-
- # TODO(pgervais) pass this as parameters instead.
- if self._cache.get('service_account_creds'):
- try:
- logging.debug('Activating OAuth2 authentication.')
- self._http = infra_libs.get_authenticated_http(
- self._cache['service_account_creds'],
- service_accounts_creds_root=
- self._cache['service_accounts_creds_root'],
- scope='https://www.googleapis.com/auth/cclog',
- http_identifier=HTTP_IDENTIFIER,
- timeout=timeout
- )
- except IOError:
- logging.error('Unable to read credentials, requests will be '
- 'unauthenticated. File: %s',
- self._cache.get('service_account_creds'))
-
- def _send_to_endpoint(self, events):
- """Send protobuf to endpoint
-
- Args:
- events(LogRequestLite): the protobuf to send.
-
- Returns:
- success(bool): whether POSTing/writing succeeded or not.
- """
- if not self.endpoint.startswith('https://'):
- logging.error("Received invalid https endpoint: %s", self.endpoint)
- return False
-
- logging.debug('event_mon: POSTing events to %s', self.endpoint)
-
- attempt = 0 # silencing pylint
- for attempt in xrange(self.try_num): # pragma: no branch
- # (re)set this time at the very last moment
- events.request_time_ms = time_ms()
- response = None
- try:
- if self._dry_run:
- logging.info('Http requests disabled. Not sending anything')
- else: # pragma: no cover
- response, _ = self._http.request(
- uri=self.endpoint,
- method='POST',
- headers={'Content-Type': 'application/octet-stream'},
- body=events.SerializeToString()
- )
-
- if self._dry_run or response.status == 200:
- logging.debug('Succeeded POSTing data after %d attempts', attempt + 1)
- return True
-
- except Exception:
- logging.exception('exception when POSTing data')
-
- if response:
- logging.error('failed to POST data to %s Status: %d (attempt %d)',
- self.endpoint, response.status, attempt)
-
- if attempt == 0:
- logging.error('data: %s', str(events)[:2000])
-
- self._sleep_fn(backoff_time(attempt, retry_backoff=self.retry_backoff))
-
- logging.error('failed to POST data after %d attempts, giving up.',
- attempt+1)
- return False

Powered by Google App Engine
This is Rietveld 408576698