| OLD | NEW |
| 1 # Copyright 2015 The Chromium Authors. All rights reserved. | 1 # Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 # Use of this source code is governed by a BSD-style license that can be | 2 # Use of this source code is governed by a BSD-style license that can be |
| 3 # found in the LICENSE file. | 3 # found in the LICENSE file. |
| 4 | 4 |
| 5 """A module for storing and getting objects from datastore. | 5 """A module for storing and getting objects from datastore. |
| 6 | 6 |
| 7 This module provides Get, Set and Delete functions for storing pickleable | 7 App Engine datastore limits entity size to less than 1 MB; this module |
| 8 objects in datastore, with support for large objects greater than 1 MB. | 8 supports storing larger objects by splitting the data and using multiple |
| 9 datastore entities. |
| 9 | 10 |
| 10 Although this module contains ndb.Model classes, these are not intended | 11 Although this module contains ndb.Model classes, these are not intended |
| 11 to be used directly by other modules. | 12 to be used directly by other modules. |
| 12 | 13 |
| 13 App Engine datastore limits entity size to less than 1 MB; this module | |
| 14 supports storing larger objects by splitting the data and using multiple | |
| 15 datastore entities and multiple memcache keys. Using ndb.get and pickle, a | |
| 16 complex data structure can be retrieved more quickly than datastore fetch. | |
| 17 | |
| 18 Example: | 14 Example: |
| 19 john = Account() | 15 john = Account() |
| 20 john.username = 'John' | 16 john.username = 'John' |
| 21 john.userid = 123 | 17 john.userid = 123 |
| 22 stored_object.Set(john.userid, john) | 18 stored_object.Set(john.userid, john) |
| 23 """ | 19 """ |
| 24 | 20 |
| 25 import cPickle as pickle | 21 import cPickle as pickle |
| 26 import logging | |
| 27 | 22 |
| 28 from google.appengine.api import memcache | |
| 29 from google.appengine.ext import ndb | 23 from google.appengine.ext import ndb |
| 30 | 24 |
| 31 _MULTIPART_ENTITY_MEMCACHE_KEY = 'multipart_entity_' | 25 # Max bytes per entity. |
| 32 | |
| 33 # Maximum number of entities and memcache to save a value. | |
| 34 # The limit for data stored in one datastore entity is 1 MB, | |
| 35 # and the limit for memcache batch operations is 32 MB. See: | |
| 36 # https://cloud.google.com/appengine/docs/python/memcache/#Python_Limits | |
| 37 _MAX_NUM_PARTS = 16 | |
| 38 | |
| 39 # Max bytes per entity or value cached with memcache. | |
| 40 _CHUNK_SIZE = 1000 * 1000 | 26 _CHUNK_SIZE = 1000 * 1000 |
| 41 | 27 |
| 42 | 28 |
| 43 @ndb.synctasklet | 29 @ndb.synctasklet |
| 44 def Get(key): | 30 def Get(key): |
| 45 """Gets the value. | 31 """Gets the value. |
| 46 | 32 |
| 47 Args: | 33 Args: |
| 48 key: String key value. | 34 key: String key value. |
| 49 | 35 |
| 50 Returns: | 36 Returns: |
| 51 A value for key. | 37 A value for key. |
| 52 """ | 38 """ |
| 53 result = yield GetAsync(key) | 39 result = yield GetAsync(key) |
| 54 raise ndb.Return(result) | 40 raise ndb.Return(result) |
| 55 | 41 |
| 56 | 42 |
| 57 @ndb.tasklet | 43 @ndb.tasklet |
| 58 def GetAsync(key): | 44 def GetAsync(key): |
| 59 results = yield MultipartCache.GetAsync(key) | 45 entity = yield ndb.Key(MultipartEntity, key).get_async() |
| 60 if not results: | 46 if not entity: |
| 61 set_future = MultipartCache.SetAsync(key, results) | 47 raise ndb.Return(None) |
| 62 get_future = _GetValueFromDatastore(key) | 48 yield entity.GetPartsAsync() |
| 63 yield set_future, get_future | 49 raise ndb.Return(entity.GetData()) |
| 64 results = get_future.get_result() | |
| 65 raise ndb.Return(results) | |
| 66 | 50 |
| 67 | 51 |
| 68 @ndb.synctasklet | 52 @ndb.synctasklet |
| 69 def Set(key, value): | 53 def Set(key, value): |
| 70 """Sets the value in datastore and memcache with limit of '_MAX_NUM_PARTS' MB. | 54 """Sets the value in datastore. |
| 71 | 55 |
| 72 Args: | 56 Args: |
| 73 key: String key value. | 57 key: String key value. |
| 74 value: A pickleable value to be stored limited at '_MAX_NUM_PARTS' MB. | 58 value: A pickleable value to be stored. |
| 75 """ | 59 """ |
| 76 yield SetAsync(key, value) | 60 yield SetAsync(key, value) |
| 77 | 61 |
| 78 | 62 |
| 79 @ndb.tasklet | 63 @ndb.tasklet |
| 80 def SetAsync(key, value): | 64 def SetAsync(key, value): |
| 81 entity = yield ndb.Key(MultipartEntity, key).get_async() | 65 entity = yield ndb.Key(MultipartEntity, key).get_async() |
| 82 if not entity: | 66 if not entity: |
| 83 entity = MultipartEntity(id=key) | 67 entity = MultipartEntity(id=key) |
| 84 entity.SetData(value) | 68 entity.SetData(value) |
| 85 yield (entity.PutAsync(), | 69 yield entity.PutAsync() |
| 86 MultipartCache.SetAsync(key, value)) | |
| 87 | 70 |
| 88 | 71 |
| 89 @ndb.synctasklet | 72 @ndb.synctasklet |
| 90 def Delete(key): | 73 def Delete(key): |
| 91 """Deletes the value in datastore and memcache.""" | 74 """Deletes the value in datastore.""" |
| 92 yield DeleteAsync(key) | 75 yield DeleteAsync(key) |
| 93 | 76 |
| 94 | 77 |
| 95 @ndb.tasklet | 78 @ndb.tasklet |
| 96 def DeleteAsync(key): | 79 def DeleteAsync(key): |
| 97 multipart_entity_key = ndb.Key(MultipartEntity, key) | 80 multipart_entity_key = ndb.Key(MultipartEntity, key) |
| 98 yield (multipart_entity_key.delete_async(), | 81 yield (multipart_entity_key.delete_async(), |
| 99 MultipartEntity.DeleteAsync(multipart_entity_key), | 82 MultipartEntity.DeleteAsync(multipart_entity_key)) |
| 100 MultipartCache.DeleteAsync(key)) | |
| 101 | 83 |
| 102 | 84 |
| 103 class MultipartEntity(ndb.Model): | 85 class MultipartEntity(ndb.Model): |
| 104 """Container for PartEntity.""" | 86 """Container for PartEntity.""" |
| 105 | 87 |
| 106 # Number of entities use to store serialized. | 88 # Number of entities use to store serialized. |
| 107 size = ndb.IntegerProperty(default=0, indexed=False) | 89 size = ndb.IntegerProperty(default=0, indexed=False) |
| 108 | 90 |
| 109 @ndb.tasklet | 91 @ndb.tasklet |
| 110 def GetPartsAsync(self): | 92 def GetPartsAsync(self): |
| (...skipping 11 matching lines...) Expand all Loading... |
| 122 @classmethod | 104 @classmethod |
| 123 @ndb.tasklet | 105 @ndb.tasklet |
| 124 def DeleteAsync(cls, key): | 106 def DeleteAsync(cls, key): |
| 125 part_keys = yield PartEntity.query(ancestor=key).fetch_async(keys_only=True) | 107 part_keys = yield PartEntity.query(ancestor=key).fetch_async(keys_only=True) |
| 126 yield ndb.delete_multi_async(part_keys) | 108 yield ndb.delete_multi_async(part_keys) |
| 127 | 109 |
| 128 @ndb.tasklet | 110 @ndb.tasklet |
| 129 def PutAsync(self): | 111 def PutAsync(self): |
| 130 """Stores serialized data over multiple PartEntity.""" | 112 """Stores serialized data over multiple PartEntity.""" |
| 131 serialized_parts = _Serialize(self.GetData()) | 113 serialized_parts = _Serialize(self.GetData()) |
| 132 if len(serialized_parts) > _MAX_NUM_PARTS: | |
| 133 logging.error('Max number of parts reached.') | |
| 134 return | |
| 135 part_list = [] | 114 part_list = [] |
| 136 num_parts = len(serialized_parts) | 115 num_parts = len(serialized_parts) |
| 137 for i in xrange(num_parts): | 116 for i in xrange(num_parts): |
| 138 if serialized_parts[i] is not None: | 117 part = PartEntity(id=i + 1, parent=self.key, value=serialized_parts[i]) |
| 139 part = PartEntity(id=i + 1, parent=self.key, value=serialized_parts[i]) | 118 part_list.append(part) |
| 140 part_list.append(part) | |
| 141 self.size = num_parts | 119 self.size = num_parts |
| 142 yield ndb.put_multi_async(part_list + [self]) | 120 yield ndb.put_multi_async(part_list + [self]) |
| 143 | 121 |
| 144 def GetData(self): | 122 def GetData(self): |
| 145 return getattr(self, '_data', None) | 123 return getattr(self, '_data', None) |
| 146 | 124 |
| 147 def SetData(self, data): | 125 def SetData(self, data): |
| 148 setattr(self, '_data', data) | 126 setattr(self, '_data', data) |
| 149 | 127 |
| 150 | 128 |
| 151 class PartEntity(ndb.Model): | 129 class PartEntity(ndb.Model): |
| 152 """Holds a part of serialized data for MultipartEntity. | 130 """Holds a part of serialized data for MultipartEntity. |
| 153 | 131 |
| 154 This entity key has the form: | 132 This entity key has the form: |
| 155 ndb.Key('MultipartEntity', multipart_entity_id, 'PartEntity', part_index) | 133 ndb.Key('MultipartEntity', multipart_entity_id, 'PartEntity', part_index) |
| 156 """ | 134 """ |
| 157 value = ndb.BlobProperty() | 135 value = ndb.BlobProperty() |
| 158 | 136 |
| 159 | 137 |
| 160 class MultipartCache(object): | |
| 161 """Contains operations for storing values over multiple memcache keys. | |
| 162 | |
| 163 Values are serialized, split, and stored over multiple memcache keys. The | |
| 164 head cache stores the expected size. | |
| 165 """ | |
| 166 | |
| 167 @classmethod | |
| 168 @ndb.tasklet | |
| 169 def GetAsync(cls, key): | |
| 170 """Gets value in memcache.""" | |
| 171 keys = cls._GetCacheKeyList(key) | |
| 172 head_key = cls._GetCacheKey(key) | |
| 173 client = memcache.Client() | |
| 174 cache_values = yield client.get_multi_async(keys) | |
| 175 # Whether we have all the memcache values. | |
| 176 if len(keys) != len(cache_values) or head_key not in cache_values: | |
| 177 raise ndb.Return(None) | |
| 178 | |
| 179 serialized = '' | |
| 180 cache_size = cache_values[head_key] | |
| 181 keys.remove(head_key) | |
| 182 for key in keys[:cache_size]: | |
| 183 if key not in cache_values: | |
| 184 raise ndb.Return(None) | |
| 185 if cache_values[key] is not None: | |
| 186 serialized += cache_values[key] | |
| 187 raise ndb.Return(pickle.loads(serialized)) | |
| 188 | |
| 189 @classmethod | |
| 190 @ndb.tasklet | |
| 191 def SetAsync(cls, key, value): | |
| 192 """Sets a value in memcache.""" | |
| 193 serialized_parts = _Serialize(value) | |
| 194 if len(serialized_parts) > _MAX_NUM_PARTS: | |
| 195 logging.error('Max number of parts reached.') | |
| 196 raise ndb.Return(None) | |
| 197 | |
| 198 cached_values = {} | |
| 199 cached_values[cls._GetCacheKey(key)] = len(serialized_parts) | |
| 200 for i in xrange(len(serialized_parts)): | |
| 201 cached_values[cls._GetCacheKey(key, i)] = serialized_parts[i] | |
| 202 client = memcache.Client() | |
| 203 yield client.set_multi_async(cached_values) | |
| 204 | |
| 205 @classmethod | |
| 206 @ndb.synctasklet | |
| 207 def Delete(cls, key): | |
| 208 """Deletes all cached values for key.""" | |
| 209 yield cls.DeleteAsync(key) | |
| 210 | |
| 211 @classmethod | |
| 212 @ndb.tasklet | |
| 213 def DeleteAsync(cls, key): | |
| 214 client = memcache.Client() | |
| 215 yield client.delete_multi_async(cls._GetCacheKeyList(key)) | |
| 216 | |
| 217 @classmethod | |
| 218 def _GetCacheKeyList(cls, key): | |
| 219 """Gets a list of head cache key and cache key parts.""" | |
| 220 keys = [cls._GetCacheKey(key, i) for i in xrange(_MAX_NUM_PARTS)] | |
| 221 keys.append(cls._GetCacheKey(key)) | |
| 222 return keys | |
| 223 | |
| 224 @classmethod | |
| 225 def _GetCacheKey(cls, key, index=None): | |
| 226 """Returns either head cache key or cache key part.""" | |
| 227 if index is not None: | |
| 228 return _MULTIPART_ENTITY_MEMCACHE_KEY + '%s.%s' % (key, index) | |
| 229 return _MULTIPART_ENTITY_MEMCACHE_KEY + key | |
| 230 | |
| 231 | |
| 232 @ndb.tasklet | |
| 233 def _GetValueFromDatastore(key): | |
| 234 entity = yield ndb.Key(MultipartEntity, key).get_async() | |
| 235 if not entity: | |
| 236 raise ndb.Return(None) | |
| 237 yield entity.GetPartsAsync() | |
| 238 raise ndb.Return(entity.GetData()) | |
| 239 | |
| 240 | |
| 241 def _Serialize(value): | 138 def _Serialize(value): |
| 242 """Serializes value and returns a list of its parts. | 139 """Serializes value and returns a list of its parts. |
| 243 | 140 |
| 244 Args: | 141 Args: |
| 245 value: A pickleable value. | 142 value: A pickleable value. |
| 246 | 143 |
| 247 Returns: | 144 Returns: |
| 248 A list of string representation of the value that has been pickled and split | 145 A list of string representation of the value that has been pickled and split |
| 249 into _CHUNK_SIZE. | 146 into _CHUNK_SIZE. |
| 250 """ | 147 """ |
| 251 serialized = pickle.dumps(value, 2) | 148 serialized = pickle.dumps(value, 2) |
| 252 length = len(serialized) | 149 length = len(serialized) |
| 253 values = [] | 150 values = [] |
| 254 for i in xrange(0, length, _CHUNK_SIZE): | 151 for i in xrange(0, length, _CHUNK_SIZE): |
| 255 values.append(serialized[i:i + _CHUNK_SIZE]) | 152 values.append(serialized[i:i + _CHUNK_SIZE]) |
| 256 for i in xrange(len(values), _MAX_NUM_PARTS): | |
| 257 values.append(None) | |
| 258 return values | 153 return values |
| OLD | NEW |