| OLD | NEW |
| 1 # Copyright 2015 The Chromium Authors. All rights reserved. | 1 # Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 # Use of this source code is governed by a BSD-style license that can be | 2 # Use of this source code is governed by a BSD-style license that can be |
| 3 # found in the LICENSE file. | 3 # found in the LICENSE file. |
| 4 | 4 |
| 5 """Classes representing the monitoring interface for tasks or devices.""" | 5 """Classes representing the monitoring interface for tasks or devices.""" |
| 6 | 6 |
| 7 | 7 |
| 8 import base64 | 8 import base64 |
| 9 import httplib2 | 9 import httplib2 |
| 10 import json | 10 import json |
| (...skipping 14 matching lines...) Expand all Loading... |
| 25 from oauth2client.contrib import gce | 25 from oauth2client.contrib import gce |
| 26 from oauth2client.client import GoogleCredentials | 26 from oauth2client.client import GoogleCredentials |
| 27 from oauth2client.file import Storage | 27 from oauth2client.file import Storage |
| 28 | 28 |
| 29 # Special string that can be passed through as the credentials path to use the | 29 # Special string that can be passed through as the credentials path to use the |
| 30 # default Appengine or GCE service account. | 30 # default Appengine or GCE service account. |
| 31 APPENGINE_CREDENTIALS = ':appengine' | 31 APPENGINE_CREDENTIALS = ':appengine' |
| 32 GCE_CREDENTIALS = ':gce' | 32 GCE_CREDENTIALS = ':gce' |
| 33 | 33 |
| 34 | 34 |
| 35 class CredentialFactory(object): | |
| 36 """Base class for things that can create OAuth2Credentials.""" | |
| 37 | |
| 38 @classmethod | |
| 39 def from_string(cls, path): | |
| 40 """Creates an appropriate subclass from a file path or magic string.""" | |
| 41 | |
| 42 if path == APPENGINE_CREDENTIALS: | |
| 43 return AppengineCredentials() | |
| 44 if path == GCE_CREDENTIALS: | |
| 45 return GCECredentials() | |
| 46 return FileCredentials(path) | |
| 47 | |
| 48 def create(self, scopes): | |
| 49 raise NotImplementedError | |
| 50 | |
| 51 | |
| 52 class GCECredentials(CredentialFactory): | |
| 53 def create(self, scopes): | |
| 54 return gce.AppAssertionCredentials(scopes) | |
| 55 | |
| 56 | |
| 57 class AppengineCredentials(CredentialFactory): | |
| 58 def create(self, scopes): # pragma: no cover | |
| 59 # This import doesn't work outside appengine, so delay it until it's used. | |
| 60 from oauth2client import appengine | |
| 61 from google.appengine.api import app_identity | |
| 62 logging.info('Initializing with service account %s', | |
| 63 app_identity.get_service_account_name()) | |
| 64 return appengine.AppAssertionCredentials(scopes) | |
| 65 | |
| 66 | |
| 67 class FileCredentials(CredentialFactory): | |
| 68 def __init__(self, path): | |
| 69 self.path = path | |
| 70 | |
| 71 def create(self, scopes): | |
| 72 with open(self.path, 'r') as fh: | |
| 73 data = json.load(fh) | |
| 74 if data.get('type', None): | |
| 75 credentials = GoogleCredentials.from_stream(self.path) | |
| 76 credentials = credentials.create_scoped(scopes) | |
| 77 return credentials | |
| 78 return Storage(self.path).get() | |
| 79 | |
| 80 | |
| 81 class DelegateServiceAccountCredentials(CredentialFactory): | |
| 82 IAM_SCOPE = 'https://www.googleapis.com/auth/iam' | |
| 83 | |
| 84 def __init__(self, service_account_email, base): | |
| 85 self.base = base | |
| 86 self.service_account_email = service_account_email | |
| 87 | |
| 88 def create(self, scopes): | |
| 89 logging.info('Delegating to service account %s', self.service_account_email) | |
| 90 http = httplib2_utils.InstrumentedHttp('actor-credentials') | |
| 91 http = self.base.create([self.IAM_SCOPE]).authorize(http) | |
| 92 return httplib2_utils.DelegateServiceAccountCredentials( | |
| 93 http, self.service_account_email, scopes) | |
| 94 | |
| 95 | |
| 96 class Monitor(object): | 35 class Monitor(object): |
| 97 """Abstract base class encapsulating the ability to collect and send metrics. | 36 """Abstract base class encapsulating the ability to collect and send metrics. |
| 98 | 37 |
| 99 This is a singleton class. There should only be one instance of a Monitor at | 38 This is a singleton class. There should only be one instance of a Monitor at |
| 100 a time. It will be created and initialized by process_argparse_options. It | 39 a time. It will be created and initialized by process_argparse_options. It |
| 101 must exist in order for any metrics to be sent, although both Targets and | 40 must exist in order for any metrics to be sent, although both Targets and |
| 102 Metrics may be initialized before the underlying Monitor. If it does not exist | 41 Metrics may be initialized before the underlying Monitor. If it does not exist |
| 103 at the time that a Metric is sent, an exception will be raised. | 42 at the time that a Metric is sent, an exception will be raised. |
| 104 """ | 43 """ |
| 105 | 44 |
| (...skipping 10 matching lines...) Expand all Loading... |
| 116 A MetricsCollection with the appropriate data attribute set. | 55 A MetricsCollection with the appropriate data attribute set. |
| 117 """ | 56 """ |
| 118 if isinstance(data, metrics_pb2.MetricsCollection): | 57 if isinstance(data, metrics_pb2.MetricsCollection): |
| 119 ret = data | 58 ret = data |
| 120 elif isinstance(data, list): | 59 elif isinstance(data, list): |
| 121 ret = metrics_pb2.MetricsCollection(data=data) | 60 ret = metrics_pb2.MetricsCollection(data=data) |
| 122 else: | 61 else: |
| 123 ret = metrics_pb2.MetricsCollection(data=[data]) | 62 ret = metrics_pb2.MetricsCollection(data=[data]) |
| 124 return ret | 63 return ret |
| 125 | 64 |
| 65 def _load_credentials(self, credentials_file_path): |
| 66 if credentials_file_path == GCE_CREDENTIALS: |
| 67 return gce.AppAssertionCredentials(self._SCOPES) |
| 68 if credentials_file_path == APPENGINE_CREDENTIALS: # pragma: no cover |
| 69 # This import doesn't work outside appengine, so delay it until it's used. |
| 70 from oauth2client import appengine |
| 71 from google.appengine.api import app_identity |
| 72 logging.info('Initializing with service account %s', |
| 73 app_identity.get_service_account_name()) |
| 74 return appengine.AppAssertionCredentials(self._SCOPES) |
| 75 |
| 76 with open(credentials_file_path, 'r') as credentials_file: |
| 77 credentials_json = json.load(credentials_file) |
| 78 if credentials_json.get('type', None): |
| 79 credentials = GoogleCredentials.from_stream(credentials_file_path) |
| 80 credentials = credentials.create_scoped(self._SCOPES) |
| 81 return credentials |
| 82 return Storage(credentials_file_path).get() |
| 83 |
| 126 def send(self, metric_pb): | 84 def send(self, metric_pb): |
| 127 raise NotImplementedError() | 85 raise NotImplementedError() |
| 128 | 86 |
| 129 | 87 |
| 130 class HttpsMonitor(Monitor): | 88 class HttpsMonitor(Monitor): |
| 131 | 89 |
| 132 _SCOPES = ['https://www.googleapis.com/auth/prodxmon'] | 90 _SCOPES = [ |
| 91 'https://www.googleapis.com/auth/prodxmon' |
| 92 ] |
| 133 | 93 |
| 134 def __init__(self, endpoint, credential_factory, http=None, ca_certs=None): | 94 def __init__(self, endpoint, credentials_file_path, http=None, ca_certs=None): |
| 135 self._endpoint = endpoint | 95 self._endpoint = endpoint |
| 136 credentials = credential_factory.create(self._SCOPES) | 96 credentials = self._load_credentials(credentials_file_path) |
| 137 if http is None: | 97 if http is None: |
| 138 http = httplib2_utils.RetriableHttp( | 98 http = httplib2_utils.RetriableHttp( |
| 139 httplib2_utils.InstrumentedHttp('acq-mon-api', ca_certs=ca_certs)) | 99 httplib2_utils.InstrumentedHttp('acq-mon-api', ca_certs=ca_certs)) |
| 140 self._http = credentials.authorize(http) | 100 self._http = credentials.authorize(http) |
| 141 | 101 |
| 142 def encode_to_json(self, metric_pb): | 102 def encodeToJson(self, metric_pb): |
| 143 if interface.state.use_new_proto: | 103 if interface.state.use_new_proto: |
| 144 return json.dumps({'payload': pb_to_popo.convert(metric_pb)}) | 104 return json.dumps({'payload': pb_to_popo.convert(metric_pb)}) |
| 145 else: | 105 else: |
| 146 return json.dumps({'resource': pb_to_popo.convert(metric_pb)}) | 106 return json.dumps({'resource': pb_to_popo.convert(metric_pb)}) |
| 147 | 107 |
| 148 def send(self, metric_pb): | 108 def send(self, metric_pb): |
| 149 if interface.state.use_new_proto: | 109 if interface.state.use_new_proto: |
| 150 body = self.encode_to_json(metric_pb) | 110 body = self.encodeToJson(metric_pb) |
| 151 else: | 111 else: |
| 152 body = self.encode_to_json(self._wrap_proto(metric_pb)) | 112 body = self.encodeToJson(self._wrap_proto(metric_pb)) |
| 153 | 113 |
| 154 try: | 114 try: |
| 155 resp, content = self._http.request(self._endpoint, | 115 resp, content = self._http.request(self._endpoint, method='POST', |
| 156 method='POST', | 116 body=body) |
| 157 body=body, | |
| 158 headers={'Content-Type': 'application/json'}) | |
| 159 if resp.status != 200: | 117 if resp.status != 200: |
| 160 logging.warning('HttpsMonitor.send received status %d: %s', resp.status, | 118 logging.warning('HttpsMonitor.send received status %d: %s', resp.status, |
| 161 content) | 119 content) |
| 162 except (ValueError, errors.Error, | 120 except (ValueError, errors.Error, |
| 163 socket.timeout, socket.error, socket.herror, socket.gaierror, | 121 socket.timeout, socket.error, socket.herror, socket.gaierror, |
| 164 httplib2.HttpLib2Error): | 122 httplib2.HttpLib2Error): |
| 165 logging.warning('HttpsMonitor.send failed: %s\n', | 123 logging.warning('HttpsMonitor.send failed: %s\n', |
| 166 traceback.format_exc()) | 124 traceback.format_exc()) |
| 167 | 125 |
| 168 | 126 |
| 169 class PubSubMonitor(Monitor): | 127 class PubSubMonitor(Monitor): |
| 170 """Class which publishes metrics to a Cloud Pub/Sub topic.""" | 128 """Class which publishes metrics to a Cloud Pub/Sub topic.""" |
| 171 | 129 |
| 172 _SCOPES = ['https://www.googleapis.com/auth/pubsub'] | 130 _SCOPES = [ |
| 131 'https://www.googleapis.com/auth/pubsub', |
| 132 ] |
| 173 | 133 |
| 174 TIMEOUT = 10 # seconds | 134 TIMEOUT = 10 # seconds |
| 175 | 135 |
| 176 def _initialize(self): | 136 def _initialize(self): |
| 177 creds = self._credential_factory.create(self._SCOPES) | 137 creds = self._load_credentials(self._credsfile) |
| 178 creds.authorize(self._http) | 138 creds.authorize(self._http) |
| 179 self._api = discovery.build('pubsub', 'v1', http=self._http) | 139 self._api = discovery.build('pubsub', 'v1', http=self._http) |
| 180 | 140 |
| 181 def _update_init_metrics(self, status): | 141 def _update_init_metrics(self, status): |
| 182 if not self._use_instrumented_http: | 142 if not self._use_instrumented_http: |
| 183 return | 143 return |
| 184 fields = {'name': 'acq-mon-api-pubsub', | 144 fields = {'name': 'acq-mon-api-pubsub', |
| 185 'client': 'discovery', | 145 'client': 'discovery', |
| 186 'status': status} | 146 'status': status} |
| 187 http_metrics.response_status.increment(fields=fields) | 147 http_metrics.response_status.increment(fields=fields) |
| 188 | 148 |
| 189 def _check_initialize(self): | 149 def _check_initialize(self): |
| 190 if self._api: | 150 if self._api: |
| 191 return True | 151 return True |
| 192 try: | 152 try: |
| 193 self._initialize() | 153 self._initialize() |
| 194 except (ValueError, errors.Error, | 154 except (ValueError, errors.Error, |
| 195 socket.timeout, socket.error, socket.herror, socket.gaierror, | 155 socket.timeout, socket.error, socket.herror, socket.gaierror, |
| 196 httplib2.HttpLib2Error, EnvironmentError): | 156 httplib2.HttpLib2Error, EnvironmentError): |
| 197 # Log a warning, not error, to avoid false alarms in AppEngine apps. | 157 # Log a warning, not error, to avoid false alarms in AppEngine apps. |
| 198 logging.warning('PubSubMonitor._initialize failed:\n%s', | 158 logging.warning('PubSubMonitor._initialize failed:\n%s', |
| 199 traceback.format_exc()) | 159 traceback.format_exc()) |
| 200 self._api = None | 160 self._api = None |
| 201 self._update_init_metrics(http_metrics.STATUS_ERROR) | 161 self._update_init_metrics(http_metrics.STATUS_ERROR) |
| 202 return False | 162 return False |
| 203 | 163 |
| 204 self._update_init_metrics(http_metrics.STATUS_OK) | 164 self._update_init_metrics(http_metrics.STATUS_OK) |
| 205 return True | 165 return True |
| 206 | 166 |
| 207 def __init__(self, credential_factory, project, topic, | 167 def __init__(self, credsfile, project, topic, use_instrumented_http=True, |
| 208 use_instrumented_http=True, ca_certs=None): | 168 ca_certs=None): |
| 209 """Process monitoring related command line flags and initialize api. | 169 """Process monitoring related command line flags and initialize api. |
| 210 | 170 |
| 211 Args: | 171 Args: |
| 212 credential_factory (CredentialFactory instance): factory that creates | 172 credsfile (str): path to the credentials json file |
| 213 oauth2 credentials. | |
| 214 project (str): the name of the Pub/Sub project to publish to. | 173 project (str): the name of the Pub/Sub project to publish to. |
| 215 topic (str): the name of the Pub/Sub topic to publish to. | 174 topic (str): the name of the Pub/Sub topic to publish to. |
| 216 use_instrumented_http (bool): whether to record monitoring metrics for | 175 use_instrumented_http (bool): whether to record monitoring metrics for |
| 217 HTTP requests made to the pubsub API. | 176 HTTP requests made to the pubsub API. |
| 218 ca_certs (str): path to file containing root CA certificates for SSL | 177 ca_certs (str): path to file containing root CA certificates for SSL |
| 219 server certificate validation. If not set, a CA cert | 178 server certificate validation. If not set, a CA cert |
| 220 file bundled with httplib2 is used. | 179 file bundled with httplib2 is used. |
| 221 """ | 180 """ |
| 222 # Do not call self._check_initialize() in the constructor. This | 181 # Do not call self._check_initialize() in the constructor. This |
| 223 # class is constructed during app initialization on AppEngine, and | 182 # class is constructed during app initialization on AppEngine, and |
| 224 # network calls are especially flaky during that time. | 183 # network calls are especially flaky during that time. |
| 225 self._api = None | 184 self._api = None |
| 226 self._use_instrumented_http = use_instrumented_http | 185 self._use_instrumented_http = use_instrumented_http |
| 227 if use_instrumented_http: | 186 if use_instrumented_http: |
| 228 self._http = httplib2_utils.InstrumentedHttp( | 187 self._http = httplib2_utils.InstrumentedHttp( |
| 229 'acq-mon-api-pubsub', timeout=self.TIMEOUT, ca_certs=ca_certs) | 188 'acq-mon-api-pubsub', timeout=self.TIMEOUT, ca_certs=ca_certs) |
| 230 else: | 189 else: |
| 231 self._http = httplib2.Http(timeout=self.TIMEOUT, ca_certs=ca_certs) | 190 self._http = httplib2.Http(timeout=self.TIMEOUT, ca_certs=ca_certs) |
| 232 self._credential_factory = credential_factory | 191 self._credsfile = credsfile |
| 233 self._topic = 'projects/%s/topics/%s' % (project, topic) | 192 self._topic = 'projects/%s/topics/%s' % (project, topic) |
| 234 | 193 |
| 235 def send(self, metric_pb): | 194 def send(self, metric_pb): |
| 236 """Send a metric proto to the monitoring api. | 195 """Send a metric proto to the monitoring api. |
| 237 | 196 |
| 238 Args: | 197 Args: |
| 239 metric_pb (MetricsData or MetricsCollection): the metric protobuf to send | 198 metric_pb (MetricsData or MetricsCollection): the metric protobuf to send |
| 240 """ | 199 """ |
| 241 if not self._check_initialize(): | 200 if not self._check_initialize(): |
| 242 return | 201 return |
| (...skipping 33 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 276 logging.info('Flushing monitoring metrics:\n%s', text) | 235 logging.info('Flushing monitoring metrics:\n%s', text) |
| 277 if self._fh is not None: | 236 if self._fh is not None: |
| 278 self._fh.write(text + '\n\n') | 237 self._fh.write(text + '\n\n') |
| 279 self._fh.flush() | 238 self._fh.flush() |
| 280 | 239 |
| 281 | 240 |
| 282 class NullMonitor(Monitor): | 241 class NullMonitor(Monitor): |
| 283 """Class that doesn't send metrics anywhere.""" | 242 """Class that doesn't send metrics anywhere.""" |
| 284 def send(self, metric_pb): | 243 def send(self, metric_pb): |
| 285 pass | 244 pass |
| OLD | NEW |