OLD | NEW |
(Empty) | |
| 1 # Copyright 2016 Google Inc. All Rights Reserved. |
| 2 # |
| 3 # Licensed under the Apache License, Version 2.0 (the "License"); |
| 4 # you may not use this file except in compliance with the License. |
| 5 # You may obtain a copy of the License at |
| 6 # |
| 7 # http://www.apache.org/licenses/LICENSE-2.0 |
| 8 # |
| 9 # Unless required by applicable law or agreed to in writing, software |
| 10 # distributed under the License is distributed on an "AS IS" BASIS, |
| 11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 12 # See the License for the specific language governing permissions and |
| 13 # limitations under the License. |
| 14 |
| 15 """client provides a complete standalone service control client. |
| 16 |
| 17 :class:`Client` is a package-level facade that encapsulates all service control |
| 18 functionality. |
| 19 |
| 20 The :class:`Loaders` simplify ``Client`` initialization. |
| 21 |
| 22 ``Client`` needs to stop and start a thread to implement its behaviour. In most |
| 23 environments, the default thread class is sufficient. However, on Google App En
gine, |
| 24 it's necessary to use the appengine specific threading class instead. |
| 25 |
| 26 :func:`use_gae_thread` and `use_default_thread` can be used to change the thread |
| 27 class used by new instances of `Client`. |
| 28 |
| 29 Example: |
| 30 |
| 31 >>> from google.api.control import client |
| 32 >>> |
| 33 >>> # use on appengine with package-default settings |
| 34 >>> service_name = 'my-appengine-service-name' |
| 35 >>> client.use_gae_thread() |
| 36 >>> gae_client = client.Loaders.DEFAULT.load(service_name) |
| 37 >>> gae_client.start() |
| 38 |
| 39 """ |
| 40 from __future__ import absolute_import |
| 41 |
| 42 from datetime import datetime, timedelta |
| 43 from enum import Enum |
| 44 import json |
| 45 import logging |
| 46 import os |
| 47 import threading |
| 48 import time |
| 49 |
| 50 from . import api_client, check_request, report_request |
| 51 from . import USER_AGENT |
| 52 from google.api.control.caches import CheckOptions, ReportOptions, to_cache_time
r |
| 53 from google.api.control.vendor.py3 import sched |
| 54 |
| 55 |
| 56 logger = logging.getLogger(__name__) |
| 57 |
| 58 |
| 59 CONFIG_VAR = 'ENDPOINTS_SERVER_CONFIG_FILE' |
| 60 |
| 61 |
| 62 def _load_from_well_known_env(): |
| 63 if CONFIG_VAR not in os.environ: |
| 64 logger.info('did not load server config; no environ var %s', CONFIG_VAR) |
| 65 return _load_default() |
| 66 json_file = os.environ[CONFIG_VAR] |
| 67 if not os.path.exists(json_file): |
| 68 logger.warn('did not load service; missing config file %s', json_file) |
| 69 return _load_default() |
| 70 try: |
| 71 with open(json_file) as f: |
| 72 json_dict = json.load(f) |
| 73 check_json = json_dict['checkAggregatorConfig'] |
| 74 report_json = json_dict['reportAggregatorConfig'] |
| 75 check_options = CheckOptions( |
| 76 num_entries=check_json['cacheEntries'], |
| 77 expiration=timedelta( |
| 78 milliseconds=check_json['responseExpirationMs']), |
| 79 flush_interval=timedelta( |
| 80 milliseconds=check_json['flushIntervalMs'])) |
| 81 report_options = ReportOptions( |
| 82 num_entries=report_json['cacheEntries'], |
| 83 flush_interval=timedelta( |
| 84 milliseconds=report_json['flushIntervalMs'])) |
| 85 return check_options, report_options |
| 86 except (KeyError, ValueError): |
| 87 logger.warn('did not load service; bad json config file %s', |
| 88 json_file, |
| 89 exc_info=True) |
| 90 return _load_default() |
| 91 |
| 92 |
| 93 def _load_default(): |
| 94 return CheckOptions(), ReportOptions() |
| 95 |
| 96 |
| 97 def _load_no_cache(): |
| 98 return (CheckOptions(num_entries=-1), |
| 99 ReportOptions(num_entries=-1)) |
| 100 |
| 101 |
| 102 class Loaders(Enum): |
| 103 """Enumerates the functions used to load clients from server configs.""" |
| 104 # pylint: disable=too-few-public-methods |
| 105 ENVIRONMENT = (_load_from_well_known_env,) |
| 106 DEFAULT = (_load_default,) |
| 107 NO_CACHE = (_load_no_cache,) |
| 108 |
| 109 def __init__(self, load_func): |
| 110 """Constructor. |
| 111 |
| 112 load_func is used to load a client config |
| 113 """ |
| 114 self._load_func = load_func |
| 115 |
| 116 def load(self, service_name, **kw): |
| 117 check_opts, report_opts = self._load_func() |
| 118 return Client(service_name, check_opts, report_opts, **kw) |
| 119 |
| 120 |
| 121 _THREAD_CLASS = threading.Thread |
| 122 |
| 123 |
| 124 def _create_http_transport(): |
| 125 additional_http_headers = {"user-agent": USER_AGENT} |
| 126 do_logging = logger.level <= logging.DEBUG |
| 127 return api_client.ServicecontrolV1( |
| 128 additional_http_headers=additional_http_headers, |
| 129 log_request=do_logging, |
| 130 log_response=do_logging) |
| 131 |
| 132 |
| 133 def _thread_local_http_transport_func(): |
| 134 local = threading.local() |
| 135 |
| 136 def create_transport(): |
| 137 if not getattr(local, "transport", None): |
| 138 local.transport = _create_http_transport() |
| 139 return local.transport |
| 140 |
| 141 return create_transport |
| 142 |
| 143 |
| 144 _CREATE_THREAD_LOCAL_TRANSPORT = _thread_local_http_transport_func() |
| 145 |
| 146 |
| 147 class Client(object): |
| 148 """Client is a package-level facade that encapsulates all service control |
| 149 functionality. |
| 150 |
| 151 Using one of the :class:`Loaders` makes it easy to initialize ``Client`` |
| 152 instances. |
| 153 |
| 154 Example: |
| 155 |
| 156 >>> from google.api.control import client |
| 157 >>> service_name = 'my-service-name' |
| 158 >>> |
| 159 >>> # create an scc client using the package default values |
| 160 >>> default_client = client.Loaders.DEFAULT.load(service_name) |
| 161 |
| 162 >>> # create an scc client by loading configuration from the |
| 163 >>> # a JSON file configured by an environment variable |
| 164 >>> json_conf_client = client.Loaders.ENVIRONMENT.load(service_name) |
| 165 |
| 166 Client is thread-compatible |
| 167 |
| 168 """ |
| 169 # pylint: disable=too-many-instance-attributes, too-many-arguments |
| 170 |
| 171 def __init__(self, |
| 172 service_name, |
| 173 check_options, |
| 174 report_options, |
| 175 timer=datetime.utcnow, |
| 176 create_transport=_CREATE_THREAD_LOCAL_TRANSPORT): |
| 177 """ |
| 178 |
| 179 Args: |
| 180 service_name (str): the name of the service to be controlled |
| 181 check_options (:class:`google.api.control.caches.CheckOptions`): |
| 182 configures checking |
| 183 report_options (:class:`google.api.control.caches.ReportOptions`): |
| 184 configures reporting |
| 185 timer (:func[[datetime.datetime]]: used to obtain the current time. |
| 186 """ |
| 187 self._check_aggregator = check_request.Aggregator(service_name, |
| 188 check_options, |
| 189 timer=timer) |
| 190 self._report_aggregator = report_request.Aggregator(service_name, |
| 191 report_options, |
| 192 timer=timer) |
| 193 self._running = False |
| 194 self._scheduler = None |
| 195 self._stopped = False |
| 196 self._timer = timer |
| 197 self._thread = None |
| 198 self._create_transport = create_transport |
| 199 self._lock = threading.RLock() |
| 200 |
| 201 def start(self): |
| 202 """Starts processing. |
| 203 |
| 204 Calling this method |
| 205 |
| 206 - starts the thread that regularly flushes all enabled caches. |
| 207 - enables the other methods on the instance to be called successfully |
| 208 |
| 209 I.e, even when the configuration disables aggregation, it is invalid to |
| 210 access the other methods of an instance until ``start`` is called - |
| 211 Calls to other public methods will fail with an AssertionError. |
| 212 |
| 213 """ |
| 214 with self._lock: |
| 215 if self._running: |
| 216 logger.info('%s is already started', self) |
| 217 return |
| 218 |
| 219 self._stopped = False |
| 220 self._running = True |
| 221 logger.info('starting thread of type %s to run the scheduler', |
| 222 _THREAD_CLASS) |
| 223 self._thread = _THREAD_CLASS(target=self._schedule_flushes) |
| 224 try: |
| 225 self._thread.start() |
| 226 except Exception: # pylint: disable=broad-except |
| 227 logger.warn( |
| 228 'no scheduler thread, scheduler.run() will be invoked by rep
ort(...)', |
| 229 exc_info=True) |
| 230 self._thread = None |
| 231 self._initialize_flushing() |
| 232 |
| 233 def stop(self): |
| 234 """Halts processing |
| 235 |
| 236 This will lead to the reports being flushed, the caches being cleared |
| 237 and a stop to the current processing thread. |
| 238 |
| 239 """ |
| 240 with self._lock: |
| 241 if self._stopped: |
| 242 logger.info('%s is already stopped', self) |
| 243 return |
| 244 |
| 245 self._flush_all_reports() |
| 246 self._stopped = True |
| 247 if self._run_scheduler_directly: |
| 248 self._cleanup_if_stopped() |
| 249 |
| 250 if self._scheduler and self._scheduler.empty(): |
| 251 # if there are events scheduled, then _running will subsequently |
| 252 # be set False by the scheduler thread. This handles the |
| 253 # case where there are no events, e.g because all aggreagation |
| 254 # was disabled |
| 255 self._running = False |
| 256 self._scheduler = None |
| 257 |
| 258 def check(self, check_req): |
| 259 """Process a check_request. |
| 260 |
| 261 The req is first passed to the check_aggregator. If there is a valid |
| 262 cached response, that is returned, otherwise a response is obtained from |
| 263 the transport. |
| 264 |
| 265 Args: |
| 266 check_req (``ServicecontrolServicesCheckRequest``): to be sent to |
| 267 the service control service |
| 268 |
| 269 Returns: |
| 270 ``CheckResponse``: either the cached response if one is applicable |
| 271 or a response from making a transport request, or None if |
| 272 if the request to the transport fails |
| 273 |
| 274 """ |
| 275 |
| 276 self._assert_is_running() |
| 277 res = self._check_aggregator.check(check_req) |
| 278 if res: |
| 279 logger.debug('using cached check response for %s: %s', |
| 280 check_request, res) |
| 281 return res |
| 282 |
| 283 # Application code should not fail because check request's don't |
| 284 # complete, They should fail open, so here simply log the error and |
| 285 # return None to indicate that no response was obtained |
| 286 try: |
| 287 transport = self._create_transport() |
| 288 resp = transport.services.check(check_req) |
| 289 self._check_aggregator.add_response(check_req, resp) |
| 290 return resp |
| 291 except Exception: # pylint: disable=broad-except |
| 292 logger.error('direct send of check request failed %s', |
| 293 check_request, exc_info=True) |
| 294 return None |
| 295 |
| 296 def report(self, report_req): |
| 297 """Processes a report request. |
| 298 |
| 299 It will aggregate it with prior report_requests to be send later |
| 300 or it will send it immediately if that's appropriate. |
| 301 """ |
| 302 self._assert_is_running() |
| 303 |
| 304 # no thread running, run the scheduler to ensure any pending |
| 305 # flush tasks are executed. |
| 306 if self._run_scheduler_directly: |
| 307 self._scheduler.run(blocking=False) |
| 308 |
| 309 if not self._report_aggregator.report(report_req): |
| 310 logger.info('need to send a report request directly') |
| 311 try: |
| 312 transport = self._create_transport() |
| 313 transport.services.report(report_req) |
| 314 except Exception: # pylint: disable=broad-except |
| 315 logger.error('direct send for report request failed', |
| 316 exc_info=True) |
| 317 |
| 318 @property |
| 319 def _run_scheduler_directly(self): |
| 320 return self._running and self._thread is None |
| 321 |
| 322 def _assert_is_running(self): |
| 323 assert self._running, '%s needs to be running' % (self,) |
| 324 |
| 325 def _initialize_flushing(self): |
| 326 with self._lock: |
| 327 logger.info('created a scheduler to control flushing') |
| 328 self._scheduler = sched.scheduler(to_cache_timer(self._timer), |
| 329 time.sleep) |
| 330 logger.info('scheduling initial check and flush') |
| 331 self._flush_schedule_check_aggregator() |
| 332 self._flush_schedule_report_aggregator() |
| 333 |
| 334 def _schedule_flushes(self): |
| 335 # the method expects to be run in the thread created in start() |
| 336 self._initialize_flushing() |
| 337 self._scheduler.run() # should block until self._stopped is set |
| 338 logger.info('scheduler.run completed, %s will exit', threading.current_t
hread()) |
| 339 |
| 340 def _cleanup_if_stopped(self): |
| 341 if not self._stopped: |
| 342 return False |
| 343 |
| 344 self._check_aggregator.clear() |
| 345 self._report_aggregator.clear() |
| 346 self._running = False |
| 347 return True |
| 348 |
| 349 def _flush_schedule_check_aggregator(self): |
| 350 if self._cleanup_if_stopped(): |
| 351 logger.info('did not schedule check flush: client is stopped') |
| 352 return |
| 353 |
| 354 flush_interval = self._check_aggregator.flush_interval |
| 355 if not flush_interval or flush_interval.total_seconds() < 0: |
| 356 logger.debug('did not schedule check flush: caching is disabled') |
| 357 return |
| 358 |
| 359 if self._run_scheduler_directly: |
| 360 logger.debug('did not schedule check flush: no scheduler thread') |
| 361 return |
| 362 |
| 363 logger.debug('flushing the check aggregator') |
| 364 transport = self._create_transport() |
| 365 for req in self._check_aggregator.flush(): |
| 366 try: |
| 367 resp = transport.services.check(req) |
| 368 self._check_aggregator.add_response(req, resp) |
| 369 except Exception: # pylint: disable=broad-except |
| 370 logger.error('failed to flush check_req %s', req, exc_info=True) |
| 371 |
| 372 # schedule a repeat of this method |
| 373 self._scheduler.enter( |
| 374 flush_interval.total_seconds(), |
| 375 2, # a higher priority than report flushes |
| 376 self._flush_schedule_check_aggregator, |
| 377 () |
| 378 ) |
| 379 |
| 380 def _flush_schedule_report_aggregator(self): |
| 381 if self._cleanup_if_stopped(): |
| 382 logger.info('did not schedule report flush: client is stopped') |
| 383 return |
| 384 |
| 385 flush_interval = self._report_aggregator.flush_interval |
| 386 if not flush_interval or flush_interval.total_seconds() < 0: |
| 387 logger.debug('did not schedule report flush: caching is disabled') |
| 388 return |
| 389 |
| 390 # flush reports and schedule a repeat of this method |
| 391 transport = self._create_transport() |
| 392 reqs = self._report_aggregator.flush() |
| 393 logger.debug("will flush %d report requests", len(reqs)) |
| 394 for req in reqs: |
| 395 try: |
| 396 transport.services.report(req) |
| 397 except Exception: # pylint: disable=broad-except |
| 398 logger.error('failed to flush report_req %s', req, exc_info=True
) |
| 399 |
| 400 self._scheduler.enter( |
| 401 flush_interval.total_seconds(), |
| 402 1, # a lower priority than check flushes |
| 403 self._flush_schedule_report_aggregator, |
| 404 () |
| 405 ) |
| 406 |
| 407 def _flush_all_reports(self): |
| 408 all_requests = self._report_aggregator.clear() |
| 409 logger.info('flushing all reports (count=%d)', len(all_requests)) |
| 410 transport = self._create_transport() |
| 411 for req in all_requests: |
| 412 try: |
| 413 transport.services.report(req) |
| 414 except Exception: # pylint: disable=broad-except |
| 415 logger.error('failed to flush report_req %s', req, exc_info=True
) |
| 416 |
| 417 |
| 418 def use_default_thread(): |
| 419 """Makes ``Client``s started after this use the standard Thread class.""" |
| 420 global _THREAD_CLASS # pylint: disable=global-statement |
| 421 _THREAD_CLASS = threading.Thread |
| 422 |
| 423 |
| 424 def use_gae_thread(): |
| 425 """Makes ``Client``s started after this use the appengine thread class.""" |
| 426 global _THREAD_CLASS # pylint: disable=global-statement |
| 427 try: |
| 428 from google.appengine.api.background_thread import background_thread |
| 429 _THREAD_CLASS = background_thread.BackgroundThread |
| 430 except ImportError: |
| 431 logger.error( |
| 432 'Could not install appengine background threads!' |
| 433 ' Please install the python AppEngine SDK and use this from there' |
| 434 ) |
OLD | NEW |