Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(18)

Side by Side Diff: dashboard/dashboard/common/stored_object.py

Issue 2748953003: . Base URL: https://github.com/catapult-project/catapult.git@master
Patch Set: Created 3 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 # Copyright 2015 The Chromium Authors. All rights reserved. 1 # Copyright 2015 The Chromium Authors. All rights reserved.
2 # Use of this source code is governed by a BSD-style license that can be 2 # Use of this source code is governed by a BSD-style license that can be
3 # found in the LICENSE file. 3 # found in the LICENSE file.
4 4
5 """A module for storing and getting objects from datastore. 5 """A module for storing and getting objects from datastore.
6 6
7 This module provides Get, Set and Delete functions for storing pickleable 7 This module provides Get, Set and Delete functions for storing pickleable
8 objects in datastore, with support for large objects greater than 1 MB. 8 objects in datastore, with support for large objects greater than 1 MB.
9 9
10 Although this module contains ndb.Model classes, these are not intended 10 Although this module contains ndb.Model classes, these are not intended
(...skipping 22 matching lines...) Expand all
33 # Maximum number of entities and memcache to save a value. 33 # Maximum number of entities and memcache to save a value.
34 # The limit for data stored in one datastore entity is 1 MB, 34 # The limit for data stored in one datastore entity is 1 MB,
35 # and the limit for memcache batch operations is 32 MB. See: 35 # and the limit for memcache batch operations is 32 MB. See:
36 # https://cloud.google.com/appengine/docs/python/memcache/#Python_Limits 36 # https://cloud.google.com/appengine/docs/python/memcache/#Python_Limits
37 _MAX_NUM_PARTS = 16 37 _MAX_NUM_PARTS = 16
38 38
39 # Max bytes per entity or value cached with memcache. 39 # Max bytes per entity or value cached with memcache.
40 _CHUNK_SIZE = 1000 * 1000 40 _CHUNK_SIZE = 1000 * 1000
41 41
42 42
43 @ndb.synctasklet
43 def Get(key): 44 def Get(key):
44 """Gets the value. 45 """Gets the value.
45 46
46 Args: 47 Args:
47 key: String key value. 48 key: String key value.
48 49
49 Returns: 50 Returns:
50 A value for key. 51 A value for key.
51 """ 52 """
52 results = MultipartCache.Get(key) 53 result = yield GetAsync(key)
53 if not results: 54 raise ndb.Return(result)
54 results = _GetValueFromDatastore(key)
55 MultipartCache.Set(key, results)
56 return results
57 55
58 56
57 @ndb.tasklet
58 def GetAsync(key):
59 results = yield MultipartCache.GetAsync(key)
60 if not results:
61 set_future = MultipartCache.SetAsync(key, results)
62 get_future = _GetValueFromDatastore(key)
63 yield set_future, get_future
64 results = get_future.get_result()
65 raise ndb.Return(results)
66
67
68 @ndb.synctasklet
59 def Set(key, value): 69 def Set(key, value):
60 """Sets the value in datastore and memcache with limit of '_MAX_NUM_PARTS' MB. 70 """Sets the value in datastore and memcache with limit of '_MAX_NUM_PARTS' MB.
61 71
62 Args: 72 Args:
63 key: String key value. 73 key: String key value.
64 value: A pickleable value to be stored limited at '_MAX_NUM_PARTS' MB. 74 value: A pickleable value to be stored limited at '_MAX_NUM_PARTS' MB.
65 """ 75 """
66 entity = ndb.Key(MultipartEntity, key).get() 76 yield SetAsync(key, value)
77
78
79 @ndb.tasklet
80 def SetAsync(key, value):
81 entity = yield ndb.Key(MultipartEntity, key).get_async()
67 if not entity: 82 if not entity:
68 entity = MultipartEntity(id=key) 83 entity = MultipartEntity(id=key)
69 entity.SetData(value) 84 entity.SetData(value)
70 entity.Save() 85 yield (entity.PutAsync(),
71 MultipartCache.Set(key, value) 86 MultipartCache.SetAsync(key, value))
72 87
73 88
89 @ndb.synctasklet
74 def Delete(key): 90 def Delete(key):
75 """Deletes the value in datastore and memcache.""" 91 """Deletes the value in datastore and memcache."""
76 ndb.Key(MultipartEntity, key).delete() 92 yield DeleteAsync(key)
77 MultipartCache.Delete(key) 93
94
95 @ndb.tasklet
96 def DeleteAsync(key):
97 multipart_entity_key = ndb.Key(MultipartEntity, key)
98 yield (multipart_entity_key.delete_async(),
99 MultipartEntity.DeleteAsync(multipart_entity_key),
100 MultipartCache.DeleteAsync(key))
78 101
79 102
80 class MultipartEntity(ndb.Model): 103 class MultipartEntity(ndb.Model):
81 """Container for PartEntity.""" 104 """Container for PartEntity."""
82 105
83 # Number of entities use to store serialized. 106 # Number of entities use to store serialized.
84 size = ndb.IntegerProperty(default=0, indexed=False) 107 size = ndb.IntegerProperty(default=0, indexed=False)
85 108
86 @classmethod 109 @ndb.tasklet
87 def _post_get_hook(cls, key, future): # pylint: disable=unused-argument 110 def GetPartsAsync(self):
88 """Deserializes data from multiple PartEntity.""" 111 """Deserializes data from multiple PartEntity."""
89 entity = future.get_result() 112 if self.size:
90 if entity is None or not entity.size:
91 return 113 return
92 114
93 string_id = entity.key.string_id() 115 string_id = self.key.string_id()
94 part_keys = [ndb.Key(MultipartEntity, string_id, PartEntity, i + 1) 116 part_keys = [ndb.Key(MultipartEntity, string_id, PartEntity, i + 1)
95 for i in xrange(entity.size)] 117 for i in xrange(entity.size)]
96 part_entities = ndb.get_multi(part_keys) 118 part_entities = yield ndb.get_multi_async(part_keys)
97 serialized = ''.join(p.value for p in part_entities if p is not None) 119 serialized = ''.join(p.value for p in part_entities if p is not None)
98 entity.SetData(pickle.loads(serialized)) 120 self.SetData(pickle.loads(serialized))
99 121
100 @classmethod 122 @classmethod
101 def _pre_delete_hook(cls, key): 123 @ndb.tasklet
102 """Deletes PartEntity entities.""" 124 def DeleteAsync(cls, key):
103 part_keys = PartEntity.query(ancestor=key).fetch(keys_only=True) 125 part_keys = yield PartEntity.query(ancestor=key).fetch_async(keys_only=True)
104 ndb.delete_multi(part_keys) 126 yield ndb.delete_multi_async(part_keys)
105 127
106 def Save(self): 128 @ndb.tasklet
129 def PutAsync(self):
107 """Stores serialized data over multiple PartEntity.""" 130 """Stores serialized data over multiple PartEntity."""
108 serialized_parts = _Serialize(self.GetData()) 131 serialized_parts = _Serialize(self.GetData())
109 if len(serialized_parts) > _MAX_NUM_PARTS: 132 if len(serialized_parts) > _MAX_NUM_PARTS:
110 logging.error('Max number of parts reached.') 133 logging.error('Max number of parts reached.')
111 return 134 return
112 part_list = [] 135 part_list = []
113 num_parts = len(serialized_parts) 136 num_parts = len(serialized_parts)
114 for i in xrange(num_parts): 137 for i in xrange(num_parts):
115 if serialized_parts[i] is not None: 138 if serialized_parts[i] is not None:
116 part = PartEntity(id=i + 1, parent=self.key, value=serialized_parts[i]) 139 part = PartEntity(id=i + 1, parent=self.key, value=serialized_parts[i])
117 part_list.append(part) 140 part_list.append(part)
118 self.size = num_parts 141 self.size = num_parts
119 ndb.put_multi(part_list + [self]) 142 yield ndb.put_multi_async(part_list + [self])
120 143
121 def GetData(self): 144 def GetData(self):
122 return getattr(self, '_data', None) 145 return getattr(self, '_data', None)
123 146
124 def SetData(self, data): 147 def SetData(self, data):
125 setattr(self, '_data', data) 148 setattr(self, '_data', data)
126 149
127 150
128 class PartEntity(ndb.Model): 151 class PartEntity(ndb.Model):
129 """Holds a part of serialized data for MultipartEntity. 152 """Holds a part of serialized data for MultipartEntity.
130 153
131 This entity key has the form: 154 This entity key has the form:
132 ndb.Key('MultipartEntity', multipart_entity_id, 'PartEntity', part_index) 155 ndb.Key('MultipartEntity', multipart_entity_id, 'PartEntity', part_index)
133 """ 156 """
134 value = ndb.BlobProperty() 157 value = ndb.BlobProperty()
135 158
136 159
137 class MultipartCache(object): 160 class MultipartCache(object):
138 """Contains operations for storing values over multiple memcache keys. 161 """Contains operations for storing values over multiple memcache keys.
139 162
140 Values are serialized, split, and stored over multiple memcache keys. The 163 Values are serialized, split, and stored over multiple memcache keys. The
141 head cache stores the expected size. 164 head cache stores the expected size.
142 """ 165 """
143 166
144 @classmethod 167 @classmethod
145 def Get(cls, key): 168 @ndb.tasklet
169 def GetAsync(cls, key):
146 """Gets value in memcache.""" 170 """Gets value in memcache."""
147 keys = cls._GetCacheKeyList(key) 171 keys = cls._GetCacheKeyList(key)
148 head_key = cls._GetCacheKey(key) 172 head_key = cls._GetCacheKey(key)
149 cache_values = memcache.get_multi(keys) 173 client = memcache.Client()
174 cache_values = yield client.get_multi_async(keys)
150 # Whether we have all the memcache values. 175 # Whether we have all the memcache values.
151 if len(keys) != len(cache_values) or head_key not in cache_values: 176 if len(keys) != len(cache_values) or head_key not in cache_values:
152 return None 177 raise ndb.Return(None)
153 178
154 serialized = '' 179 serialized = ''
155 cache_size = cache_values[head_key] 180 cache_size = cache_values[head_key]
156 keys.remove(head_key) 181 keys.remove(head_key)
157 for key in keys[:cache_size]: 182 for key in keys[:cache_size]:
158 if key not in cache_values: 183 if key not in cache_values:
159 return None 184 raise ndb.Return(None)
160 if cache_values[key] is not None: 185 if cache_values[key] is not None:
161 serialized += cache_values[key] 186 serialized += cache_values[key]
162 return pickle.loads(serialized) 187 raise ndb.Return(pickle.loads(serialized))
163 188
164 @classmethod 189 @classmethod
165 def Set(cls, key, value): 190 @ndb.tasklet
191 def SetAsync(cls, key, value):
166 """Sets a value in memcache.""" 192 """Sets a value in memcache."""
167 serialized_parts = _Serialize(value) 193 serialized_parts = _Serialize(value)
168 if len(serialized_parts) > _MAX_NUM_PARTS: 194 if len(serialized_parts) > _MAX_NUM_PARTS:
169 logging.error('Max number of parts reached.') 195 logging.error('Max number of parts reached.')
170 return 196 raise ndb.Return(None)
171 197
172 cached_values = {} 198 cached_values = {}
173 cached_values[cls._GetCacheKey(key)] = len(serialized_parts) 199 cached_values[cls._GetCacheKey(key)] = len(serialized_parts)
174 for i in xrange(len(serialized_parts)): 200 for i in xrange(len(serialized_parts)):
175 cached_values[cls._GetCacheKey(key, i)] = serialized_parts[i] 201 cached_values[cls._GetCacheKey(key, i)] = serialized_parts[i]
176 memcache.set_multi(cached_values) 202 client = memcache.Client()
203 yield client.set_multi_async(cached_values)
177 204
178 @classmethod 205 @classmethod
206 @ndb.synctasklet
179 def Delete(cls, key): 207 def Delete(cls, key):
180 """Deletes all cached values for key.""" 208 """Deletes all cached values for key."""
181 memcache.delete_multi(cls._GetCacheKeyList(key)) 209 yield cls.DeleteAsync(key)
210
211 @classmethod
212 @ndb.tasklet
213 def DeleteAsync(cls, key):
214 client = memcache.Client()
215 yield client.delete_multi_async(cls._GetCacheKeyList(key))
182 216
183 @classmethod 217 @classmethod
184 def _GetCacheKeyList(cls, key): 218 def _GetCacheKeyList(cls, key):
185 """Gets a list of head cache key and cache key parts.""" 219 """Gets a list of head cache key and cache key parts."""
186 keys = [cls._GetCacheKey(key, i) for i in xrange(_MAX_NUM_PARTS)] 220 keys = [cls._GetCacheKey(key, i) for i in xrange(_MAX_NUM_PARTS)]
187 keys.append(cls._GetCacheKey(key)) 221 keys.append(cls._GetCacheKey(key))
188 return keys 222 return keys
189 223
190 @classmethod 224 @classmethod
191 def _GetCacheKey(cls, key, index=None): 225 def _GetCacheKey(cls, key, index=None):
192 """Returns either head cache key or cache key part.""" 226 """Returns either head cache key or cache key part."""
193 if index is not None: 227 if index is not None:
194 return _MULTIPART_ENTITY_MEMCACHE_KEY + '%s.%s' % (key, index) 228 return _MULTIPART_ENTITY_MEMCACHE_KEY + '%s.%s' % (key, index)
195 return _MULTIPART_ENTITY_MEMCACHE_KEY + key 229 return _MULTIPART_ENTITY_MEMCACHE_KEY + key
196 230
197 231
232 @ndb.tasklet
198 def _GetValueFromDatastore(key): 233 def _GetValueFromDatastore(key):
199 entity = ndb.Key(MultipartEntity, key).get() 234 entity = yield ndb.Key(MultipartEntity, key).get_async()
200 if not entity: 235 if not entity:
201 return None 236 raise ndb.Return(None)
202 return entity.GetData() 237 yield entity.GetPartsAsync()
238 raise ndb.Return(entity.GetData())
203 239
204 240
205 def _Serialize(value): 241 def _Serialize(value):
206 """Serializes value and returns a list of its parts. 242 """Serializes value and returns a list of its parts.
207 243
208 Args: 244 Args:
209 value: A pickleable value. 245 value: A pickleable value.
210 246
211 Returns: 247 Returns:
212 A list of string representation of the value that has been pickled and split 248 A list of string representation of the value that has been pickled and split
213 into _CHUNK_SIZE. 249 into _CHUNK_SIZE.
214 """ 250 """
215 serialized = pickle.dumps(value, 2) 251 serialized = pickle.dumps(value, 2)
216 length = len(serialized) 252 length = len(serialized)
217 values = [] 253 values = []
218 for i in xrange(0, length, _CHUNK_SIZE): 254 for i in xrange(0, length, _CHUNK_SIZE):
219 values.append(serialized[i:i + _CHUNK_SIZE]) 255 values.append(serialized[i:i + _CHUNK_SIZE])
220 for i in xrange(len(values), _MAX_NUM_PARTS): 256 for i in xrange(len(values), _MAX_NUM_PARTS):
221 values.append(None) 257 values.append(None)
222 return values 258 return values
OLDNEW
« no previous file with comments | « dashboard/dashboard/common/namespaced_stored_object.py ('k') | dashboard/dashboard/graph_revisions.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698