| Index: third_party/google-endpoints/google/api/control/report_request.py
|
| diff --git a/third_party/google-endpoints/google/api/control/report_request.py b/third_party/google-endpoints/google/api/control/report_request.py
|
| new file mode 100644
|
| index 0000000000000000000000000000000000000000..45c73295922d561b1a9ea44dbba83bcfd8354637
|
| --- /dev/null
|
| +++ b/third_party/google-endpoints/google/api/control/report_request.py
|
| @@ -0,0 +1,540 @@
|
| +# Copyright 2016 Google Inc. All Rights Reserved.
|
| +#
|
| +# Licensed under the Apache License, Version 2.0 (the "License");
|
| +# you may not use this file except in compliance with the License.
|
| +# You may obtain a copy of the License at
|
| +#
|
| +# http://www.apache.org/licenses/LICENSE-2.0
|
| +#
|
| +# Unless required by applicable law or agreed to in writing, software
|
| +# distributed under the License is distributed on an "AS IS" BASIS,
|
| +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
| +# See the License for the specific language governing permissions and
|
| +# limitations under the License.
|
| +
|
| +"""report_request supports aggregation of ReportRequests.
|
| +
|
| +It proves :class:`.Aggregator` that aggregates and batches together
|
| +ReportRequests.
|
| +
|
| +"""
|
| +
|
| +
|
| +from __future__ import absolute_import
|
| +
|
| +import collections
|
| +import functools
|
| +import hashlib
|
| +import logging
|
| +import time
|
| +from datetime import datetime, timedelta
|
| +
|
| +from apitools.base.py import encoding
|
| +from enum import Enum
|
| +from . import caches, label_descriptor, operation, messages
|
| +from . import metric_descriptor, signing, timestamp
|
| +
|
| +logger = logging.getLogger(__name__)
|
| +
|
| +SIZE_NOT_SET = -1
|
| +
|
| +
|
| +def _validate_int_arg(name, value):
|
| + if value == SIZE_NOT_SET or (isinstance(value, int) and value >= 0):
|
| + return
|
| + raise ValueError('%s should be a non-negative int/long' % (name,))
|
| +
|
| +
|
| +def _validate_timedelta_arg(name, value):
|
| + if value is None or isinstance(value, timedelta):
|
| + return
|
| + raise ValueError('%s should be a timedelta' % (name,))
|
| +
|
| +
|
| +class ReportingRules(collections.namedtuple('ReportingRules',
|
| + ['logs', 'metrics', 'labels'])):
|
| + """Holds information that determines how to fill a `ReportRequest`.
|
| +
|
| + Attributes:
|
| + logs (iterable[string]): the name of logs to be included in the `ReportRequest`
|
| + metrics (iterable[:class:`google.api.control.metric_descriptor.KnownMetrics`]):
|
| + the metrics to be added to a `ReportRequest`
|
| + labels (iterable[:class:`google.api.control.metric_descriptor.KnownLabels`]):
|
| + the labels to be added to a `ReportRequest`
|
| + """
|
| + # pylint: disable=too-few-public-methods
|
| +
|
| + def __new__(cls, logs=None, metrics=None, labels=None):
|
| + """Invokes the base constructor with default values."""
|
| + logs = set(logs) if logs else set()
|
| + metrics = tuple(metrics) if metrics else tuple()
|
| + labels = tuple(labels) if labels else tuple()
|
| + return super(cls, ReportingRules).__new__(cls, logs, metrics, labels)
|
| +
|
| + @classmethod
|
| + def from_known_inputs(cls, logs=None, metric_names=None, label_names=None):
|
| + """An alternate constructor that assumes known metrics and labels.
|
| +
|
| + This differs from the default constructor in that the metrics and labels
|
| + are iterables of names of 'known' metrics and labels respectively. The
|
| + names are used to obtain the metrics and labels from
|
| + :class:`google.api.control.metric_descriptor.KnownMetrics` and
|
| + :class:`google.api.control.label_descriptor.KnownLabels` respectively.
|
| +
|
| + names that don't correspond to a known metric or label are ignored; as
|
| + are metrics or labels that don't yet have a way of updating the
|
| + `ReportRequest` operation.
|
| +
|
| + Args:
|
| + logs (iterable[string]): the name of logs to be included in the
|
| + `ReportRequest`
|
| + metric_names (iterable[string]): the name of a known metric to be
|
| + added to the `ReportRequest`
|
| + label_names (iterable[string]): the name of a known label to be added
|
| + to the `ReportRequest`
|
| +
|
| + """
|
| + if not metric_names:
|
| + metric_names = ()
|
| + if not label_names:
|
| + label_names = ()
|
| + known_labels = []
|
| + known_metrics = []
|
| + # pylint: disable=no-member
|
| + # pylint is not aware of the __members__ attributes
|
| + for l in label_descriptor.KnownLabels.__members__.values():
|
| + if l.update_label_func and l.label_name in label_names:
|
| + known_labels.append(l)
|
| + for m in metric_descriptor.KnownMetrics.__members__.values():
|
| + if m.update_op_func and m.metric_name in metric_names:
|
| + known_metrics.append(m)
|
| + return cls(logs=logs, metrics=known_metrics, labels=known_labels)
|
| +
|
| +
|
| +class ReportedProtocols(Enum):
|
| + """Enumerates the protocols that can be reported."""
|
| + # pylint: disable=too-few-public-methods
|
| + UNKNOWN = 0
|
| + HTTP = 1
|
| + HTTP2 = 2
|
| + GRPC = 3
|
| +
|
| +
|
| +class ReportedPlatforms(Enum):
|
| + """Enumerates the platforms that can be reported."""
|
| + # pylint: disable=too-few-public-methods
|
| + UNKNOWN = 0
|
| + GAE_FLEX = 1
|
| + GAE_STANDARD = 2
|
| + GCE = 3
|
| + GKE = 4
|
| + DEVELOPMENT = 5
|
| +
|
| +
|
| +class ErrorCause(Enum):
|
| + """Enumerates the causes of errors."""
|
| + # pylint: disable=too-few-public-methods
|
| + internal = 0 # default, error in scc library code
|
| + application = 1 # external application error
|
| + auth = 2 # authentication error
|
| + service_control = 3 # error in service control check
|
| +
|
| +
|
| +# alias the severity enum
|
| +_SEVERITY = messages.LogEntry.SeverityValueValuesEnum
|
| +
|
| +
|
| +def _struct_payload_from(a_dict):
|
| + return encoding.PyValueToMessage(messages.LogEntry.StructPayloadValue, a_dict)
|
| +
|
| +
|
| +class Info(
|
| + collections.namedtuple(
|
| + 'Info', (
|
| + 'api_name',
|
| + 'api_method',
|
| + 'api_version',
|
| + 'auth_issuer',
|
| + 'auth_audience',
|
| + 'backend_time',
|
| + 'error_cause',
|
| + 'location',
|
| + 'log_message',
|
| + 'method',
|
| + 'overhead_time',
|
| + 'platform',
|
| + 'producer_project_id',
|
| + 'protocol',
|
| + 'request_size',
|
| + 'request_time',
|
| + 'response_code',
|
| + 'response_size',
|
| + 'url',
|
| + ) + operation.Info._fields),
|
| + operation.Info):
|
| + """Holds the information necessary to fill in a ReportRequest.
|
| +
|
| + In the attribute descriptions below, N/A means 'not available'
|
| +
|
| + Attributes:
|
| + api_name (string): the api name and version
|
| + api_method (string): the full api method name
|
| + api_version (string): the api version
|
| + auth_issuer (string): the auth issuer
|
| + auth_audience (string): the auth audience
|
| + backend_time(datetime.timedelta): the backend request time, None for N/A
|
| + error_cause(:class:`ErrorCause`): the cause of error if one has occurred
|
| + location (string): the location of the service
|
| + log_message (string): a message to log as an info log
|
| + method (string): the HTTP method used to make the request
|
| + overhead_time(datetime.timedelta): the overhead time, None for N/A
|
| + platform (:class:`ReportedPlatform`): the platform in use
|
| + producer_project_id (string): the producer project id
|
| + protocol (:class:`ReportedProtocol`): the protocol used
|
| + request_size(int): the request size in bytes, -1 means N/A
|
| + request_time(datetime.timedelta): the request time
|
| + response_size(int): the request size in bytes, -1 means N/A
|
| + response_code(int): the code of the http response
|
| + url (string): the request url
|
| +
|
| + """
|
| + # pylint: disable=too-many-arguments,too-many-locals
|
| +
|
| + COPYABLE_LOG_FIELDS = [
|
| + 'api_name',
|
| + 'api_method',
|
| + 'api_key',
|
| + 'producer_project_id',
|
| + 'referer',
|
| + 'location',
|
| + 'log_message',
|
| + 'url',
|
| + ]
|
| +
|
| + def __new__(cls,
|
| + api_name='',
|
| + api_method='',
|
| + api_version='',
|
| + auth_issuer='',
|
| + auth_audience='',
|
| + backend_time=None,
|
| + error_cause=ErrorCause.internal,
|
| + location='',
|
| + log_message='',
|
| + method='',
|
| + overhead_time=None,
|
| + platform=ReportedPlatforms.UNKNOWN,
|
| + producer_project_id='',
|
| + protocol=ReportedProtocols.UNKNOWN,
|
| + request_size=SIZE_NOT_SET,
|
| + request_time=None,
|
| + response_size=SIZE_NOT_SET,
|
| + response_code=200,
|
| + url='',
|
| + **kw):
|
| + """Invokes the base constructor with default values."""
|
| + op_info = operation.Info(**kw)
|
| + _validate_timedelta_arg('backend_time', backend_time)
|
| + _validate_timedelta_arg('overhead_time', overhead_time)
|
| + _validate_timedelta_arg('request_time', request_time)
|
| + _validate_int_arg('request_size', request_size)
|
| + _validate_int_arg('response_size', response_size)
|
| + if not isinstance(protocol, ReportedProtocols):
|
| + raise ValueError('protocol should be a %s' % (ReportedProtocols,))
|
| + if not isinstance(platform, ReportedPlatforms):
|
| + raise ValueError('platform should be a %s' % (ReportedPlatforms,))
|
| + if not isinstance(error_cause, ErrorCause):
|
| + raise ValueError('error_cause should be a %s' % (ErrorCause,))
|
| + return super(cls, Info).__new__(
|
| + cls,
|
| + api_name,
|
| + api_method,
|
| + api_version,
|
| + auth_issuer,
|
| + auth_audience,
|
| + backend_time,
|
| + error_cause,
|
| + location,
|
| + log_message,
|
| + method,
|
| + overhead_time,
|
| + platform,
|
| + producer_project_id,
|
| + protocol,
|
| + request_size,
|
| + request_time,
|
| + response_code,
|
| + response_size,
|
| + url,
|
| + **op_info._asdict())
|
| +
|
| + def _as_log_entry(self, name, now):
|
| + """Makes a `LogEntry` from this instance for the given log_name.
|
| +
|
| + Args:
|
| + rules (:class:`ReportingRules`): determines what labels, metrics and
|
| + logs to include in the report request.
|
| + now (:class:`datetime.DateTime`): the current time
|
| +
|
| + Return:
|
| + a ``LogEntry`` generated from this instance with the given name
|
| + and timestamp
|
| +
|
| + Raises:
|
| + ValueError: if the fields in this instance are insufficient to
|
| + to create a valid ``ServicecontrolServicesReportRequest``
|
| +
|
| + """
|
| + # initialize the struct with fields that are always present
|
| + d = {
|
| + 'http_response_code': self.response_code,
|
| + 'timestamp': time.mktime(now.timetuple())
|
| + }
|
| +
|
| + # compute the severity
|
| + severity = _SEVERITY.INFO
|
| + if self.response_code >= 400:
|
| + severity = _SEVERITY.ERROR
|
| + d['error_cause'] = self.error_cause.name
|
| +
|
| + # add 'optional' fields to the struct
|
| + if self.request_size > 0:
|
| + d['request_size'] = self.request_size
|
| + if self.response_size > 0:
|
| + d['response_size'] = self.response_size
|
| + if self.method:
|
| + d['http_method'] = self.method
|
| + if self.request_time:
|
| + d['request_latency_in_ms'] = self.request_time.total_seconds() * 1000
|
| +
|
| + # add 'copyable' fields to the struct
|
| + for key in self.COPYABLE_LOG_FIELDS:
|
| + value = getattr(self, key, None)
|
| + if value:
|
| + d[key] = value
|
| +
|
| + return messages.LogEntry(
|
| + name=name,
|
| + timestamp=timestamp.to_rfc3339(now),
|
| + severity=severity,
|
| + structPayload=_struct_payload_from(d))
|
| +
|
| + def as_report_request(self, rules, timer=datetime.utcnow):
|
| + """Makes a `ServicecontrolServicesReportRequest` from this instance
|
| +
|
| + Args:
|
| + rules (:class:`ReportingRules`): determines what labels, metrics and
|
| + logs to include in the report request.
|
| + timer: a function that determines the current time
|
| +
|
| + Return:
|
| + a ``ServicecontrolServicesReportRequest`` generated from this instance
|
| + governed by the provided ``rules``
|
| +
|
| + Raises:
|
| + ValueError: if the fields in this instance cannot be used to create
|
| + a valid ``ServicecontrolServicesReportRequest``
|
| +
|
| + """
|
| + if not self.service_name:
|
| + raise ValueError('the service name must be set')
|
| + op = super(Info, self).as_operation(timer=timer)
|
| +
|
| + # Populate metrics and labels if they can be associated with a
|
| + # method/operation
|
| + if op.operationId and op.operationName:
|
| + labels = {}
|
| + for known_label in rules.labels:
|
| + known_label.do_labels_update(self, labels)
|
| + if labels:
|
| + op.labels = encoding.PyValueToMessage(
|
| + messages.Operation.LabelsValue,
|
| + labels)
|
| + for known_metric in rules.metrics:
|
| + known_metric.do_operation_update(self, op)
|
| +
|
| + # Populate the log entries
|
| + now = timer()
|
| + op.logEntries = [self._as_log_entry(l, now) for l in rules.logs]
|
| +
|
| + return messages.ServicecontrolServicesReportRequest(
|
| + serviceName=self.service_name,
|
| + reportRequest=messages.ReportRequest(operations=[op]))
|
| +
|
| +
|
| +_NO_RESULTS = tuple()
|
| +
|
| +
|
| +class Aggregator(object):
|
| + """Aggregates Service Control Report requests.
|
| +
|
| + :func:`report` determines if a `ReportRequest` should be sent to the
|
| + service immediately
|
| +
|
| + """
|
| +
|
| + CACHED_OK = object()
|
| + """A sentinel returned by :func:`report` when a request is cached OK."""
|
| +
|
| + MAX_OPERATION_COUNT = 1000
|
| + """The maximum number of operations to send in a report request."""
|
| +
|
| + def __init__(self, service_name, options, kinds=None,
|
| + timer=datetime.utcnow):
|
| + """
|
| + Constructor
|
| +
|
| + Args:
|
| + service_name (string): name of the service being aggregagated
|
| + options (:class:`google.api.caches.ReportOptions`): configures the behavior
|
| + of this aggregator
|
| + kinds (dict[string, [:class:`.MetricKind`]]): describes the
|
| + type of metrics used during aggregation
|
| + timer (function([[datetime]]): a function that returns the current
|
| + as a time as a datetime instance
|
| +
|
| + """
|
| + self._cache = caches.create(options, timer=timer)
|
| + self._options = options
|
| + self._kinds = kinds
|
| + self._service_name = service_name
|
| +
|
| + @property
|
| + def flush_interval(self):
|
| + """The interval between calls to flush.
|
| +
|
| + Returns:
|
| + timedelta: the period between calls to flush if, or ``None`` if no
|
| + cache is set
|
| +
|
| + """
|
| + return None if self._cache is None else self._options.flush_interval
|
| +
|
| + @property
|
| + def service_name(self):
|
| + """The service to which all requests being aggregated should belong."""
|
| + return self._service_name
|
| +
|
| + def flush(self):
|
| + """Flushes this instance's cache.
|
| +
|
| + The driver of this instance should call this method every
|
| + `flush_interval`.
|
| +
|
| + Returns:
|
| + list[``ServicecontrolServicesReportRequest``]: corresponding to the
|
| + pending cached operations
|
| +
|
| + """
|
| + if self._cache is None:
|
| + return _NO_RESULTS
|
| + with self._cache as c:
|
| + flushed_ops = [x.as_operation() for x in list(c.out_deque)]
|
| + c.out_deque.clear()
|
| + reqs = []
|
| + max_ops = self.MAX_OPERATION_COUNT
|
| + for x in range(0, len(flushed_ops), max_ops):
|
| + report_request = messages.ReportRequest(
|
| + operations=flushed_ops[x:x + max_ops])
|
| + reqs.append(
|
| + messages.ServicecontrolServicesReportRequest(
|
| + serviceName=self.service_name,
|
| + reportRequest=report_request))
|
| +
|
| + return reqs
|
| +
|
| + def clear(self):
|
| + """Clears the cache."""
|
| + if self._cache is None:
|
| + return _NO_RESULTS
|
| + if self._cache is not None:
|
| + with self._cache as k:
|
| + res = [x.as_operation() for x in k.values()]
|
| + k.clear()
|
| + k.out_deque.clear()
|
| + return res
|
| +
|
| + def report(self, req):
|
| + """Adds a report request to the cache.
|
| +
|
| + Returns ``None`` if it could not be aggregated, and callers need to
|
| + send the request to the server, otherwise it returns ``CACHED_OK``.
|
| +
|
| + Args:
|
| + req (:class:`messages.ReportRequest`): the request
|
| + to be aggregated
|
| +
|
| + Result:
|
| + ``None`` if the request as not cached, otherwise ``CACHED_OK``
|
| +
|
| + """
|
| + if self._cache is None:
|
| + return None # no cache, send request now
|
| + if not isinstance(req, messages.ServicecontrolServicesReportRequest):
|
| + raise ValueError('Invalid request')
|
| + if req.serviceName != self.service_name:
|
| + logger.error('bad report(): service_name %s does not match ours %s',
|
| + req.serviceName, self.service_name)
|
| + raise ValueError('Service name mismatch')
|
| + report_req = req.reportRequest
|
| + if report_req is None:
|
| + logger.error('bad report(): no report_request in %s', req)
|
| + raise ValueError('Expected report_request not set')
|
| + if _has_high_important_operation(report_req) or self._cache is None:
|
| + return None
|
| + ops_by_signature = _key_by_signature(report_req.operations,
|
| + _sign_operation)
|
| +
|
| + # Concurrency:
|
| + #
|
| + # This holds a lock on the cache while updating it. No i/o operations
|
| + # are performed, so any waiting threads see minimal delays
|
| + with self._cache as cache:
|
| + for key, op in iter(ops_by_signature.items()):
|
| + agg = cache.get(key)
|
| + if agg is None:
|
| + cache[key] = operation.Aggregator(op, self._kinds)
|
| + else:
|
| + agg.add(op)
|
| +
|
| + return self.CACHED_OK
|
| +
|
| +
|
| +def _has_high_important_operation(req):
|
| + def is_important(op):
|
| + return (op.importance !=
|
| + messages.Operation.ImportanceValueValuesEnum.LOW)
|
| +
|
| + return functools.reduce(lambda x, y: x and is_important(y),
|
| + req.operations, True)
|
| +
|
| +
|
| +def _key_by_signature(operations, signature_func):
|
| + """Creates a dictionary of operations keyed by signature
|
| +
|
| + Args:
|
| + operations (iterable[Operations]): the input operations
|
| +
|
| + Returns:
|
| + dict[string, [Operations]]: the operations keyed by signature
|
| + """
|
| + return dict((signature_func(op), op) for op in operations)
|
| +
|
| +
|
| +def _sign_operation(op):
|
| + """Obtains a signature for an operation in a ReportRequest.
|
| +
|
| + Args:
|
| + op (:class:`google.api.gen.servicecontrol_v1_messages.Operation`): an
|
| + operation used in a `ReportRequest`
|
| +
|
| + Returns:
|
| + string: a unique signature for that operation
|
| + """
|
| + md5 = hashlib.md5()
|
| + md5.update(op.consumerId)
|
| + md5.update('\x00')
|
| + md5.update(op.operationName)
|
| + if op.labels:
|
| + signing.add_dict_to_hash(md5, encoding.MessageToPyValue(op.labels))
|
| + return md5.digest()
|
|
|