Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(30)

Side by Side Diff: client/third_party/infra_libs/ts_mon/common/monitors.py

Issue 2705273003: Roll infra_libs and gae_ts_mon in luci-py, and add field_specs to all metrics (Closed)
Patch Set: Rebase Created 3 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 # Copyright 2015 The Chromium Authors. All rights reserved. 1 # Copyright 2015 The Chromium Authors. All rights reserved.
2 # Use of this source code is governed by a BSD-style license that can be 2 # Use of this source code is governed by a BSD-style license that can be
3 # found in the LICENSE file. 3 # found in the LICENSE file.
4 4
5 """Classes representing the monitoring interface for tasks or devices.""" 5 """Classes representing the monitoring interface for tasks or devices."""
6 6
7 7
8 import base64 8 import base64
9 import httplib2 9 import httplib2
10 import json 10 import json
(...skipping 14 matching lines...) Expand all
25 from oauth2client.contrib import gce 25 from oauth2client.contrib import gce
26 from oauth2client.client import GoogleCredentials 26 from oauth2client.client import GoogleCredentials
27 from oauth2client.file import Storage 27 from oauth2client.file import Storage
28 28
29 # Special string that can be passed through as the credentials path to use the 29 # Special string that can be passed through as the credentials path to use the
30 # default Appengine or GCE service account. 30 # default Appengine or GCE service account.
31 APPENGINE_CREDENTIALS = ':appengine' 31 APPENGINE_CREDENTIALS = ':appengine'
32 GCE_CREDENTIALS = ':gce' 32 GCE_CREDENTIALS = ':gce'
33 33
34 34
35 class CredentialFactory(object):
36 """Base class for things that can create OAuth2Credentials."""
37
38 @classmethod
39 def from_string(cls, path):
40 """Creates an appropriate subclass from a file path or magic string."""
41
42 if path == APPENGINE_CREDENTIALS:
43 return AppengineCredentials()
44 if path == GCE_CREDENTIALS:
45 return GCECredentials()
46 return FileCredentials(path)
47
48 def create(self, scopes):
49 raise NotImplementedError
50
51
52 class GCECredentials(CredentialFactory):
53 def create(self, scopes):
54 return gce.AppAssertionCredentials(scopes)
55
56
57 class AppengineCredentials(CredentialFactory):
58 def create(self, scopes): # pragma: no cover
59 # This import doesn't work outside appengine, so delay it until it's used.
60 from oauth2client import appengine
61 from google.appengine.api import app_identity
62 logging.info('Initializing with service account %s',
63 app_identity.get_service_account_name())
64 return appengine.AppAssertionCredentials(scopes)
65
66
67 class FileCredentials(CredentialFactory):
68 def __init__(self, path):
69 self.path = path
70
71 def create(self, scopes):
72 with open(self.path, 'r') as fh:
73 data = json.load(fh)
74 if data.get('type', None):
75 credentials = GoogleCredentials.from_stream(self.path)
76 credentials = credentials.create_scoped(scopes)
77 return credentials
78 return Storage(self.path).get()
79
80
81 class DelegateServiceAccountCredentials(CredentialFactory):
82 IAM_SCOPE = 'https://www.googleapis.com/auth/iam'
83
84 def __init__(self, service_account_email, base):
85 self.base = base
86 self.service_account_email = service_account_email
87
88 def create(self, scopes):
89 logging.info('Delegating to service account %s', self.service_account_email)
90 http = httplib2_utils.InstrumentedHttp('actor-credentials')
91 http = self.base.create([self.IAM_SCOPE]).authorize(http)
92 return httplib2_utils.DelegateServiceAccountCredentials(
93 http, self.service_account_email, scopes)
94
95
35 class Monitor(object): 96 class Monitor(object):
36 """Abstract base class encapsulating the ability to collect and send metrics. 97 """Abstract base class encapsulating the ability to collect and send metrics.
37 98
38 This is a singleton class. There should only be one instance of a Monitor at 99 This is a singleton class. There should only be one instance of a Monitor at
39 a time. It will be created and initialized by process_argparse_options. It 100 a time. It will be created and initialized by process_argparse_options. It
40 must exist in order for any metrics to be sent, although both Targets and 101 must exist in order for any metrics to be sent, although both Targets and
41 Metrics may be initialized before the underlying Monitor. If it does not exist 102 Metrics may be initialized before the underlying Monitor. If it does not exist
42 at the time that a Metric is sent, an exception will be raised. 103 at the time that a Metric is sent, an exception will be raised.
43 """ 104 """
44 105
(...skipping 10 matching lines...) Expand all
55 A MetricsCollection with the appropriate data attribute set. 116 A MetricsCollection with the appropriate data attribute set.
56 """ 117 """
57 if isinstance(data, metrics_pb2.MetricsCollection): 118 if isinstance(data, metrics_pb2.MetricsCollection):
58 ret = data 119 ret = data
59 elif isinstance(data, list): 120 elif isinstance(data, list):
60 ret = metrics_pb2.MetricsCollection(data=data) 121 ret = metrics_pb2.MetricsCollection(data=data)
61 else: 122 else:
62 ret = metrics_pb2.MetricsCollection(data=[data]) 123 ret = metrics_pb2.MetricsCollection(data=[data])
63 return ret 124 return ret
64 125
65 def _load_credentials(self, credentials_file_path):
66 if credentials_file_path == GCE_CREDENTIALS:
67 return gce.AppAssertionCredentials(self._SCOPES)
68 if credentials_file_path == APPENGINE_CREDENTIALS: # pragma: no cover
69 # This import doesn't work outside appengine, so delay it until it's used.
70 from oauth2client import appengine
71 from google.appengine.api import app_identity
72 logging.info('Initializing with service account %s',
73 app_identity.get_service_account_name())
74 return appengine.AppAssertionCredentials(self._SCOPES)
75
76 with open(credentials_file_path, 'r') as credentials_file:
77 credentials_json = json.load(credentials_file)
78 if credentials_json.get('type', None):
79 credentials = GoogleCredentials.from_stream(credentials_file_path)
80 credentials = credentials.create_scoped(self._SCOPES)
81 return credentials
82 return Storage(credentials_file_path).get()
83
84 def send(self, metric_pb): 126 def send(self, metric_pb):
85 raise NotImplementedError() 127 raise NotImplementedError()
86 128
87 129
88 class HttpsMonitor(Monitor): 130 class HttpsMonitor(Monitor):
89 131
90 _SCOPES = [ 132 _SCOPES = ['https://www.googleapis.com/auth/prodxmon']
91 'https://www.googleapis.com/auth/prodxmon'
92 ]
93 133
94 def __init__(self, endpoint, credentials_file_path, http=None, ca_certs=None): 134 def __init__(self, endpoint, credential_factory, http=None, ca_certs=None):
95 self._endpoint = endpoint 135 self._endpoint = endpoint
96 credentials = self._load_credentials(credentials_file_path) 136 credentials = credential_factory.create(self._SCOPES)
97 if http is None: 137 if http is None:
98 http = httplib2_utils.RetriableHttp( 138 http = httplib2_utils.RetriableHttp(
99 httplib2_utils.InstrumentedHttp('acq-mon-api', ca_certs=ca_certs)) 139 httplib2_utils.InstrumentedHttp('acq-mon-api', ca_certs=ca_certs))
100 self._http = credentials.authorize(http) 140 self._http = credentials.authorize(http)
101 141
102 def encodeToJson(self, metric_pb): 142 def encode_to_json(self, metric_pb):
103 if interface.state.use_new_proto: 143 if interface.state.use_new_proto:
104 return json.dumps({'payload': pb_to_popo.convert(metric_pb)}) 144 return json.dumps({'payload': pb_to_popo.convert(metric_pb)})
105 else: 145 else:
106 return json.dumps({'resource': pb_to_popo.convert(metric_pb)}) 146 return json.dumps({'resource': pb_to_popo.convert(metric_pb)})
107 147
108 def send(self, metric_pb): 148 def send(self, metric_pb):
109 if interface.state.use_new_proto: 149 if interface.state.use_new_proto:
110 body = self.encodeToJson(metric_pb) 150 body = self.encode_to_json(metric_pb)
111 else: 151 else:
112 body = self.encodeToJson(self._wrap_proto(metric_pb)) 152 body = self.encode_to_json(self._wrap_proto(metric_pb))
113 153
114 try: 154 try:
115 resp, content = self._http.request(self._endpoint, method='POST', 155 resp, content = self._http.request(self._endpoint,
116 body=body) 156 method='POST',
157 body=body,
158 headers={'Content-Type': 'application/json'})
117 if resp.status != 200: 159 if resp.status != 200:
118 logging.warning('HttpsMonitor.send received status %d: %s', resp.status, 160 logging.warning('HttpsMonitor.send received status %d: %s', resp.status,
119 content) 161 content)
120 except (ValueError, errors.Error, 162 except (ValueError, errors.Error,
121 socket.timeout, socket.error, socket.herror, socket.gaierror, 163 socket.timeout, socket.error, socket.herror, socket.gaierror,
122 httplib2.HttpLib2Error): 164 httplib2.HttpLib2Error):
123 logging.warning('HttpsMonitor.send failed: %s\n', 165 logging.warning('HttpsMonitor.send failed: %s\n',
124 traceback.format_exc()) 166 traceback.format_exc())
125 167
126 168
127 class PubSubMonitor(Monitor): 169 class PubSubMonitor(Monitor):
128 """Class which publishes metrics to a Cloud Pub/Sub topic.""" 170 """Class which publishes metrics to a Cloud Pub/Sub topic."""
129 171
130 _SCOPES = [ 172 _SCOPES = ['https://www.googleapis.com/auth/pubsub']
131 'https://www.googleapis.com/auth/pubsub',
132 ]
133 173
134 TIMEOUT = 10 # seconds 174 TIMEOUT = 10 # seconds
135 175
136 def _initialize(self): 176 def _initialize(self):
137 creds = self._load_credentials(self._credsfile) 177 creds = self._credential_factory.create(self._SCOPES)
138 creds.authorize(self._http) 178 creds.authorize(self._http)
139 self._api = discovery.build('pubsub', 'v1', http=self._http) 179 self._api = discovery.build('pubsub', 'v1', http=self._http)
140 180
141 def _update_init_metrics(self, status): 181 def _update_init_metrics(self, status):
142 if not self._use_instrumented_http: 182 if not self._use_instrumented_http:
143 return 183 return
144 fields = {'name': 'acq-mon-api-pubsub', 184 fields = {'name': 'acq-mon-api-pubsub',
145 'client': 'discovery', 185 'client': 'discovery',
146 'status': status} 186 'status': status}
147 http_metrics.response_status.increment(fields=fields) 187 http_metrics.response_status.increment(fields=fields)
148 188
149 def _check_initialize(self): 189 def _check_initialize(self):
150 if self._api: 190 if self._api:
151 return True 191 return True
152 try: 192 try:
153 self._initialize() 193 self._initialize()
154 except (ValueError, errors.Error, 194 except (ValueError, errors.Error,
155 socket.timeout, socket.error, socket.herror, socket.gaierror, 195 socket.timeout, socket.error, socket.herror, socket.gaierror,
156 httplib2.HttpLib2Error, EnvironmentError): 196 httplib2.HttpLib2Error, EnvironmentError):
157 # Log a warning, not error, to avoid false alarms in AppEngine apps. 197 # Log a warning, not error, to avoid false alarms in AppEngine apps.
158 logging.warning('PubSubMonitor._initialize failed:\n%s', 198 logging.warning('PubSubMonitor._initialize failed:\n%s',
159 traceback.format_exc()) 199 traceback.format_exc())
160 self._api = None 200 self._api = None
161 self._update_init_metrics(http_metrics.STATUS_ERROR) 201 self._update_init_metrics(http_metrics.STATUS_ERROR)
162 return False 202 return False
163 203
164 self._update_init_metrics(http_metrics.STATUS_OK) 204 self._update_init_metrics(http_metrics.STATUS_OK)
165 return True 205 return True
166 206
167 def __init__(self, credsfile, project, topic, use_instrumented_http=True, 207 def __init__(self, credential_factory, project, topic,
168 ca_certs=None): 208 use_instrumented_http=True, ca_certs=None):
169 """Process monitoring related command line flags and initialize api. 209 """Process monitoring related command line flags and initialize api.
170 210
171 Args: 211 Args:
172 credsfile (str): path to the credentials json file 212 credential_factory (CredentialFactory instance): factory that creates
213 oauth2 credentials.
173 project (str): the name of the Pub/Sub project to publish to. 214 project (str): the name of the Pub/Sub project to publish to.
174 topic (str): the name of the Pub/Sub topic to publish to. 215 topic (str): the name of the Pub/Sub topic to publish to.
175 use_instrumented_http (bool): whether to record monitoring metrics for 216 use_instrumented_http (bool): whether to record monitoring metrics for
176 HTTP requests made to the pubsub API. 217 HTTP requests made to the pubsub API.
177 ca_certs (str): path to file containing root CA certificates for SSL 218 ca_certs (str): path to file containing root CA certificates for SSL
178 server certificate validation. If not set, a CA cert 219 server certificate validation. If not set, a CA cert
179 file bundled with httplib2 is used. 220 file bundled with httplib2 is used.
180 """ 221 """
181 # Do not call self._check_initialize() in the constructor. This 222 # Do not call self._check_initialize() in the constructor. This
182 # class is constructed during app initialization on AppEngine, and 223 # class is constructed during app initialization on AppEngine, and
183 # network calls are especially flaky during that time. 224 # network calls are especially flaky during that time.
184 self._api = None 225 self._api = None
185 self._use_instrumented_http = use_instrumented_http 226 self._use_instrumented_http = use_instrumented_http
186 if use_instrumented_http: 227 if use_instrumented_http:
187 self._http = httplib2_utils.InstrumentedHttp( 228 self._http = httplib2_utils.InstrumentedHttp(
188 'acq-mon-api-pubsub', timeout=self.TIMEOUT, ca_certs=ca_certs) 229 'acq-mon-api-pubsub', timeout=self.TIMEOUT, ca_certs=ca_certs)
189 else: 230 else:
190 self._http = httplib2.Http(timeout=self.TIMEOUT, ca_certs=ca_certs) 231 self._http = httplib2.Http(timeout=self.TIMEOUT, ca_certs=ca_certs)
191 self._credsfile = credsfile 232 self._credential_factory = credential_factory
192 self._topic = 'projects/%s/topics/%s' % (project, topic) 233 self._topic = 'projects/%s/topics/%s' % (project, topic)
193 234
194 def send(self, metric_pb): 235 def send(self, metric_pb):
195 """Send a metric proto to the monitoring api. 236 """Send a metric proto to the monitoring api.
196 237
197 Args: 238 Args:
198 metric_pb (MetricsData or MetricsCollection): the metric protobuf to send 239 metric_pb (MetricsData or MetricsCollection): the metric protobuf to send
199 """ 240 """
200 if not self._check_initialize(): 241 if not self._check_initialize():
201 return 242 return
(...skipping 33 matching lines...) Expand 10 before | Expand all | Expand 10 after
235 logging.info('Flushing monitoring metrics:\n%s', text) 276 logging.info('Flushing monitoring metrics:\n%s', text)
236 if self._fh is not None: 277 if self._fh is not None:
237 self._fh.write(text + '\n\n') 278 self._fh.write(text + '\n\n')
238 self._fh.flush() 279 self._fh.flush()
239 280
240 281
241 class NullMonitor(Monitor): 282 class NullMonitor(Monitor):
242 """Class that doesn't send metrics anywhere.""" 283 """Class that doesn't send metrics anywhere."""
243 def send(self, metric_pb): 284 def send(self, metric_pb):
244 pass 285 pass
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698