| Index: client/third_party/infra_libs/ts_mon/common/monitors.py
|
| diff --git a/client/third_party/infra_libs/ts_mon/common/monitors.py b/client/third_party/infra_libs/ts_mon/common/monitors.py
|
| index cb095fac2316d67b23f9c20e64c5a0ba4808d48f..e3ebc1bcb9aaf405031362e3940444d3518e3000 100644
|
| --- a/client/third_party/infra_libs/ts_mon/common/monitors.py
|
| +++ b/client/third_party/infra_libs/ts_mon/common/monitors.py
|
| @@ -32,6 +32,67 @@ APPENGINE_CREDENTIALS = ':appengine'
|
| GCE_CREDENTIALS = ':gce'
|
|
|
|
|
| +class CredentialFactory(object):
|
| + """Base class for things that can create OAuth2Credentials."""
|
| +
|
| + @classmethod
|
| + def from_string(cls, path):
|
| + """Creates an appropriate subclass from a file path or magic string."""
|
| +
|
| + if path == APPENGINE_CREDENTIALS:
|
| + return AppengineCredentials()
|
| + if path == GCE_CREDENTIALS:
|
| + return GCECredentials()
|
| + return FileCredentials(path)
|
| +
|
| + def create(self, scopes):
|
| + raise NotImplementedError
|
| +
|
| +
|
| +class GCECredentials(CredentialFactory):
|
| + def create(self, scopes):
|
| + return gce.AppAssertionCredentials(scopes)
|
| +
|
| +
|
| +class AppengineCredentials(CredentialFactory):
|
| + def create(self, scopes): # pragma: no cover
|
| + # This import doesn't work outside appengine, so delay it until it's used.
|
| + from oauth2client import appengine
|
| + from google.appengine.api import app_identity
|
| + logging.info('Initializing with service account %s',
|
| + app_identity.get_service_account_name())
|
| + return appengine.AppAssertionCredentials(scopes)
|
| +
|
| +
|
| +class FileCredentials(CredentialFactory):
|
| + def __init__(self, path):
|
| + self.path = path
|
| +
|
| + def create(self, scopes):
|
| + with open(self.path, 'r') as fh:
|
| + data = json.load(fh)
|
| + if data.get('type', None):
|
| + credentials = GoogleCredentials.from_stream(self.path)
|
| + credentials = credentials.create_scoped(scopes)
|
| + return credentials
|
| + return Storage(self.path).get()
|
| +
|
| +
|
| +class DelegateServiceAccountCredentials(CredentialFactory):
|
| + IAM_SCOPE = 'https://www.googleapis.com/auth/iam'
|
| +
|
| + def __init__(self, service_account_email, base):
|
| + self.base = base
|
| + self.service_account_email = service_account_email
|
| +
|
| + def create(self, scopes):
|
| + logging.info('Delegating to service account %s', self.service_account_email)
|
| + http = httplib2_utils.InstrumentedHttp('actor-credentials')
|
| + http = self.base.create([self.IAM_SCOPE]).authorize(http)
|
| + return httplib2_utils.DelegateServiceAccountCredentials(
|
| + http, self.service_account_email, scopes)
|
| +
|
| +
|
| class Monitor(object):
|
| """Abstract base class encapsulating the ability to collect and send metrics.
|
|
|
| @@ -62,44 +123,23 @@ class Monitor(object):
|
| ret = metrics_pb2.MetricsCollection(data=[data])
|
| return ret
|
|
|
| - def _load_credentials(self, credentials_file_path):
|
| - if credentials_file_path == GCE_CREDENTIALS:
|
| - return gce.AppAssertionCredentials(self._SCOPES)
|
| - if credentials_file_path == APPENGINE_CREDENTIALS: # pragma: no cover
|
| - # This import doesn't work outside appengine, so delay it until it's used.
|
| - from oauth2client import appengine
|
| - from google.appengine.api import app_identity
|
| - logging.info('Initializing with service account %s',
|
| - app_identity.get_service_account_name())
|
| - return appengine.AppAssertionCredentials(self._SCOPES)
|
| -
|
| - with open(credentials_file_path, 'r') as credentials_file:
|
| - credentials_json = json.load(credentials_file)
|
| - if credentials_json.get('type', None):
|
| - credentials = GoogleCredentials.from_stream(credentials_file_path)
|
| - credentials = credentials.create_scoped(self._SCOPES)
|
| - return credentials
|
| - return Storage(credentials_file_path).get()
|
| -
|
| def send(self, metric_pb):
|
| raise NotImplementedError()
|
|
|
|
|
| class HttpsMonitor(Monitor):
|
|
|
| - _SCOPES = [
|
| - 'https://www.googleapis.com/auth/prodxmon'
|
| - ]
|
| + _SCOPES = ['https://www.googleapis.com/auth/prodxmon']
|
|
|
| - def __init__(self, endpoint, credentials_file_path, http=None, ca_certs=None):
|
| + def __init__(self, endpoint, credential_factory, http=None, ca_certs=None):
|
| self._endpoint = endpoint
|
| - credentials = self._load_credentials(credentials_file_path)
|
| + credentials = credential_factory.create(self._SCOPES)
|
| if http is None:
|
| http = httplib2_utils.RetriableHttp(
|
| httplib2_utils.InstrumentedHttp('acq-mon-api', ca_certs=ca_certs))
|
| self._http = credentials.authorize(http)
|
|
|
| - def encodeToJson(self, metric_pb):
|
| + def encode_to_json(self, metric_pb):
|
| if interface.state.use_new_proto:
|
| return json.dumps({'payload': pb_to_popo.convert(metric_pb)})
|
| else:
|
| @@ -107,13 +147,15 @@ class HttpsMonitor(Monitor):
|
|
|
| def send(self, metric_pb):
|
| if interface.state.use_new_proto:
|
| - body = self.encodeToJson(metric_pb)
|
| + body = self.encode_to_json(metric_pb)
|
| else:
|
| - body = self.encodeToJson(self._wrap_proto(metric_pb))
|
| + body = self.encode_to_json(self._wrap_proto(metric_pb))
|
|
|
| try:
|
| - resp, content = self._http.request(self._endpoint, method='POST',
|
| - body=body)
|
| + resp, content = self._http.request(self._endpoint,
|
| + method='POST',
|
| + body=body,
|
| + headers={'Content-Type': 'application/json'})
|
| if resp.status != 200:
|
| logging.warning('HttpsMonitor.send received status %d: %s', resp.status,
|
| content)
|
| @@ -127,14 +169,12 @@ class HttpsMonitor(Monitor):
|
| class PubSubMonitor(Monitor):
|
| """Class which publishes metrics to a Cloud Pub/Sub topic."""
|
|
|
| - _SCOPES = [
|
| - 'https://www.googleapis.com/auth/pubsub',
|
| - ]
|
| + _SCOPES = ['https://www.googleapis.com/auth/pubsub']
|
|
|
| TIMEOUT = 10 # seconds
|
|
|
| def _initialize(self):
|
| - creds = self._load_credentials(self._credsfile)
|
| + creds = self._credential_factory.create(self._SCOPES)
|
| creds.authorize(self._http)
|
| self._api = discovery.build('pubsub', 'v1', http=self._http)
|
|
|
| @@ -164,12 +204,13 @@ class PubSubMonitor(Monitor):
|
| self._update_init_metrics(http_metrics.STATUS_OK)
|
| return True
|
|
|
| - def __init__(self, credsfile, project, topic, use_instrumented_http=True,
|
| - ca_certs=None):
|
| + def __init__(self, credential_factory, project, topic,
|
| + use_instrumented_http=True, ca_certs=None):
|
| """Process monitoring related command line flags and initialize api.
|
|
|
| Args:
|
| - credsfile (str): path to the credentials json file
|
| + credential_factory (CredentialFactory instance): factory that creates
|
| + oauth2 credentials.
|
| project (str): the name of the Pub/Sub project to publish to.
|
| topic (str): the name of the Pub/Sub topic to publish to.
|
| use_instrumented_http (bool): whether to record monitoring metrics for
|
| @@ -188,7 +229,7 @@ class PubSubMonitor(Monitor):
|
| 'acq-mon-api-pubsub', timeout=self.TIMEOUT, ca_certs=ca_certs)
|
| else:
|
| self._http = httplib2.Http(timeout=self.TIMEOUT, ca_certs=ca_certs)
|
| - self._credsfile = credsfile
|
| + self._credential_factory = credential_factory
|
| self._topic = 'projects/%s/topics/%s' % (project, topic)
|
|
|
| def send(self, metric_pb):
|
|
|