| OLD | NEW |
| 1 # Copyright 2014 The Chromium Authors. All rights reserved. | 1 # Copyright 2014 The Chromium Authors. All rights reserved. |
| 2 # Use of this source code is governed by a BSD-style license that can be | 2 # Use of this source code is governed by a BSD-style license that can be |
| 3 # found in the LICENSE file. | 3 # found in the LICENSE file. |
| 4 | 4 |
| 5 """deferred_resource converts blocking apiclient resource to deferred.""" | 5 """deferred_resource converts blocking apiclient resource to deferred.""" |
| 6 | 6 |
| 7 import collections | 7 import collections |
| 8 import datetime | 8 import datetime |
| 9 import functools | 9 import functools |
| 10 import httplib | 10 import httplib |
| 11 import ssl | 11 import ssl |
| 12 import threading | 12 import threading |
| 13 import traceback | 13 import traceback |
| 14 | 14 |
| 15 from twisted.internet import defer, reactor, threads | 15 from twisted.internet import defer, reactor, threads |
| 16 from twisted.python import log as twistedLog | 16 from twisted.python import log as twistedLog |
| 17 from twisted.python.threadpool import ThreadPool | 17 from twisted.python.threadpool import ThreadPool |
| 18 import apiclient | 18 import apiclient |
| 19 import apiclient.discovery | 19 import apiclient.discovery |
| 20 import httplib2 | |
| 21 import oauth2client | 20 import oauth2client |
| 22 | 21 |
| 22 from infra_libs import InstrumentedHttp |
| 23 |
| 23 | 24 |
| 24 DEFAULT_RETRY_ATTEMPT_COUNT = 5 | 25 DEFAULT_RETRY_ATTEMPT_COUNT = 5 |
| 25 DEFAULT_RETRY_WAIT_SECONDS = 1 | 26 DEFAULT_RETRY_WAIT_SECONDS = 1 |
| 26 | 27 |
| 27 | 28 |
| 28 if httplib.FORBIDDEN not in oauth2client.client.REFRESH_STATUS_CODES: | 29 if httplib.FORBIDDEN not in oauth2client.client.REFRESH_STATUS_CODES: |
| 29 oauth2client.client.REFRESH_STATUS_CODES.append(httplib.FORBIDDEN) | 30 oauth2client.client.REFRESH_STATUS_CODES.append(httplib.FORBIDDEN) |
| 30 | 31 |
| 31 | 32 |
| 32 class NotStartedError(Exception): | 33 class NotStartedError(Exception): |
| (...skipping 81 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 114 new_prefix = self._path + (name,) | 115 new_prefix = self._path + (name,) |
| 115 sub_api = self._api_cache.get(new_prefix) | 116 sub_api = self._api_cache.get(new_prefix) |
| 116 if not sub_api: | 117 if not sub_api: |
| 117 sub_api = self._owner.Api(self._owner, path=self._path + (name,)) | 118 sub_api = self._owner.Api(self._owner, path=self._path + (name,)) |
| 118 self._api_cache[new_prefix] = sub_api | 119 self._api_cache[new_prefix] = sub_api |
| 119 return sub_api | 120 return sub_api |
| 120 | 121 |
| 121 def __init__( | 122 def __init__( |
| 122 self, resource, credentials=None, max_concurrent_requests=1, | 123 self, resource, credentials=None, max_concurrent_requests=1, |
| 123 retry_wait_seconds=None, retry_attempt_count=None, verbose=False, | 124 retry_wait_seconds=None, retry_attempt_count=None, verbose=False, |
| 124 log_prefix='', timeout=None, _pool=None): | 125 log_prefix='', timeout=None, _pool=None, http_client_name=None): |
| 125 """Creates a DeferredResource. | 126 """Creates a DeferredResource. |
| 126 | 127 |
| 127 Args: | 128 Args: |
| 128 resource (apiclient.Resource): a resource, such as one generated by | 129 resource (apiclient.Resource): a resource, such as one generated by |
| 129 apiclient.discovery.build. | 130 apiclient.discovery.build. |
| 130 credentials (oauth2client.client.Credentials or CredentialFactory): | 131 credentials (oauth2client.client.Credentials or CredentialFactory): |
| 131 credentials to use to make API requests. | 132 credentials to use to make API requests. |
| 132 max_concurrent_requests (int): maximum number of concurrent requests. | 133 max_concurrent_requests (int): maximum number of concurrent requests. |
| 133 Defaults to 1. | 134 Defaults to 1. |
| 134 retry_wait_seconds (int, float): initial wait interval for request | 135 retry_wait_seconds (int, float): initial wait interval for request |
| 135 retrial. In seconds, defaults to 1. | 136 retrial. In seconds, defaults to 1. |
| 136 retry_attempt_count (int): number of attempts before giving up. | 137 retry_attempt_count (int): number of attempts before giving up. |
| 137 Defaults to 5. | 138 Defaults to 5. |
| 138 verbose (bool): if True, log each request/response. | 139 verbose (bool): if True, log each request/response. |
| 139 log_prefix (str): prefix for log messages. | 140 log_prefix (str): prefix for log messages. |
| 140 timeout (int): request timeout in seconds. If None is passed | 141 timeout (int): request timeout in seconds. If None is passed |
| 141 then Python's default timeout for sockets will be used. See | 142 then Python's default timeout for sockets will be used. See |
| 142 for example the docs of socket.setdefaulttimeout(): | 143 for example the docs of socket.setdefaulttimeout(): |
| 143 http://docs.python.org/library/socket.html#socket.setdefaulttimeout | 144 http://docs.python.org/library/socket.html#socket.setdefaulttimeout |
| 145 http_client_name (str): an identifier for the HTTP requests made by this |
| 146 resource. Included with monitoring metrics. |
| 144 """ | 147 """ |
| 145 max_concurrent_requests = max_concurrent_requests or 1 | 148 max_concurrent_requests = max_concurrent_requests or 1 |
| 146 assert resource, 'resource not specified' | 149 assert resource, 'resource not specified' |
| 147 if retry_wait_seconds is None: | 150 if retry_wait_seconds is None: |
| 148 retry_wait_seconds = DEFAULT_RETRY_WAIT_SECONDS | 151 retry_wait_seconds = DEFAULT_RETRY_WAIT_SECONDS |
| 149 assert isinstance(retry_wait_seconds, (int, float)) | 152 assert isinstance(retry_wait_seconds, (int, float)) |
| 150 if retry_attempt_count is None: | 153 if retry_attempt_count is None: |
| 151 retry_attempt_count = DEFAULT_RETRY_ATTEMPT_COUNT | 154 retry_attempt_count = DEFAULT_RETRY_ATTEMPT_COUNT |
| 152 assert isinstance(retry_attempt_count, int) | 155 assert isinstance(retry_attempt_count, int) |
| 156 if http_client_name is None: |
| 157 http_client_name = 'deferred_resource' |
| 153 | 158 |
| 154 self._pool = _pool or self._create_thread_pool(max_concurrent_requests) | 159 self._pool = _pool or self._create_thread_pool(max_concurrent_requests) |
| 155 self._resource = resource | 160 self._resource = resource |
| 156 self.credentials = credentials | 161 self.credentials = credentials |
| 157 self.retry_wait_seconds = retry_wait_seconds | 162 self.retry_wait_seconds = retry_wait_seconds |
| 158 self.retry_attempt_count = retry_attempt_count | 163 self.retry_attempt_count = retry_attempt_count |
| 159 self.verbose = verbose | 164 self.verbose = verbose |
| 160 self.log_prefix = log_prefix | 165 self.log_prefix = log_prefix |
| 161 self.api = self.Api(self) | 166 self.api = self.Api(self) |
| 162 self._th_local = threading.local() | 167 self._th_local = threading.local() |
| 163 self.started = False | 168 self.started = False |
| 164 self.timeout = timeout | 169 self.timeout = timeout |
| 170 self.http_client_name = http_client_name |
| 165 | 171 |
| 166 @classmethod | 172 @classmethod |
| 167 def _create_thread_pool(cls, max_concurrent_requests): | 173 def _create_thread_pool(cls, max_concurrent_requests): |
| 168 return DaemonThreadPool(minthreads=1, maxthreads=max_concurrent_requests) | 174 return DaemonThreadPool(minthreads=1, maxthreads=max_concurrent_requests) |
| 169 | 175 |
| 170 @classmethod | 176 @classmethod |
| 171 def _create_async( | 177 def _create_async( |
| 172 cls, resource_factory, max_concurrent_requests=1, _pool=None, **kwargs): | 178 cls, resource_factory, max_concurrent_requests=1, _pool=None, **kwargs): |
| 173 _pool = _pool or cls._create_thread_pool(max_concurrent_requests) | 179 _pool = _pool or cls._create_thread_pool(max_concurrent_requests) |
| 174 result = defer.Deferred() | 180 result = defer.Deferred() |
| (...skipping 153 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 328 create_creds = False | 334 create_creds = False |
| 329 if getattr(self._th_local, 'http', None) is None: | 335 if getattr(self._th_local, 'http', None) is None: |
| 330 create_creds = True | 336 create_creds = True |
| 331 elif (self._th_local.credentials_expiry is not None and | 337 elif (self._th_local.credentials_expiry is not None and |
| 332 self._th_local.credentials_expiry <= now): | 338 self._th_local.credentials_expiry <= now): |
| 333 create_creds = True | 339 create_creds = True |
| 334 | 340 |
| 335 if create_creds: | 341 if create_creds: |
| 336 self._th_local.credentials = None | 342 self._th_local.credentials = None |
| 337 self._th_local.credentials_expiry = None | 343 self._th_local.credentials_expiry = None |
| 338 self._th_local.http = httplib2.Http(timeout=self.timeout) | 344 self._th_local.http = InstrumentedHttp( |
| 345 self.http_client_name, timeout=self.timeout) |
| 339 if self.credentials is not None: | 346 if self.credentials is not None: |
| 340 creds = self.credentials | 347 creds = self.credentials |
| 341 ttl = None | 348 ttl = None |
| 342 if isinstance(creds, CredentialFactory): | 349 if isinstance(creds, CredentialFactory): |
| 343 ttl = creds.ttl | 350 ttl = creds.ttl |
| 344 creds = creds() | 351 creds = creds() |
| 345 self._th_local.credentials = creds.from_json(creds.to_json()) | 352 self._th_local.credentials = creds.from_json(creds.to_json()) |
| 346 if ttl is not None: | 353 if ttl is not None: |
| 347 self._th_local.credentials_expiry = now + ttl | 354 self._th_local.credentials_expiry = now + ttl |
| 348 self._th_local.http = self._th_local.credentials.authorize( | 355 self._th_local.http = self._th_local.credentials.authorize( |
| (...skipping 23 matching lines...) Expand all Loading... |
| 372 return d | 379 return d |
| 373 | 380 |
| 374 | 381 |
| 375 def is_transient(ex): | 382 def is_transient(ex): |
| 376 if isinstance(ex, apiclient.errors.HttpError) and ex.resp: | 383 if isinstance(ex, apiclient.errors.HttpError) and ex.resp: |
| 377 return ex.resp.status >= 500; | 384 return ex.resp.status >= 500; |
| 378 if isinstance(ex, ssl.SSLError): | 385 if isinstance(ex, ssl.SSLError): |
| 379 # No reason, no errcode. | 386 # No reason, no errcode. |
| 380 return "timed out" in str(ex) | 387 return "timed out" in str(ex) |
| 381 return False | 388 return False |
| OLD | NEW |