Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(358)

Unified Diff: infra_libs/ts_mon/common/monitors.py

Issue 2213143002: Add infra_libs as a bootstrap dependency. (Closed) Base URL: https://chromium.googlesource.com/infra/infra.git@master
Patch Set: Removed the ugly import hack Created 4 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: infra_libs/ts_mon/common/monitors.py
diff --git a/infra_libs/ts_mon/common/monitors.py b/infra_libs/ts_mon/common/monitors.py
deleted file mode 100644
index e39cdfe63503584c8622f1c96bc837f7d77fdfc3..0000000000000000000000000000000000000000
--- a/infra_libs/ts_mon/common/monitors.py
+++ /dev/null
@@ -1,229 +0,0 @@
-# Copyright 2015 The Chromium Authors. All rights reserved.
-# Use of this source code is governed by a BSD-style license that can be
-# found in the LICENSE file.
-
-"""Classes representing the monitoring interface for tasks or devices."""
-
-
-import base64
-import httplib2
-import json
-import logging
-import socket
-import traceback
-
-from googleapiclient import discovery
-from googleapiclient import errors
-from infra_libs import httplib2_utils
-from infra_libs.ts_mon.common import http_metrics
-from infra_libs.ts_mon.common import pb_to_popo
-from infra_libs.ts_mon.protos import metrics_pb2
-from oauth2client import gce
-from oauth2client.client import GoogleCredentials
-from oauth2client.file import Storage
-
-# Special string that can be passed through as the credentials path to use the
-# default Appengine or GCE service account.
-APPENGINE_CREDENTIALS = ':appengine'
-GCE_CREDENTIALS = ':gce'
-
-
-class Monitor(object):
- """Abstract base class encapsulating the ability to collect and send metrics.
-
- This is a singleton class. There should only be one instance of a Monitor at
- a time. It will be created and initialized by process_argparse_options. It
- must exist in order for any metrics to be sent, although both Targets and
- Metrics may be initialized before the underlying Monitor. If it does not exist
- at the time that a Metric is sent, an exception will be raised.
- """
-
- _SCOPES = []
-
- @staticmethod
- def _wrap_proto(data):
- """Normalize MetricsData, list(MetricsData), and MetricsCollection.
-
- Args:
- input: A MetricsData, list of MetricsData, or a MetricsCollection.
-
- Returns:
- A MetricsCollection with the appropriate data attribute set.
- """
- if isinstance(data, metrics_pb2.MetricsCollection):
- ret = data
- elif isinstance(data, list):
- ret = metrics_pb2.MetricsCollection(data=data)
- else:
- ret = metrics_pb2.MetricsCollection(data=[data])
- return ret
-
- def _load_credentials(self, credentials_file_path):
- if credentials_file_path == GCE_CREDENTIALS:
- return gce.AppAssertionCredentials(self._SCOPES)
- if credentials_file_path == APPENGINE_CREDENTIALS: # pragma: no cover
- # This import doesn't work outside appengine, so delay it until it's used.
- from oauth2client import appengine
- from google.appengine.api import app_identity
- logging.info('Initializing with service account %s',
- app_identity.get_service_account_name())
- return appengine.AppAssertionCredentials(self._SCOPES)
-
- with open(credentials_file_path, 'r') as credentials_file:
- credentials_json = json.load(credentials_file)
- if credentials_json.get('type', None):
- credentials = GoogleCredentials.from_stream(credentials_file_path)
- credentials = credentials.create_scoped(self._SCOPES)
- return credentials
- return Storage(credentials_file_path).get()
-
- def send(self, metric_pb):
- raise NotImplementedError()
-
-class HttpsMonitor(Monitor):
-
- _SCOPES = [
- 'https://www.googleapis.com/auth/prodxmon'
- ]
-
- def __init__(self, endpoint, credentials_file_path, http=None):
- self._endpoint = endpoint
- credentials = self._load_credentials(credentials_file_path)
- if http is None:
- http = httplib2_utils.RetriableHttp(
- httplib2_utils.InstrumentedHttp('acq-mon-api'))
- self._http = credentials.authorize(http)
-
- def encodeToJson(self, metric_pb):
- return json.dumps({ 'resource': pb_to_popo.convert(metric_pb) })
-
- def send(self, metric_pb):
- body = self.encodeToJson(self._wrap_proto(metric_pb))
-
- try:
- resp, content = self._http.request(self._endpoint, method='POST',
- body=body)
- if resp.status != 200:
- logging.warning('HttpsMonitor.send received status %d: %s', resp.status,
- content)
- except (ValueError, errors.Error,
- socket.timeout, socket.error, socket.herror, socket.gaierror,
- httplib2.HttpLib2Error):
- logging.warning('HttpsMonitor.send failed: %s\n',
- traceback.format_exc())
-
-
-class PubSubMonitor(Monitor):
- """Class which publishes metrics to a Cloud Pub/Sub topic."""
-
- _SCOPES = [
- 'https://www.googleapis.com/auth/pubsub',
- ]
-
- TIMEOUT = 10 # seconds
-
- def _initialize(self):
- creds = self._load_credentials(self._credsfile)
- creds.authorize(self._http)
- self._api = discovery.build('pubsub', 'v1', http=self._http)
-
- def _update_init_metrics(self, status):
- if not self._use_instrumented_http:
- return
- fields = {'name': 'acq-mon-api-pubsub',
- 'client': 'discovery',
- 'status': status}
- http_metrics.response_status.increment(fields=fields)
-
- def _check_initialize(self):
- if self._api:
- return True
- try:
- self._initialize()
- except (ValueError, errors.Error,
- socket.timeout, socket.error, socket.herror, socket.gaierror,
- httplib2.HttpLib2Error, EnvironmentError):
- # Log a warning, not error, to avoid false alarms in AppEngine apps.
- logging.warning('PubSubMonitor._initialize failed:\n%s',
- traceback.format_exc())
- self._api = None
- self._update_init_metrics(http_metrics.STATUS_ERROR)
- return False
-
- self._update_init_metrics(http_metrics.STATUS_OK)
- return True
-
- def __init__(self, credsfile, project, topic, use_instrumented_http=True):
- """Process monitoring related command line flags and initialize api.
-
- Args:
- credsfile (str): path to the credentials json file
- project (str): the name of the Pub/Sub project to publish to.
- topic (str): the name of the Pub/Sub topic to publish to.
- use_instrumented_http (bool): whether to record monitoring metrics for
- HTTP requests made to the pubsub API.
- """
- # Do not call self._check_initialize() in the constructor. This
- # class is constructed during app initialization on AppEngine, and
- # network calls are especially flaky during that time.
- self._api = None
- self._use_instrumented_http = use_instrumented_http
- if use_instrumented_http:
- self._http = httplib2_utils.InstrumentedHttp(
- 'acq-mon-api-pubsub', timeout=self.TIMEOUT)
- else:
- self._http = httplib2.Http(timeout=self.TIMEOUT)
- self._credsfile = credsfile
- self._topic = 'projects/%s/topics/%s' % (project, topic)
-
- def send(self, metric_pb):
- """Send a metric proto to the monitoring api.
-
- Args:
- metric_pb (MetricsData or MetricsCollection): the metric protobuf to send
- """
- if not self._check_initialize():
- return
- proto = self._wrap_proto(metric_pb)
- logging.debug('ts_mon: sending %d metrics to PubSub', len(proto.data))
- body = {
- 'messages': [
- {'data': base64.b64encode(proto.SerializeToString())},
- ],
- }
- # Occasionally, client fails to receive a proper internal JSON
- # from the server and raises ValueError trying to parse it. Other
- # times we may fail with a network error. This is not fatal, we'll
- # resend metrics next time.
- try:
- self._api.projects().topics().publish(
- topic=self._topic,
- body=body).execute(num_retries=5)
- except (ValueError, errors.Error,
- socket.timeout, socket.error, socket.herror, socket.gaierror,
- httplib2.HttpLib2Error):
- # Log a warning, not error, to avoid false alarms in AppEngine apps.
- logging.warning('PubSubMonitor.send failed:\n%s',
- traceback.format_exc())
-
-
-class DebugMonitor(Monitor):
- """Class which writes metrics to logs or a local file for debugging."""
- def __init__(self, filepath=None):
- if filepath is None:
- self._fh = None
- else:
- self._fh = open(filepath, 'a')
-
- def send(self, metric_pb):
- text = str(self._wrap_proto(metric_pb))
- logging.info('Flushing monitoring metrics:\n%s', text)
- if self._fh is not None:
- self._fh.write(text + '\n\n')
- self._fh.flush()
-
-
-class NullMonitor(Monitor):
- """Class that doesn't send metrics anywhere."""
- def send(self, metric_pb):
- pass

Powered by Google App Engine
This is Rietveld 408576698