Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(71)

Side by Side Diff: third_party/google-endpoints/google/api/control/report_request.py

Issue 2666783008: Add google-endpoints to third_party/. (Closed)
Patch Set: Created 3 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 # Copyright 2016 Google Inc. All Rights Reserved.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 """report_request supports aggregation of ReportRequests.
16
17 It proves :class:`.Aggregator` that aggregates and batches together
18 ReportRequests.
19
20 """
21
22
23 from __future__ import absolute_import
24
25 import collections
26 import functools
27 import hashlib
28 import logging
29 import time
30 from datetime import datetime, timedelta
31
32 from apitools.base.py import encoding
33 from enum import Enum
34 from . import caches, label_descriptor, operation, messages
35 from . import metric_descriptor, signing, timestamp
36
37 logger = logging.getLogger(__name__)
38
39 SIZE_NOT_SET = -1
40
41
42 def _validate_int_arg(name, value):
43 if value == SIZE_NOT_SET or (isinstance(value, int) and value >= 0):
44 return
45 raise ValueError('%s should be a non-negative int/long' % (name,))
46
47
48 def _validate_timedelta_arg(name, value):
49 if value is None or isinstance(value, timedelta):
50 return
51 raise ValueError('%s should be a timedelta' % (name,))
52
53
54 class ReportingRules(collections.namedtuple('ReportingRules',
55 ['logs', 'metrics', 'labels'])):
56 """Holds information that determines how to fill a `ReportRequest`.
57
58 Attributes:
59 logs (iterable[string]): the name of logs to be included in the `ReportReq uest`
60 metrics (iterable[:class:`google.api.control.metric_descriptor.KnownMetric s`]):
61 the metrics to be added to a `ReportRequest`
62 labels (iterable[:class:`google.api.control.metric_descriptor.KnownLabels` ]):
63 the labels to be added to a `ReportRequest`
64 """
65 # pylint: disable=too-few-public-methods
66
67 def __new__(cls, logs=None, metrics=None, labels=None):
68 """Invokes the base constructor with default values."""
69 logs = set(logs) if logs else set()
70 metrics = tuple(metrics) if metrics else tuple()
71 labels = tuple(labels) if labels else tuple()
72 return super(cls, ReportingRules).__new__(cls, logs, metrics, labels)
73
74 @classmethod
75 def from_known_inputs(cls, logs=None, metric_names=None, label_names=None):
76 """An alternate constructor that assumes known metrics and labels.
77
78 This differs from the default constructor in that the metrics and labels
79 are iterables of names of 'known' metrics and labels respectively. The
80 names are used to obtain the metrics and labels from
81 :class:`google.api.control.metric_descriptor.KnownMetrics` and
82 :class:`google.api.control.label_descriptor.KnownLabels` respectively.
83
84 names that don't correspond to a known metric or label are ignored; as
85 are metrics or labels that don't yet have a way of updating the
86 `ReportRequest` operation.
87
88 Args:
89 logs (iterable[string]): the name of logs to be included in the
90 `ReportRequest`
91 metric_names (iterable[string]): the name of a known metric to be
92 added to the `ReportRequest`
93 label_names (iterable[string]): the name of a known label to be added
94 to the `ReportRequest`
95
96 """
97 if not metric_names:
98 metric_names = ()
99 if not label_names:
100 label_names = ()
101 known_labels = []
102 known_metrics = []
103 # pylint: disable=no-member
104 # pylint is not aware of the __members__ attributes
105 for l in label_descriptor.KnownLabels.__members__.values():
106 if l.update_label_func and l.label_name in label_names:
107 known_labels.append(l)
108 for m in metric_descriptor.KnownMetrics.__members__.values():
109 if m.update_op_func and m.metric_name in metric_names:
110 known_metrics.append(m)
111 return cls(logs=logs, metrics=known_metrics, labels=known_labels)
112
113
114 class ReportedProtocols(Enum):
115 """Enumerates the protocols that can be reported."""
116 # pylint: disable=too-few-public-methods
117 UNKNOWN = 0
118 HTTP = 1
119 HTTP2 = 2
120 GRPC = 3
121
122
123 class ReportedPlatforms(Enum):
124 """Enumerates the platforms that can be reported."""
125 # pylint: disable=too-few-public-methods
126 UNKNOWN = 0
127 GAE_FLEX = 1
128 GAE_STANDARD = 2
129 GCE = 3
130 GKE = 4
131 DEVELOPMENT = 5
132
133
134 class ErrorCause(Enum):
135 """Enumerates the causes of errors."""
136 # pylint: disable=too-few-public-methods
137 internal = 0 # default, error in scc library code
138 application = 1 # external application error
139 auth = 2 # authentication error
140 service_control = 3 # error in service control check
141
142
143 # alias the severity enum
144 _SEVERITY = messages.LogEntry.SeverityValueValuesEnum
145
146
147 def _struct_payload_from(a_dict):
148 return encoding.PyValueToMessage(messages.LogEntry.StructPayloadValue, a_dic t)
149
150
151 class Info(
152 collections.namedtuple(
153 'Info', (
154 'api_name',
155 'api_method',
156 'api_version',
157 'auth_issuer',
158 'auth_audience',
159 'backend_time',
160 'error_cause',
161 'location',
162 'log_message',
163 'method',
164 'overhead_time',
165 'platform',
166 'producer_project_id',
167 'protocol',
168 'request_size',
169 'request_time',
170 'response_code',
171 'response_size',
172 'url',
173 ) + operation.Info._fields),
174 operation.Info):
175 """Holds the information necessary to fill in a ReportRequest.
176
177 In the attribute descriptions below, N/A means 'not available'
178
179 Attributes:
180 api_name (string): the api name and version
181 api_method (string): the full api method name
182 api_version (string): the api version
183 auth_issuer (string): the auth issuer
184 auth_audience (string): the auth audience
185 backend_time(datetime.timedelta): the backend request time, None for N/A
186 error_cause(:class:`ErrorCause`): the cause of error if one has occurred
187 location (string): the location of the service
188 log_message (string): a message to log as an info log
189 method (string): the HTTP method used to make the request
190 overhead_time(datetime.timedelta): the overhead time, None for N/A
191 platform (:class:`ReportedPlatform`): the platform in use
192 producer_project_id (string): the producer project id
193 protocol (:class:`ReportedProtocol`): the protocol used
194 request_size(int): the request size in bytes, -1 means N/A
195 request_time(datetime.timedelta): the request time
196 response_size(int): the request size in bytes, -1 means N/A
197 response_code(int): the code of the http response
198 url (string): the request url
199
200 """
201 # pylint: disable=too-many-arguments,too-many-locals
202
203 COPYABLE_LOG_FIELDS = [
204 'api_name',
205 'api_method',
206 'api_key',
207 'producer_project_id',
208 'referer',
209 'location',
210 'log_message',
211 'url',
212 ]
213
214 def __new__(cls,
215 api_name='',
216 api_method='',
217 api_version='',
218 auth_issuer='',
219 auth_audience='',
220 backend_time=None,
221 error_cause=ErrorCause.internal,
222 location='',
223 log_message='',
224 method='',
225 overhead_time=None,
226 platform=ReportedPlatforms.UNKNOWN,
227 producer_project_id='',
228 protocol=ReportedProtocols.UNKNOWN,
229 request_size=SIZE_NOT_SET,
230 request_time=None,
231 response_size=SIZE_NOT_SET,
232 response_code=200,
233 url='',
234 **kw):
235 """Invokes the base constructor with default values."""
236 op_info = operation.Info(**kw)
237 _validate_timedelta_arg('backend_time', backend_time)
238 _validate_timedelta_arg('overhead_time', overhead_time)
239 _validate_timedelta_arg('request_time', request_time)
240 _validate_int_arg('request_size', request_size)
241 _validate_int_arg('response_size', response_size)
242 if not isinstance(protocol, ReportedProtocols):
243 raise ValueError('protocol should be a %s' % (ReportedProtocols,))
244 if not isinstance(platform, ReportedPlatforms):
245 raise ValueError('platform should be a %s' % (ReportedPlatforms,))
246 if not isinstance(error_cause, ErrorCause):
247 raise ValueError('error_cause should be a %s' % (ErrorCause,))
248 return super(cls, Info).__new__(
249 cls,
250 api_name,
251 api_method,
252 api_version,
253 auth_issuer,
254 auth_audience,
255 backend_time,
256 error_cause,
257 location,
258 log_message,
259 method,
260 overhead_time,
261 platform,
262 producer_project_id,
263 protocol,
264 request_size,
265 request_time,
266 response_code,
267 response_size,
268 url,
269 **op_info._asdict())
270
271 def _as_log_entry(self, name, now):
272 """Makes a `LogEntry` from this instance for the given log_name.
273
274 Args:
275 rules (:class:`ReportingRules`): determines what labels, metrics and
276 logs to include in the report request.
277 now (:class:`datetime.DateTime`): the current time
278
279 Return:
280 a ``LogEntry`` generated from this instance with the given name
281 and timestamp
282
283 Raises:
284 ValueError: if the fields in this instance are insufficient to
285 to create a valid ``ServicecontrolServicesReportRequest``
286
287 """
288 # initialize the struct with fields that are always present
289 d = {
290 'http_response_code': self.response_code,
291 'timestamp': time.mktime(now.timetuple())
292 }
293
294 # compute the severity
295 severity = _SEVERITY.INFO
296 if self.response_code >= 400:
297 severity = _SEVERITY.ERROR
298 d['error_cause'] = self.error_cause.name
299
300 # add 'optional' fields to the struct
301 if self.request_size > 0:
302 d['request_size'] = self.request_size
303 if self.response_size > 0:
304 d['response_size'] = self.response_size
305 if self.method:
306 d['http_method'] = self.method
307 if self.request_time:
308 d['request_latency_in_ms'] = self.request_time.total_seconds() * 100 0
309
310 # add 'copyable' fields to the struct
311 for key in self.COPYABLE_LOG_FIELDS:
312 value = getattr(self, key, None)
313 if value:
314 d[key] = value
315
316 return messages.LogEntry(
317 name=name,
318 timestamp=timestamp.to_rfc3339(now),
319 severity=severity,
320 structPayload=_struct_payload_from(d))
321
322 def as_report_request(self, rules, timer=datetime.utcnow):
323 """Makes a `ServicecontrolServicesReportRequest` from this instance
324
325 Args:
326 rules (:class:`ReportingRules`): determines what labels, metrics and
327 logs to include in the report request.
328 timer: a function that determines the current time
329
330 Return:
331 a ``ServicecontrolServicesReportRequest`` generated from this instance
332 governed by the provided ``rules``
333
334 Raises:
335 ValueError: if the fields in this instance cannot be used to create
336 a valid ``ServicecontrolServicesReportRequest``
337
338 """
339 if not self.service_name:
340 raise ValueError('the service name must be set')
341 op = super(Info, self).as_operation(timer=timer)
342
343 # Populate metrics and labels if they can be associated with a
344 # method/operation
345 if op.operationId and op.operationName:
346 labels = {}
347 for known_label in rules.labels:
348 known_label.do_labels_update(self, labels)
349 if labels:
350 op.labels = encoding.PyValueToMessage(
351 messages.Operation.LabelsValue,
352 labels)
353 for known_metric in rules.metrics:
354 known_metric.do_operation_update(self, op)
355
356 # Populate the log entries
357 now = timer()
358 op.logEntries = [self._as_log_entry(l, now) for l in rules.logs]
359
360 return messages.ServicecontrolServicesReportRequest(
361 serviceName=self.service_name,
362 reportRequest=messages.ReportRequest(operations=[op]))
363
364
365 _NO_RESULTS = tuple()
366
367
368 class Aggregator(object):
369 """Aggregates Service Control Report requests.
370
371 :func:`report` determines if a `ReportRequest` should be sent to the
372 service immediately
373
374 """
375
376 CACHED_OK = object()
377 """A sentinel returned by :func:`report` when a request is cached OK."""
378
379 MAX_OPERATION_COUNT = 1000
380 """The maximum number of operations to send in a report request."""
381
382 def __init__(self, service_name, options, kinds=None,
383 timer=datetime.utcnow):
384 """
385 Constructor
386
387 Args:
388 service_name (string): name of the service being aggregagated
389 options (:class:`google.api.caches.ReportOptions`): configures the beh avior
390 of this aggregator
391 kinds (dict[string, [:class:`.MetricKind`]]): describes the
392 type of metrics used during aggregation
393 timer (function([[datetime]]): a function that returns the current
394 as a time as a datetime instance
395
396 """
397 self._cache = caches.create(options, timer=timer)
398 self._options = options
399 self._kinds = kinds
400 self._service_name = service_name
401
402 @property
403 def flush_interval(self):
404 """The interval between calls to flush.
405
406 Returns:
407 timedelta: the period between calls to flush if, or ``None`` if no
408 cache is set
409
410 """
411 return None if self._cache is None else self._options.flush_interval
412
413 @property
414 def service_name(self):
415 """The service to which all requests being aggregated should belong."""
416 return self._service_name
417
418 def flush(self):
419 """Flushes this instance's cache.
420
421 The driver of this instance should call this method every
422 `flush_interval`.
423
424 Returns:
425 list[``ServicecontrolServicesReportRequest``]: corresponding to the
426 pending cached operations
427
428 """
429 if self._cache is None:
430 return _NO_RESULTS
431 with self._cache as c:
432 flushed_ops = [x.as_operation() for x in list(c.out_deque)]
433 c.out_deque.clear()
434 reqs = []
435 max_ops = self.MAX_OPERATION_COUNT
436 for x in range(0, len(flushed_ops), max_ops):
437 report_request = messages.ReportRequest(
438 operations=flushed_ops[x:x + max_ops])
439 reqs.append(
440 messages.ServicecontrolServicesReportRequest(
441 serviceName=self.service_name,
442 reportRequest=report_request))
443
444 return reqs
445
446 def clear(self):
447 """Clears the cache."""
448 if self._cache is None:
449 return _NO_RESULTS
450 if self._cache is not None:
451 with self._cache as k:
452 res = [x.as_operation() for x in k.values()]
453 k.clear()
454 k.out_deque.clear()
455 return res
456
457 def report(self, req):
458 """Adds a report request to the cache.
459
460 Returns ``None`` if it could not be aggregated, and callers need to
461 send the request to the server, otherwise it returns ``CACHED_OK``.
462
463 Args:
464 req (:class:`messages.ReportRequest`): the request
465 to be aggregated
466
467 Result:
468 ``None`` if the request as not cached, otherwise ``CACHED_OK``
469
470 """
471 if self._cache is None:
472 return None # no cache, send request now
473 if not isinstance(req, messages.ServicecontrolServicesReportRequest):
474 raise ValueError('Invalid request')
475 if req.serviceName != self.service_name:
476 logger.error('bad report(): service_name %s does not match ours %s',
477 req.serviceName, self.service_name)
478 raise ValueError('Service name mismatch')
479 report_req = req.reportRequest
480 if report_req is None:
481 logger.error('bad report(): no report_request in %s', req)
482 raise ValueError('Expected report_request not set')
483 if _has_high_important_operation(report_req) or self._cache is None:
484 return None
485 ops_by_signature = _key_by_signature(report_req.operations,
486 _sign_operation)
487
488 # Concurrency:
489 #
490 # This holds a lock on the cache while updating it. No i/o operations
491 # are performed, so any waiting threads see minimal delays
492 with self._cache as cache:
493 for key, op in iter(ops_by_signature.items()):
494 agg = cache.get(key)
495 if agg is None:
496 cache[key] = operation.Aggregator(op, self._kinds)
497 else:
498 agg.add(op)
499
500 return self.CACHED_OK
501
502
503 def _has_high_important_operation(req):
504 def is_important(op):
505 return (op.importance !=
506 messages.Operation.ImportanceValueValuesEnum.LOW)
507
508 return functools.reduce(lambda x, y: x and is_important(y),
509 req.operations, True)
510
511
512 def _key_by_signature(operations, signature_func):
513 """Creates a dictionary of operations keyed by signature
514
515 Args:
516 operations (iterable[Operations]): the input operations
517
518 Returns:
519 dict[string, [Operations]]: the operations keyed by signature
520 """
521 return dict((signature_func(op), op) for op in operations)
522
523
524 def _sign_operation(op):
525 """Obtains a signature for an operation in a ReportRequest.
526
527 Args:
528 op (:class:`google.api.gen.servicecontrol_v1_messages.Operation`): an
529 operation used in a `ReportRequest`
530
531 Returns:
532 string: a unique signature for that operation
533 """
534 md5 = hashlib.md5()
535 md5.update(op.consumerId)
536 md5.update('\x00')
537 md5.update(op.operationName)
538 if op.labels:
539 signing.add_dict_to_hash(md5, encoding.MessageToPyValue(op.labels))
540 return md5.digest()
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698