OLD | NEW |
(Empty) | |
| 1 # Copyright 2016 Google Inc. All Rights Reserved. |
| 2 # |
| 3 # Licensed under the Apache License, Version 2.0 (the "License"); |
| 4 # you may not use this file except in compliance with the License. |
| 5 # You may obtain a copy of the License at |
| 6 # |
| 7 # http://www.apache.org/licenses/LICENSE-2.0 |
| 8 # |
| 9 # Unless required by applicable law or agreed to in writing, software |
| 10 # distributed under the License is distributed on an "AS IS" BASIS, |
| 11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 12 # See the License for the specific language governing permissions and |
| 13 # limitations under the License. |
| 14 |
| 15 """caches provide functions and classes used to support caching. |
| 16 |
| 17 caching is provide by extensions of the cache classes provided by the |
| 18 cachetools open-source library. |
| 19 |
| 20 :func:`create` creates a cache instance specifed by either |
| 21 :class:`google.api.control.CheckAggregationOptions` or a |
| 22 :class:`google.api.control.ReportAggregationOptions` |
| 23 |
| 24 """ |
| 25 |
| 26 from __future__ import absolute_import |
| 27 |
| 28 # pylint: disable=too-many-ancestors |
| 29 # |
| 30 # It affects the DequeOutTTLCache and DequeOutLRUCache which extend |
| 31 # cachetools.TTLCache and cachetools.LRUCache respectively. Within cachetools, |
| 32 # those classes each extend Cache, which itself extends DefaultMapping. It does |
| 33 # makes sense to have this chain of ancestors, so it's right the disable the |
| 34 # warning here. |
| 35 |
| 36 import collections |
| 37 import logging |
| 38 import threading |
| 39 from datetime import datetime, timedelta |
| 40 |
| 41 import cachetools |
| 42 |
| 43 logger = logging.getLogger(__name__) |
| 44 |
| 45 |
| 46 class CheckOptions( |
| 47 collections.namedtuple( |
| 48 'CheckOptions', |
| 49 ['num_entries', |
| 50 'flush_interval', |
| 51 'expiration'])): |
| 52 """Holds values used to control report check behavior. |
| 53 |
| 54 Attributes: |
| 55 |
| 56 num_entries: the maximum number of cache entries that can be kept in |
| 57 the aggregation cache |
| 58 flush_interval (:class:`datetime.timedelta`): the maximum delta before |
| 59 aggregated report requests are flushed to the server. The cache |
| 60 entry is deleted after the flush. |
| 61 expiration (:class:`datetime.timedelta`): elapsed time before a cached |
| 62 check response should be deleted. This value should be larger than |
| 63 ``flush_interval``, otherwise it will be ignored, and instead a value |
| 64 equivalent to flush_interval + 1ms will be used. |
| 65 """ |
| 66 # pylint: disable=too-few-public-methods |
| 67 DEFAULT_NUM_ENTRIES = 200 |
| 68 DEFAULT_FLUSH_INTERVAL = timedelta(milliseconds=500) |
| 69 DEFAULT_EXPIRATION = timedelta(seconds=1) |
| 70 |
| 71 def __new__(cls, |
| 72 num_entries=DEFAULT_NUM_ENTRIES, |
| 73 flush_interval=DEFAULT_FLUSH_INTERVAL, |
| 74 expiration=DEFAULT_EXPIRATION): |
| 75 """Invokes the base constructor with default values.""" |
| 76 assert isinstance(num_entries, int), 'should be an int' |
| 77 assert isinstance(flush_interval, timedelta), 'should be a timedelta' |
| 78 assert isinstance(expiration, timedelta), 'should be a timedelta' |
| 79 if expiration <= flush_interval: |
| 80 expiration = flush_interval + timedelta(milliseconds=1) |
| 81 return super(cls, CheckOptions).__new__( |
| 82 cls, |
| 83 num_entries, |
| 84 flush_interval, |
| 85 expiration) |
| 86 |
| 87 |
| 88 class ReportOptions( |
| 89 collections.namedtuple( |
| 90 'ReportOptions', |
| 91 ['num_entries', |
| 92 'flush_interval'])): |
| 93 """Holds values used to control report aggregation behavior. |
| 94 |
| 95 Attributes: |
| 96 |
| 97 num_entries: the maximum number of cache entries that can be kept in |
| 98 the aggregation cache |
| 99 |
| 100 flush_interval (:class:`datetime.timedelta`): the maximum delta before |
| 101 aggregated report requests are flushed to the server. The cache |
| 102 entry is deleted after the flush |
| 103 """ |
| 104 # pylint: disable=too-few-public-methods |
| 105 DEFAULT_NUM_ENTRIES = 200 |
| 106 DEFAULT_FLUSH_INTERVAL = timedelta(seconds=1) |
| 107 |
| 108 def __new__(cls, |
| 109 num_entries=DEFAULT_NUM_ENTRIES, |
| 110 flush_interval=DEFAULT_FLUSH_INTERVAL): |
| 111 """Invokes the base constructor with default values.""" |
| 112 assert isinstance(num_entries, int), 'should be an int' |
| 113 assert isinstance(flush_interval, timedelta), 'should be a timedelta' |
| 114 |
| 115 return super(cls, ReportOptions).__new__( |
| 116 cls, |
| 117 num_entries, |
| 118 flush_interval) |
| 119 |
| 120 |
| 121 ZERO_INTERVAL = timedelta() |
| 122 |
| 123 |
| 124 def create(options, timer=None): |
| 125 """Create a cache specified by ``options`` |
| 126 |
| 127 ``options`` is an instance of either |
| 128 :class:`google.api.control.caches.CheckOptions` or |
| 129 :class:`google.api.control.caches.ReportOptions` |
| 130 |
| 131 The returned cache is wrapped in a :class:`LockedObject`, requiring it to |
| 132 be accessed in a with statement that gives synchronized access |
| 133 |
| 134 Example: |
| 135 >>> options = CheckOptions() |
| 136 >>> synced_cache = make_cache(options) |
| 137 >>> with synced_cache as cache: # acquire the lock |
| 138 ... cache['a_key'] = 'a_value' |
| 139 |
| 140 Args: |
| 141 options (object): an instance of either of the options classes |
| 142 |
| 143 Returns: |
| 144 :class:`cachetools.Cache`: the cache implementation specified by options |
| 145 or None: if options is ``None`` or if options.num_entries < 0 |
| 146 |
| 147 Raises: |
| 148 ValueError: if options is not a support type |
| 149 |
| 150 """ |
| 151 if options is None: # no options, don't create cache |
| 152 return None |
| 153 |
| 154 if not (isinstance(options, CheckOptions) or |
| 155 isinstance(options, ReportOptions)): |
| 156 logger.error('make_cache(): bad options %s', options) |
| 157 raise ValueError('Invalid options') |
| 158 |
| 159 if (options.num_entries <= 0): |
| 160 logger.info("did not create cache, options was %s", options) |
| 161 return None |
| 162 |
| 163 logger.info("creating a cache from %s", options) |
| 164 if (options.flush_interval > ZERO_INTERVAL): |
| 165 # options always has a flush_interval, but may have an expiration |
| 166 # field. If the expiration is present, use that instead of the |
| 167 # flush_interval for the ttl |
| 168 ttl = getattr(options, 'expiration', options.flush_interval) |
| 169 return LockedObject( |
| 170 DequeOutTTLCache( |
| 171 options.num_entries, |
| 172 ttl=ttl.total_seconds(), |
| 173 timer=to_cache_timer(timer) |
| 174 )) |
| 175 |
| 176 return LockedObject(DequeOutLRUCache(options.num_entries)) |
| 177 |
| 178 |
| 179 class DequeOutTTLCache(cachetools.TTLCache): |
| 180 """Extends ``TTLCache`` so that expired items are placed in a ``deque``.""" |
| 181 |
| 182 def __init__(self, maxsize, ttl, out_deque=None, **kw): |
| 183 """Constructor. |
| 184 |
| 185 Args: |
| 186 maxsize (int): the maximum number of entries in the queue |
| 187 ttl (int): the ttl for entries added to the cache |
| 188 out_deque :class:`collections.deque`: a `deque` in which to add items |
| 189 that expire from the cache |
| 190 **kw: the other keyword args supported by the constructor to |
| 191 :class:`cachetools.TTLCache` |
| 192 |
| 193 Raises: |
| 194 ValueError: if out_deque is not a collections.deque |
| 195 |
| 196 """ |
| 197 super(DequeOutTTLCache, self).__init__(maxsize, ttl, **kw) |
| 198 if out_deque is None: |
| 199 out_deque = collections.deque() |
| 200 elif not isinstance(out_deque, collections.deque): |
| 201 raise ValueError('out_deque should be a collections.deque') |
| 202 self._out_deque = out_deque |
| 203 self._tracking = {} |
| 204 |
| 205 def __setitem__(self, key, value, **kw): |
| 206 super(DequeOutTTLCache, self).__setitem__(key, value, **kw) |
| 207 self._tracking[key] = value |
| 208 |
| 209 @property |
| 210 def out_deque(self): |
| 211 """The :class:`collections.deque` to which expired items are added.""" |
| 212 self.expire() |
| 213 expired = dict((k, v) for (k, v) in self._tracking.items() |
| 214 if self.get(k) is None) |
| 215 for k, v in expired.items(): |
| 216 del self._tracking[k] |
| 217 self._out_deque.append(v) |
| 218 return self._out_deque |
| 219 |
| 220 |
| 221 class DequeOutLRUCache(cachetools.LRUCache): |
| 222 """Extends ``LRUCache`` so that expired items are placed in a ``deque``.""" |
| 223 |
| 224 def __init__(self, maxsize, out_deque=None, **kw): |
| 225 """Constructor. |
| 226 |
| 227 Args: |
| 228 maxsize (int): the maximum number of entries in the queue |
| 229 out_deque :class:`collections.deque`: a `deque` in which to add items |
| 230 that expire from the cache |
| 231 **kw: the other keyword args supported by constructor to |
| 232 :class:`cachetools.LRUCache` |
| 233 |
| 234 Raises: |
| 235 ValueError: if out_deque is not a collections.deque |
| 236 |
| 237 """ |
| 238 super(DequeOutLRUCache, self).__init__(maxsize, **kw) |
| 239 if out_deque is None: |
| 240 out_deque = collections.deque() |
| 241 elif not isinstance(out_deque, collections.deque): |
| 242 raise ValueError('out_deque should be collections.deque') |
| 243 self._out_deque = out_deque |
| 244 self._tracking = {} |
| 245 |
| 246 def __setitem__(self, key, value, **kw): |
| 247 super(DequeOutLRUCache, self).__setitem__(key, value, **kw) |
| 248 self._tracking[key] = value |
| 249 |
| 250 @property |
| 251 def out_deque(self): |
| 252 """The :class:`collections.deque` to which expired items are added.""" |
| 253 expired = dict((k, v) for (k, v) in iter(self._tracking.items()) |
| 254 if self.get(k) is None) |
| 255 for k, v in iter(expired.items()): |
| 256 del self._tracking[k] |
| 257 self._out_deque.append(v) |
| 258 return self._out_deque |
| 259 |
| 260 |
| 261 class LockedObject(object): |
| 262 """LockedObject protects an object with a re-entrant lock. |
| 263 |
| 264 The lock is required by the context manager protocol. |
| 265 """ |
| 266 # pylint: disable=too-few-public-methods |
| 267 |
| 268 def __init__(self, obj): |
| 269 self._lock = threading.RLock() |
| 270 self._obj = obj |
| 271 |
| 272 def __enter__(self): |
| 273 self._lock.acquire() |
| 274 return self._obj |
| 275 |
| 276 def __exit__(self, _exc_type, _exc_val, _exc_tb): |
| 277 self._lock.release() |
| 278 |
| 279 |
| 280 def to_cache_timer(datetime_func): |
| 281 """Converts a datetime_func to a timestamp_func. |
| 282 |
| 283 Args: |
| 284 datetime_func (callable[[datatime]]): a func that returns the current |
| 285 time |
| 286 |
| 287 Returns: |
| 288 time_func (callable[[timestamp]): a func that returns the timestamp |
| 289 from the epoch |
| 290 """ |
| 291 if datetime_func is None: |
| 292 datetime_func = datetime.utcnow |
| 293 |
| 294 def _timer(): |
| 295 """Return the timestamp since the epoch.""" |
| 296 return (datetime_func() - datetime(1970, 1, 1)).total_seconds() |
| 297 |
| 298 return _timer |
OLD | NEW |