| OLD | NEW |
| 1 # Copyright 2015 The Chromium Authors. All rights reserved. | 1 # Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 # Use of this source code is governed by a BSD-style license that can be | 2 # Use of this source code is governed by a BSD-style license that can be |
| 3 # found in the LICENSE file. | 3 # found in the LICENSE file. |
| 4 | 4 |
| 5 """Classes representing the monitoring interface for tasks or devices.""" | 5 """Classes representing the monitoring interface for tasks or devices.""" |
| 6 | 6 |
| 7 | 7 |
| 8 import base64 | 8 import base64 |
| 9 import httplib2 | 9 import httplib2 |
| 10 import json | 10 import json |
| 11 import logging | 11 import logging |
| 12 import socket | 12 import socket |
| 13 import traceback | 13 import traceback |
| 14 | 14 |
| 15 from googleapiclient import discovery | 15 from googleapiclient import discovery |
| 16 from googleapiclient import errors | 16 from googleapiclient import errors |
| 17 from infra_libs import httplib2_utils | 17 from infra_libs import httplib2_utils |
| 18 from infra_libs.ts_mon.common import interface | 18 from infra_libs.ts_mon.common import interface |
| 19 from infra_libs.ts_mon.common import http_metrics | 19 from infra_libs.ts_mon.common import http_metrics |
| 20 from infra_libs.ts_mon.common import pb_to_popo | 20 from infra_libs.ts_mon.common import pb_to_popo |
| 21 from infra_libs.ts_mon.protos.current import metrics_pb2 | 21 from infra_libs.ts_mon.protos import metrics_pb2 |
| 22 try: # pragma: no cover | 22 try: # pragma: no cover |
| 23 from oauth2client import gce | 23 from oauth2client import gce |
| 24 except ImportError: # pragma: no cover | 24 except ImportError: # pragma: no cover |
| 25 from oauth2client.contrib import gce | 25 from oauth2client.contrib import gce |
| 26 from oauth2client.client import GoogleCredentials | 26 from oauth2client.client import GoogleCredentials |
| 27 from oauth2client.file import Storage | 27 from oauth2client.file import Storage |
| 28 | 28 |
| 29 # Special string that can be passed through as the credentials path to use the | 29 # Special string that can be passed through as the credentials path to use the |
| 30 # default Appengine or GCE service account. | 30 # default Appengine or GCE service account. |
| 31 APPENGINE_CREDENTIALS = ':appengine' | 31 APPENGINE_CREDENTIALS = ':appengine' |
| (...skipping 19 matching lines...) Expand all Loading... |
| 51 | 51 |
| 52 class GCECredentials(CredentialFactory): | 52 class GCECredentials(CredentialFactory): |
| 53 def create(self, scopes): | 53 def create(self, scopes): |
| 54 return gce.AppAssertionCredentials(scopes) | 54 return gce.AppAssertionCredentials(scopes) |
| 55 | 55 |
| 56 | 56 |
| 57 class AppengineCredentials(CredentialFactory): | 57 class AppengineCredentials(CredentialFactory): |
| 58 def create(self, scopes): # pragma: no cover | 58 def create(self, scopes): # pragma: no cover |
| 59 # This import doesn't work outside appengine, so delay it until it's used. | 59 # This import doesn't work outside appengine, so delay it until it's used. |
| 60 from oauth2client import appengine | 60 from oauth2client import appengine |
| 61 from google.appengine.api import app_identity | |
| 62 logging.info('Initializing with service account %s', | |
| 63 app_identity.get_service_account_name()) | |
| 64 return appengine.AppAssertionCredentials(scopes) | 61 return appengine.AppAssertionCredentials(scopes) |
| 65 | 62 |
| 66 | 63 |
| 67 class FileCredentials(CredentialFactory): | 64 class FileCredentials(CredentialFactory): |
| 68 def __init__(self, path): | 65 def __init__(self, path): |
| 69 self.path = path | 66 self.path = path |
| 70 | 67 |
| 71 def create(self, scopes): | 68 def create(self, scopes): |
| 72 with open(self.path, 'r') as fh: | 69 with open(self.path, 'r') as fh: |
| 73 data = json.load(fh) | 70 data = json.load(fh) |
| (...skipping 20 matching lines...) Expand all Loading... |
| 94 | 91 |
| 95 | 92 |
| 96 class Monitor(object): | 93 class Monitor(object): |
| 97 """Abstract base class encapsulating the ability to collect and send metrics. | 94 """Abstract base class encapsulating the ability to collect and send metrics. |
| 98 | 95 |
| 99 This is a singleton class. There should only be one instance of a Monitor at | 96 This is a singleton class. There should only be one instance of a Monitor at |
| 100 a time. It will be created and initialized by process_argparse_options. It | 97 a time. It will be created and initialized by process_argparse_options. It |
| 101 must exist in order for any metrics to be sent, although both Targets and | 98 must exist in order for any metrics to be sent, although both Targets and |
| 102 Metrics may be initialized before the underlying Monitor. If it does not exist | 99 Metrics may be initialized before the underlying Monitor. If it does not exist |
| 103 at the time that a Metric is sent, an exception will be raised. | 100 at the time that a Metric is sent, an exception will be raised. |
| 101 |
| 102 send() can be either synchronous or asynchronous. If synchronous, it needs to |
| 103 make the HTTP request, wait for a response and return None. |
| 104 If asynchronous, send() should start the request and immediately return some |
| 105 object which is later passed to wait() once all requests have been started. |
| 104 """ | 106 """ |
| 105 | 107 |
| 106 _SCOPES = [] | 108 _SCOPES = [] |
| 107 | 109 |
| 108 @staticmethod | |
| 109 def _wrap_proto(data): | |
| 110 """Normalize MetricsData, list(MetricsData), and MetricsCollection. | |
| 111 | |
| 112 Args: | |
| 113 input: A MetricsData, list of MetricsData, or a MetricsCollection. | |
| 114 | |
| 115 Returns: | |
| 116 A MetricsCollection with the appropriate data attribute set. | |
| 117 """ | |
| 118 if isinstance(data, metrics_pb2.MetricsCollection): | |
| 119 ret = data | |
| 120 elif isinstance(data, list): | |
| 121 ret = metrics_pb2.MetricsCollection(data=data) | |
| 122 else: | |
| 123 ret = metrics_pb2.MetricsCollection(data=[data]) | |
| 124 return ret | |
| 125 | |
| 126 def send(self, metric_pb): | 110 def send(self, metric_pb): |
| 127 raise NotImplementedError() | 111 raise NotImplementedError() |
| 128 | 112 |
| 113 def wait(self, state): # pragma: no cover |
| 114 pass |
| 115 |
| 129 | 116 |
| 130 class HttpsMonitor(Monitor): | 117 class HttpsMonitor(Monitor): |
| 131 | 118 |
| 132 _SCOPES = ['https://www.googleapis.com/auth/prodxmon'] | 119 _SCOPES = ['https://www.googleapis.com/auth/prodxmon'] |
| 133 | 120 |
| 134 def __init__(self, endpoint, credential_factory, http=None, ca_certs=None): | 121 def __init__(self, endpoint, credential_factory, http=None, ca_certs=None): |
| 135 self._endpoint = endpoint | 122 self._endpoint = endpoint |
| 136 credentials = credential_factory.create(self._SCOPES) | 123 credentials = credential_factory.create(self._SCOPES) |
| 137 if http is None: | 124 if http is None: |
| 138 http = httplib2_utils.RetriableHttp( | 125 http = httplib2_utils.RetriableHttp( |
| 139 httplib2_utils.InstrumentedHttp('acq-mon-api', ca_certs=ca_certs)) | 126 httplib2_utils.InstrumentedHttp('acq-mon-api', ca_certs=ca_certs)) |
| 140 self._http = credentials.authorize(http) | 127 self._http = credentials.authorize(http) |
| 141 | 128 |
| 142 def encode_to_json(self, metric_pb): | 129 def encode_to_json(self, metric_pb): |
| 143 if interface.state.use_new_proto: | 130 return json.dumps({'payload': pb_to_popo.convert(metric_pb)}) |
| 144 return json.dumps({'payload': pb_to_popo.convert(metric_pb)}) | |
| 145 else: | |
| 146 return json.dumps({'resource': pb_to_popo.convert(metric_pb)}) | |
| 147 | 131 |
| 148 def send(self, metric_pb): | 132 def send(self, metric_pb): |
| 149 logging.info('ts_mon: serializing metrics') | 133 body = self.encode_to_json(metric_pb) |
| 150 if interface.state.use_new_proto: | |
| 151 body = self.encode_to_json(metric_pb) | |
| 152 else: | |
| 153 body = self.encode_to_json(self._wrap_proto(metric_pb)) | |
| 154 | 134 |
| 155 try: | 135 try: |
| 156 logging.info('ts_mon: sending %d bytes', len(body)) | |
| 157 resp, content = self._http.request(self._endpoint, | 136 resp, content = self._http.request(self._endpoint, |
| 158 method='POST', | 137 method='POST', |
| 159 body=body, | 138 body=body, |
| 160 headers={'Content-Type': 'application/json'}) | 139 headers={'Content-Type': 'application/json'}) |
| 161 logging.info('ts_mon: request finished') | |
| 162 if resp.status != 200: | 140 if resp.status != 200: |
| 163 logging.warning('HttpsMonitor.send received status %d: %s', resp.status, | 141 logging.warning('HttpsMonitor.send received status %d: %s', resp.status, |
| 164 content) | 142 content) |
| 165 except (ValueError, errors.Error, | 143 except (ValueError, errors.Error, |
| 166 socket.timeout, socket.error, socket.herror, socket.gaierror, | 144 socket.timeout, socket.error, socket.herror, socket.gaierror, |
| 167 httplib2.HttpLib2Error): | 145 httplib2.HttpLib2Error): |
| 168 logging.warning('HttpsMonitor.send failed: %s\n', | 146 logging.exception('HttpsMonitor.send failed') |
| 169 traceback.format_exc()) | |
| 170 | |
| 171 | |
| 172 class PubSubMonitor(Monitor): | |
| 173 """Class which publishes metrics to a Cloud Pub/Sub topic.""" | |
| 174 | |
| 175 _SCOPES = ['https://www.googleapis.com/auth/pubsub'] | |
| 176 | |
| 177 TIMEOUT = 10 # seconds | |
| 178 | |
| 179 def _initialize(self): | |
| 180 creds = self._credential_factory.create(self._SCOPES) | |
| 181 creds.authorize(self._http) | |
| 182 self._api = discovery.build('pubsub', 'v1', http=self._http) | |
| 183 | |
| 184 def _update_init_metrics(self, status): | |
| 185 if not self._use_instrumented_http: | |
| 186 return | |
| 187 fields = {'name': 'acq-mon-api-pubsub', | |
| 188 'client': 'discovery', | |
| 189 'status': status} | |
| 190 http_metrics.response_status.increment(fields=fields) | |
| 191 | |
| 192 def _check_initialize(self): | |
| 193 if self._api: | |
| 194 return True | |
| 195 try: | |
| 196 self._initialize() | |
| 197 except (ValueError, errors.Error, | |
| 198 socket.timeout, socket.error, socket.herror, socket.gaierror, | |
| 199 httplib2.HttpLib2Error, EnvironmentError): | |
| 200 # Log a warning, not error, to avoid false alarms in AppEngine apps. | |
| 201 logging.warning('PubSubMonitor._initialize failed:\n%s', | |
| 202 traceback.format_exc()) | |
| 203 self._api = None | |
| 204 self._update_init_metrics(http_metrics.STATUS_ERROR) | |
| 205 return False | |
| 206 | |
| 207 self._update_init_metrics(http_metrics.STATUS_OK) | |
| 208 return True | |
| 209 | |
| 210 def __init__(self, credential_factory, project, topic, | |
| 211 use_instrumented_http=True, ca_certs=None): | |
| 212 """Process monitoring related command line flags and initialize api. | |
| 213 | |
| 214 Args: | |
| 215 credential_factory (CredentialFactory instance): factory that creates | |
| 216 oauth2 credentials. | |
| 217 project (str): the name of the Pub/Sub project to publish to. | |
| 218 topic (str): the name of the Pub/Sub topic to publish to. | |
| 219 use_instrumented_http (bool): whether to record monitoring metrics for | |
| 220 HTTP requests made to the pubsub API. | |
| 221 ca_certs (str): path to file containing root CA certificates for SSL | |
| 222 server certificate validation. If not set, a CA cert | |
| 223 file bundled with httplib2 is used. | |
| 224 """ | |
| 225 # Do not call self._check_initialize() in the constructor. This | |
| 226 # class is constructed during app initialization on AppEngine, and | |
| 227 # network calls are especially flaky during that time. | |
| 228 self._api = None | |
| 229 self._use_instrumented_http = use_instrumented_http | |
| 230 if use_instrumented_http: | |
| 231 self._http = httplib2_utils.InstrumentedHttp( | |
| 232 'acq-mon-api-pubsub', timeout=self.TIMEOUT, ca_certs=ca_certs) | |
| 233 else: | |
| 234 self._http = httplib2.Http(timeout=self.TIMEOUT, ca_certs=ca_certs) | |
| 235 self._credential_factory = credential_factory | |
| 236 self._topic = 'projects/%s/topics/%s' % (project, topic) | |
| 237 | |
| 238 def send(self, metric_pb): | |
| 239 """Send a metric proto to the monitoring api. | |
| 240 | |
| 241 Args: | |
| 242 metric_pb (MetricsData or MetricsCollection): the metric protobuf to send | |
| 243 """ | |
| 244 if not self._check_initialize(): | |
| 245 return | |
| 246 proto = self._wrap_proto(metric_pb) | |
| 247 logging.debug('ts_mon: sending %d metrics to PubSub', len(proto.data)) | |
| 248 body = { | |
| 249 'messages': [ | |
| 250 {'data': base64.b64encode(proto.SerializeToString())}, | |
| 251 ], | |
| 252 } | |
| 253 # Occasionally, client fails to receive a proper internal JSON | |
| 254 # from the server and raises ValueError trying to parse it. Other | |
| 255 # times we may fail with a network error. This is not fatal, we'll | |
| 256 # resend metrics next time. | |
| 257 try: | |
| 258 self._api.projects().topics().publish( | |
| 259 topic=self._topic, | |
| 260 body=body).execute(num_retries=5) | |
| 261 except (ValueError, errors.Error, | |
| 262 socket.timeout, socket.error, socket.herror, socket.gaierror, | |
| 263 httplib2.HttpLib2Error): | |
| 264 # Log a warning, not error, to avoid false alarms in AppEngine apps. | |
| 265 logging.warning('PubSubMonitor.send failed:\n%s', | |
| 266 traceback.format_exc()) | |
| 267 | 147 |
| 268 | 148 |
| 269 class DebugMonitor(Monitor): | 149 class DebugMonitor(Monitor): |
| 270 """Class which writes metrics to logs or a local file for debugging.""" | 150 """Class which writes metrics to logs or a local file for debugging.""" |
| 271 def __init__(self, filepath=None): | 151 def __init__(self, filepath=None): |
| 272 if filepath is None: | 152 if filepath is None: |
| 273 self._fh = None | 153 self._fh = None |
| 274 else: | 154 else: |
| 275 self._fh = open(filepath, 'a') | 155 self._fh = open(filepath, 'a') |
| 276 | 156 |
| 277 def send(self, metric_pb): | 157 def send(self, metric_pb): |
| 278 text = str(self._wrap_proto(metric_pb)) | 158 text = str(metric_pb) |
| 279 logging.info('Flushing monitoring metrics:\n%s', text) | 159 logging.info('Flushing monitoring metrics:\n%s', text) |
| 280 if self._fh is not None: | 160 if self._fh is not None: |
| 281 self._fh.write(text + '\n\n') | 161 self._fh.write(text + '\n\n') |
| 282 self._fh.flush() | 162 self._fh.flush() |
| 283 | 163 |
| 284 | 164 |
| 285 class NullMonitor(Monitor): | 165 class NullMonitor(Monitor): |
| 286 """Class that doesn't send metrics anywhere.""" | 166 """Class that doesn't send metrics anywhere.""" |
| 287 def send(self, metric_pb): | 167 def send(self, metric_pb): |
| 288 pass | 168 pass |
| OLD | NEW |