Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(715)

Unified Diff: third_party/google-endpoints/google/api/control/report_request.py

Issue 2666783008: Add google-endpoints to third_party/. (Closed)
Patch Set: Created 3 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: third_party/google-endpoints/google/api/control/report_request.py
diff --git a/third_party/google-endpoints/google/api/control/report_request.py b/third_party/google-endpoints/google/api/control/report_request.py
new file mode 100644
index 0000000000000000000000000000000000000000..45c73295922d561b1a9ea44dbba83bcfd8354637
--- /dev/null
+++ b/third_party/google-endpoints/google/api/control/report_request.py
@@ -0,0 +1,540 @@
+# Copyright 2016 Google Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""report_request supports aggregation of ReportRequests.
+
+It proves :class:`.Aggregator` that aggregates and batches together
+ReportRequests.
+
+"""
+
+
+from __future__ import absolute_import
+
+import collections
+import functools
+import hashlib
+import logging
+import time
+from datetime import datetime, timedelta
+
+from apitools.base.py import encoding
+from enum import Enum
+from . import caches, label_descriptor, operation, messages
+from . import metric_descriptor, signing, timestamp
+
+logger = logging.getLogger(__name__)
+
+SIZE_NOT_SET = -1
+
+
+def _validate_int_arg(name, value):
+ if value == SIZE_NOT_SET or (isinstance(value, int) and value >= 0):
+ return
+ raise ValueError('%s should be a non-negative int/long' % (name,))
+
+
+def _validate_timedelta_arg(name, value):
+ if value is None or isinstance(value, timedelta):
+ return
+ raise ValueError('%s should be a timedelta' % (name,))
+
+
+class ReportingRules(collections.namedtuple('ReportingRules',
+ ['logs', 'metrics', 'labels'])):
+ """Holds information that determines how to fill a `ReportRequest`.
+
+ Attributes:
+ logs (iterable[string]): the name of logs to be included in the `ReportRequest`
+ metrics (iterable[:class:`google.api.control.metric_descriptor.KnownMetrics`]):
+ the metrics to be added to a `ReportRequest`
+ labels (iterable[:class:`google.api.control.metric_descriptor.KnownLabels`]):
+ the labels to be added to a `ReportRequest`
+ """
+ # pylint: disable=too-few-public-methods
+
+ def __new__(cls, logs=None, metrics=None, labels=None):
+ """Invokes the base constructor with default values."""
+ logs = set(logs) if logs else set()
+ metrics = tuple(metrics) if metrics else tuple()
+ labels = tuple(labels) if labels else tuple()
+ return super(cls, ReportingRules).__new__(cls, logs, metrics, labels)
+
+ @classmethod
+ def from_known_inputs(cls, logs=None, metric_names=None, label_names=None):
+ """An alternate constructor that assumes known metrics and labels.
+
+ This differs from the default constructor in that the metrics and labels
+ are iterables of names of 'known' metrics and labels respectively. The
+ names are used to obtain the metrics and labels from
+ :class:`google.api.control.metric_descriptor.KnownMetrics` and
+ :class:`google.api.control.label_descriptor.KnownLabels` respectively.
+
+ names that don't correspond to a known metric or label are ignored; as
+ are metrics or labels that don't yet have a way of updating the
+ `ReportRequest` operation.
+
+ Args:
+ logs (iterable[string]): the name of logs to be included in the
+ `ReportRequest`
+ metric_names (iterable[string]): the name of a known metric to be
+ added to the `ReportRequest`
+ label_names (iterable[string]): the name of a known label to be added
+ to the `ReportRequest`
+
+ """
+ if not metric_names:
+ metric_names = ()
+ if not label_names:
+ label_names = ()
+ known_labels = []
+ known_metrics = []
+ # pylint: disable=no-member
+ # pylint is not aware of the __members__ attributes
+ for l in label_descriptor.KnownLabels.__members__.values():
+ if l.update_label_func and l.label_name in label_names:
+ known_labels.append(l)
+ for m in metric_descriptor.KnownMetrics.__members__.values():
+ if m.update_op_func and m.metric_name in metric_names:
+ known_metrics.append(m)
+ return cls(logs=logs, metrics=known_metrics, labels=known_labels)
+
+
+class ReportedProtocols(Enum):
+ """Enumerates the protocols that can be reported."""
+ # pylint: disable=too-few-public-methods
+ UNKNOWN = 0
+ HTTP = 1
+ HTTP2 = 2
+ GRPC = 3
+
+
+class ReportedPlatforms(Enum):
+ """Enumerates the platforms that can be reported."""
+ # pylint: disable=too-few-public-methods
+ UNKNOWN = 0
+ GAE_FLEX = 1
+ GAE_STANDARD = 2
+ GCE = 3
+ GKE = 4
+ DEVELOPMENT = 5
+
+
+class ErrorCause(Enum):
+ """Enumerates the causes of errors."""
+ # pylint: disable=too-few-public-methods
+ internal = 0 # default, error in scc library code
+ application = 1 # external application error
+ auth = 2 # authentication error
+ service_control = 3 # error in service control check
+
+
+# alias the severity enum
+_SEVERITY = messages.LogEntry.SeverityValueValuesEnum
+
+
+def _struct_payload_from(a_dict):
+ return encoding.PyValueToMessage(messages.LogEntry.StructPayloadValue, a_dict)
+
+
+class Info(
+ collections.namedtuple(
+ 'Info', (
+ 'api_name',
+ 'api_method',
+ 'api_version',
+ 'auth_issuer',
+ 'auth_audience',
+ 'backend_time',
+ 'error_cause',
+ 'location',
+ 'log_message',
+ 'method',
+ 'overhead_time',
+ 'platform',
+ 'producer_project_id',
+ 'protocol',
+ 'request_size',
+ 'request_time',
+ 'response_code',
+ 'response_size',
+ 'url',
+ ) + operation.Info._fields),
+ operation.Info):
+ """Holds the information necessary to fill in a ReportRequest.
+
+ In the attribute descriptions below, N/A means 'not available'
+
+ Attributes:
+ api_name (string): the api name and version
+ api_method (string): the full api method name
+ api_version (string): the api version
+ auth_issuer (string): the auth issuer
+ auth_audience (string): the auth audience
+ backend_time(datetime.timedelta): the backend request time, None for N/A
+ error_cause(:class:`ErrorCause`): the cause of error if one has occurred
+ location (string): the location of the service
+ log_message (string): a message to log as an info log
+ method (string): the HTTP method used to make the request
+ overhead_time(datetime.timedelta): the overhead time, None for N/A
+ platform (:class:`ReportedPlatform`): the platform in use
+ producer_project_id (string): the producer project id
+ protocol (:class:`ReportedProtocol`): the protocol used
+ request_size(int): the request size in bytes, -1 means N/A
+ request_time(datetime.timedelta): the request time
+ response_size(int): the request size in bytes, -1 means N/A
+ response_code(int): the code of the http response
+ url (string): the request url
+
+ """
+ # pylint: disable=too-many-arguments,too-many-locals
+
+ COPYABLE_LOG_FIELDS = [
+ 'api_name',
+ 'api_method',
+ 'api_key',
+ 'producer_project_id',
+ 'referer',
+ 'location',
+ 'log_message',
+ 'url',
+ ]
+
+ def __new__(cls,
+ api_name='',
+ api_method='',
+ api_version='',
+ auth_issuer='',
+ auth_audience='',
+ backend_time=None,
+ error_cause=ErrorCause.internal,
+ location='',
+ log_message='',
+ method='',
+ overhead_time=None,
+ platform=ReportedPlatforms.UNKNOWN,
+ producer_project_id='',
+ protocol=ReportedProtocols.UNKNOWN,
+ request_size=SIZE_NOT_SET,
+ request_time=None,
+ response_size=SIZE_NOT_SET,
+ response_code=200,
+ url='',
+ **kw):
+ """Invokes the base constructor with default values."""
+ op_info = operation.Info(**kw)
+ _validate_timedelta_arg('backend_time', backend_time)
+ _validate_timedelta_arg('overhead_time', overhead_time)
+ _validate_timedelta_arg('request_time', request_time)
+ _validate_int_arg('request_size', request_size)
+ _validate_int_arg('response_size', response_size)
+ if not isinstance(protocol, ReportedProtocols):
+ raise ValueError('protocol should be a %s' % (ReportedProtocols,))
+ if not isinstance(platform, ReportedPlatforms):
+ raise ValueError('platform should be a %s' % (ReportedPlatforms,))
+ if not isinstance(error_cause, ErrorCause):
+ raise ValueError('error_cause should be a %s' % (ErrorCause,))
+ return super(cls, Info).__new__(
+ cls,
+ api_name,
+ api_method,
+ api_version,
+ auth_issuer,
+ auth_audience,
+ backend_time,
+ error_cause,
+ location,
+ log_message,
+ method,
+ overhead_time,
+ platform,
+ producer_project_id,
+ protocol,
+ request_size,
+ request_time,
+ response_code,
+ response_size,
+ url,
+ **op_info._asdict())
+
+ def _as_log_entry(self, name, now):
+ """Makes a `LogEntry` from this instance for the given log_name.
+
+ Args:
+ rules (:class:`ReportingRules`): determines what labels, metrics and
+ logs to include in the report request.
+ now (:class:`datetime.DateTime`): the current time
+
+ Return:
+ a ``LogEntry`` generated from this instance with the given name
+ and timestamp
+
+ Raises:
+ ValueError: if the fields in this instance are insufficient to
+ to create a valid ``ServicecontrolServicesReportRequest``
+
+ """
+ # initialize the struct with fields that are always present
+ d = {
+ 'http_response_code': self.response_code,
+ 'timestamp': time.mktime(now.timetuple())
+ }
+
+ # compute the severity
+ severity = _SEVERITY.INFO
+ if self.response_code >= 400:
+ severity = _SEVERITY.ERROR
+ d['error_cause'] = self.error_cause.name
+
+ # add 'optional' fields to the struct
+ if self.request_size > 0:
+ d['request_size'] = self.request_size
+ if self.response_size > 0:
+ d['response_size'] = self.response_size
+ if self.method:
+ d['http_method'] = self.method
+ if self.request_time:
+ d['request_latency_in_ms'] = self.request_time.total_seconds() * 1000
+
+ # add 'copyable' fields to the struct
+ for key in self.COPYABLE_LOG_FIELDS:
+ value = getattr(self, key, None)
+ if value:
+ d[key] = value
+
+ return messages.LogEntry(
+ name=name,
+ timestamp=timestamp.to_rfc3339(now),
+ severity=severity,
+ structPayload=_struct_payload_from(d))
+
+ def as_report_request(self, rules, timer=datetime.utcnow):
+ """Makes a `ServicecontrolServicesReportRequest` from this instance
+
+ Args:
+ rules (:class:`ReportingRules`): determines what labels, metrics and
+ logs to include in the report request.
+ timer: a function that determines the current time
+
+ Return:
+ a ``ServicecontrolServicesReportRequest`` generated from this instance
+ governed by the provided ``rules``
+
+ Raises:
+ ValueError: if the fields in this instance cannot be used to create
+ a valid ``ServicecontrolServicesReportRequest``
+
+ """
+ if not self.service_name:
+ raise ValueError('the service name must be set')
+ op = super(Info, self).as_operation(timer=timer)
+
+ # Populate metrics and labels if they can be associated with a
+ # method/operation
+ if op.operationId and op.operationName:
+ labels = {}
+ for known_label in rules.labels:
+ known_label.do_labels_update(self, labels)
+ if labels:
+ op.labels = encoding.PyValueToMessage(
+ messages.Operation.LabelsValue,
+ labels)
+ for known_metric in rules.metrics:
+ known_metric.do_operation_update(self, op)
+
+ # Populate the log entries
+ now = timer()
+ op.logEntries = [self._as_log_entry(l, now) for l in rules.logs]
+
+ return messages.ServicecontrolServicesReportRequest(
+ serviceName=self.service_name,
+ reportRequest=messages.ReportRequest(operations=[op]))
+
+
+_NO_RESULTS = tuple()
+
+
+class Aggregator(object):
+ """Aggregates Service Control Report requests.
+
+ :func:`report` determines if a `ReportRequest` should be sent to the
+ service immediately
+
+ """
+
+ CACHED_OK = object()
+ """A sentinel returned by :func:`report` when a request is cached OK."""
+
+ MAX_OPERATION_COUNT = 1000
+ """The maximum number of operations to send in a report request."""
+
+ def __init__(self, service_name, options, kinds=None,
+ timer=datetime.utcnow):
+ """
+ Constructor
+
+ Args:
+ service_name (string): name of the service being aggregagated
+ options (:class:`google.api.caches.ReportOptions`): configures the behavior
+ of this aggregator
+ kinds (dict[string, [:class:`.MetricKind`]]): describes the
+ type of metrics used during aggregation
+ timer (function([[datetime]]): a function that returns the current
+ as a time as a datetime instance
+
+ """
+ self._cache = caches.create(options, timer=timer)
+ self._options = options
+ self._kinds = kinds
+ self._service_name = service_name
+
+ @property
+ def flush_interval(self):
+ """The interval between calls to flush.
+
+ Returns:
+ timedelta: the period between calls to flush if, or ``None`` if no
+ cache is set
+
+ """
+ return None if self._cache is None else self._options.flush_interval
+
+ @property
+ def service_name(self):
+ """The service to which all requests being aggregated should belong."""
+ return self._service_name
+
+ def flush(self):
+ """Flushes this instance's cache.
+
+ The driver of this instance should call this method every
+ `flush_interval`.
+
+ Returns:
+ list[``ServicecontrolServicesReportRequest``]: corresponding to the
+ pending cached operations
+
+ """
+ if self._cache is None:
+ return _NO_RESULTS
+ with self._cache as c:
+ flushed_ops = [x.as_operation() for x in list(c.out_deque)]
+ c.out_deque.clear()
+ reqs = []
+ max_ops = self.MAX_OPERATION_COUNT
+ for x in range(0, len(flushed_ops), max_ops):
+ report_request = messages.ReportRequest(
+ operations=flushed_ops[x:x + max_ops])
+ reqs.append(
+ messages.ServicecontrolServicesReportRequest(
+ serviceName=self.service_name,
+ reportRequest=report_request))
+
+ return reqs
+
+ def clear(self):
+ """Clears the cache."""
+ if self._cache is None:
+ return _NO_RESULTS
+ if self._cache is not None:
+ with self._cache as k:
+ res = [x.as_operation() for x in k.values()]
+ k.clear()
+ k.out_deque.clear()
+ return res
+
+ def report(self, req):
+ """Adds a report request to the cache.
+
+ Returns ``None`` if it could not be aggregated, and callers need to
+ send the request to the server, otherwise it returns ``CACHED_OK``.
+
+ Args:
+ req (:class:`messages.ReportRequest`): the request
+ to be aggregated
+
+ Result:
+ ``None`` if the request as not cached, otherwise ``CACHED_OK``
+
+ """
+ if self._cache is None:
+ return None # no cache, send request now
+ if not isinstance(req, messages.ServicecontrolServicesReportRequest):
+ raise ValueError('Invalid request')
+ if req.serviceName != self.service_name:
+ logger.error('bad report(): service_name %s does not match ours %s',
+ req.serviceName, self.service_name)
+ raise ValueError('Service name mismatch')
+ report_req = req.reportRequest
+ if report_req is None:
+ logger.error('bad report(): no report_request in %s', req)
+ raise ValueError('Expected report_request not set')
+ if _has_high_important_operation(report_req) or self._cache is None:
+ return None
+ ops_by_signature = _key_by_signature(report_req.operations,
+ _sign_operation)
+
+ # Concurrency:
+ #
+ # This holds a lock on the cache while updating it. No i/o operations
+ # are performed, so any waiting threads see minimal delays
+ with self._cache as cache:
+ for key, op in iter(ops_by_signature.items()):
+ agg = cache.get(key)
+ if agg is None:
+ cache[key] = operation.Aggregator(op, self._kinds)
+ else:
+ agg.add(op)
+
+ return self.CACHED_OK
+
+
+def _has_high_important_operation(req):
+ def is_important(op):
+ return (op.importance !=
+ messages.Operation.ImportanceValueValuesEnum.LOW)
+
+ return functools.reduce(lambda x, y: x and is_important(y),
+ req.operations, True)
+
+
+def _key_by_signature(operations, signature_func):
+ """Creates a dictionary of operations keyed by signature
+
+ Args:
+ operations (iterable[Operations]): the input operations
+
+ Returns:
+ dict[string, [Operations]]: the operations keyed by signature
+ """
+ return dict((signature_func(op), op) for op in operations)
+
+
+def _sign_operation(op):
+ """Obtains a signature for an operation in a ReportRequest.
+
+ Args:
+ op (:class:`google.api.gen.servicecontrol_v1_messages.Operation`): an
+ operation used in a `ReportRequest`
+
+ Returns:
+ string: a unique signature for that operation
+ """
+ md5 = hashlib.md5()
+ md5.update(op.consumerId)
+ md5.update('\x00')
+ md5.update(op.operationName)
+ if op.labels:
+ signing.add_dict_to_hash(md5, encoding.MessageToPyValue(op.labels))
+ return md5.digest()

Powered by Google App Engine
This is Rietveld 408576698