Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(46)

Unified Diff: client/third_party/infra_libs/ts_mon/common/monitors.py

Issue 2705273003: Roll infra_libs and gae_ts_mon in luci-py, and add field_specs to all metrics (Closed)
Patch Set: Rebase Created 3 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: client/third_party/infra_libs/ts_mon/common/monitors.py
diff --git a/client/third_party/infra_libs/ts_mon/common/monitors.py b/client/third_party/infra_libs/ts_mon/common/monitors.py
index cb095fac2316d67b23f9c20e64c5a0ba4808d48f..e3ebc1bcb9aaf405031362e3940444d3518e3000 100644
--- a/client/third_party/infra_libs/ts_mon/common/monitors.py
+++ b/client/third_party/infra_libs/ts_mon/common/monitors.py
@@ -32,6 +32,67 @@ APPENGINE_CREDENTIALS = ':appengine'
GCE_CREDENTIALS = ':gce'
+class CredentialFactory(object):
+ """Base class for things that can create OAuth2Credentials."""
+
+ @classmethod
+ def from_string(cls, path):
+ """Creates an appropriate subclass from a file path or magic string."""
+
+ if path == APPENGINE_CREDENTIALS:
+ return AppengineCredentials()
+ if path == GCE_CREDENTIALS:
+ return GCECredentials()
+ return FileCredentials(path)
+
+ def create(self, scopes):
+ raise NotImplementedError
+
+
+class GCECredentials(CredentialFactory):
+ def create(self, scopes):
+ return gce.AppAssertionCredentials(scopes)
+
+
+class AppengineCredentials(CredentialFactory):
+ def create(self, scopes): # pragma: no cover
+ # This import doesn't work outside appengine, so delay it until it's used.
+ from oauth2client import appengine
+ from google.appengine.api import app_identity
+ logging.info('Initializing with service account %s',
+ app_identity.get_service_account_name())
+ return appengine.AppAssertionCredentials(scopes)
+
+
+class FileCredentials(CredentialFactory):
+ def __init__(self, path):
+ self.path = path
+
+ def create(self, scopes):
+ with open(self.path, 'r') as fh:
+ data = json.load(fh)
+ if data.get('type', None):
+ credentials = GoogleCredentials.from_stream(self.path)
+ credentials = credentials.create_scoped(scopes)
+ return credentials
+ return Storage(self.path).get()
+
+
+class DelegateServiceAccountCredentials(CredentialFactory):
+ IAM_SCOPE = 'https://www.googleapis.com/auth/iam'
+
+ def __init__(self, service_account_email, base):
+ self.base = base
+ self.service_account_email = service_account_email
+
+ def create(self, scopes):
+ logging.info('Delegating to service account %s', self.service_account_email)
+ http = httplib2_utils.InstrumentedHttp('actor-credentials')
+ http = self.base.create([self.IAM_SCOPE]).authorize(http)
+ return httplib2_utils.DelegateServiceAccountCredentials(
+ http, self.service_account_email, scopes)
+
+
class Monitor(object):
"""Abstract base class encapsulating the ability to collect and send metrics.
@@ -62,44 +123,23 @@ class Monitor(object):
ret = metrics_pb2.MetricsCollection(data=[data])
return ret
- def _load_credentials(self, credentials_file_path):
- if credentials_file_path == GCE_CREDENTIALS:
- return gce.AppAssertionCredentials(self._SCOPES)
- if credentials_file_path == APPENGINE_CREDENTIALS: # pragma: no cover
- # This import doesn't work outside appengine, so delay it until it's used.
- from oauth2client import appengine
- from google.appengine.api import app_identity
- logging.info('Initializing with service account %s',
- app_identity.get_service_account_name())
- return appengine.AppAssertionCredentials(self._SCOPES)
-
- with open(credentials_file_path, 'r') as credentials_file:
- credentials_json = json.load(credentials_file)
- if credentials_json.get('type', None):
- credentials = GoogleCredentials.from_stream(credentials_file_path)
- credentials = credentials.create_scoped(self._SCOPES)
- return credentials
- return Storage(credentials_file_path).get()
-
def send(self, metric_pb):
raise NotImplementedError()
class HttpsMonitor(Monitor):
- _SCOPES = [
- 'https://www.googleapis.com/auth/prodxmon'
- ]
+ _SCOPES = ['https://www.googleapis.com/auth/prodxmon']
- def __init__(self, endpoint, credentials_file_path, http=None, ca_certs=None):
+ def __init__(self, endpoint, credential_factory, http=None, ca_certs=None):
self._endpoint = endpoint
- credentials = self._load_credentials(credentials_file_path)
+ credentials = credential_factory.create(self._SCOPES)
if http is None:
http = httplib2_utils.RetriableHttp(
httplib2_utils.InstrumentedHttp('acq-mon-api', ca_certs=ca_certs))
self._http = credentials.authorize(http)
- def encodeToJson(self, metric_pb):
+ def encode_to_json(self, metric_pb):
if interface.state.use_new_proto:
return json.dumps({'payload': pb_to_popo.convert(metric_pb)})
else:
@@ -107,13 +147,15 @@ class HttpsMonitor(Monitor):
def send(self, metric_pb):
if interface.state.use_new_proto:
- body = self.encodeToJson(metric_pb)
+ body = self.encode_to_json(metric_pb)
else:
- body = self.encodeToJson(self._wrap_proto(metric_pb))
+ body = self.encode_to_json(self._wrap_proto(metric_pb))
try:
- resp, content = self._http.request(self._endpoint, method='POST',
- body=body)
+ resp, content = self._http.request(self._endpoint,
+ method='POST',
+ body=body,
+ headers={'Content-Type': 'application/json'})
if resp.status != 200:
logging.warning('HttpsMonitor.send received status %d: %s', resp.status,
content)
@@ -127,14 +169,12 @@ class HttpsMonitor(Monitor):
class PubSubMonitor(Monitor):
"""Class which publishes metrics to a Cloud Pub/Sub topic."""
- _SCOPES = [
- 'https://www.googleapis.com/auth/pubsub',
- ]
+ _SCOPES = ['https://www.googleapis.com/auth/pubsub']
TIMEOUT = 10 # seconds
def _initialize(self):
- creds = self._load_credentials(self._credsfile)
+ creds = self._credential_factory.create(self._SCOPES)
creds.authorize(self._http)
self._api = discovery.build('pubsub', 'v1', http=self._http)
@@ -164,12 +204,13 @@ class PubSubMonitor(Monitor):
self._update_init_metrics(http_metrics.STATUS_OK)
return True
- def __init__(self, credsfile, project, topic, use_instrumented_http=True,
- ca_certs=None):
+ def __init__(self, credential_factory, project, topic,
+ use_instrumented_http=True, ca_certs=None):
"""Process monitoring related command line flags and initialize api.
Args:
- credsfile (str): path to the credentials json file
+ credential_factory (CredentialFactory instance): factory that creates
+ oauth2 credentials.
project (str): the name of the Pub/Sub project to publish to.
topic (str): the name of the Pub/Sub topic to publish to.
use_instrumented_http (bool): whether to record monitoring metrics for
@@ -188,7 +229,7 @@ class PubSubMonitor(Monitor):
'acq-mon-api-pubsub', timeout=self.TIMEOUT, ca_certs=ca_certs)
else:
self._http = httplib2.Http(timeout=self.TIMEOUT, ca_certs=ca_certs)
- self._credsfile = credsfile
+ self._credential_factory = credential_factory
self._topic = 'projects/%s/topics/%s' % (project, topic)
def send(self, metric_pb):

Powered by Google App Engine
This is Rietveld 408576698