OLD | NEW |
(Empty) | |
| 1 # Copyright 2016 Google Inc. All Rights Reserved. |
| 2 # |
| 3 # Licensed under the Apache License, Version 2.0 (the "License"); |
| 4 # you may not use this file except in compliance with the License. |
| 5 # You may obtain a copy of the License at |
| 6 # |
| 7 # http://www.apache.org/licenses/LICENSE-2.0 |
| 8 # |
| 9 # Unless required by applicable law or agreed to in writing, software |
| 10 # distributed under the License is distributed on an "AS IS" BASIS, |
| 11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 12 # See the License for the specific language governing permissions and |
| 13 # limitations under the License. |
| 14 |
| 15 """check_request supports aggregation of CheckRequests. |
| 16 |
| 17 :func:`sign` generated a signature from CheckRequests |
| 18 :class:`~google.api.gen.servicecontrol_v1_message.Operation` represents |
| 19 information regarding an operation, and is a key constituent of |
| 20 :class:`~google.api.gen.servicecontrol_v1_message.CheckRequest` and |
| 21 :class:`~google.api.gen.servicecontrol_v1_message.ReportRequests. |
| 22 |
| 23 The :class:`.Aggregator` implements the strategy for aggregating CheckRequests |
| 24 and caching their responses. |
| 25 |
| 26 """ |
| 27 |
| 28 from __future__ import absolute_import |
| 29 |
| 30 import collections |
| 31 import hashlib |
| 32 import httplib |
| 33 import logging |
| 34 from datetime import datetime |
| 35 |
| 36 from apitools.base.py import encoding |
| 37 |
| 38 from . import caches, label_descriptor, messages |
| 39 from . import metric_value, operation, signing |
| 40 |
| 41 logger = logging.getLogger(__name__) |
| 42 |
| 43 # alias for brevity |
| 44 _CheckErrors = messages.CheckError.CodeValueValuesEnum |
| 45 _IS_OK = (httplib.OK, '', True) |
| 46 _IS_UNKNOWN = ( |
| 47 httplib.INTERNAL_SERVER_ERROR, |
| 48 'Request blocked due to unsupported block reason {detail}', |
| 49 False) |
| 50 _CHECK_ERROR_CONVERSION = { |
| 51 _CheckErrors.NOT_FOUND: ( |
| 52 httplib.BAD_REQUEST, |
| 53 'Client project not found. Please pass a valid project', |
| 54 False, |
| 55 ), |
| 56 _CheckErrors.API_KEY_NOT_FOUND: ( |
| 57 httplib.BAD_REQUEST, |
| 58 'API key not found. Please pass a valid API key', |
| 59 True, |
| 60 ), |
| 61 _CheckErrors.API_KEY_EXPIRED: ( |
| 62 httplib.BAD_REQUEST, |
| 63 'API key expired. Please renew the API key', |
| 64 True, |
| 65 ), |
| 66 _CheckErrors.API_KEY_INVALID: ( |
| 67 httplib.BAD_REQUEST, |
| 68 'API not valid. Please pass a valid API key', |
| 69 True, |
| 70 ), |
| 71 _CheckErrors.SERVICE_NOT_ACTIVATED: ( |
| 72 httplib.FORBIDDEN, |
| 73 '{detail} Please enable the project for {project_id}', |
| 74 False, |
| 75 ), |
| 76 _CheckErrors.PERMISSION_DENIED: ( |
| 77 httplib.FORBIDDEN, |
| 78 'Permission denied: {detail}', |
| 79 False, |
| 80 ), |
| 81 _CheckErrors.IP_ADDRESS_BLOCKED: ( |
| 82 httplib.FORBIDDEN, |
| 83 '{detail}', |
| 84 False, |
| 85 ), |
| 86 _CheckErrors.REFERER_BLOCKED: ( |
| 87 httplib.FORBIDDEN, |
| 88 '{detail}', |
| 89 False, |
| 90 ), |
| 91 _CheckErrors.CLIENT_APP_BLOCKED: ( |
| 92 httplib.FORBIDDEN, |
| 93 '{detail}', |
| 94 False, |
| 95 ), |
| 96 _CheckErrors.PROJECT_DELETED: ( |
| 97 httplib.FORBIDDEN, |
| 98 'Project {project_id} has been deleted', |
| 99 False, |
| 100 ), |
| 101 _CheckErrors.PROJECT_INVALID: ( |
| 102 httplib.BAD_REQUEST, |
| 103 'Client Project is not valid. Please pass a valid project', |
| 104 False, |
| 105 ), |
| 106 _CheckErrors.VISIBILITY_DENIED: ( |
| 107 httplib.FORBIDDEN, |
| 108 'Project {project_id} has no visibility access to the service', |
| 109 False, |
| 110 ), |
| 111 _CheckErrors.BILLING_DISABLED: ( |
| 112 httplib.FORBIDDEN, |
| 113 'Project {project_id} has billing disabled. Please enable it', |
| 114 False, |
| 115 ), |
| 116 |
| 117 # Fail open for internal server errors |
| 118 _CheckErrors.NAMESPACE_LOOKUP_UNAVAILABLE: _IS_OK, |
| 119 _CheckErrors.SERVICE_STATUS_UNAVAILABLE: _IS_OK, |
| 120 _CheckErrors.BILLING_STATUS_UNAVAILABLE: _IS_OK, |
| 121 _CheckErrors.QUOTA_CHECK_UNAVAILABLE: _IS_OK, |
| 122 } |
| 123 |
| 124 |
| 125 def convert_response(check_response, project_id): |
| 126 """Computes a http status code and message `CheckResponse` |
| 127 |
| 128 The return value a tuple (code, message, api_key_is_bad) where |
| 129 |
| 130 code: is the http status code |
| 131 message: is the message to return |
| 132 api_key_is_bad: indicates that a given api_key is bad |
| 133 |
| 134 Args: |
| 135 check_response (:class:`google.api.gen.servicecontrol_v1_messages.CheckRe
sponse`): |
| 136 the response from calling an api |
| 137 |
| 138 Returns: |
| 139 tuple(code, message, bool) |
| 140 """ |
| 141 if not check_response or not check_response.checkErrors: |
| 142 return _IS_OK |
| 143 |
| 144 # only check the first error for now, as per ESP |
| 145 theError = check_response.checkErrors[0] |
| 146 error_tuple = _CHECK_ERROR_CONVERSION.get(theError.code, _IS_UNKNOWN) |
| 147 if error_tuple[1].find('{') == -1: # no replacements needed: |
| 148 return error_tuple |
| 149 |
| 150 updated_msg = error_tuple[1].replace('{project_id}', project_id) |
| 151 updated_msg = updated_msg.replace('{detail}', theError.detail or '') |
| 152 error_tuple = (error_tuple[0], updated_msg, error_tuple[2]) |
| 153 return error_tuple |
| 154 |
| 155 |
| 156 def sign(check_request): |
| 157 """Obtains a signature for an operation in a `CheckRequest` |
| 158 |
| 159 Args: |
| 160 op (:class:`google.api.gen.servicecontrol_v1_messages.Operation`): an |
| 161 operation used in a `CheckRequest` |
| 162 |
| 163 Returns: |
| 164 string: a secure hash generated from the operation |
| 165 """ |
| 166 if not isinstance(check_request, messages.CheckRequest): |
| 167 raise ValueError('Invalid request') |
| 168 op = check_request.operation |
| 169 if op is None or op.operationName is None or op.consumerId is None: |
| 170 logging.error('Bad %s: not initialized => not signed', check_request) |
| 171 raise ValueError('check request must be initialized with an operation') |
| 172 md5 = hashlib.md5() |
| 173 md5.update(op.operationName) |
| 174 md5.update('\x00') |
| 175 md5.update(op.consumerId) |
| 176 if op.labels: |
| 177 signing.add_dict_to_hash(md5, encoding.MessageToPyValue(op.labels)) |
| 178 for value_set in op.metricValueSets: |
| 179 md5.update('\x00') |
| 180 md5.update(value_set.metricName) |
| 181 for mv in value_set.metricValues: |
| 182 metric_value.update_hash(md5, mv) |
| 183 |
| 184 md5.update('\x00') |
| 185 if op.quotaProperties: |
| 186 # N.B: this differs form cxx implementation, which serializes the |
| 187 # protobuf. This should be OK as the exact hash used does not need to |
| 188 # match across implementations. |
| 189 md5.update(repr(op.quotaProperties)) |
| 190 |
| 191 md5.update('\x00') |
| 192 return md5.digest() |
| 193 |
| 194 |
| 195 _KNOWN_LABELS = label_descriptor.KnownLabels |
| 196 |
| 197 |
| 198 class Info(collections.namedtuple('Info', |
| 199 ('client_ip',) + operation.Info._fields), |
| 200 operation.Info): |
| 201 """Holds the information necessary to fill in CheckRequest. |
| 202 |
| 203 In addition the attributes in :class:`operation.Info`, this has: |
| 204 |
| 205 Attributes: |
| 206 client_ip: the client IP address |
| 207 |
| 208 """ |
| 209 def __new__(cls, client_ip='', **kw): |
| 210 """Invokes the base constructor with default values.""" |
| 211 op_info = operation.Info(**kw) |
| 212 return super(Info, cls).__new__(cls, client_ip, **op_info._asdict()) |
| 213 |
| 214 def as_check_request(self, timer=datetime.utcnow): |
| 215 """Makes a `ServicecontrolServicesCheckRequest` from this instance |
| 216 |
| 217 Returns: |
| 218 a ``ServicecontrolServicesCheckRequest`` |
| 219 |
| 220 Raises: |
| 221 ValueError: if the fields in this instance are insufficient to |
| 222 to create a valid ``ServicecontrolServicesCheckRequest`` |
| 223 |
| 224 """ |
| 225 if not self.service_name: |
| 226 raise ValueError('the service name must be set') |
| 227 if not self.operation_id: |
| 228 raise ValueError('the operation id must be set') |
| 229 if not self.operation_name: |
| 230 raise ValueError('the operation name must be set') |
| 231 op = super(Info, self).as_operation(timer=timer) |
| 232 labels = { |
| 233 _KNOWN_LABELS.SCC_USER_AGENT.label_name: label_descriptor.USER_AGENT |
| 234 } |
| 235 if self.client_ip: |
| 236 labels[_KNOWN_LABELS.SCC_CALLER_IP.label_name] = self.client_ip |
| 237 |
| 238 if self.referer: |
| 239 labels[_KNOWN_LABELS.SCC_REFERER.label_name] = self.referer |
| 240 |
| 241 op.labels = encoding.PyValueToMessage( |
| 242 messages.Operation.LabelsValue, labels) |
| 243 check_request = messages.CheckRequest(operation=op) |
| 244 return messages.ServicecontrolServicesCheckRequest( |
| 245 serviceName=self.service_name, |
| 246 checkRequest=check_request) |
| 247 |
| 248 |
| 249 class Aggregator(object): |
| 250 """Caches and aggregates ``CheckRequests``. |
| 251 |
| 252 Concurrency: Thread safe. |
| 253 |
| 254 Usage: |
| 255 |
| 256 Creating a new cache entry and use cached response |
| 257 |
| 258 Example: |
| 259 >>> options = caches.CheckOptions() |
| 260 >>> agg = Aggregator('my_service', options) |
| 261 >>> req = ServicecontrolServicesCheckRequest(...) |
| 262 >>> # check returns None as the request is not cached |
| 263 >>> if agg.check(req) is not None: |
| 264 ... resp = service.check(req) |
| 265 ... agg = service.add_response(req, resp) |
| 266 >>> agg.check(req) # response now cached according as-per options |
| 267 <CheckResponse ....> |
| 268 |
| 269 Refreshing a cached entry after a flush interval |
| 270 |
| 271 The flush interval is constrained to be shorter than the actual cache |
| 272 expiration. This allows the response to potentially remain cached and be |
| 273 aggregated with subsequent check requests for the same operation. |
| 274 |
| 275 Example: |
| 276 >>> # continuing from the previous example, |
| 277 >>> # ... after the flush interval |
| 278 >>> # - the response is still in the cache, i.e, not expired |
| 279 >>> # - the first call after the flush interval returns None, subsequent |
| 280 >>> # calls continue to return the cached response |
| 281 >>> agg.check(req) # signals the caller to call service.check(req) |
| 282 None |
| 283 >>> agg.check(req) # next call returns the cached response |
| 284 <CheckResponse ....> |
| 285 |
| 286 Flushing the cache |
| 287 |
| 288 Once a response is expired, if there is an outstanding, cached CheckRequest |
| 289 for it, this should be sent and their responses added back to the |
| 290 aggregator instance, as they will contain quota updates that have not been |
| 291 sent. |
| 292 |
| 293 Example: |
| 294 |
| 295 >>> # continuing the previous example |
| 296 >>> for req in agg.flush(): # an iterable of cached CheckRequests |
| 297 ... resp = caller.send_req(req) # caller sends them |
| 298 >>> agg.add_response(req, resp) # and caches their responses |
| 299 |
| 300 """ |
| 301 |
| 302 def __init__(self, service_name, options, kinds=None, |
| 303 timer=datetime.utcnow): |
| 304 """Constructor. |
| 305 |
| 306 Args: |
| 307 service_name (string): names the service that all requests aggregated |
| 308 by this instance will be sent |
| 309 options (:class:`~google.api.caches.CheckOptions`): configures the |
| 310 caching and flushing behavior of this instance |
| 311 kinds (dict[string,[google.api.control.MetricKind]]): specifies the |
| 312 kind of metric for each each metric name. |
| 313 timer (function([[datetime]]): a function that returns the current |
| 314 as a time as a datetime instance |
| 315 """ |
| 316 self._service_name = service_name |
| 317 self._options = options |
| 318 self._cache = caches.create(options, timer=timer) |
| 319 self._kinds = {} if kinds is None else dict(kinds) |
| 320 self._timer = timer |
| 321 |
| 322 @property |
| 323 def service_name(self): |
| 324 """The service to which all aggregated requests should belong.""" |
| 325 return self._service_name |
| 326 |
| 327 @property |
| 328 def flush_interval(self): |
| 329 """The interval between calls to flush. |
| 330 |
| 331 Returns: |
| 332 timedelta: the period between calls to flush if, or ``None`` if no |
| 333 cache is set |
| 334 |
| 335 """ |
| 336 return None if self._cache is None else self._options.expiration |
| 337 |
| 338 def flush(self): |
| 339 """Flushes this instance's cache. |
| 340 |
| 341 The driver of this instance should call this method every |
| 342 `flush_interval`. |
| 343 |
| 344 Returns: |
| 345 list['CheckRequest']: corresponding to CheckRequests that were |
| 346 pending |
| 347 |
| 348 """ |
| 349 if self._cache is None: |
| 350 return [] |
| 351 with self._cache as c: |
| 352 flushed_items = list(c.out_deque) |
| 353 c.out_deque.clear() |
| 354 cached_reqs = [item.extract_request() for item in flushed_items] |
| 355 cached_reqs = [req for req in cached_reqs if req is not None] |
| 356 return cached_reqs |
| 357 |
| 358 def clear(self): |
| 359 """Clears this instance's cache.""" |
| 360 if self._cache is not None: |
| 361 with self._cache as c: |
| 362 c.clear() |
| 363 c.out_deque.clear() |
| 364 |
| 365 def add_response(self, req, resp): |
| 366 """Adds the response from sending to `req` to this instance's cache. |
| 367 |
| 368 Args: |
| 369 req (`ServicecontrolServicesCheckRequest`): the request |
| 370 resp (CheckResponse): the response from sending the request |
| 371 """ |
| 372 if self._cache is None: |
| 373 return |
| 374 signature = sign(req.checkRequest) |
| 375 with self._cache as c: |
| 376 now = self._timer() |
| 377 quota_scale = 0 # WIP |
| 378 item = c.get(signature) |
| 379 if item is None: |
| 380 c[signature] = CachedItem( |
| 381 resp, self.service_name, now, quota_scale) |
| 382 else: |
| 383 # Update the cached item to reflect that it is updated |
| 384 item.last_check_time = now |
| 385 item.response = resp |
| 386 item.quota_scale = quota_scale |
| 387 item.is_flushing = False |
| 388 c[signature] = item |
| 389 |
| 390 def check(self, req): |
| 391 """Determine if ``req`` is in this instances cache. |
| 392 |
| 393 Determine if there are cache hits for the request in this aggregator |
| 394 instance. |
| 395 |
| 396 Not in the cache |
| 397 |
| 398 If req is not in the cache, it returns ``None`` to indicate that the |
| 399 caller should send the request. |
| 400 |
| 401 Cache Hit; response has errors |
| 402 |
| 403 When a cached CheckResponse has errors, it's assumed that ``req`` would |
| 404 fail as well, so the cached CheckResponse is returned. However, the |
| 405 first CheckRequest after the flush interval has elapsed should be sent |
| 406 to the server to refresh the CheckResponse, though until it's received, |
| 407 subsequent CheckRequests should fail with the cached CheckResponse. |
| 408 |
| 409 Cache behaviour - response passed |
| 410 |
| 411 If the cached CheckResponse has no errors, it's assumed that ``req`` |
| 412 will succeed as well, so the CheckResponse is returned, with the quota |
| 413 info updated to the same as requested. The requested tokens are |
| 414 aggregated until flushed. |
| 415 |
| 416 Args: |
| 417 req (``ServicecontrolServicesCheckRequest``): to be sent to |
| 418 the service control service |
| 419 |
| 420 Raises: |
| 421 ValueError: if the ``req`` service_name is not the same as |
| 422 this instances |
| 423 |
| 424 Returns: |
| 425 ``CheckResponse``: if an applicable response is cached by this |
| 426 instance is available for use or None, if there is no applicable |
| 427 response |
| 428 |
| 429 """ |
| 430 if self._cache is None: |
| 431 return None # no cache, send request now |
| 432 if not isinstance(req, messages.ServicecontrolServicesCheckRequest): |
| 433 raise ValueError('Invalid request') |
| 434 if req.serviceName != self.service_name: |
| 435 logger.error('bad check(): service_name %s does not match ours %s', |
| 436 req.serviceName, self.service_name) |
| 437 raise ValueError('Service name mismatch') |
| 438 check_request = req.checkRequest |
| 439 if check_request is None: |
| 440 logger.error('bad check(): no check_request in %s', req) |
| 441 raise ValueError('Expected operation not set') |
| 442 op = check_request.operation |
| 443 if op is None: |
| 444 logger.error('bad check(): no operation in %s', req) |
| 445 raise ValueError('Expected operation not set') |
| 446 if op.importance != messages.Operation.ImportanceValueValuesEnum.LOW: |
| 447 return None # op is important, send request now |
| 448 |
| 449 signature = sign(check_request) |
| 450 with self._cache as cache: |
| 451 logger.debug('checking the cache for %s\n%s', signature, cache) |
| 452 item = cache.get(signature) |
| 453 if item is None: |
| 454 return None # signal to caller to send req |
| 455 else: |
| 456 return self._handle_cached_response(req, item) |
| 457 |
| 458 def _handle_cached_response(self, req, item): |
| 459 with self._cache: # defensive, this re-entrant lock should be held |
| 460 if len(item.response.checkErrors) > 0: |
| 461 if self._is_current(item): |
| 462 return item.response |
| 463 |
| 464 # There are errors, but now it's ok to send a new request |
| 465 item.last_check_time = self._timer() |
| 466 return None # signal caller to send req |
| 467 else: |
| 468 item.update_request(req, self._kinds) |
| 469 if self._is_current(item): |
| 470 return item.response |
| 471 |
| 472 if (item.is_flushing): |
| 473 logger.warn('last refresh request did not complete') |
| 474 |
| 475 item.is_flushing = True |
| 476 item.last_check_time = self._timer() |
| 477 return None # signal caller to send req |
| 478 |
| 479 def _is_current(self, item): |
| 480 age = self._timer() - item.last_check_time |
| 481 return age < self._options.flush_interval |
| 482 |
| 483 |
| 484 class CachedItem(object): |
| 485 """CachedItem holds items cached along with a ``CheckRequest``. |
| 486 |
| 487 Thread compatible. |
| 488 |
| 489 Attributes: |
| 490 response (:class:`messages.CachedResponse`): the cached response |
| 491 is_flushing (bool): indicates if it's been detected that item |
| 492 is stale, and needs to be flushed |
| 493 quota_scale (int): WIP, used to determine quota |
| 494 last_check_time (datetime.datetime): the last time this instance |
| 495 was checked |
| 496 |
| 497 """ |
| 498 |
| 499 def __init__(self, resp, service_name, last_check_time, quota_scale): |
| 500 self.last_check_time = last_check_time |
| 501 self.quota_scale = quota_scale |
| 502 self.is_flushing = False |
| 503 self.response = resp |
| 504 self._service_name = service_name |
| 505 self._op_aggregator = None |
| 506 |
| 507 def update_request(self, req, kinds): |
| 508 agg = self._op_aggregator |
| 509 if agg is None: |
| 510 self._op_aggregator = operation.Aggregator( |
| 511 req.checkRequest.operation, kinds) |
| 512 else: |
| 513 agg.add(req.checkRequest.operation) |
| 514 |
| 515 def extract_request(self): |
| 516 if self._op_aggregator is None: |
| 517 return None |
| 518 |
| 519 op = self._op_aggregator.as_operation() |
| 520 self._op_aggregator = None |
| 521 check_request = messages.CheckRequest(operation=op) |
| 522 return messages.ServicecontrolServicesCheckRequest( |
| 523 serviceName=self._service_name, |
| 524 checkRequest=check_request) |
OLD | NEW |