OLD | NEW |
1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 package memory | 5 package memory |
6 | 6 |
7 import ( | 7 import ( |
8 "bytes" | 8 "bytes" |
9 "fmt" | 9 "fmt" |
10 "sync" | 10 "sync" |
11 "sync/atomic" | 11 "sync/atomic" |
12 | 12 |
13 ds "github.com/luci/gae/service/datastore" | 13 ds "github.com/luci/gae/service/datastore" |
14 "github.com/luci/gae/service/datastore/serialize" | 14 "github.com/luci/gae/service/datastore/serialize" |
15 "github.com/luci/luci-go/common/errors" | 15 "github.com/luci/luci-go/common/errors" |
16 "golang.org/x/net/context" | 16 "golang.org/x/net/context" |
17 ) | 17 ) |
18 | 18 |
19 //////////////////////////////// dataStoreData ///////////////////////////////// | 19 //////////////////////////////// dataStoreData ///////////////////////////////// |
20 | 20 |
21 type dataStoreData struct { | 21 type dataStoreData struct { |
22 rwlock sync.RWMutex | 22 rwlock sync.RWMutex |
| 23 |
| 24 // the 'appid' of this datastore |
| 25 aid string |
| 26 |
23 // See README.md for head schema. | 27 // See README.md for head schema. |
24 head *memStore | 28 head *memStore |
25 // if snap is nil, that means that this is always-consistent, and | 29 // if snap is nil, that means that this is always-consistent, and |
26 // getQuerySnaps will return (head, head) | 30 // getQuerySnaps will return (head, head) |
27 snap *memStore | 31 snap *memStore |
28 // For testing, see SetTransactionRetryCount. | 32 // For testing, see SetTransactionRetryCount. |
29 txnFakeRetry int | 33 txnFakeRetry int |
30 // true means that queries with insufficent indexes will pause to add th
em | 34 // true means that queries with insufficent indexes will pause to add th
em |
31 // and then continue instead of failing. | 35 // and then continue instead of failing. |
32 autoIndex bool | 36 autoIndex bool |
33 // true means that all of the __...__ keys which are normally automatica
lly | 37 // true means that all of the __...__ keys which are normally automatica
lly |
34 // maintained will be omitted. This also means that Put with an incomple
te | 38 // maintained will be omitted. This also means that Put with an incomple
te |
35 // key will become an error. | 39 // key will become an error. |
36 disableSpecialEntities bool | 40 disableSpecialEntities bool |
37 } | 41 } |
38 | 42 |
39 var ( | 43 var ( |
40 _ = memContextObj((*dataStoreData)(nil)) | 44 _ = memContextObj((*dataStoreData)(nil)) |
41 _ = sync.Locker((*dataStoreData)(nil)) | 45 _ = sync.Locker((*dataStoreData)(nil)) |
42 ) | 46 ) |
43 | 47 |
44 func newDataStoreData() *dataStoreData { | 48 func newDataStoreData(aid string) *dataStoreData { |
45 head := newMemStore() | 49 head := newMemStore() |
46 return &dataStoreData{ | 50 return &dataStoreData{ |
| 51 aid: aid, |
47 head: head, | 52 head: head, |
48 snap: head.Snapshot(), // empty but better than a nil pointer. | 53 snap: head.Snapshot(), // empty but better than a nil pointer. |
49 } | 54 } |
50 } | 55 } |
51 | 56 |
52 func (d *dataStoreData) Lock() { | 57 func (d *dataStoreData) Lock() { |
53 d.rwlock.Lock() | 58 d.rwlock.Lock() |
54 } | 59 } |
55 | 60 |
56 func (d *dataStoreData) Unlock() { | 61 func (d *dataStoreData) Unlock() { |
(...skipping 13 matching lines...) Expand all Loading... |
70 if always { | 75 if always { |
71 d.snap = nil | 76 d.snap = nil |
72 } else { | 77 } else { |
73 d.snap = d.head.Snapshot() | 78 d.snap = d.head.Snapshot() |
74 } | 79 } |
75 } | 80 } |
76 | 81 |
77 func (d *dataStoreData) addIndexes(ns string, idxs []*ds.IndexDefinition) { | 82 func (d *dataStoreData) addIndexes(ns string, idxs []*ds.IndexDefinition) { |
78 d.Lock() | 83 d.Lock() |
79 defer d.Unlock() | 84 defer d.Unlock() |
80 » addIndexes(d.head, ns, idxs) | 85 » addIndexes(d.head, d.aid, ns, idxs) |
81 } | 86 } |
82 | 87 |
83 func (d *dataStoreData) setAutoIndex(enable bool) { | 88 func (d *dataStoreData) setAutoIndex(enable bool) { |
84 d.Lock() | 89 d.Lock() |
85 defer d.Unlock() | 90 defer d.Unlock() |
86 d.autoIndex = enable | 91 d.autoIndex = enable |
87 } | 92 } |
88 | 93 |
89 func (d *dataStoreData) maybeAutoIndex(err error) bool { | 94 func (d *dataStoreData) maybeAutoIndex(err error) bool { |
90 mi, ok := err.(*ErrMissingIndex) | 95 mi, ok := err.(*ErrMissingIndex) |
(...skipping 171 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
262 if err != nil { | 267 if err != nil { |
263 return | 268 return |
264 } | 269 } |
265 if !d.disableSpecialEntities { | 270 if !d.disableSpecialEntities { |
266 incrementLocked(ents, groupMetaKey(ret), 1) | 271 incrementLocked(ents, groupMetaKey(ret), 1) |
267 } | 272 } |
268 | 273 |
269 old := ents.Get(keyBytes(ret)) | 274 old := ents.Get(keyBytes(ret)) |
270 oldPM := ds.PropertyMap(nil) | 275 oldPM := ds.PropertyMap(nil) |
271 if old != nil { | 276 if old != nil { |
272 » » » » if oldPM, err = rpmWoCtx(old, ns); err != nil { | 277 » » » » if oldPM, err = rpm(old); err != nil { |
273 return | 278 return |
274 } | 279 } |
275 } | 280 } |
276 updateIndexes(d.head, ret, oldPM, pmap) | 281 updateIndexes(d.head, ret, oldPM, pmap) |
277 ents.Set(keyBytes(ret), dataBytes) | 282 ents.Set(keyBytes(ret), dataBytes) |
278 return | 283 return |
279 }() | 284 }() |
280 if cb != nil { | 285 if cb != nil { |
281 cb(k, err) | 286 cb(k, err) |
282 } | 287 } |
(...skipping 11 matching lines...) Expand all Loading... |
294 } | 299 } |
295 return nil | 300 return nil |
296 } | 301 } |
297 | 302 |
298 for _, k := range keys { | 303 for _, k := range keys { |
299 pdata := ents.Get(keyBytes(k)) | 304 pdata := ents.Get(keyBytes(k)) |
300 if pdata == nil { | 305 if pdata == nil { |
301 cb(nil, ds.ErrNoSuchEntity) | 306 cb(nil, ds.ErrNoSuchEntity) |
302 continue | 307 continue |
303 } | 308 } |
304 » » cb(rpmWoCtx(pdata, k.Namespace())) | 309 » » cb(rpm(pdata)) |
305 } | 310 } |
306 return nil | 311 return nil |
307 } | 312 } |
308 | 313 |
309 func (d *dataStoreData) getMulti(keys []*ds.Key, cb ds.GetMultiCB) error { | 314 func (d *dataStoreData) getMulti(keys []*ds.Key, cb ds.GetMultiCB) error { |
310 return getMultiInner(keys, cb, func() (*memCollection, error) { | 315 return getMultiInner(keys, cb, func() (*memCollection, error) { |
311 s := d.takeSnapshot() | 316 s := d.takeSnapshot() |
312 | 317 |
313 return s.GetCollection("ents:" + keys[0].Namespace()), nil | 318 return s.GetCollection("ents:" + keys[0].Namespace()), nil |
314 }) | 319 }) |
315 } | 320 } |
316 | 321 |
317 func (d *dataStoreData) delMulti(keys []*ds.Key, cb ds.DeleteMultiCB) { | 322 func (d *dataStoreData) delMulti(keys []*ds.Key, cb ds.DeleteMultiCB) { |
318 ns := keys[0].Namespace() | 323 ns := keys[0].Namespace() |
319 ents := d.mutableEnts(ns) | 324 ents := d.mutableEnts(ns) |
320 | 325 |
321 if ents != nil { | 326 if ents != nil { |
322 for _, k := range keys { | 327 for _, k := range keys { |
323 err := func() error { | 328 err := func() error { |
324 kb := keyBytes(k) | 329 kb := keyBytes(k) |
325 | 330 |
326 d.Lock() | 331 d.Lock() |
327 defer d.Unlock() | 332 defer d.Unlock() |
328 | 333 |
329 if !d.disableSpecialEntities { | 334 if !d.disableSpecialEntities { |
330 incrementLocked(ents, groupMetaKey(k), 1
) | 335 incrementLocked(ents, groupMetaKey(k), 1
) |
331 } | 336 } |
332 if old := ents.Get(kb); old != nil { | 337 if old := ents.Get(kb); old != nil { |
333 » » » » » oldPM, err := rpmWoCtx(old, ns) | 338 » » » » » oldPM, err := rpm(old) |
334 if err != nil { | 339 if err != nil { |
335 return err | 340 return err |
336 } | 341 } |
337 updateIndexes(d.head, k, oldPM, nil) | 342 updateIndexes(d.head, k, oldPM, nil) |
338 ents.Delete(kb) | 343 ents.Delete(kb) |
339 } | 344 } |
340 return nil | 345 return nil |
341 }() | 346 }() |
342 if cb != nil { | 347 if cb != nil { |
343 cb(err) | 348 cb(err) |
(...skipping 196 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
540 cb(err) | 545 cb(err) |
541 } | 546 } |
542 } | 547 } |
543 return nil | 548 return nil |
544 } | 549 } |
545 | 550 |
546 func keyBytes(key *ds.Key) []byte { | 551 func keyBytes(key *ds.Key) []byte { |
547 return serialize.ToBytes(ds.MkProperty(key)) | 552 return serialize.ToBytes(ds.MkProperty(key)) |
548 } | 553 } |
549 | 554 |
550 func rpmWoCtx(data []byte, ns string) (ds.PropertyMap, error) { | |
551 return serialize.ReadPropertyMap(bytes.NewBuffer(data), | |
552 serialize.WithoutContext, globalAppID, ns) | |
553 } | |
554 | |
555 func rpm(data []byte) (ds.PropertyMap, error) { | 555 func rpm(data []byte) (ds.PropertyMap, error) { |
556 return serialize.ReadPropertyMap(bytes.NewBuffer(data), | 556 return serialize.ReadPropertyMap(bytes.NewBuffer(data), |
557 serialize.WithContext, "", "") | 557 serialize.WithContext, "", "") |
558 } | 558 } |
OLD | NEW |