| OLD | NEW |
| 1 # Copyright 2015 The Chromium Authors. All rights reserved. | 1 # Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 # Use of this source code is governed by a BSD-style license that can be | 2 # Use of this source code is governed by a BSD-style license that can be |
| 3 # found in the LICENSE file. | 3 # found in the LICENSE file. |
| 4 | 4 |
| 5 """A module for storing and getting objects from datastore. | 5 """A module for storing and getting objects from datastore. |
| 6 | 6 |
| 7 This module provides Get, Set and Delete functions for storing pickleable | 7 This module provides Get, Set and Delete functions for storing pickleable |
| 8 objects in datastore, with support for large objects greater than 1 MB. | 8 objects in datastore, with support for large objects greater than 1 MB. |
| 9 | 9 |
| 10 Although this module contains ndb.Model classes, these are not intended | 10 Although this module contains ndb.Model classes, these are not intended |
| (...skipping 22 matching lines...) Expand all Loading... |
| 33 # Maximum number of entities and memcache to save a value. | 33 # Maximum number of entities and memcache to save a value. |
| 34 # The limit for data stored in one datastore entity is 1 MB, | 34 # The limit for data stored in one datastore entity is 1 MB, |
| 35 # and the limit for memcache batch operations is 32 MB. See: | 35 # and the limit for memcache batch operations is 32 MB. See: |
| 36 # https://cloud.google.com/appengine/docs/python/memcache/#Python_Limits | 36 # https://cloud.google.com/appengine/docs/python/memcache/#Python_Limits |
| 37 _MAX_NUM_PARTS = 16 | 37 _MAX_NUM_PARTS = 16 |
| 38 | 38 |
| 39 # Max bytes per entity or value cached with memcache. | 39 # Max bytes per entity or value cached with memcache. |
| 40 _CHUNK_SIZE = 1000 * 1000 | 40 _CHUNK_SIZE = 1000 * 1000 |
| 41 | 41 |
| 42 | 42 |
| 43 @ndb.synctasklet |
| 43 def Get(key): | 44 def Get(key): |
| 44 """Gets the value. | 45 """Gets the value. |
| 45 | 46 |
| 46 Args: | 47 Args: |
| 47 key: String key value. | 48 key: String key value. |
| 48 | 49 |
| 49 Returns: | 50 Returns: |
| 50 A value for key. | 51 A value for key. |
| 51 """ | 52 """ |
| 52 results = MultipartCache.Get(key) | 53 result = yield GetAsync(key) |
| 53 if not results: | 54 raise ndb.Return(result) |
| 54 results = _GetValueFromDatastore(key) | |
| 55 MultipartCache.Set(key, results) | |
| 56 return results | |
| 57 | 55 |
| 58 | 56 |
| 57 @ndb.tasklet |
| 58 def GetAsync(key): |
| 59 results = yield MultipartCache.GetAsync(key) |
| 60 if not results: |
| 61 set_future = MultipartCache.SetAsync(key, results) |
| 62 get_future = _GetValueFromDatastore(key) |
| 63 yield set_future, get_future |
| 64 results = get_future.get_result() |
| 65 raise ndb.Return(results) |
| 66 |
| 67 |
| 68 @ndb.synctasklet |
| 59 def Set(key, value): | 69 def Set(key, value): |
| 60 """Sets the value in datastore and memcache with limit of '_MAX_NUM_PARTS' MB. | 70 """Sets the value in datastore and memcache with limit of '_MAX_NUM_PARTS' MB. |
| 61 | 71 |
| 62 Args: | 72 Args: |
| 63 key: String key value. | 73 key: String key value. |
| 64 value: A pickleable value to be stored limited at '_MAX_NUM_PARTS' MB. | 74 value: A pickleable value to be stored limited at '_MAX_NUM_PARTS' MB. |
| 65 """ | 75 """ |
| 66 entity = ndb.Key(MultipartEntity, key).get() | 76 yield SetAsync(key, value) |
| 77 |
| 78 |
| 79 @ndb.tasklet |
| 80 def SetAsync(key, value): |
| 81 entity = yield ndb.Key(MultipartEntity, key).get_async() |
| 67 if not entity: | 82 if not entity: |
| 68 entity = MultipartEntity(id=key) | 83 entity = MultipartEntity(id=key) |
| 69 entity.SetData(value) | 84 entity.SetData(value) |
| 70 entity.Save() | 85 yield (entity.PutAsync(), |
| 71 MultipartCache.Set(key, value) | 86 MultipartCache.SetAsync(key, value)) |
| 72 | 87 |
| 73 | 88 |
| 89 @ndb.synctasklet |
| 74 def Delete(key): | 90 def Delete(key): |
| 75 """Deletes the value in datastore and memcache.""" | 91 """Deletes the value in datastore and memcache.""" |
| 76 ndb.Key(MultipartEntity, key).delete() | 92 yield DeleteAsync(key) |
| 77 MultipartCache.Delete(key) | 93 |
| 94 |
| 95 @ndb.tasklet |
| 96 def DeleteAsync(key): |
| 97 multipart_entity_key = ndb.Key(MultipartEntity, key) |
| 98 yield (multipart_entity_key.delete_async(), |
| 99 MultipartEntity.DeleteAsync(multipart_entity_key), |
| 100 MultipartCache.DeleteAsync(key)) |
| 78 | 101 |
| 79 | 102 |
| 80 class MultipartEntity(ndb.Model): | 103 class MultipartEntity(ndb.Model): |
| 81 """Container for PartEntity.""" | 104 """Container for PartEntity.""" |
| 82 | 105 |
| 83 # Number of entities use to store serialized. | 106 # Number of entities use to store serialized. |
| 84 size = ndb.IntegerProperty(default=0, indexed=False) | 107 size = ndb.IntegerProperty(default=0, indexed=False) |
| 85 | 108 |
| 86 @classmethod | 109 @ndb.tasklet |
| 87 def _post_get_hook(cls, key, future): # pylint: disable=unused-argument | 110 def GetPartsAsync(self): |
| 88 """Deserializes data from multiple PartEntity.""" | 111 """Deserializes data from multiple PartEntity.""" |
| 89 entity = future.get_result() | 112 if self.size: |
| 90 if entity is None or not entity.size: | |
| 91 return | 113 return |
| 92 | 114 |
| 93 string_id = entity.key.string_id() | 115 string_id = self.key.string_id() |
| 94 part_keys = [ndb.Key(MultipartEntity, string_id, PartEntity, i + 1) | 116 part_keys = [ndb.Key(MultipartEntity, string_id, PartEntity, i + 1) |
| 95 for i in xrange(entity.size)] | 117 for i in xrange(entity.size)] |
| 96 part_entities = ndb.get_multi(part_keys) | 118 part_entities = yield ndb.get_multi_async(part_keys) |
| 97 serialized = ''.join(p.value for p in part_entities if p is not None) | 119 serialized = ''.join(p.value for p in part_entities if p is not None) |
| 98 entity.SetData(pickle.loads(serialized)) | 120 self.SetData(pickle.loads(serialized)) |
| 99 | 121 |
| 100 @classmethod | 122 @classmethod |
| 101 def _pre_delete_hook(cls, key): | 123 @ndb.tasklet |
| 102 """Deletes PartEntity entities.""" | 124 def DeleteAsync(cls, key): |
| 103 part_keys = PartEntity.query(ancestor=key).fetch(keys_only=True) | 125 part_keys = yield PartEntity.query(ancestor=key).fetch_async(keys_only=True) |
| 104 ndb.delete_multi(part_keys) | 126 yield ndb.delete_multi_async(part_keys) |
| 105 | 127 |
| 106 def Save(self): | 128 @ndb.tasklet |
| 129 def PutAsync(self): |
| 107 """Stores serialized data over multiple PartEntity.""" | 130 """Stores serialized data over multiple PartEntity.""" |
| 108 serialized_parts = _Serialize(self.GetData()) | 131 serialized_parts = _Serialize(self.GetData()) |
| 109 if len(serialized_parts) > _MAX_NUM_PARTS: | 132 if len(serialized_parts) > _MAX_NUM_PARTS: |
| 110 logging.error('Max number of parts reached.') | 133 logging.error('Max number of parts reached.') |
| 111 return | 134 return |
| 112 part_list = [] | 135 part_list = [] |
| 113 num_parts = len(serialized_parts) | 136 num_parts = len(serialized_parts) |
| 114 for i in xrange(num_parts): | 137 for i in xrange(num_parts): |
| 115 if serialized_parts[i] is not None: | 138 if serialized_parts[i] is not None: |
| 116 part = PartEntity(id=i + 1, parent=self.key, value=serialized_parts[i]) | 139 part = PartEntity(id=i + 1, parent=self.key, value=serialized_parts[i]) |
| 117 part_list.append(part) | 140 part_list.append(part) |
| 118 self.size = num_parts | 141 self.size = num_parts |
| 119 ndb.put_multi(part_list + [self]) | 142 yield ndb.put_multi_async(part_list + [self]) |
| 120 | 143 |
| 121 def GetData(self): | 144 def GetData(self): |
| 122 return getattr(self, '_data', None) | 145 return getattr(self, '_data', None) |
| 123 | 146 |
| 124 def SetData(self, data): | 147 def SetData(self, data): |
| 125 setattr(self, '_data', data) | 148 setattr(self, '_data', data) |
| 126 | 149 |
| 127 | 150 |
| 128 class PartEntity(ndb.Model): | 151 class PartEntity(ndb.Model): |
| 129 """Holds a part of serialized data for MultipartEntity. | 152 """Holds a part of serialized data for MultipartEntity. |
| 130 | 153 |
| 131 This entity key has the form: | 154 This entity key has the form: |
| 132 ndb.Key('MultipartEntity', multipart_entity_id, 'PartEntity', part_index) | 155 ndb.Key('MultipartEntity', multipart_entity_id, 'PartEntity', part_index) |
| 133 """ | 156 """ |
| 134 value = ndb.BlobProperty() | 157 value = ndb.BlobProperty() |
| 135 | 158 |
| 136 | 159 |
| 137 class MultipartCache(object): | 160 class MultipartCache(object): |
| 138 """Contains operations for storing values over multiple memcache keys. | 161 """Contains operations for storing values over multiple memcache keys. |
| 139 | 162 |
| 140 Values are serialized, split, and stored over multiple memcache keys. The | 163 Values are serialized, split, and stored over multiple memcache keys. The |
| 141 head cache stores the expected size. | 164 head cache stores the expected size. |
| 142 """ | 165 """ |
| 143 | 166 |
| 144 @classmethod | 167 @classmethod |
| 145 def Get(cls, key): | 168 @ndb.tasklet |
| 169 def GetAsync(cls, key): |
| 146 """Gets value in memcache.""" | 170 """Gets value in memcache.""" |
| 147 keys = cls._GetCacheKeyList(key) | 171 keys = cls._GetCacheKeyList(key) |
| 148 head_key = cls._GetCacheKey(key) | 172 head_key = cls._GetCacheKey(key) |
| 149 cache_values = memcache.get_multi(keys) | 173 client = memcache.Client() |
| 174 cache_values = yield client.get_multi_async(keys) |
| 150 # Whether we have all the memcache values. | 175 # Whether we have all the memcache values. |
| 151 if len(keys) != len(cache_values) or head_key not in cache_values: | 176 if len(keys) != len(cache_values) or head_key not in cache_values: |
| 152 return None | 177 raise ndb.Return(None) |
| 153 | 178 |
| 154 serialized = '' | 179 serialized = '' |
| 155 cache_size = cache_values[head_key] | 180 cache_size = cache_values[head_key] |
| 156 keys.remove(head_key) | 181 keys.remove(head_key) |
| 157 for key in keys[:cache_size]: | 182 for key in keys[:cache_size]: |
| 158 if key not in cache_values: | 183 if key not in cache_values: |
| 159 return None | 184 raise ndb.Return(None) |
| 160 if cache_values[key] is not None: | 185 if cache_values[key] is not None: |
| 161 serialized += cache_values[key] | 186 serialized += cache_values[key] |
| 162 return pickle.loads(serialized) | 187 raise ndb.Return(pickle.loads(serialized)) |
| 163 | 188 |
| 164 @classmethod | 189 @classmethod |
| 165 def Set(cls, key, value): | 190 @ndb.tasklet |
| 191 def SetAsync(cls, key, value): |
| 166 """Sets a value in memcache.""" | 192 """Sets a value in memcache.""" |
| 167 serialized_parts = _Serialize(value) | 193 serialized_parts = _Serialize(value) |
| 168 if len(serialized_parts) > _MAX_NUM_PARTS: | 194 if len(serialized_parts) > _MAX_NUM_PARTS: |
| 169 logging.error('Max number of parts reached.') | 195 logging.error('Max number of parts reached.') |
| 170 return | 196 raise ndb.Return(None) |
| 171 | 197 |
| 172 cached_values = {} | 198 cached_values = {} |
| 173 cached_values[cls._GetCacheKey(key)] = len(serialized_parts) | 199 cached_values[cls._GetCacheKey(key)] = len(serialized_parts) |
| 174 for i in xrange(len(serialized_parts)): | 200 for i in xrange(len(serialized_parts)): |
| 175 cached_values[cls._GetCacheKey(key, i)] = serialized_parts[i] | 201 cached_values[cls._GetCacheKey(key, i)] = serialized_parts[i] |
| 176 memcache.set_multi(cached_values) | 202 client = memcache.Client() |
| 203 yield client.set_multi_async(cached_values) |
| 177 | 204 |
| 178 @classmethod | 205 @classmethod |
| 206 @ndb.synctasklet |
| 179 def Delete(cls, key): | 207 def Delete(cls, key): |
| 180 """Deletes all cached values for key.""" | 208 """Deletes all cached values for key.""" |
| 181 memcache.delete_multi(cls._GetCacheKeyList(key)) | 209 yield cls.DeleteAsync(key) |
| 210 |
| 211 @classmethod |
| 212 @ndb.tasklet |
| 213 def DeleteAsync(cls, key): |
| 214 client = memcache.Client() |
| 215 yield client.delete_multi_async(cls._GetCacheKeyList(key)) |
| 182 | 216 |
| 183 @classmethod | 217 @classmethod |
| 184 def _GetCacheKeyList(cls, key): | 218 def _GetCacheKeyList(cls, key): |
| 185 """Gets a list of head cache key and cache key parts.""" | 219 """Gets a list of head cache key and cache key parts.""" |
| 186 keys = [cls._GetCacheKey(key, i) for i in xrange(_MAX_NUM_PARTS)] | 220 keys = [cls._GetCacheKey(key, i) for i in xrange(_MAX_NUM_PARTS)] |
| 187 keys.append(cls._GetCacheKey(key)) | 221 keys.append(cls._GetCacheKey(key)) |
| 188 return keys | 222 return keys |
| 189 | 223 |
| 190 @classmethod | 224 @classmethod |
| 191 def _GetCacheKey(cls, key, index=None): | 225 def _GetCacheKey(cls, key, index=None): |
| 192 """Returns either head cache key or cache key part.""" | 226 """Returns either head cache key or cache key part.""" |
| 193 if index is not None: | 227 if index is not None: |
| 194 return _MULTIPART_ENTITY_MEMCACHE_KEY + '%s.%s' % (key, index) | 228 return _MULTIPART_ENTITY_MEMCACHE_KEY + '%s.%s' % (key, index) |
| 195 return _MULTIPART_ENTITY_MEMCACHE_KEY + key | 229 return _MULTIPART_ENTITY_MEMCACHE_KEY + key |
| 196 | 230 |
| 197 | 231 |
| 232 @ndb.tasklet |
| 198 def _GetValueFromDatastore(key): | 233 def _GetValueFromDatastore(key): |
| 199 entity = ndb.Key(MultipartEntity, key).get() | 234 entity = yield ndb.Key(MultipartEntity, key).get_async() |
| 200 if not entity: | 235 if not entity: |
| 201 return None | 236 raise ndb.Return(None) |
| 202 return entity.GetData() | 237 yield entity.GetPartsAsync() |
| 238 raise ndb.Return(entity.GetData()) |
| 203 | 239 |
| 204 | 240 |
| 205 def _Serialize(value): | 241 def _Serialize(value): |
| 206 """Serializes value and returns a list of its parts. | 242 """Serializes value and returns a list of its parts. |
| 207 | 243 |
| 208 Args: | 244 Args: |
| 209 value: A pickleable value. | 245 value: A pickleable value. |
| 210 | 246 |
| 211 Returns: | 247 Returns: |
| 212 A list of string representation of the value that has been pickled and split | 248 A list of string representation of the value that has been pickled and split |
| 213 into _CHUNK_SIZE. | 249 into _CHUNK_SIZE. |
| 214 """ | 250 """ |
| 215 serialized = pickle.dumps(value, 2) | 251 serialized = pickle.dumps(value, 2) |
| 216 length = len(serialized) | 252 length = len(serialized) |
| 217 values = [] | 253 values = [] |
| 218 for i in xrange(0, length, _CHUNK_SIZE): | 254 for i in xrange(0, length, _CHUNK_SIZE): |
| 219 values.append(serialized[i:i + _CHUNK_SIZE]) | 255 values.append(serialized[i:i + _CHUNK_SIZE]) |
| 220 for i in xrange(len(values), _MAX_NUM_PARTS): | 256 for i in xrange(len(values), _MAX_NUM_PARTS): |
| 221 values.append(None) | 257 values.append(None) |
| 222 return values | 258 return values |
| OLD | NEW |