OLD | NEW |
| (Empty) |
1 // Copyright 2015 The Chromium Authors. All rights reserved. | |
2 // Use of this source code is governed by a BSD-style license that can be | |
3 // found in the LICENSE file. | |
4 | |
5 package memory | |
6 | |
7 import ( | |
8 "bytes" | |
9 "errors" | |
10 "fmt" | |
11 "sync" | |
12 "sync/atomic" | |
13 | |
14 "golang.org/x/net/context" | |
15 | |
16 "github.com/luci/gae" | |
17 "github.com/luci/gae/helper" | |
18 ) | |
19 | |
20 //////////////////////////////// dataStoreData ///////////////////////////////// | |
21 | |
22 type dataStoreData struct { | |
23 rwlock sync.RWMutex | |
24 // See README.md for store schema. | |
25 store *memStore | |
26 snap *memStore | |
27 } | |
28 | |
29 var ( | |
30 _ = memContextObj((*dataStoreData)(nil)) | |
31 _ = sync.Locker((*dataStoreData)(nil)) | |
32 ) | |
33 | |
34 func newDataStoreData() *dataStoreData { | |
35 store := newMemStore() | |
36 return &dataStoreData{ | |
37 store: store, | |
38 snap: store.Snapshot(), // empty but better than a nil pointer. | |
39 } | |
40 } | |
41 | |
42 func (d *dataStoreData) Lock() { | |
43 d.rwlock.Lock() | |
44 } | |
45 | |
46 func (d *dataStoreData) Unlock() { | |
47 d.rwlock.Unlock() | |
48 } | |
49 | |
50 /////////////////////////// indicies(dataStoreData) //////////////////////////// | |
51 | |
52 func groupMetaKey(key gae.DSKey) []byte { | |
53 return keyBytes(helper.WithoutContext, | |
54 helper.NewDSKey("", "", "__entity_group__", "", 1, helper.DSKeyR
oot(key))) | |
55 } | |
56 | |
57 func groupIDsKey(key gae.DSKey) []byte { | |
58 return keyBytes(helper.WithoutContext, | |
59 helper.NewDSKey("", "", "__entity_group_ids__", "", 1, helper.DS
KeyRoot(key))) | |
60 } | |
61 | |
62 func rootIDsKey(kind string) []byte { | |
63 return keyBytes(helper.WithoutContext, | |
64 helper.NewDSKey("", "", "__entity_root_ids__", kind, 0, nil)) | |
65 } | |
66 | |
67 func curVersion(ents *memCollection, key []byte) int64 { | |
68 if v := ents.Get(key); v != nil { | |
69 pm, err := rpm(v) | |
70 if err != nil { | |
71 panic(err) // memory corruption | |
72 } | |
73 pl, ok := pm["__version__"] | |
74 if ok && len(pl) > 0 && pl[0].Type() == gae.DSPTInt { | |
75 return pl[0].Value().(int64) | |
76 } | |
77 panic(fmt.Errorf("__version__ property missing or wrong: %v", pm
)) | |
78 } | |
79 return 0 | |
80 } | |
81 | |
82 func incrementLocked(ents *memCollection, key []byte) int64 { | |
83 ret := curVersion(ents, key) + 1 | |
84 buf := &bytes.Buffer{} | |
85 helper.WriteDSPropertyMap( | |
86 buf, gae.DSPropertyMap{"__version__": {gae.MkDSPropertyNI(ret)}}
, helper.WithContext) | |
87 ents.Set(key, buf.Bytes()) | |
88 return ret | |
89 } | |
90 | |
91 func (d *dataStoreData) entsKeyLocked(key gae.DSKey) (*memCollection, gae.DSKey)
{ | |
92 coll := "ents:" + key.Namespace() | |
93 ents := d.store.GetCollection(coll) | |
94 if ents == nil { | |
95 ents = d.store.SetCollection(coll, nil) | |
96 } | |
97 | |
98 if helper.DSKeyIncomplete(key) { | |
99 idKey := []byte(nil) | |
100 if key.Parent() == nil { | |
101 idKey = rootIDsKey(key.Kind()) | |
102 } else { | |
103 idKey = groupIDsKey(key) | |
104 } | |
105 id := incrementLocked(ents, idKey) | |
106 key = helper.NewDSKey(key.AppID(), key.Namespace(), key.Kind(),
"", id, key.Parent()) | |
107 } | |
108 | |
109 return ents, key | |
110 } | |
111 | |
112 func (d *dataStoreData) put(ns string, key gae.DSKey, pls gae.DSPropertyLoadSave
r) (gae.DSKey, error) { | |
113 keys, errs := d.putMulti(ns, []gae.DSKey{key}, []gae.DSPropertyLoadSaver
{pls}) | |
114 if errs == nil { | |
115 return keys[0], nil | |
116 } | |
117 return nil, gae.SingleError(errs) | |
118 } | |
119 | |
120 func (d *dataStoreData) putMulti(ns string, keys []gae.DSKey, plss []gae.DSPrope
rtyLoadSaver) ([]gae.DSKey, error) { | |
121 pmaps, err := putMultiPrelim(ns, keys, plss) | |
122 if err != nil { | |
123 return nil, err | |
124 } | |
125 return d.putMultiInner(keys, pmaps) | |
126 } | |
127 | |
128 func putMultiPrelim(ns string, keys []gae.DSKey, plss []gae.DSPropertyLoadSaver)
([]gae.DSPropertyMap, error) { | |
129 err := multiValid(keys, plss, ns, true, false) | |
130 if err != nil { | |
131 return nil, err | |
132 } | |
133 pmaps := make([]gae.DSPropertyMap, len(keys)) | |
134 lme := gae.LazyMultiError{Size: len(keys)} | |
135 for i, pls := range plss { | |
136 pm, err := pls.Save(false) | |
137 lme.Assign(i, err) | |
138 pmaps[i] = pm | |
139 } | |
140 return pmaps, lme.Get() | |
141 } | |
142 | |
143 func (d *dataStoreData) putMultiInner(keys []gae.DSKey, data []gae.DSPropertyMap
) ([]gae.DSKey, error) { | |
144 retKeys := make([]gae.DSKey, len(keys)) | |
145 lme := gae.LazyMultiError{Size: len(keys)} | |
146 for i, k := range keys { | |
147 buf := &bytes.Buffer{} | |
148 helper.WriteDSPropertyMap(buf, data[i], helper.WithoutContext) | |
149 dataBytes := buf.Bytes() | |
150 | |
151 rKey, err := func() (ret gae.DSKey, err error) { | |
152 d.rwlock.Lock() | |
153 defer d.rwlock.Unlock() | |
154 | |
155 ents, ret := d.entsKeyLocked(k) | |
156 incrementLocked(ents, groupMetaKey(ret)) | |
157 | |
158 old := ents.Get(keyBytes(helper.WithoutContext, ret)) | |
159 oldPM := gae.DSPropertyMap(nil) | |
160 if old != nil { | |
161 if oldPM, err = rpmWoCtx(old, ret.Namespace());
err != nil { | |
162 return | |
163 } | |
164 } | |
165 updateIndicies(d.store, ret, oldPM, data[i]) | |
166 ents.Set(keyBytes(helper.WithoutContext, ret), dataBytes
) | |
167 return | |
168 }() | |
169 lme.Assign(i, err) | |
170 retKeys[i] = rKey | |
171 } | |
172 return retKeys, lme.Get() | |
173 } | |
174 | |
175 func getMultiInner(ns string, keys []gae.DSKey, plss []gae.DSPropertyLoadSaver,
getColl func() (*memCollection, error)) error { | |
176 if err := multiValid(keys, plss, ns, false, true); err != nil { | |
177 return err | |
178 } | |
179 | |
180 lme := gae.LazyMultiError{Size: len(keys)} | |
181 | |
182 ents, err := getColl() | |
183 if err != nil { | |
184 return err | |
185 } | |
186 if ents == nil { | |
187 for i := range keys { | |
188 lme.Assign(i, gae.ErrDSNoSuchEntity) | |
189 } | |
190 return lme.Get() | |
191 } | |
192 | |
193 for i, k := range keys { | |
194 pdata := ents.Get(keyBytes(helper.WithoutContext, k)) | |
195 if pdata == nil { | |
196 lme.Assign(i, gae.ErrDSNoSuchEntity) | |
197 continue | |
198 } | |
199 | |
200 got, err := rpmWoCtx(pdata, ns) | |
201 if err != nil { | |
202 lme.Assign(i, err) | |
203 continue | |
204 } | |
205 | |
206 lme.Assign(i, plss[i].Load(got)) | |
207 } | |
208 return lme.Get() | |
209 } | |
210 | |
211 func (d *dataStoreData) get(ns string, key gae.DSKey, pls gae.DSPropertyLoadSave
r) error { | |
212 return gae.SingleError(d.getMulti(ns, []gae.DSKey{key}, []gae.DSProperty
LoadSaver{pls})) | |
213 } | |
214 | |
215 func (d *dataStoreData) getMulti(ns string, keys []gae.DSKey, plss []gae.DSPrope
rtyLoadSaver) error { | |
216 return getMultiInner(ns, keys, plss, func() (*memCollection, error) { | |
217 d.rwlock.RLock() | |
218 s := d.store.Snapshot() | |
219 d.rwlock.RUnlock() | |
220 | |
221 return s.GetCollection("ents:" + ns), nil | |
222 }) | |
223 } | |
224 | |
225 func (d *dataStoreData) del(ns string, key gae.DSKey) (err error) { | |
226 return gae.SingleError(d.delMulti(ns, []gae.DSKey{key})) | |
227 } | |
228 | |
229 func (d *dataStoreData) delMulti(ns string, keys []gae.DSKey) error { | |
230 lme := gae.LazyMultiError{Size: len(keys)} | |
231 toDel := make([][]byte, 0, len(keys)) | |
232 for i, k := range keys { | |
233 if !helper.DSKeyValid(k, ns, false) { | |
234 lme.Assign(i, gae.ErrDSInvalidKey) | |
235 continue | |
236 } | |
237 toDel = append(toDel, keyBytes(helper.WithoutContext, k)) | |
238 } | |
239 err := lme.Get() | |
240 if err != nil { | |
241 return err | |
242 } | |
243 | |
244 d.rwlock.Lock() | |
245 defer d.rwlock.Unlock() | |
246 | |
247 ents := d.store.GetCollection("ents:" + ns) | |
248 if ents == nil { | |
249 return nil | |
250 } | |
251 | |
252 for i, k := range keys { | |
253 incrementLocked(ents, groupMetaKey(k)) | |
254 kb := toDel[i] | |
255 old := ents.Get(kb) | |
256 oldPM := gae.DSPropertyMap(nil) | |
257 if old != nil { | |
258 if oldPM, err = rpmWoCtx(old, ns); err != nil { | |
259 lme.Assign(i, err) | |
260 continue | |
261 } | |
262 } | |
263 updateIndicies(d.store, k, oldPM, nil) | |
264 ents.Delete(kb) | |
265 } | |
266 return lme.Get() | |
267 } | |
268 | |
269 func (d *dataStoreData) canApplyTxn(obj memContextObj) bool { | |
270 // TODO(riannucci): implement with Flush/FlushRevert for persistance. | |
271 | |
272 txn := obj.(*txnDataStoreData) | |
273 for rk, muts := range txn.muts { | |
274 if len(muts) == 0 { // read-only | |
275 continue | |
276 } | |
277 k, err := helper.ReadDSKey(bytes.NewBufferString(rk), helper.Wit
hContext, "", "") | |
278 if err != nil { | |
279 panic(err) | |
280 } | |
281 | |
282 entKey := "ents:" + k.Namespace() | |
283 mkey := groupMetaKey(k) | |
284 entsHead := d.store.GetCollection(entKey) | |
285 entsSnap := txn.snap.GetCollection(entKey) | |
286 vHead := curVersion(entsHead, mkey) | |
287 vSnap := curVersion(entsSnap, mkey) | |
288 if vHead != vSnap { | |
289 return false | |
290 } | |
291 } | |
292 return true | |
293 } | |
294 | |
295 func (d *dataStoreData) applyTxn(c context.Context, obj memContextObj) { | |
296 txn := obj.(*txnDataStoreData) | |
297 for _, muts := range txn.muts { | |
298 if len(muts) == 0 { // read-only | |
299 continue | |
300 } | |
301 for _, m := range muts { | |
302 err := error(nil) | |
303 if m.data == nil { | |
304 err = d.del(m.key.Namespace(), m.key) | |
305 } else { | |
306 _, err = d.put(m.key.Namespace(), m.key, m.data) | |
307 } | |
308 if err != nil { | |
309 panic(err) | |
310 } | |
311 } | |
312 } | |
313 } | |
314 | |
315 func (d *dataStoreData) mkTxn(o *gae.DSTransactionOptions) memContextObj { | |
316 return &txnDataStoreData{ | |
317 // alias to the main datastore's so that testing code can have p
rimitive | |
318 // access to break features inside of transactions. | |
319 parent: d, | |
320 isXG: o != nil && o.XG, | |
321 snap: d.store.Snapshot(), | |
322 muts: map[string][]txnMutation{}, | |
323 } | |
324 } | |
325 | |
326 func (d *dataStoreData) endTxn() {} | |
327 | |
328 /////////////////////////////// txnDataStoreData /////////////////////////////// | |
329 | |
330 type txnMutation struct { | |
331 key gae.DSKey | |
332 data gae.DSPropertyMap | |
333 } | |
334 | |
335 type txnDataStoreData struct { | |
336 sync.Mutex | |
337 | |
338 parent *dataStoreData | |
339 | |
340 // boolean 0 or 1, use atomic.*Int32 to access. | |
341 closed int32 | |
342 isXG bool | |
343 | |
344 snap *memStore | |
345 | |
346 // string is the raw-bytes encoding of the entity root incl. namespace | |
347 muts map[string][]txnMutation | |
348 // TODO(riannucci): account for 'transaction size' limit of 10MB by summ
ing | |
349 // length of encoded keys + values. | |
350 } | |
351 | |
352 var _ memContextObj = (*txnDataStoreData)(nil) | |
353 | |
354 const xgEGLimit = 25 | |
355 | |
356 func (*txnDataStoreData) canApplyTxn(memContextObj) bool { return false } | |
357 func (td *txnDataStoreData) endTxn() { | |
358 if atomic.LoadInt32(&td.closed) == 1 { | |
359 panic("cannot end transaction twice") | |
360 } | |
361 atomic.StoreInt32(&td.closed, 1) | |
362 } | |
363 func (*txnDataStoreData) applyTxn(context.Context, memContextObj) { | |
364 panic("txnDataStoreData cannot apply transactions") | |
365 } | |
366 func (*txnDataStoreData) mkTxn(*gae.DSTransactionOptions) memContextObj { | |
367 panic("impossible") | |
368 } | |
369 | |
370 func (td *txnDataStoreData) run(f func() error) error { | |
371 // Slightly different from the SDK... datastore and taskqueue each imple
ment | |
372 // this here, where in the SDK only datastore.transaction.Call does. | |
373 if atomic.LoadInt32(&td.closed) == 1 { | |
374 return errors.New("datastore: transaction context has expired") | |
375 } | |
376 return f() | |
377 } | |
378 | |
379 // writeMutation ensures that this transaction can support the given key/value | |
380 // mutation. | |
381 // | |
382 // if getOnly is true, don't record the actual mutation data, just ensure that | |
383 // the key is in an included entity group (or add an empty entry for tha
t | |
384 // group). | |
385 // | |
386 // if !getOnly && data == nil, this counts as a deletion instead of a Put. | |
387 // | |
388 // Returns an error if this key causes the transaction to cross too many entity | |
389 // groups. | |
390 func (td *txnDataStoreData) writeMutation(getOnly bool, key gae.DSKey, data gae.
DSPropertyMap) error { | |
391 rk := string(keyBytes(helper.WithContext, helper.DSKeyRoot(key))) | |
392 | |
393 td.Lock() | |
394 defer td.Unlock() | |
395 | |
396 if _, ok := td.muts[rk]; !ok { | |
397 limit := 1 | |
398 if td.isXG { | |
399 limit = xgEGLimit | |
400 } | |
401 if len(td.muts)+1 > limit { | |
402 msg := "cross-group transaction need to be explicitly sp
ecified (xg=True)" | |
403 if td.isXG { | |
404 msg = "operating on too many entity groups in a
single transaction" | |
405 } | |
406 return errors.New(msg) | |
407 } | |
408 td.muts[rk] = []txnMutation{} | |
409 } | |
410 if !getOnly { | |
411 td.muts[rk] = append(td.muts[rk], txnMutation{key, data}) | |
412 } | |
413 | |
414 return nil | |
415 } | |
416 | |
417 func (td *txnDataStoreData) put(ns string, key gae.DSKey, pls gae.DSPropertyLoad
Saver) (gae.DSKey, error) { | |
418 keys, errs := td.putMulti(ns, []gae.DSKey{key}, []gae.DSPropertyLoadSave
r{pls}) | |
419 if errs == nil { | |
420 return keys[0], nil | |
421 } | |
422 return nil, gae.SingleError(errs) | |
423 } | |
424 | |
425 func (td *txnDataStoreData) putMulti(ns string, keys []gae.DSKey, plss []gae.DSP
ropertyLoadSaver) ([]gae.DSKey, error) { | |
426 pmaps, err := putMultiPrelim(ns, keys, plss) | |
427 if err != nil { | |
428 return nil, err | |
429 } | |
430 | |
431 retKeys := make([]gae.DSKey, len(keys)) | |
432 lme := gae.LazyMultiError{Size: len(keys)} | |
433 for i, k := range keys { | |
434 func() { | |
435 td.parent.Lock() | |
436 defer td.parent.Unlock() | |
437 _, k = td.parent.entsKeyLocked(k) | |
438 }() | |
439 lme.Assign(i, td.writeMutation(false, k, pmaps[i])) | |
440 retKeys[i] = k | |
441 } | |
442 | |
443 return retKeys, lme.Get() | |
444 } | |
445 | |
446 func (td *txnDataStoreData) get(ns string, key gae.DSKey, pls gae.DSPropertyLoad
Saver) error { | |
447 return gae.SingleError(td.getMulti(ns, []gae.DSKey{key}, []gae.DSPropert
yLoadSaver{pls})) | |
448 } | |
449 | |
450 func (td *txnDataStoreData) getMulti(ns string, keys []gae.DSKey, plss []gae.DSP
ropertyLoadSaver) error { | |
451 return getMultiInner(ns, keys, plss, func() (*memCollection, error) { | |
452 lme := gae.LazyMultiError{Size: len(keys)} | |
453 for i, k := range keys { | |
454 lme.Assign(i, td.writeMutation(true, k, nil)) | |
455 } | |
456 return td.snap.GetCollection("ents:" + ns), lme.Get() | |
457 }) | |
458 } | |
459 | |
460 func (td *txnDataStoreData) del(ns string, key gae.DSKey) error { | |
461 return gae.SingleError(td.delMulti(ns, []gae.DSKey{key})) | |
462 } | |
463 | |
464 func (td *txnDataStoreData) delMulti(ns string, keys []gae.DSKey) error { | |
465 lme := gae.LazyMultiError{Size: len(keys)} | |
466 for i, k := range keys { | |
467 if !helper.DSKeyValid(k, ns, false) { | |
468 lme.Assign(i, gae.ErrDSInvalidKey) | |
469 } else { | |
470 lme.Assign(i, td.writeMutation(false, k, nil)) | |
471 } | |
472 } | |
473 return lme.Get() | |
474 } | |
475 | |
476 func keyBytes(ctx helper.DSKeyContext, key gae.DSKey) []byte { | |
477 buf := &bytes.Buffer{} | |
478 helper.WriteDSKey(buf, ctx, key) | |
479 return buf.Bytes() | |
480 } | |
481 | |
482 func rpmWoCtx(data []byte, ns string) (gae.DSPropertyMap, error) { | |
483 return helper.ReadDSPropertyMap(bytes.NewBuffer(data), helper.WithoutCon
text, globalAppID, ns) | |
484 } | |
485 | |
486 func rpm(data []byte) (gae.DSPropertyMap, error) { | |
487 return helper.ReadDSPropertyMap(bytes.NewBuffer(data), helper.WithContex
t, "", "") | |
488 } | |
489 | |
490 func multiValid(keys []gae.DSKey, plss []gae.DSPropertyLoadSaver, ns string, pot
entialKey, allowSpecial bool) error { | |
491 vfn := func(k gae.DSKey) bool { | |
492 return !helper.DSKeyIncomplete(k) && helper.DSKeyValid(k, ns, al
lowSpecial) | |
493 } | |
494 if potentialKey { | |
495 vfn = func(k gae.DSKey) bool { | |
496 // adds an id to k if it's incomplete. | |
497 if helper.DSKeyIncomplete(k) { | |
498 k = helper.NewDSKey(k.AppID(), k.Namespace(), k.
Kind(), "", 1, k.Parent()) | |
499 } | |
500 return helper.DSKeyValid(k, ns, allowSpecial) | |
501 } | |
502 } | |
503 | |
504 if keys == nil || plss == nil { | |
505 return errors.New("gae: key or plss slices were nil") | |
506 } | |
507 if len(keys) != len(plss) { | |
508 return errors.New("gae: key and dst slices have different length
") | |
509 } | |
510 lme := gae.LazyMultiError{Size: len(keys)} | |
511 for i, k := range keys { | |
512 if !vfn(k) { | |
513 lme.Assign(i, gae.ErrDSInvalidKey) | |
514 } | |
515 } | |
516 return lme.Get() | |
517 } | |
OLD | NEW |