| Index: third_party/google-endpoints/google/api/control/client.py
|
| diff --git a/third_party/google-endpoints/google/api/control/client.py b/third_party/google-endpoints/google/api/control/client.py
|
| new file mode 100644
|
| index 0000000000000000000000000000000000000000..06711583e5e5822f1a991295e5eee2fb21d58ef5
|
| --- /dev/null
|
| +++ b/third_party/google-endpoints/google/api/control/client.py
|
| @@ -0,0 +1,434 @@
|
| +# Copyright 2016 Google Inc. All Rights Reserved.
|
| +#
|
| +# Licensed under the Apache License, Version 2.0 (the "License");
|
| +# you may not use this file except in compliance with the License.
|
| +# You may obtain a copy of the License at
|
| +#
|
| +# http://www.apache.org/licenses/LICENSE-2.0
|
| +#
|
| +# Unless required by applicable law or agreed to in writing, software
|
| +# distributed under the License is distributed on an "AS IS" BASIS,
|
| +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
| +# See the License for the specific language governing permissions and
|
| +# limitations under the License.
|
| +
|
| +"""client provides a complete standalone service control client.
|
| +
|
| +:class:`Client` is a package-level facade that encapsulates all service control
|
| +functionality.
|
| +
|
| +The :class:`Loaders` simplify ``Client`` initialization.
|
| +
|
| +``Client`` needs to stop and start a thread to implement its behaviour. In most
|
| +environments, the default thread class is sufficient. However, on Google App Engine,
|
| +it's necessary to use the appengine specific threading class instead.
|
| +
|
| +:func:`use_gae_thread` and `use_default_thread` can be used to change the thread
|
| +class used by new instances of `Client`.
|
| +
|
| +Example:
|
| +
|
| + >>> from google.api.control import client
|
| + >>>
|
| + >>> # use on appengine with package-default settings
|
| + >>> service_name = 'my-appengine-service-name'
|
| + >>> client.use_gae_thread()
|
| + >>> gae_client = client.Loaders.DEFAULT.load(service_name)
|
| + >>> gae_client.start()
|
| +
|
| +"""
|
| +from __future__ import absolute_import
|
| +
|
| +from datetime import datetime, timedelta
|
| +from enum import Enum
|
| +import json
|
| +import logging
|
| +import os
|
| +import threading
|
| +import time
|
| +
|
| +from . import api_client, check_request, report_request
|
| +from . import USER_AGENT
|
| +from google.api.control.caches import CheckOptions, ReportOptions, to_cache_timer
|
| +from google.api.control.vendor.py3 import sched
|
| +
|
| +
|
| +logger = logging.getLogger(__name__)
|
| +
|
| +
|
| +CONFIG_VAR = 'ENDPOINTS_SERVER_CONFIG_FILE'
|
| +
|
| +
|
| +def _load_from_well_known_env():
|
| + if CONFIG_VAR not in os.environ:
|
| + logger.info('did not load server config; no environ var %s', CONFIG_VAR)
|
| + return _load_default()
|
| + json_file = os.environ[CONFIG_VAR]
|
| + if not os.path.exists(json_file):
|
| + logger.warn('did not load service; missing config file %s', json_file)
|
| + return _load_default()
|
| + try:
|
| + with open(json_file) as f:
|
| + json_dict = json.load(f)
|
| + check_json = json_dict['checkAggregatorConfig']
|
| + report_json = json_dict['reportAggregatorConfig']
|
| + check_options = CheckOptions(
|
| + num_entries=check_json['cacheEntries'],
|
| + expiration=timedelta(
|
| + milliseconds=check_json['responseExpirationMs']),
|
| + flush_interval=timedelta(
|
| + milliseconds=check_json['flushIntervalMs']))
|
| + report_options = ReportOptions(
|
| + num_entries=report_json['cacheEntries'],
|
| + flush_interval=timedelta(
|
| + milliseconds=report_json['flushIntervalMs']))
|
| + return check_options, report_options
|
| + except (KeyError, ValueError):
|
| + logger.warn('did not load service; bad json config file %s',
|
| + json_file,
|
| + exc_info=True)
|
| + return _load_default()
|
| +
|
| +
|
| +def _load_default():
|
| + return CheckOptions(), ReportOptions()
|
| +
|
| +
|
| +def _load_no_cache():
|
| + return (CheckOptions(num_entries=-1),
|
| + ReportOptions(num_entries=-1))
|
| +
|
| +
|
| +class Loaders(Enum):
|
| + """Enumerates the functions used to load clients from server configs."""
|
| + # pylint: disable=too-few-public-methods
|
| + ENVIRONMENT = (_load_from_well_known_env,)
|
| + DEFAULT = (_load_default,)
|
| + NO_CACHE = (_load_no_cache,)
|
| +
|
| + def __init__(self, load_func):
|
| + """Constructor.
|
| +
|
| + load_func is used to load a client config
|
| + """
|
| + self._load_func = load_func
|
| +
|
| + def load(self, service_name, **kw):
|
| + check_opts, report_opts = self._load_func()
|
| + return Client(service_name, check_opts, report_opts, **kw)
|
| +
|
| +
|
| +_THREAD_CLASS = threading.Thread
|
| +
|
| +
|
| +def _create_http_transport():
|
| + additional_http_headers = {"user-agent": USER_AGENT}
|
| + do_logging = logger.level <= logging.DEBUG
|
| + return api_client.ServicecontrolV1(
|
| + additional_http_headers=additional_http_headers,
|
| + log_request=do_logging,
|
| + log_response=do_logging)
|
| +
|
| +
|
| +def _thread_local_http_transport_func():
|
| + local = threading.local()
|
| +
|
| + def create_transport():
|
| + if not getattr(local, "transport", None):
|
| + local.transport = _create_http_transport()
|
| + return local.transport
|
| +
|
| + return create_transport
|
| +
|
| +
|
| +_CREATE_THREAD_LOCAL_TRANSPORT = _thread_local_http_transport_func()
|
| +
|
| +
|
| +class Client(object):
|
| + """Client is a package-level facade that encapsulates all service control
|
| + functionality.
|
| +
|
| + Using one of the :class:`Loaders` makes it easy to initialize ``Client``
|
| + instances.
|
| +
|
| + Example:
|
| +
|
| + >>> from google.api.control import client
|
| + >>> service_name = 'my-service-name'
|
| + >>>
|
| + >>> # create an scc client using the package default values
|
| + >>> default_client = client.Loaders.DEFAULT.load(service_name)
|
| +
|
| + >>> # create an scc client by loading configuration from the
|
| + >>> # a JSON file configured by an environment variable
|
| + >>> json_conf_client = client.Loaders.ENVIRONMENT.load(service_name)
|
| +
|
| + Client is thread-compatible
|
| +
|
| + """
|
| + # pylint: disable=too-many-instance-attributes, too-many-arguments
|
| +
|
| + def __init__(self,
|
| + service_name,
|
| + check_options,
|
| + report_options,
|
| + timer=datetime.utcnow,
|
| + create_transport=_CREATE_THREAD_LOCAL_TRANSPORT):
|
| + """
|
| +
|
| + Args:
|
| + service_name (str): the name of the service to be controlled
|
| + check_options (:class:`google.api.control.caches.CheckOptions`):
|
| + configures checking
|
| + report_options (:class:`google.api.control.caches.ReportOptions`):
|
| + configures reporting
|
| + timer (:func[[datetime.datetime]]: used to obtain the current time.
|
| + """
|
| + self._check_aggregator = check_request.Aggregator(service_name,
|
| + check_options,
|
| + timer=timer)
|
| + self._report_aggregator = report_request.Aggregator(service_name,
|
| + report_options,
|
| + timer=timer)
|
| + self._running = False
|
| + self._scheduler = None
|
| + self._stopped = False
|
| + self._timer = timer
|
| + self._thread = None
|
| + self._create_transport = create_transport
|
| + self._lock = threading.RLock()
|
| +
|
| + def start(self):
|
| + """Starts processing.
|
| +
|
| + Calling this method
|
| +
|
| + - starts the thread that regularly flushes all enabled caches.
|
| + - enables the other methods on the instance to be called successfully
|
| +
|
| + I.e, even when the configuration disables aggregation, it is invalid to
|
| + access the other methods of an instance until ``start`` is called -
|
| + Calls to other public methods will fail with an AssertionError.
|
| +
|
| + """
|
| + with self._lock:
|
| + if self._running:
|
| + logger.info('%s is already started', self)
|
| + return
|
| +
|
| + self._stopped = False
|
| + self._running = True
|
| + logger.info('starting thread of type %s to run the scheduler',
|
| + _THREAD_CLASS)
|
| + self._thread = _THREAD_CLASS(target=self._schedule_flushes)
|
| + try:
|
| + self._thread.start()
|
| + except Exception: # pylint: disable=broad-except
|
| + logger.warn(
|
| + 'no scheduler thread, scheduler.run() will be invoked by report(...)',
|
| + exc_info=True)
|
| + self._thread = None
|
| + self._initialize_flushing()
|
| +
|
| + def stop(self):
|
| + """Halts processing
|
| +
|
| + This will lead to the reports being flushed, the caches being cleared
|
| + and a stop to the current processing thread.
|
| +
|
| + """
|
| + with self._lock:
|
| + if self._stopped:
|
| + logger.info('%s is already stopped', self)
|
| + return
|
| +
|
| + self._flush_all_reports()
|
| + self._stopped = True
|
| + if self._run_scheduler_directly:
|
| + self._cleanup_if_stopped()
|
| +
|
| + if self._scheduler and self._scheduler.empty():
|
| + # if there are events scheduled, then _running will subsequently
|
| + # be set False by the scheduler thread. This handles the
|
| + # case where there are no events, e.g because all aggreagation
|
| + # was disabled
|
| + self._running = False
|
| + self._scheduler = None
|
| +
|
| + def check(self, check_req):
|
| + """Process a check_request.
|
| +
|
| + The req is first passed to the check_aggregator. If there is a valid
|
| + cached response, that is returned, otherwise a response is obtained from
|
| + the transport.
|
| +
|
| + Args:
|
| + check_req (``ServicecontrolServicesCheckRequest``): to be sent to
|
| + the service control service
|
| +
|
| + Returns:
|
| + ``CheckResponse``: either the cached response if one is applicable
|
| + or a response from making a transport request, or None if
|
| + if the request to the transport fails
|
| +
|
| + """
|
| +
|
| + self._assert_is_running()
|
| + res = self._check_aggregator.check(check_req)
|
| + if res:
|
| + logger.debug('using cached check response for %s: %s',
|
| + check_request, res)
|
| + return res
|
| +
|
| + # Application code should not fail because check request's don't
|
| + # complete, They should fail open, so here simply log the error and
|
| + # return None to indicate that no response was obtained
|
| + try:
|
| + transport = self._create_transport()
|
| + resp = transport.services.check(check_req)
|
| + self._check_aggregator.add_response(check_req, resp)
|
| + return resp
|
| + except Exception: # pylint: disable=broad-except
|
| + logger.error('direct send of check request failed %s',
|
| + check_request, exc_info=True)
|
| + return None
|
| +
|
| + def report(self, report_req):
|
| + """Processes a report request.
|
| +
|
| + It will aggregate it with prior report_requests to be send later
|
| + or it will send it immediately if that's appropriate.
|
| + """
|
| + self._assert_is_running()
|
| +
|
| + # no thread running, run the scheduler to ensure any pending
|
| + # flush tasks are executed.
|
| + if self._run_scheduler_directly:
|
| + self._scheduler.run(blocking=False)
|
| +
|
| + if not self._report_aggregator.report(report_req):
|
| + logger.info('need to send a report request directly')
|
| + try:
|
| + transport = self._create_transport()
|
| + transport.services.report(report_req)
|
| + except Exception: # pylint: disable=broad-except
|
| + logger.error('direct send for report request failed',
|
| + exc_info=True)
|
| +
|
| + @property
|
| + def _run_scheduler_directly(self):
|
| + return self._running and self._thread is None
|
| +
|
| + def _assert_is_running(self):
|
| + assert self._running, '%s needs to be running' % (self,)
|
| +
|
| + def _initialize_flushing(self):
|
| + with self._lock:
|
| + logger.info('created a scheduler to control flushing')
|
| + self._scheduler = sched.scheduler(to_cache_timer(self._timer),
|
| + time.sleep)
|
| + logger.info('scheduling initial check and flush')
|
| + self._flush_schedule_check_aggregator()
|
| + self._flush_schedule_report_aggregator()
|
| +
|
| + def _schedule_flushes(self):
|
| + # the method expects to be run in the thread created in start()
|
| + self._initialize_flushing()
|
| + self._scheduler.run() # should block until self._stopped is set
|
| + logger.info('scheduler.run completed, %s will exit', threading.current_thread())
|
| +
|
| + def _cleanup_if_stopped(self):
|
| + if not self._stopped:
|
| + return False
|
| +
|
| + self._check_aggregator.clear()
|
| + self._report_aggregator.clear()
|
| + self._running = False
|
| + return True
|
| +
|
| + def _flush_schedule_check_aggregator(self):
|
| + if self._cleanup_if_stopped():
|
| + logger.info('did not schedule check flush: client is stopped')
|
| + return
|
| +
|
| + flush_interval = self._check_aggregator.flush_interval
|
| + if not flush_interval or flush_interval.total_seconds() < 0:
|
| + logger.debug('did not schedule check flush: caching is disabled')
|
| + return
|
| +
|
| + if self._run_scheduler_directly:
|
| + logger.debug('did not schedule check flush: no scheduler thread')
|
| + return
|
| +
|
| + logger.debug('flushing the check aggregator')
|
| + transport = self._create_transport()
|
| + for req in self._check_aggregator.flush():
|
| + try:
|
| + resp = transport.services.check(req)
|
| + self._check_aggregator.add_response(req, resp)
|
| + except Exception: # pylint: disable=broad-except
|
| + logger.error('failed to flush check_req %s', req, exc_info=True)
|
| +
|
| + # schedule a repeat of this method
|
| + self._scheduler.enter(
|
| + flush_interval.total_seconds(),
|
| + 2, # a higher priority than report flushes
|
| + self._flush_schedule_check_aggregator,
|
| + ()
|
| + )
|
| +
|
| + def _flush_schedule_report_aggregator(self):
|
| + if self._cleanup_if_stopped():
|
| + logger.info('did not schedule report flush: client is stopped')
|
| + return
|
| +
|
| + flush_interval = self._report_aggregator.flush_interval
|
| + if not flush_interval or flush_interval.total_seconds() < 0:
|
| + logger.debug('did not schedule report flush: caching is disabled')
|
| + return
|
| +
|
| + # flush reports and schedule a repeat of this method
|
| + transport = self._create_transport()
|
| + reqs = self._report_aggregator.flush()
|
| + logger.debug("will flush %d report requests", len(reqs))
|
| + for req in reqs:
|
| + try:
|
| + transport.services.report(req)
|
| + except Exception: # pylint: disable=broad-except
|
| + logger.error('failed to flush report_req %s', req, exc_info=True)
|
| +
|
| + self._scheduler.enter(
|
| + flush_interval.total_seconds(),
|
| + 1, # a lower priority than check flushes
|
| + self._flush_schedule_report_aggregator,
|
| + ()
|
| + )
|
| +
|
| + def _flush_all_reports(self):
|
| + all_requests = self._report_aggregator.clear()
|
| + logger.info('flushing all reports (count=%d)', len(all_requests))
|
| + transport = self._create_transport()
|
| + for req in all_requests:
|
| + try:
|
| + transport.services.report(req)
|
| + except Exception: # pylint: disable=broad-except
|
| + logger.error('failed to flush report_req %s', req, exc_info=True)
|
| +
|
| +
|
| +def use_default_thread():
|
| + """Makes ``Client``s started after this use the standard Thread class."""
|
| + global _THREAD_CLASS # pylint: disable=global-statement
|
| + _THREAD_CLASS = threading.Thread
|
| +
|
| +
|
| +def use_gae_thread():
|
| + """Makes ``Client``s started after this use the appengine thread class."""
|
| + global _THREAD_CLASS # pylint: disable=global-statement
|
| + try:
|
| + from google.appengine.api.background_thread import background_thread
|
| + _THREAD_CLASS = background_thread.BackgroundThread
|
| + except ImportError:
|
| + logger.error(
|
| + 'Could not install appengine background threads!'
|
| + ' Please install the python AppEngine SDK and use this from there'
|
| + )
|
|
|