OLD | NEW |
---|---|
(Empty) | |
1 // Copyright 2015 The Chromium Authors. All rights reserved. | |
2 // Use of this source code is governed by a BSD-style license that can be | |
3 // found in the LICENSE file. | |
4 | |
5 package memory | |
6 | |
7 import ( | |
8 "errors" | |
9 "infra/gae/libs/wrapper" | |
10 goon_internal "infra/gae/libs/wrapper/memory/internal/goon" | |
11 "math/rand" | |
12 "sync" | |
13 "sync/atomic" | |
14 | |
15 "github.com/mjibson/goon" | |
16 | |
17 "appengine/datastore" | |
18 pb "appengine_internal/datastore" | |
19 ) | |
20 | |
21 ////////////////////////////////// knrKeeper /////////////////////////////////// | |
22 | |
23 type knrKeeper struct { | |
24 knrLock sync.Mutex | |
25 knrFunc goon.KindNameResolver | |
26 } | |
27 | |
28 func (k *knrKeeper) KindNameResolver() goon.KindNameResolver { | |
29 k.knrLock.Lock() | |
30 defer k.knrLock.Unlock() | |
31 if k.knrFunc == nil { | |
32 k.knrFunc = goon.DefaultKindName | |
33 } | |
34 return k.knrFunc | |
35 } | |
36 | |
37 func (k *knrKeeper) SetKindNameResolver(knr goon.KindNameResolver) { | |
38 k.knrLock.Lock() | |
39 defer k.knrLock.Unlock() | |
40 if knr == nil { | |
41 knr = goon.DefaultKindName | |
42 } | |
43 k.knrFunc = knr | |
44 } | |
45 | |
46 //////////////////////////////// dataStoreData ///////////////////////////////// | |
47 | |
48 type dataStoreData struct { | |
49 // implements memContextObj. | |
50 | |
51 wrapper.BrokenFeatures | |
52 knrKeeper | |
53 | |
54 rwlock sync.RWMutex | |
55 // See README.md for store schema. | |
56 store *memStore | |
57 snap *memStore | |
58 } | |
59 | |
60 ////////////////////////////// New(dataStoreData) ////////////////////////////// | |
M-A Ruel
2015/05/29 18:16:52
FTR, I don't see value in these kinds of separator
| |
61 | |
62 func newDataStoreData() *dataStoreData { | |
M-A Ruel
2015/05/29 18:16:52
The thing I'd really like for each object is for y
| |
63 store := newMemStore() | |
64 return &dataStoreData{ | |
65 BrokenFeatures: wrapper.BrokenFeatures{DefaultError: newDSError( pb.Error_INTERNAL_ERROR)}, | |
66 store: store, | |
67 snap: store.Snapshot(), // empty but better than a nil pointer. | |
68 } | |
69 } | |
70 | |
71 ////////////////////////////// Locker(Datastore) /////////////////////////////// | |
72 | |
73 func (d *dataStoreData) Lock() { | |
74 d.rwlock.Lock() | |
75 } | |
76 | |
77 func (d *dataStoreData) Unlock() { | |
78 d.rwlock.Unlock() | |
79 } | |
80 | |
81 func groupMetaKey(key *datastore.Key) []byte { | |
82 return keyBytes(noNS, newKey("", "__entity_group__", "", 1, rootKey(key) )) | |
83 } | |
84 | |
85 func groupIDsKey(key *datastore.Key) []byte { | |
86 return keyBytes(noNS, newKey("", "__entity_group_ids__", "", 1, rootKey( key))) | |
87 } | |
88 | |
89 func rootIDsKey(kind string) []byte { | |
90 return keyBytes(noNS, newKey("", "__entity_root_ids__", kind, 0, nil)) | |
91 } | |
92 | |
93 func curVersion(ents *memCollection, key []byte) (int64, error) { | |
94 if v := ents.Get(key); v != nil { | |
95 numData := &propertyList{} | |
96 if err := numData.UnmarshalBinary(v); err != nil { | |
97 return 0, err | |
98 } | |
99 return (*numData)[0].Value.(int64), nil | |
100 } | |
101 return 0, nil | |
102 } | |
103 | |
104 func incrementLocked(ents *memCollection, key []byte) (int64, error) { | |
105 num := int64(0) | |
106 numData := &propertyList{} | |
107 if v := ents.Get(key); v != nil { | |
108 if err := numData.UnmarshalBinary(v); err != nil { | |
109 return 0, err | |
110 } | |
111 num = (*numData)[0].Value.(int64) | |
112 } else { | |
113 *numData = append(*numData, datastore.Property{Name: "__version_ _"}) | |
114 } | |
115 num++ | |
116 (*numData)[0].Value = num | |
117 incData, err := numData.MarshalBinary() | |
118 if err != nil { | |
119 return 0, err | |
120 } | |
121 ents.Set(key, incData) | |
122 | |
123 return num, nil | |
124 } | |
125 | |
126 func (d *dataStoreData) entsKeyLocked(key *datastore.Key) (*memCollection, *data store.Key, error) { | |
127 coll := "ents:" + key.Namespace() | |
128 ents := d.store.GetCollection(coll) | |
129 if ents == nil { | |
130 ents = d.store.SetCollection(coll, nil) | |
131 } | |
132 | |
133 if key.Incomplete() { | |
134 idKey := []byte(nil) | |
135 if key.Parent() == nil { | |
136 idKey = rootIDsKey(key.Kind()) | |
137 } else { | |
138 idKey = groupIDsKey(key) | |
139 } | |
140 id, err := incrementLocked(ents, idKey) | |
141 if err != nil { | |
142 return nil, nil, err | |
143 } | |
144 key = newKey(key.Namespace(), key.Kind(), "", id, key.Parent()) | |
145 } | |
146 | |
147 return ents, key, nil | |
148 } | |
149 | |
150 func putPrelim(ns string, knr goon.KindNameResolver, src interface{}) (*datastor e.Key, *propertyList, error) { | |
151 key := newKeyObj(ns, knr, src) | |
152 if !KeyCouldBeValid(ns, key, UserKeyOnly) { | |
153 // TODO(riannucci): different error for Put-ing to reserved Keys ? | |
154 return nil, nil, datastore.ErrInvalidKey | |
155 } | |
156 | |
157 data, err := toPL(src) | |
158 return key, data, err | |
159 } | |
160 | |
161 func (d *dataStoreData) put(ns string, src interface{}) (*datastore.Key, error) { | |
162 key, plData, err := putPrelim(ns, d.KindNameResolver(), src) | |
163 if err != nil { | |
164 return nil, err | |
165 } | |
166 if key, err = d.putInner(key, plData); err != nil { | |
167 return nil, err | |
168 } | |
169 return key, goon_internal.SetStructKey(src, key, d.KindNameResolver()) | |
170 } | |
171 | |
172 func (d *dataStoreData) putInner(key *datastore.Key, data *propertyList) (*datas tore.Key, error) { | |
173 dataBytes, err := data.MarshalBinary() | |
174 if err != nil { | |
175 return nil, err | |
176 } | |
177 | |
178 d.rwlock.Lock() | |
179 defer d.rwlock.Unlock() | |
180 | |
181 ents, key, err := d.entsKeyLocked(key) | |
182 if err != nil { | |
183 return nil, err | |
184 } | |
185 if _, err = incrementLocked(ents, groupMetaKey(key)); err != nil { | |
186 return nil, err | |
187 } | |
188 | |
189 ents.Set(keyBytes(noNS, key), dataBytes) | |
190 | |
191 return key, nil | |
192 } | |
193 | |
194 func getInner(ns string, knr goon.KindNameResolver, dst interface{}, getColl fun c(*datastore.Key) (*memCollection, error)) error { | |
195 key := newKeyObj(ns, knr, dst) | |
196 if !KeyValid(ns, key, AllowSpecialKeys) { | |
197 return datastore.ErrInvalidKey | |
198 } | |
199 | |
200 ents, err := getColl(key) | |
201 if err != nil { | |
202 return err | |
203 } | |
204 if ents == nil { | |
205 return datastore.ErrNoSuchEntity | |
206 } | |
207 pdata := ents.Get(keyBytes(noNS, key)) | |
208 if pdata == nil { | |
209 return datastore.ErrNoSuchEntity | |
210 } | |
211 pl := &propertyList{} | |
212 if err = pl.UnmarshalBinary(pdata); err != nil { | |
213 return err | |
214 } | |
215 return fromPL(pl, dst) | |
216 } | |
217 | |
218 func (d *dataStoreData) get(ns string, dst interface{}) error { | |
219 return getInner(ns, d.KindNameResolver(), dst, func(*datastore.Key) (*me mCollection, error) { | |
220 d.rwlock.RLock() | |
221 s := d.store.Snapshot() | |
222 d.rwlock.RUnlock() | |
223 | |
224 return s.GetCollection("ents:" + ns), nil | |
225 }) | |
226 } | |
227 | |
228 func (d *dataStoreData) del(ns string, key *datastore.Key) error { | |
229 if !KeyValid(ns, key, UserKeyOnly) { | |
230 return datastore.ErrInvalidKey | |
231 } | |
232 | |
233 keyBuf := keyBytes(noNS, key) | |
234 | |
235 d.rwlock.Lock() | |
236 defer d.rwlock.Unlock() | |
237 | |
238 ents := d.store.GetCollection("ents:" + ns) | |
239 if ents == nil { | |
240 return nil | |
241 } | |
242 if _, err := incrementLocked(ents, groupMetaKey(key)); err != nil { | |
243 return err | |
244 } | |
245 | |
246 ents.Delete(keyBuf) | |
247 return nil | |
248 } | |
249 | |
250 ///////////////////////// memContextObj(dataStoreData) ///////////////////////// | |
251 | |
252 func (d *dataStoreData) canApplyTxn(obj memContextObj) bool { | |
253 // TODO(riannucci): implement with Flush/FlushRevert for persistance. | |
254 | |
255 txn := obj.(*txnDataStoreData) | |
256 for rk, muts := range txn.muts { | |
257 if len(muts) == 0 { // read-only | |
258 continue | |
259 } | |
260 k, err := keyFromByteString(withNS, rk) | |
261 if err != nil { | |
262 panic(err) | |
263 } | |
264 entKey := "ents:" + k.Namespace() | |
265 mkey := groupMetaKey(k) | |
266 entsHead := d.store.GetCollection(entKey) | |
267 entsSnap := txn.snap.GetCollection(entKey) | |
268 vHead, err := curVersion(entsHead, mkey) | |
269 if err != nil { | |
270 panic(err) | |
271 } | |
272 vSnap, err := curVersion(entsSnap, mkey) | |
273 if err != nil { | |
274 panic(err) | |
275 } | |
276 if vHead != vSnap { | |
277 return false | |
278 } | |
279 } | |
280 return true | |
281 } | |
282 | |
283 func (d *dataStoreData) applyTxn(r *rand.Rand, obj memContextObj) { | |
284 txn := obj.(*txnDataStoreData) | |
285 for _, muts := range txn.muts { | |
286 if len(muts) == 0 { // read-only | |
287 continue | |
288 } | |
289 for _, m := range muts { | |
290 err := error(nil) | |
291 if m.data == nil { | |
292 err = d.del(m.key.Namespace(), m.key) | |
293 } else { | |
294 _, err = d.putInner(m.key, m.data) | |
295 } | |
296 if err != nil { | |
297 panic(err) | |
298 } | |
299 } | |
300 } | |
301 } | |
302 | |
303 func (d *dataStoreData) mkTxn(o *datastore.TransactionOptions) (memContextObj, e rror) { | |
304 return &txnDataStoreData{ | |
305 // alias to the main datastore's so that testing code can have p rimitive | |
306 // access to break features inside of transactions. | |
307 BrokenFeatures: &d.BrokenFeatures, | |
308 parent: d, | |
309 knrKeeper: knrKeeper{knrFunc: d.knrFunc}, | |
310 isXG: o != nil && o.XG, | |
311 snap: d.store.Snapshot(), | |
312 muts: map[string][]txnMutation{}, | |
313 }, nil | |
314 } | |
315 | |
316 func (d *dataStoreData) endTxn() {} | |
317 | |
318 /////////////////////////////// txnDataStoreData /////////////////////////////// | |
M-A Ruel
2015/05/29 18:16:52
I don't know if you intended to insert a line in-b
| |
319 type txnMutation struct { | |
320 key *datastore.Key | |
321 data *propertyList | |
322 } | |
323 | |
324 type txnDataStoreData struct { | |
325 *wrapper.BrokenFeatures | |
326 knrKeeper | |
327 sync.Mutex | |
328 | |
329 parent *dataStoreData | |
330 | |
331 // boolean 0 or 1, use atomic.*Int32 to access. | |
332 closed int32 | |
333 isXG bool | |
334 | |
335 snap *memStore | |
336 | |
337 // string is the raw-bytes encoding of the entity root incl. namespace | |
338 muts map[string][]txnMutation | |
339 // TODO(riannucci): account for 'transaction size' limit of 10MB by summ ing | |
340 // length of encoded keys + values. | |
341 } | |
342 | |
343 const xgEGLimit = 25 | |
344 | |
345 /////////////////////// memContextObj(txnDataStoreData) //////////////////////// | |
M-A Ruel
2015/05/29 18:16:52
I assume you mean that txnDataStoreData implements
| |
346 | |
347 func (*txnDataStoreData) canApplyTxn(memContextObj) bool { return false } | |
348 func (td *txnDataStoreData) endTxn() { | |
349 if atomic.LoadInt32(&td.closed) == 1 { | |
350 panic("cannot end transaction twice") | |
351 } | |
352 atomic.StoreInt32(&td.closed, 1) | |
353 } | |
354 func (*txnDataStoreData) applyTxn(*rand.Rand, memContextObj) { | |
355 panic("txnDataStoreData cannot apply transactions") | |
356 } | |
357 func (*txnDataStoreData) mkTxn(*datastore.TransactionOptions) (memContextObj, er ror) { | |
358 return nil, errors.New("datastore: nested transactions are not supported ") | |
359 } | |
360 | |
361 /////////////////// wrapper.BrokenFeatures(txnDataStoreData) /////////////////// | |
362 | |
363 func (td *txnDataStoreData) IsBroken() error { | |
364 // Slightly different from the SDK... datastore and taskqueue each imple ment | |
365 // this here, where in the SDK only datastore.transaction.Call does. | |
366 if atomic.LoadInt32(&td.closed) == 1 { | |
367 return errors.New("datastore: transaction context has expired") | |
368 } | |
369 return td.BrokenFeatures.IsBroken() | |
370 } | |
371 | |
372 // writeMutation ensures that this transaction can support the given key/value | |
373 // mutation. | |
374 // | |
375 // if getOnly is true, don't record the actual mutation data, just ensure that | |
376 // the key is in an included entity group (or add an empty entry for tha t | |
377 // group). | |
378 // | |
379 // if !getOnly && data == nil, this counts as a deletion instead of a Put. | |
380 // | |
381 // Returns an error if this key causes the transaction to cross too many entity | |
382 // groups. | |
383 func (td *txnDataStoreData) writeMutation(getOnly bool, key *datastore.Key, data *propertyList) error { | |
384 rk := string(keyBytes(withNS, rootKey(key))) | |
385 | |
386 td.Lock() | |
387 defer td.Unlock() | |
388 | |
389 if _, ok := td.muts[rk]; !ok { | |
390 limit := 1 | |
391 if td.isXG { | |
392 limit = xgEGLimit | |
393 } | |
394 if len(td.muts)+1 > limit { | |
395 msg := "cross-group transaction need to be explicitly sp ecified (xg=True)" | |
396 if td.isXG { | |
397 msg = "operating on too many entity groups in a single transaction" | |
398 } | |
399 return newDSError(pb.Error_BAD_REQUEST, msg) | |
400 } | |
401 td.muts[rk] = []txnMutation{} | |
402 } | |
403 if !getOnly { | |
404 td.muts[rk] = append(td.muts[rk], txnMutation{key, data}) | |
405 } | |
406 | |
407 return nil | |
408 } | |
409 | |
410 func (td *txnDataStoreData) put(ns string, src interface{}) (*datastore.Key, err or) { | |
411 key, plData, err := putPrelim(ns, td.KindNameResolver(), src) | |
412 if err != nil { | |
413 return nil, err | |
414 } | |
415 | |
416 func() { | |
417 td.parent.Lock() | |
418 defer td.parent.Unlock() | |
419 _, key, err = td.parent.entsKeyLocked(key) | |
420 }() | |
421 if err != nil { | |
422 return nil, err | |
423 } | |
424 | |
425 if err = td.writeMutation(false, key, plData); err != nil { | |
426 return nil, err | |
427 } | |
428 | |
429 return key, goon_internal.SetStructKey(src, key, td.KindNameResolver()) | |
430 } | |
431 | |
432 func (td *txnDataStoreData) get(ns string, dst interface{}) error { | |
433 return getInner(ns, td.KindNameResolver(), dst, func(key *datastore.Key) (*memCollection, error) { | |
434 if err := td.writeMutation(true, key, nil); err != nil { | |
435 return nil, err | |
436 } | |
437 return td.snap.GetCollection("ents:" + ns), nil | |
438 }) | |
439 } | |
440 | |
441 func (td *txnDataStoreData) del(ns string, key *datastore.Key) error { | |
442 if !KeyValid(ns, key, UserKeyOnly) { | |
443 return datastore.ErrInvalidKey | |
444 } | |
445 return td.writeMutation(false, key, nil) | |
446 } | |
OLD | NEW |