Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(132)

Side by Side Diff: third_party/google-endpoints/google/api/control/check_request.py

Issue 2666783008: Add google-endpoints to third_party/. (Closed)
Patch Set: Created 3 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 # Copyright 2016 Google Inc. All Rights Reserved.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 """check_request supports aggregation of CheckRequests.
16
17 :func:`sign` generated a signature from CheckRequests
18 :class:`~google.api.gen.servicecontrol_v1_message.Operation` represents
19 information regarding an operation, and is a key constituent of
20 :class:`~google.api.gen.servicecontrol_v1_message.CheckRequest` and
21 :class:`~google.api.gen.servicecontrol_v1_message.ReportRequests.
22
23 The :class:`.Aggregator` implements the strategy for aggregating CheckRequests
24 and caching their responses.
25
26 """
27
28 from __future__ import absolute_import
29
30 import collections
31 import hashlib
32 import httplib
33 import logging
34 from datetime import datetime
35
36 from apitools.base.py import encoding
37
38 from . import caches, label_descriptor, messages
39 from . import metric_value, operation, signing
40
41 logger = logging.getLogger(__name__)
42
43 # alias for brevity
44 _CheckErrors = messages.CheckError.CodeValueValuesEnum
45 _IS_OK = (httplib.OK, '', True)
46 _IS_UNKNOWN = (
47 httplib.INTERNAL_SERVER_ERROR,
48 'Request blocked due to unsupported block reason {detail}',
49 False)
50 _CHECK_ERROR_CONVERSION = {
51 _CheckErrors.NOT_FOUND: (
52 httplib.BAD_REQUEST,
53 'Client project not found. Please pass a valid project',
54 False,
55 ),
56 _CheckErrors.API_KEY_NOT_FOUND: (
57 httplib.BAD_REQUEST,
58 'API key not found. Please pass a valid API key',
59 True,
60 ),
61 _CheckErrors.API_KEY_EXPIRED: (
62 httplib.BAD_REQUEST,
63 'API key expired. Please renew the API key',
64 True,
65 ),
66 _CheckErrors.API_KEY_INVALID: (
67 httplib.BAD_REQUEST,
68 'API not valid. Please pass a valid API key',
69 True,
70 ),
71 _CheckErrors.SERVICE_NOT_ACTIVATED: (
72 httplib.FORBIDDEN,
73 '{detail} Please enable the project for {project_id}',
74 False,
75 ),
76 _CheckErrors.PERMISSION_DENIED: (
77 httplib.FORBIDDEN,
78 'Permission denied: {detail}',
79 False,
80 ),
81 _CheckErrors.IP_ADDRESS_BLOCKED: (
82 httplib.FORBIDDEN,
83 '{detail}',
84 False,
85 ),
86 _CheckErrors.REFERER_BLOCKED: (
87 httplib.FORBIDDEN,
88 '{detail}',
89 False,
90 ),
91 _CheckErrors.CLIENT_APP_BLOCKED: (
92 httplib.FORBIDDEN,
93 '{detail}',
94 False,
95 ),
96 _CheckErrors.PROJECT_DELETED: (
97 httplib.FORBIDDEN,
98 'Project {project_id} has been deleted',
99 False,
100 ),
101 _CheckErrors.PROJECT_INVALID: (
102 httplib.BAD_REQUEST,
103 'Client Project is not valid. Please pass a valid project',
104 False,
105 ),
106 _CheckErrors.VISIBILITY_DENIED: (
107 httplib.FORBIDDEN,
108 'Project {project_id} has no visibility access to the service',
109 False,
110 ),
111 _CheckErrors.BILLING_DISABLED: (
112 httplib.FORBIDDEN,
113 'Project {project_id} has billing disabled. Please enable it',
114 False,
115 ),
116
117 # Fail open for internal server errors
118 _CheckErrors.NAMESPACE_LOOKUP_UNAVAILABLE: _IS_OK,
119 _CheckErrors.SERVICE_STATUS_UNAVAILABLE: _IS_OK,
120 _CheckErrors.BILLING_STATUS_UNAVAILABLE: _IS_OK,
121 _CheckErrors.QUOTA_CHECK_UNAVAILABLE: _IS_OK,
122 }
123
124
125 def convert_response(check_response, project_id):
126 """Computes a http status code and message `CheckResponse`
127
128 The return value a tuple (code, message, api_key_is_bad) where
129
130 code: is the http status code
131 message: is the message to return
132 api_key_is_bad: indicates that a given api_key is bad
133
134 Args:
135 check_response (:class:`google.api.gen.servicecontrol_v1_messages.CheckRe sponse`):
136 the response from calling an api
137
138 Returns:
139 tuple(code, message, bool)
140 """
141 if not check_response or not check_response.checkErrors:
142 return _IS_OK
143
144 # only check the first error for now, as per ESP
145 theError = check_response.checkErrors[0]
146 error_tuple = _CHECK_ERROR_CONVERSION.get(theError.code, _IS_UNKNOWN)
147 if error_tuple[1].find('{') == -1: # no replacements needed:
148 return error_tuple
149
150 updated_msg = error_tuple[1].replace('{project_id}', project_id)
151 updated_msg = updated_msg.replace('{detail}', theError.detail or '')
152 error_tuple = (error_tuple[0], updated_msg, error_tuple[2])
153 return error_tuple
154
155
156 def sign(check_request):
157 """Obtains a signature for an operation in a `CheckRequest`
158
159 Args:
160 op (:class:`google.api.gen.servicecontrol_v1_messages.Operation`): an
161 operation used in a `CheckRequest`
162
163 Returns:
164 string: a secure hash generated from the operation
165 """
166 if not isinstance(check_request, messages.CheckRequest):
167 raise ValueError('Invalid request')
168 op = check_request.operation
169 if op is None or op.operationName is None or op.consumerId is None:
170 logging.error('Bad %s: not initialized => not signed', check_request)
171 raise ValueError('check request must be initialized with an operation')
172 md5 = hashlib.md5()
173 md5.update(op.operationName)
174 md5.update('\x00')
175 md5.update(op.consumerId)
176 if op.labels:
177 signing.add_dict_to_hash(md5, encoding.MessageToPyValue(op.labels))
178 for value_set in op.metricValueSets:
179 md5.update('\x00')
180 md5.update(value_set.metricName)
181 for mv in value_set.metricValues:
182 metric_value.update_hash(md5, mv)
183
184 md5.update('\x00')
185 if op.quotaProperties:
186 # N.B: this differs form cxx implementation, which serializes the
187 # protobuf. This should be OK as the exact hash used does not need to
188 # match across implementations.
189 md5.update(repr(op.quotaProperties))
190
191 md5.update('\x00')
192 return md5.digest()
193
194
195 _KNOWN_LABELS = label_descriptor.KnownLabels
196
197
198 class Info(collections.namedtuple('Info',
199 ('client_ip',) + operation.Info._fields),
200 operation.Info):
201 """Holds the information necessary to fill in CheckRequest.
202
203 In addition the attributes in :class:`operation.Info`, this has:
204
205 Attributes:
206 client_ip: the client IP address
207
208 """
209 def __new__(cls, client_ip='', **kw):
210 """Invokes the base constructor with default values."""
211 op_info = operation.Info(**kw)
212 return super(Info, cls).__new__(cls, client_ip, **op_info._asdict())
213
214 def as_check_request(self, timer=datetime.utcnow):
215 """Makes a `ServicecontrolServicesCheckRequest` from this instance
216
217 Returns:
218 a ``ServicecontrolServicesCheckRequest``
219
220 Raises:
221 ValueError: if the fields in this instance are insufficient to
222 to create a valid ``ServicecontrolServicesCheckRequest``
223
224 """
225 if not self.service_name:
226 raise ValueError('the service name must be set')
227 if not self.operation_id:
228 raise ValueError('the operation id must be set')
229 if not self.operation_name:
230 raise ValueError('the operation name must be set')
231 op = super(Info, self).as_operation(timer=timer)
232 labels = {
233 _KNOWN_LABELS.SCC_USER_AGENT.label_name: label_descriptor.USER_AGENT
234 }
235 if self.client_ip:
236 labels[_KNOWN_LABELS.SCC_CALLER_IP.label_name] = self.client_ip
237
238 if self.referer:
239 labels[_KNOWN_LABELS.SCC_REFERER.label_name] = self.referer
240
241 op.labels = encoding.PyValueToMessage(
242 messages.Operation.LabelsValue, labels)
243 check_request = messages.CheckRequest(operation=op)
244 return messages.ServicecontrolServicesCheckRequest(
245 serviceName=self.service_name,
246 checkRequest=check_request)
247
248
249 class Aggregator(object):
250 """Caches and aggregates ``CheckRequests``.
251
252 Concurrency: Thread safe.
253
254 Usage:
255
256 Creating a new cache entry and use cached response
257
258 Example:
259 >>> options = caches.CheckOptions()
260 >>> agg = Aggregator('my_service', options)
261 >>> req = ServicecontrolServicesCheckRequest(...)
262 >>> # check returns None as the request is not cached
263 >>> if agg.check(req) is not None:
264 ... resp = service.check(req)
265 ... agg = service.add_response(req, resp)
266 >>> agg.check(req) # response now cached according as-per options
267 <CheckResponse ....>
268
269 Refreshing a cached entry after a flush interval
270
271 The flush interval is constrained to be shorter than the actual cache
272 expiration. This allows the response to potentially remain cached and be
273 aggregated with subsequent check requests for the same operation.
274
275 Example:
276 >>> # continuing from the previous example,
277 >>> # ... after the flush interval
278 >>> # - the response is still in the cache, i.e, not expired
279 >>> # - the first call after the flush interval returns None, subsequent
280 >>> # calls continue to return the cached response
281 >>> agg.check(req) # signals the caller to call service.check(req)
282 None
283 >>> agg.check(req) # next call returns the cached response
284 <CheckResponse ....>
285
286 Flushing the cache
287
288 Once a response is expired, if there is an outstanding, cached CheckRequest
289 for it, this should be sent and their responses added back to the
290 aggregator instance, as they will contain quota updates that have not been
291 sent.
292
293 Example:
294
295 >>> # continuing the previous example
296 >>> for req in agg.flush(): # an iterable of cached CheckRequests
297 ... resp = caller.send_req(req) # caller sends them
298 >>> agg.add_response(req, resp) # and caches their responses
299
300 """
301
302 def __init__(self, service_name, options, kinds=None,
303 timer=datetime.utcnow):
304 """Constructor.
305
306 Args:
307 service_name (string): names the service that all requests aggregated
308 by this instance will be sent
309 options (:class:`~google.api.caches.CheckOptions`): configures the
310 caching and flushing behavior of this instance
311 kinds (dict[string,[google.api.control.MetricKind]]): specifies the
312 kind of metric for each each metric name.
313 timer (function([[datetime]]): a function that returns the current
314 as a time as a datetime instance
315 """
316 self._service_name = service_name
317 self._options = options
318 self._cache = caches.create(options, timer=timer)
319 self._kinds = {} if kinds is None else dict(kinds)
320 self._timer = timer
321
322 @property
323 def service_name(self):
324 """The service to which all aggregated requests should belong."""
325 return self._service_name
326
327 @property
328 def flush_interval(self):
329 """The interval between calls to flush.
330
331 Returns:
332 timedelta: the period between calls to flush if, or ``None`` if no
333 cache is set
334
335 """
336 return None if self._cache is None else self._options.expiration
337
338 def flush(self):
339 """Flushes this instance's cache.
340
341 The driver of this instance should call this method every
342 `flush_interval`.
343
344 Returns:
345 list['CheckRequest']: corresponding to CheckRequests that were
346 pending
347
348 """
349 if self._cache is None:
350 return []
351 with self._cache as c:
352 flushed_items = list(c.out_deque)
353 c.out_deque.clear()
354 cached_reqs = [item.extract_request() for item in flushed_items]
355 cached_reqs = [req for req in cached_reqs if req is not None]
356 return cached_reqs
357
358 def clear(self):
359 """Clears this instance's cache."""
360 if self._cache is not None:
361 with self._cache as c:
362 c.clear()
363 c.out_deque.clear()
364
365 def add_response(self, req, resp):
366 """Adds the response from sending to `req` to this instance's cache.
367
368 Args:
369 req (`ServicecontrolServicesCheckRequest`): the request
370 resp (CheckResponse): the response from sending the request
371 """
372 if self._cache is None:
373 return
374 signature = sign(req.checkRequest)
375 with self._cache as c:
376 now = self._timer()
377 quota_scale = 0 # WIP
378 item = c.get(signature)
379 if item is None:
380 c[signature] = CachedItem(
381 resp, self.service_name, now, quota_scale)
382 else:
383 # Update the cached item to reflect that it is updated
384 item.last_check_time = now
385 item.response = resp
386 item.quota_scale = quota_scale
387 item.is_flushing = False
388 c[signature] = item
389
390 def check(self, req):
391 """Determine if ``req`` is in this instances cache.
392
393 Determine if there are cache hits for the request in this aggregator
394 instance.
395
396 Not in the cache
397
398 If req is not in the cache, it returns ``None`` to indicate that the
399 caller should send the request.
400
401 Cache Hit; response has errors
402
403 When a cached CheckResponse has errors, it's assumed that ``req`` would
404 fail as well, so the cached CheckResponse is returned. However, the
405 first CheckRequest after the flush interval has elapsed should be sent
406 to the server to refresh the CheckResponse, though until it's received,
407 subsequent CheckRequests should fail with the cached CheckResponse.
408
409 Cache behaviour - response passed
410
411 If the cached CheckResponse has no errors, it's assumed that ``req``
412 will succeed as well, so the CheckResponse is returned, with the quota
413 info updated to the same as requested. The requested tokens are
414 aggregated until flushed.
415
416 Args:
417 req (``ServicecontrolServicesCheckRequest``): to be sent to
418 the service control service
419
420 Raises:
421 ValueError: if the ``req`` service_name is not the same as
422 this instances
423
424 Returns:
425 ``CheckResponse``: if an applicable response is cached by this
426 instance is available for use or None, if there is no applicable
427 response
428
429 """
430 if self._cache is None:
431 return None # no cache, send request now
432 if not isinstance(req, messages.ServicecontrolServicesCheckRequest):
433 raise ValueError('Invalid request')
434 if req.serviceName != self.service_name:
435 logger.error('bad check(): service_name %s does not match ours %s',
436 req.serviceName, self.service_name)
437 raise ValueError('Service name mismatch')
438 check_request = req.checkRequest
439 if check_request is None:
440 logger.error('bad check(): no check_request in %s', req)
441 raise ValueError('Expected operation not set')
442 op = check_request.operation
443 if op is None:
444 logger.error('bad check(): no operation in %s', req)
445 raise ValueError('Expected operation not set')
446 if op.importance != messages.Operation.ImportanceValueValuesEnum.LOW:
447 return None # op is important, send request now
448
449 signature = sign(check_request)
450 with self._cache as cache:
451 logger.debug('checking the cache for %s\n%s', signature, cache)
452 item = cache.get(signature)
453 if item is None:
454 return None # signal to caller to send req
455 else:
456 return self._handle_cached_response(req, item)
457
458 def _handle_cached_response(self, req, item):
459 with self._cache: # defensive, this re-entrant lock should be held
460 if len(item.response.checkErrors) > 0:
461 if self._is_current(item):
462 return item.response
463
464 # There are errors, but now it's ok to send a new request
465 item.last_check_time = self._timer()
466 return None # signal caller to send req
467 else:
468 item.update_request(req, self._kinds)
469 if self._is_current(item):
470 return item.response
471
472 if (item.is_flushing):
473 logger.warn('last refresh request did not complete')
474
475 item.is_flushing = True
476 item.last_check_time = self._timer()
477 return None # signal caller to send req
478
479 def _is_current(self, item):
480 age = self._timer() - item.last_check_time
481 return age < self._options.flush_interval
482
483
484 class CachedItem(object):
485 """CachedItem holds items cached along with a ``CheckRequest``.
486
487 Thread compatible.
488
489 Attributes:
490 response (:class:`messages.CachedResponse`): the cached response
491 is_flushing (bool): indicates if it's been detected that item
492 is stale, and needs to be flushed
493 quota_scale (int): WIP, used to determine quota
494 last_check_time (datetime.datetime): the last time this instance
495 was checked
496
497 """
498
499 def __init__(self, resp, service_name, last_check_time, quota_scale):
500 self.last_check_time = last_check_time
501 self.quota_scale = quota_scale
502 self.is_flushing = False
503 self.response = resp
504 self._service_name = service_name
505 self._op_aggregator = None
506
507 def update_request(self, req, kinds):
508 agg = self._op_aggregator
509 if agg is None:
510 self._op_aggregator = operation.Aggregator(
511 req.checkRequest.operation, kinds)
512 else:
513 agg.add(req.checkRequest.operation)
514
515 def extract_request(self):
516 if self._op_aggregator is None:
517 return None
518
519 op = self._op_aggregator.as_operation()
520 self._op_aggregator = None
521 check_request = messages.CheckRequest(operation=op)
522 return messages.ServicecontrolServicesCheckRequest(
523 serviceName=self._service_name,
524 checkRequest=check_request)
OLDNEW
« no previous file with comments | « third_party/google-endpoints/google/api/control/caches.py ('k') | third_party/google-endpoints/google/api/control/client.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698