Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(618)

Side by Side Diff: third_party/google-endpoints/google/api/control/client.py

Issue 2666783008: Add google-endpoints to third_party/. (Closed)
Patch Set: Created 3 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 # Copyright 2016 Google Inc. All Rights Reserved.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 """client provides a complete standalone service control client.
16
17 :class:`Client` is a package-level facade that encapsulates all service control
18 functionality.
19
20 The :class:`Loaders` simplify ``Client`` initialization.
21
22 ``Client`` needs to stop and start a thread to implement its behaviour. In most
23 environments, the default thread class is sufficient. However, on Google App En gine,
24 it's necessary to use the appengine specific threading class instead.
25
26 :func:`use_gae_thread` and `use_default_thread` can be used to change the thread
27 class used by new instances of `Client`.
28
29 Example:
30
31 >>> from google.api.control import client
32 >>>
33 >>> # use on appengine with package-default settings
34 >>> service_name = 'my-appengine-service-name'
35 >>> client.use_gae_thread()
36 >>> gae_client = client.Loaders.DEFAULT.load(service_name)
37 >>> gae_client.start()
38
39 """
40 from __future__ import absolute_import
41
42 from datetime import datetime, timedelta
43 from enum import Enum
44 import json
45 import logging
46 import os
47 import threading
48 import time
49
50 from . import api_client, check_request, report_request
51 from . import USER_AGENT
52 from google.api.control.caches import CheckOptions, ReportOptions, to_cache_time r
53 from google.api.control.vendor.py3 import sched
54
55
56 logger = logging.getLogger(__name__)
57
58
59 CONFIG_VAR = 'ENDPOINTS_SERVER_CONFIG_FILE'
60
61
62 def _load_from_well_known_env():
63 if CONFIG_VAR not in os.environ:
64 logger.info('did not load server config; no environ var %s', CONFIG_VAR)
65 return _load_default()
66 json_file = os.environ[CONFIG_VAR]
67 if not os.path.exists(json_file):
68 logger.warn('did not load service; missing config file %s', json_file)
69 return _load_default()
70 try:
71 with open(json_file) as f:
72 json_dict = json.load(f)
73 check_json = json_dict['checkAggregatorConfig']
74 report_json = json_dict['reportAggregatorConfig']
75 check_options = CheckOptions(
76 num_entries=check_json['cacheEntries'],
77 expiration=timedelta(
78 milliseconds=check_json['responseExpirationMs']),
79 flush_interval=timedelta(
80 milliseconds=check_json['flushIntervalMs']))
81 report_options = ReportOptions(
82 num_entries=report_json['cacheEntries'],
83 flush_interval=timedelta(
84 milliseconds=report_json['flushIntervalMs']))
85 return check_options, report_options
86 except (KeyError, ValueError):
87 logger.warn('did not load service; bad json config file %s',
88 json_file,
89 exc_info=True)
90 return _load_default()
91
92
93 def _load_default():
94 return CheckOptions(), ReportOptions()
95
96
97 def _load_no_cache():
98 return (CheckOptions(num_entries=-1),
99 ReportOptions(num_entries=-1))
100
101
102 class Loaders(Enum):
103 """Enumerates the functions used to load clients from server configs."""
104 # pylint: disable=too-few-public-methods
105 ENVIRONMENT = (_load_from_well_known_env,)
106 DEFAULT = (_load_default,)
107 NO_CACHE = (_load_no_cache,)
108
109 def __init__(self, load_func):
110 """Constructor.
111
112 load_func is used to load a client config
113 """
114 self._load_func = load_func
115
116 def load(self, service_name, **kw):
117 check_opts, report_opts = self._load_func()
118 return Client(service_name, check_opts, report_opts, **kw)
119
120
121 _THREAD_CLASS = threading.Thread
122
123
124 def _create_http_transport():
125 additional_http_headers = {"user-agent": USER_AGENT}
126 do_logging = logger.level <= logging.DEBUG
127 return api_client.ServicecontrolV1(
128 additional_http_headers=additional_http_headers,
129 log_request=do_logging,
130 log_response=do_logging)
131
132
133 def _thread_local_http_transport_func():
134 local = threading.local()
135
136 def create_transport():
137 if not getattr(local, "transport", None):
138 local.transport = _create_http_transport()
139 return local.transport
140
141 return create_transport
142
143
144 _CREATE_THREAD_LOCAL_TRANSPORT = _thread_local_http_transport_func()
145
146
147 class Client(object):
148 """Client is a package-level facade that encapsulates all service control
149 functionality.
150
151 Using one of the :class:`Loaders` makes it easy to initialize ``Client``
152 instances.
153
154 Example:
155
156 >>> from google.api.control import client
157 >>> service_name = 'my-service-name'
158 >>>
159 >>> # create an scc client using the package default values
160 >>> default_client = client.Loaders.DEFAULT.load(service_name)
161
162 >>> # create an scc client by loading configuration from the
163 >>> # a JSON file configured by an environment variable
164 >>> json_conf_client = client.Loaders.ENVIRONMENT.load(service_name)
165
166 Client is thread-compatible
167
168 """
169 # pylint: disable=too-many-instance-attributes, too-many-arguments
170
171 def __init__(self,
172 service_name,
173 check_options,
174 report_options,
175 timer=datetime.utcnow,
176 create_transport=_CREATE_THREAD_LOCAL_TRANSPORT):
177 """
178
179 Args:
180 service_name (str): the name of the service to be controlled
181 check_options (:class:`google.api.control.caches.CheckOptions`):
182 configures checking
183 report_options (:class:`google.api.control.caches.ReportOptions`):
184 configures reporting
185 timer (:func[[datetime.datetime]]: used to obtain the current time.
186 """
187 self._check_aggregator = check_request.Aggregator(service_name,
188 check_options,
189 timer=timer)
190 self._report_aggregator = report_request.Aggregator(service_name,
191 report_options,
192 timer=timer)
193 self._running = False
194 self._scheduler = None
195 self._stopped = False
196 self._timer = timer
197 self._thread = None
198 self._create_transport = create_transport
199 self._lock = threading.RLock()
200
201 def start(self):
202 """Starts processing.
203
204 Calling this method
205
206 - starts the thread that regularly flushes all enabled caches.
207 - enables the other methods on the instance to be called successfully
208
209 I.e, even when the configuration disables aggregation, it is invalid to
210 access the other methods of an instance until ``start`` is called -
211 Calls to other public methods will fail with an AssertionError.
212
213 """
214 with self._lock:
215 if self._running:
216 logger.info('%s is already started', self)
217 return
218
219 self._stopped = False
220 self._running = True
221 logger.info('starting thread of type %s to run the scheduler',
222 _THREAD_CLASS)
223 self._thread = _THREAD_CLASS(target=self._schedule_flushes)
224 try:
225 self._thread.start()
226 except Exception: # pylint: disable=broad-except
227 logger.warn(
228 'no scheduler thread, scheduler.run() will be invoked by rep ort(...)',
229 exc_info=True)
230 self._thread = None
231 self._initialize_flushing()
232
233 def stop(self):
234 """Halts processing
235
236 This will lead to the reports being flushed, the caches being cleared
237 and a stop to the current processing thread.
238
239 """
240 with self._lock:
241 if self._stopped:
242 logger.info('%s is already stopped', self)
243 return
244
245 self._flush_all_reports()
246 self._stopped = True
247 if self._run_scheduler_directly:
248 self._cleanup_if_stopped()
249
250 if self._scheduler and self._scheduler.empty():
251 # if there are events scheduled, then _running will subsequently
252 # be set False by the scheduler thread. This handles the
253 # case where there are no events, e.g because all aggreagation
254 # was disabled
255 self._running = False
256 self._scheduler = None
257
258 def check(self, check_req):
259 """Process a check_request.
260
261 The req is first passed to the check_aggregator. If there is a valid
262 cached response, that is returned, otherwise a response is obtained from
263 the transport.
264
265 Args:
266 check_req (``ServicecontrolServicesCheckRequest``): to be sent to
267 the service control service
268
269 Returns:
270 ``CheckResponse``: either the cached response if one is applicable
271 or a response from making a transport request, or None if
272 if the request to the transport fails
273
274 """
275
276 self._assert_is_running()
277 res = self._check_aggregator.check(check_req)
278 if res:
279 logger.debug('using cached check response for %s: %s',
280 check_request, res)
281 return res
282
283 # Application code should not fail because check request's don't
284 # complete, They should fail open, so here simply log the error and
285 # return None to indicate that no response was obtained
286 try:
287 transport = self._create_transport()
288 resp = transport.services.check(check_req)
289 self._check_aggregator.add_response(check_req, resp)
290 return resp
291 except Exception: # pylint: disable=broad-except
292 logger.error('direct send of check request failed %s',
293 check_request, exc_info=True)
294 return None
295
296 def report(self, report_req):
297 """Processes a report request.
298
299 It will aggregate it with prior report_requests to be send later
300 or it will send it immediately if that's appropriate.
301 """
302 self._assert_is_running()
303
304 # no thread running, run the scheduler to ensure any pending
305 # flush tasks are executed.
306 if self._run_scheduler_directly:
307 self._scheduler.run(blocking=False)
308
309 if not self._report_aggregator.report(report_req):
310 logger.info('need to send a report request directly')
311 try:
312 transport = self._create_transport()
313 transport.services.report(report_req)
314 except Exception: # pylint: disable=broad-except
315 logger.error('direct send for report request failed',
316 exc_info=True)
317
318 @property
319 def _run_scheduler_directly(self):
320 return self._running and self._thread is None
321
322 def _assert_is_running(self):
323 assert self._running, '%s needs to be running' % (self,)
324
325 def _initialize_flushing(self):
326 with self._lock:
327 logger.info('created a scheduler to control flushing')
328 self._scheduler = sched.scheduler(to_cache_timer(self._timer),
329 time.sleep)
330 logger.info('scheduling initial check and flush')
331 self._flush_schedule_check_aggregator()
332 self._flush_schedule_report_aggregator()
333
334 def _schedule_flushes(self):
335 # the method expects to be run in the thread created in start()
336 self._initialize_flushing()
337 self._scheduler.run() # should block until self._stopped is set
338 logger.info('scheduler.run completed, %s will exit', threading.current_t hread())
339
340 def _cleanup_if_stopped(self):
341 if not self._stopped:
342 return False
343
344 self._check_aggregator.clear()
345 self._report_aggregator.clear()
346 self._running = False
347 return True
348
349 def _flush_schedule_check_aggregator(self):
350 if self._cleanup_if_stopped():
351 logger.info('did not schedule check flush: client is stopped')
352 return
353
354 flush_interval = self._check_aggregator.flush_interval
355 if not flush_interval or flush_interval.total_seconds() < 0:
356 logger.debug('did not schedule check flush: caching is disabled')
357 return
358
359 if self._run_scheduler_directly:
360 logger.debug('did not schedule check flush: no scheduler thread')
361 return
362
363 logger.debug('flushing the check aggregator')
364 transport = self._create_transport()
365 for req in self._check_aggregator.flush():
366 try:
367 resp = transport.services.check(req)
368 self._check_aggregator.add_response(req, resp)
369 except Exception: # pylint: disable=broad-except
370 logger.error('failed to flush check_req %s', req, exc_info=True)
371
372 # schedule a repeat of this method
373 self._scheduler.enter(
374 flush_interval.total_seconds(),
375 2, # a higher priority than report flushes
376 self._flush_schedule_check_aggregator,
377 ()
378 )
379
380 def _flush_schedule_report_aggregator(self):
381 if self._cleanup_if_stopped():
382 logger.info('did not schedule report flush: client is stopped')
383 return
384
385 flush_interval = self._report_aggregator.flush_interval
386 if not flush_interval or flush_interval.total_seconds() < 0:
387 logger.debug('did not schedule report flush: caching is disabled')
388 return
389
390 # flush reports and schedule a repeat of this method
391 transport = self._create_transport()
392 reqs = self._report_aggregator.flush()
393 logger.debug("will flush %d report requests", len(reqs))
394 for req in reqs:
395 try:
396 transport.services.report(req)
397 except Exception: # pylint: disable=broad-except
398 logger.error('failed to flush report_req %s', req, exc_info=True )
399
400 self._scheduler.enter(
401 flush_interval.total_seconds(),
402 1, # a lower priority than check flushes
403 self._flush_schedule_report_aggregator,
404 ()
405 )
406
407 def _flush_all_reports(self):
408 all_requests = self._report_aggregator.clear()
409 logger.info('flushing all reports (count=%d)', len(all_requests))
410 transport = self._create_transport()
411 for req in all_requests:
412 try:
413 transport.services.report(req)
414 except Exception: # pylint: disable=broad-except
415 logger.error('failed to flush report_req %s', req, exc_info=True )
416
417
418 def use_default_thread():
419 """Makes ``Client``s started after this use the standard Thread class."""
420 global _THREAD_CLASS # pylint: disable=global-statement
421 _THREAD_CLASS = threading.Thread
422
423
424 def use_gae_thread():
425 """Makes ``Client``s started after this use the appengine thread class."""
426 global _THREAD_CLASS # pylint: disable=global-statement
427 try:
428 from google.appengine.api.background_thread import background_thread
429 _THREAD_CLASS = background_thread.BackgroundThread
430 except ImportError:
431 logger.error(
432 'Could not install appengine background threads!'
433 ' Please install the python AppEngine SDK and use this from there'
434 )
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698