Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(426)

Side by Side Diff: client/third_party/infra_libs/ts_mon/common/monitors.py

Issue 2708113002: Revert of Add field_specs to all metrics in luci-py (Closed)
Patch Set: Created 3 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 # Copyright 2015 The Chromium Authors. All rights reserved. 1 # Copyright 2015 The Chromium Authors. All rights reserved.
2 # Use of this source code is governed by a BSD-style license that can be 2 # Use of this source code is governed by a BSD-style license that can be
3 # found in the LICENSE file. 3 # found in the LICENSE file.
4 4
5 """Classes representing the monitoring interface for tasks or devices.""" 5 """Classes representing the monitoring interface for tasks or devices."""
6 6
7 7
8 import base64 8 import base64
9 import httplib2 9 import httplib2
10 import json 10 import json
(...skipping 14 matching lines...) Expand all
25 from oauth2client.contrib import gce 25 from oauth2client.contrib import gce
26 from oauth2client.client import GoogleCredentials 26 from oauth2client.client import GoogleCredentials
27 from oauth2client.file import Storage 27 from oauth2client.file import Storage
28 28
29 # Special string that can be passed through as the credentials path to use the 29 # Special string that can be passed through as the credentials path to use the
30 # default Appengine or GCE service account. 30 # default Appengine or GCE service account.
31 APPENGINE_CREDENTIALS = ':appengine' 31 APPENGINE_CREDENTIALS = ':appengine'
32 GCE_CREDENTIALS = ':gce' 32 GCE_CREDENTIALS = ':gce'
33 33
34 34
35 class CredentialFactory(object):
36 """Base class for things that can create OAuth2Credentials."""
37
38 @classmethod
39 def from_string(cls, path):
40 """Creates an appropriate subclass from a file path or magic string."""
41
42 if path == APPENGINE_CREDENTIALS:
43 return AppengineCredentials()
44 if path == GCE_CREDENTIALS:
45 return GCECredentials()
46 return FileCredentials(path)
47
48 def create(self, scopes):
49 raise NotImplementedError
50
51
52 class GCECredentials(CredentialFactory):
53 def create(self, scopes):
54 return gce.AppAssertionCredentials(scopes)
55
56
57 class AppengineCredentials(CredentialFactory):
58 def create(self, scopes): # pragma: no cover
59 # This import doesn't work outside appengine, so delay it until it's used.
60 from oauth2client import appengine
61 from google.appengine.api import app_identity
62 logging.info('Initializing with service account %s',
63 app_identity.get_service_account_name())
64 return appengine.AppAssertionCredentials(scopes)
65
66
67 class FileCredentials(CredentialFactory):
68 def __init__(self, path):
69 self.path = path
70
71 def create(self, scopes):
72 with open(self.path, 'r') as fh:
73 data = json.load(fh)
74 if data.get('type', None):
75 credentials = GoogleCredentials.from_stream(self.path)
76 credentials = credentials.create_scoped(scopes)
77 return credentials
78 return Storage(self.path).get()
79
80
81 class DelegateServiceAccountCredentials(CredentialFactory):
82 IAM_SCOPE = 'https://www.googleapis.com/auth/iam'
83
84 def __init__(self, service_account_email, base):
85 self.base = base
86 self.service_account_email = service_account_email
87
88 def create(self, scopes):
89 logging.info('Delegating to service account %s', self.service_account_email)
90 http = httplib2_utils.InstrumentedHttp('actor-credentials')
91 http = self.base.create([self.IAM_SCOPE]).authorize(http)
92 return httplib2_utils.DelegateServiceAccountCredentials(
93 http, self.service_account_email, scopes)
94
95
96 class Monitor(object): 35 class Monitor(object):
97 """Abstract base class encapsulating the ability to collect and send metrics. 36 """Abstract base class encapsulating the ability to collect and send metrics.
98 37
99 This is a singleton class. There should only be one instance of a Monitor at 38 This is a singleton class. There should only be one instance of a Monitor at
100 a time. It will be created and initialized by process_argparse_options. It 39 a time. It will be created and initialized by process_argparse_options. It
101 must exist in order for any metrics to be sent, although both Targets and 40 must exist in order for any metrics to be sent, although both Targets and
102 Metrics may be initialized before the underlying Monitor. If it does not exist 41 Metrics may be initialized before the underlying Monitor. If it does not exist
103 at the time that a Metric is sent, an exception will be raised. 42 at the time that a Metric is sent, an exception will be raised.
104 """ 43 """
105 44
(...skipping 10 matching lines...) Expand all
116 A MetricsCollection with the appropriate data attribute set. 55 A MetricsCollection with the appropriate data attribute set.
117 """ 56 """
118 if isinstance(data, metrics_pb2.MetricsCollection): 57 if isinstance(data, metrics_pb2.MetricsCollection):
119 ret = data 58 ret = data
120 elif isinstance(data, list): 59 elif isinstance(data, list):
121 ret = metrics_pb2.MetricsCollection(data=data) 60 ret = metrics_pb2.MetricsCollection(data=data)
122 else: 61 else:
123 ret = metrics_pb2.MetricsCollection(data=[data]) 62 ret = metrics_pb2.MetricsCollection(data=[data])
124 return ret 63 return ret
125 64
65 def _load_credentials(self, credentials_file_path):
66 if credentials_file_path == GCE_CREDENTIALS:
67 return gce.AppAssertionCredentials(self._SCOPES)
68 if credentials_file_path == APPENGINE_CREDENTIALS: # pragma: no cover
69 # This import doesn't work outside appengine, so delay it until it's used.
70 from oauth2client import appengine
71 from google.appengine.api import app_identity
72 logging.info('Initializing with service account %s',
73 app_identity.get_service_account_name())
74 return appengine.AppAssertionCredentials(self._SCOPES)
75
76 with open(credentials_file_path, 'r') as credentials_file:
77 credentials_json = json.load(credentials_file)
78 if credentials_json.get('type', None):
79 credentials = GoogleCredentials.from_stream(credentials_file_path)
80 credentials = credentials.create_scoped(self._SCOPES)
81 return credentials
82 return Storage(credentials_file_path).get()
83
126 def send(self, metric_pb): 84 def send(self, metric_pb):
127 raise NotImplementedError() 85 raise NotImplementedError()
128 86
129 87
130 class HttpsMonitor(Monitor): 88 class HttpsMonitor(Monitor):
131 89
132 _SCOPES = ['https://www.googleapis.com/auth/prodxmon'] 90 _SCOPES = [
91 'https://www.googleapis.com/auth/prodxmon'
92 ]
133 93
134 def __init__(self, endpoint, credential_factory, http=None, ca_certs=None): 94 def __init__(self, endpoint, credentials_file_path, http=None, ca_certs=None):
135 self._endpoint = endpoint 95 self._endpoint = endpoint
136 credentials = credential_factory.create(self._SCOPES) 96 credentials = self._load_credentials(credentials_file_path)
137 if http is None: 97 if http is None:
138 http = httplib2_utils.RetriableHttp( 98 http = httplib2_utils.RetriableHttp(
139 httplib2_utils.InstrumentedHttp('acq-mon-api', ca_certs=ca_certs)) 99 httplib2_utils.InstrumentedHttp('acq-mon-api', ca_certs=ca_certs))
140 self._http = credentials.authorize(http) 100 self._http = credentials.authorize(http)
141 101
142 def encode_to_json(self, metric_pb): 102 def encodeToJson(self, metric_pb):
143 if interface.state.use_new_proto: 103 if interface.state.use_new_proto:
144 return json.dumps({'payload': pb_to_popo.convert(metric_pb)}) 104 return json.dumps({'payload': pb_to_popo.convert(metric_pb)})
145 else: 105 else:
146 return json.dumps({'resource': pb_to_popo.convert(metric_pb)}) 106 return json.dumps({'resource': pb_to_popo.convert(metric_pb)})
147 107
148 def send(self, metric_pb): 108 def send(self, metric_pb):
149 if interface.state.use_new_proto: 109 if interface.state.use_new_proto:
150 body = self.encode_to_json(metric_pb) 110 body = self.encodeToJson(metric_pb)
151 else: 111 else:
152 body = self.encode_to_json(self._wrap_proto(metric_pb)) 112 body = self.encodeToJson(self._wrap_proto(metric_pb))
153 113
154 try: 114 try:
155 resp, content = self._http.request(self._endpoint, 115 resp, content = self._http.request(self._endpoint, method='POST',
156 method='POST', 116 body=body)
157 body=body,
158 headers={'Content-Type': 'application/json'})
159 if resp.status != 200: 117 if resp.status != 200:
160 logging.warning('HttpsMonitor.send received status %d: %s', resp.status, 118 logging.warning('HttpsMonitor.send received status %d: %s', resp.status,
161 content) 119 content)
162 except (ValueError, errors.Error, 120 except (ValueError, errors.Error,
163 socket.timeout, socket.error, socket.herror, socket.gaierror, 121 socket.timeout, socket.error, socket.herror, socket.gaierror,
164 httplib2.HttpLib2Error): 122 httplib2.HttpLib2Error):
165 logging.warning('HttpsMonitor.send failed: %s\n', 123 logging.warning('HttpsMonitor.send failed: %s\n',
166 traceback.format_exc()) 124 traceback.format_exc())
167 125
168 126
169 class PubSubMonitor(Monitor): 127 class PubSubMonitor(Monitor):
170 """Class which publishes metrics to a Cloud Pub/Sub topic.""" 128 """Class which publishes metrics to a Cloud Pub/Sub topic."""
171 129
172 _SCOPES = ['https://www.googleapis.com/auth/pubsub'] 130 _SCOPES = [
131 'https://www.googleapis.com/auth/pubsub',
132 ]
173 133
174 TIMEOUT = 10 # seconds 134 TIMEOUT = 10 # seconds
175 135
176 def _initialize(self): 136 def _initialize(self):
177 creds = self._credential_factory.create(self._SCOPES) 137 creds = self._load_credentials(self._credsfile)
178 creds.authorize(self._http) 138 creds.authorize(self._http)
179 self._api = discovery.build('pubsub', 'v1', http=self._http) 139 self._api = discovery.build('pubsub', 'v1', http=self._http)
180 140
181 def _update_init_metrics(self, status): 141 def _update_init_metrics(self, status):
182 if not self._use_instrumented_http: 142 if not self._use_instrumented_http:
183 return 143 return
184 fields = {'name': 'acq-mon-api-pubsub', 144 fields = {'name': 'acq-mon-api-pubsub',
185 'client': 'discovery', 145 'client': 'discovery',
186 'status': status} 146 'status': status}
187 http_metrics.response_status.increment(fields=fields) 147 http_metrics.response_status.increment(fields=fields)
188 148
189 def _check_initialize(self): 149 def _check_initialize(self):
190 if self._api: 150 if self._api:
191 return True 151 return True
192 try: 152 try:
193 self._initialize() 153 self._initialize()
194 except (ValueError, errors.Error, 154 except (ValueError, errors.Error,
195 socket.timeout, socket.error, socket.herror, socket.gaierror, 155 socket.timeout, socket.error, socket.herror, socket.gaierror,
196 httplib2.HttpLib2Error, EnvironmentError): 156 httplib2.HttpLib2Error, EnvironmentError):
197 # Log a warning, not error, to avoid false alarms in AppEngine apps. 157 # Log a warning, not error, to avoid false alarms in AppEngine apps.
198 logging.warning('PubSubMonitor._initialize failed:\n%s', 158 logging.warning('PubSubMonitor._initialize failed:\n%s',
199 traceback.format_exc()) 159 traceback.format_exc())
200 self._api = None 160 self._api = None
201 self._update_init_metrics(http_metrics.STATUS_ERROR) 161 self._update_init_metrics(http_metrics.STATUS_ERROR)
202 return False 162 return False
203 163
204 self._update_init_metrics(http_metrics.STATUS_OK) 164 self._update_init_metrics(http_metrics.STATUS_OK)
205 return True 165 return True
206 166
207 def __init__(self, credential_factory, project, topic, 167 def __init__(self, credsfile, project, topic, use_instrumented_http=True,
208 use_instrumented_http=True, ca_certs=None): 168 ca_certs=None):
209 """Process monitoring related command line flags and initialize api. 169 """Process monitoring related command line flags and initialize api.
210 170
211 Args: 171 Args:
212 credential_factory (CredentialFactory instance): factory that creates 172 credsfile (str): path to the credentials json file
213 oauth2 credentials.
214 project (str): the name of the Pub/Sub project to publish to. 173 project (str): the name of the Pub/Sub project to publish to.
215 topic (str): the name of the Pub/Sub topic to publish to. 174 topic (str): the name of the Pub/Sub topic to publish to.
216 use_instrumented_http (bool): whether to record monitoring metrics for 175 use_instrumented_http (bool): whether to record monitoring metrics for
217 HTTP requests made to the pubsub API. 176 HTTP requests made to the pubsub API.
218 ca_certs (str): path to file containing root CA certificates for SSL 177 ca_certs (str): path to file containing root CA certificates for SSL
219 server certificate validation. If not set, a CA cert 178 server certificate validation. If not set, a CA cert
220 file bundled with httplib2 is used. 179 file bundled with httplib2 is used.
221 """ 180 """
222 # Do not call self._check_initialize() in the constructor. This 181 # Do not call self._check_initialize() in the constructor. This
223 # class is constructed during app initialization on AppEngine, and 182 # class is constructed during app initialization on AppEngine, and
224 # network calls are especially flaky during that time. 183 # network calls are especially flaky during that time.
225 self._api = None 184 self._api = None
226 self._use_instrumented_http = use_instrumented_http 185 self._use_instrumented_http = use_instrumented_http
227 if use_instrumented_http: 186 if use_instrumented_http:
228 self._http = httplib2_utils.InstrumentedHttp( 187 self._http = httplib2_utils.InstrumentedHttp(
229 'acq-mon-api-pubsub', timeout=self.TIMEOUT, ca_certs=ca_certs) 188 'acq-mon-api-pubsub', timeout=self.TIMEOUT, ca_certs=ca_certs)
230 else: 189 else:
231 self._http = httplib2.Http(timeout=self.TIMEOUT, ca_certs=ca_certs) 190 self._http = httplib2.Http(timeout=self.TIMEOUT, ca_certs=ca_certs)
232 self._credential_factory = credential_factory 191 self._credsfile = credsfile
233 self._topic = 'projects/%s/topics/%s' % (project, topic) 192 self._topic = 'projects/%s/topics/%s' % (project, topic)
234 193
235 def send(self, metric_pb): 194 def send(self, metric_pb):
236 """Send a metric proto to the monitoring api. 195 """Send a metric proto to the monitoring api.
237 196
238 Args: 197 Args:
239 metric_pb (MetricsData or MetricsCollection): the metric protobuf to send 198 metric_pb (MetricsData or MetricsCollection): the metric protobuf to send
240 """ 199 """
241 if not self._check_initialize(): 200 if not self._check_initialize():
242 return 201 return
(...skipping 33 matching lines...) Expand 10 before | Expand all | Expand 10 after
276 logging.info('Flushing monitoring metrics:\n%s', text) 235 logging.info('Flushing monitoring metrics:\n%s', text)
277 if self._fh is not None: 236 if self._fh is not None:
278 self._fh.write(text + '\n\n') 237 self._fh.write(text + '\n\n')
279 self._fh.flush() 238 self._fh.flush()
280 239
281 240
282 class NullMonitor(Monitor): 241 class NullMonitor(Monitor):
283 """Class that doesn't send metrics anywhere.""" 242 """Class that doesn't send metrics anywhere."""
284 def send(self, metric_pb): 243 def send(self, metric_pb):
285 pass 244 pass
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698