| OLD | NEW |
| (Empty) |
| 1 # Copyright 2015 The Chromium Authors. All rights reserved. | |
| 2 # Use of this source code is governed by a BSD-style license that can be | |
| 3 # found in the LICENSE file. | |
| 4 | |
| 5 """Classes representing the monitoring interface for tasks or devices.""" | |
| 6 | |
| 7 | |
| 8 import base64 | |
| 9 import httplib2 | |
| 10 import json | |
| 11 import logging | |
| 12 import socket | |
| 13 import traceback | |
| 14 | |
| 15 from googleapiclient import discovery | |
| 16 from googleapiclient import errors | |
| 17 from infra_libs import httplib2_utils | |
| 18 from infra_libs.ts_mon.common import http_metrics | |
| 19 from infra_libs.ts_mon.common import pb_to_popo | |
| 20 from infra_libs.ts_mon.protos import metrics_pb2 | |
| 21 from oauth2client import gce | |
| 22 from oauth2client.client import GoogleCredentials | |
| 23 from oauth2client.file import Storage | |
| 24 | |
| 25 # Special string that can be passed through as the credentials path to use the | |
| 26 # default Appengine or GCE service account. | |
| 27 APPENGINE_CREDENTIALS = ':appengine' | |
| 28 GCE_CREDENTIALS = ':gce' | |
| 29 | |
| 30 | |
| 31 class Monitor(object): | |
| 32 """Abstract base class encapsulating the ability to collect and send metrics. | |
| 33 | |
| 34 This is a singleton class. There should only be one instance of a Monitor at | |
| 35 a time. It will be created and initialized by process_argparse_options. It | |
| 36 must exist in order for any metrics to be sent, although both Targets and | |
| 37 Metrics may be initialized before the underlying Monitor. If it does not exist | |
| 38 at the time that a Metric is sent, an exception will be raised. | |
| 39 """ | |
| 40 | |
| 41 _SCOPES = [] | |
| 42 | |
| 43 @staticmethod | |
| 44 def _wrap_proto(data): | |
| 45 """Normalize MetricsData, list(MetricsData), and MetricsCollection. | |
| 46 | |
| 47 Args: | |
| 48 input: A MetricsData, list of MetricsData, or a MetricsCollection. | |
| 49 | |
| 50 Returns: | |
| 51 A MetricsCollection with the appropriate data attribute set. | |
| 52 """ | |
| 53 if isinstance(data, metrics_pb2.MetricsCollection): | |
| 54 ret = data | |
| 55 elif isinstance(data, list): | |
| 56 ret = metrics_pb2.MetricsCollection(data=data) | |
| 57 else: | |
| 58 ret = metrics_pb2.MetricsCollection(data=[data]) | |
| 59 return ret | |
| 60 | |
| 61 def _load_credentials(self, credentials_file_path): | |
| 62 if credentials_file_path == GCE_CREDENTIALS: | |
| 63 return gce.AppAssertionCredentials(self._SCOPES) | |
| 64 if credentials_file_path == APPENGINE_CREDENTIALS: # pragma: no cover | |
| 65 # This import doesn't work outside appengine, so delay it until it's used. | |
| 66 from oauth2client import appengine | |
| 67 from google.appengine.api import app_identity | |
| 68 logging.info('Initializing with service account %s', | |
| 69 app_identity.get_service_account_name()) | |
| 70 return appengine.AppAssertionCredentials(self._SCOPES) | |
| 71 | |
| 72 with open(credentials_file_path, 'r') as credentials_file: | |
| 73 credentials_json = json.load(credentials_file) | |
| 74 if credentials_json.get('type', None): | |
| 75 credentials = GoogleCredentials.from_stream(credentials_file_path) | |
| 76 credentials = credentials.create_scoped(self._SCOPES) | |
| 77 return credentials | |
| 78 return Storage(credentials_file_path).get() | |
| 79 | |
| 80 def send(self, metric_pb): | |
| 81 raise NotImplementedError() | |
| 82 | |
| 83 class HttpsMonitor(Monitor): | |
| 84 | |
| 85 _SCOPES = [ | |
| 86 'https://www.googleapis.com/auth/prodxmon' | |
| 87 ] | |
| 88 | |
| 89 def __init__(self, endpoint, credentials_file_path, http=None): | |
| 90 self._endpoint = endpoint | |
| 91 credentials = self._load_credentials(credentials_file_path) | |
| 92 if http is None: | |
| 93 http = httplib2_utils.RetriableHttp( | |
| 94 httplib2_utils.InstrumentedHttp('acq-mon-api')) | |
| 95 self._http = credentials.authorize(http) | |
| 96 | |
| 97 def encodeToJson(self, metric_pb): | |
| 98 return json.dumps({ 'resource': pb_to_popo.convert(metric_pb) }) | |
| 99 | |
| 100 def send(self, metric_pb): | |
| 101 body = self.encodeToJson(self._wrap_proto(metric_pb)) | |
| 102 | |
| 103 try: | |
| 104 resp, content = self._http.request(self._endpoint, method='POST', | |
| 105 body=body) | |
| 106 if resp.status != 200: | |
| 107 logging.warning('HttpsMonitor.send received status %d: %s', resp.status, | |
| 108 content) | |
| 109 except (ValueError, errors.Error, | |
| 110 socket.timeout, socket.error, socket.herror, socket.gaierror, | |
| 111 httplib2.HttpLib2Error): | |
| 112 logging.warning('HttpsMonitor.send failed: %s\n', | |
| 113 traceback.format_exc()) | |
| 114 | |
| 115 | |
| 116 class PubSubMonitor(Monitor): | |
| 117 """Class which publishes metrics to a Cloud Pub/Sub topic.""" | |
| 118 | |
| 119 _SCOPES = [ | |
| 120 'https://www.googleapis.com/auth/pubsub', | |
| 121 ] | |
| 122 | |
| 123 TIMEOUT = 10 # seconds | |
| 124 | |
| 125 def _initialize(self): | |
| 126 creds = self._load_credentials(self._credsfile) | |
| 127 creds.authorize(self._http) | |
| 128 self._api = discovery.build('pubsub', 'v1', http=self._http) | |
| 129 | |
| 130 def _update_init_metrics(self, status): | |
| 131 if not self._use_instrumented_http: | |
| 132 return | |
| 133 fields = {'name': 'acq-mon-api-pubsub', | |
| 134 'client': 'discovery', | |
| 135 'status': status} | |
| 136 http_metrics.response_status.increment(fields=fields) | |
| 137 | |
| 138 def _check_initialize(self): | |
| 139 if self._api: | |
| 140 return True | |
| 141 try: | |
| 142 self._initialize() | |
| 143 except (ValueError, errors.Error, | |
| 144 socket.timeout, socket.error, socket.herror, socket.gaierror, | |
| 145 httplib2.HttpLib2Error, EnvironmentError): | |
| 146 # Log a warning, not error, to avoid false alarms in AppEngine apps. | |
| 147 logging.warning('PubSubMonitor._initialize failed:\n%s', | |
| 148 traceback.format_exc()) | |
| 149 self._api = None | |
| 150 self._update_init_metrics(http_metrics.STATUS_ERROR) | |
| 151 return False | |
| 152 | |
| 153 self._update_init_metrics(http_metrics.STATUS_OK) | |
| 154 return True | |
| 155 | |
| 156 def __init__(self, credsfile, project, topic, use_instrumented_http=True): | |
| 157 """Process monitoring related command line flags and initialize api. | |
| 158 | |
| 159 Args: | |
| 160 credsfile (str): path to the credentials json file | |
| 161 project (str): the name of the Pub/Sub project to publish to. | |
| 162 topic (str): the name of the Pub/Sub topic to publish to. | |
| 163 use_instrumented_http (bool): whether to record monitoring metrics for | |
| 164 HTTP requests made to the pubsub API. | |
| 165 """ | |
| 166 # Do not call self._check_initialize() in the constructor. This | |
| 167 # class is constructed during app initialization on AppEngine, and | |
| 168 # network calls are especially flaky during that time. | |
| 169 self._api = None | |
| 170 self._use_instrumented_http = use_instrumented_http | |
| 171 if use_instrumented_http: | |
| 172 self._http = httplib2_utils.InstrumentedHttp( | |
| 173 'acq-mon-api-pubsub', timeout=self.TIMEOUT) | |
| 174 else: | |
| 175 self._http = httplib2.Http(timeout=self.TIMEOUT) | |
| 176 self._credsfile = credsfile | |
| 177 self._topic = 'projects/%s/topics/%s' % (project, topic) | |
| 178 | |
| 179 def send(self, metric_pb): | |
| 180 """Send a metric proto to the monitoring api. | |
| 181 | |
| 182 Args: | |
| 183 metric_pb (MetricsData or MetricsCollection): the metric protobuf to send | |
| 184 """ | |
| 185 if not self._check_initialize(): | |
| 186 return | |
| 187 proto = self._wrap_proto(metric_pb) | |
| 188 logging.debug('ts_mon: sending %d metrics to PubSub', len(proto.data)) | |
| 189 body = { | |
| 190 'messages': [ | |
| 191 {'data': base64.b64encode(proto.SerializeToString())}, | |
| 192 ], | |
| 193 } | |
| 194 # Occasionally, client fails to receive a proper internal JSON | |
| 195 # from the server and raises ValueError trying to parse it. Other | |
| 196 # times we may fail with a network error. This is not fatal, we'll | |
| 197 # resend metrics next time. | |
| 198 try: | |
| 199 self._api.projects().topics().publish( | |
| 200 topic=self._topic, | |
| 201 body=body).execute(num_retries=5) | |
| 202 except (ValueError, errors.Error, | |
| 203 socket.timeout, socket.error, socket.herror, socket.gaierror, | |
| 204 httplib2.HttpLib2Error): | |
| 205 # Log a warning, not error, to avoid false alarms in AppEngine apps. | |
| 206 logging.warning('PubSubMonitor.send failed:\n%s', | |
| 207 traceback.format_exc()) | |
| 208 | |
| 209 | |
| 210 class DebugMonitor(Monitor): | |
| 211 """Class which writes metrics to logs or a local file for debugging.""" | |
| 212 def __init__(self, filepath=None): | |
| 213 if filepath is None: | |
| 214 self._fh = None | |
| 215 else: | |
| 216 self._fh = open(filepath, 'a') | |
| 217 | |
| 218 def send(self, metric_pb): | |
| 219 text = str(self._wrap_proto(metric_pb)) | |
| 220 logging.info('Flushing monitoring metrics:\n%s', text) | |
| 221 if self._fh is not None: | |
| 222 self._fh.write(text + '\n\n') | |
| 223 self._fh.flush() | |
| 224 | |
| 225 | |
| 226 class NullMonitor(Monitor): | |
| 227 """Class that doesn't send metrics anywhere.""" | |
| 228 def send(self, metric_pb): | |
| 229 pass | |
| OLD | NEW |