OLD | NEW |
1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 package memory | 5 package memory |
6 | 6 |
7 import ( | 7 import ( |
8 "bytes" | 8 "bytes" |
9 "fmt" | 9 "fmt" |
10 "sync" | 10 "sync" |
11 "sync/atomic" | 11 "sync/atomic" |
12 | 12 |
13 ds "github.com/luci/gae/service/datastore" | 13 ds "github.com/luci/gae/service/datastore" |
14 "github.com/luci/gae/service/datastore/serialize" | 14 "github.com/luci/gae/service/datastore/serialize" |
15 "github.com/luci/luci-go/common/errors" | 15 "github.com/luci/luci-go/common/errors" |
16 "golang.org/x/net/context" | 16 "golang.org/x/net/context" |
17 ) | 17 ) |
18 | 18 |
19 //////////////////////////////// dataStoreData ///////////////////////////////// | 19 //////////////////////////////// dataStoreData ///////////////////////////////// |
20 | 20 |
21 type dataStoreData struct { | 21 type dataStoreData struct { |
22 rwlock sync.RWMutex | 22 rwlock sync.RWMutex |
| 23 |
| 24 // the 'appid' of this datastore |
| 25 aid string |
| 26 |
23 // See README.md for head schema. | 27 // See README.md for head schema. |
24 head *memStore | 28 head *memStore |
| 29 // if snap is nil, that means that this is always-consistent, and |
| 30 // getQuerySnaps will return (head, head) |
25 snap *memStore | 31 snap *memStore |
26 // For testing, see SetTransactionRetryCount. | 32 // For testing, see SetTransactionRetryCount. |
27 txnFakeRetry int | 33 txnFakeRetry int |
28 // true means that head always == snap | |
29 consistent bool | |
30 // true means that queries with insufficent indexes will pause to add th
em | 34 // true means that queries with insufficent indexes will pause to add th
em |
31 // and then continue instead of failing. | 35 // and then continue instead of failing. |
32 autoIndex bool | 36 autoIndex bool |
33 // true means that all of the __...__ keys which are normally automatica
lly | 37 // true means that all of the __...__ keys which are normally automatica
lly |
34 // maintained will be omitted. This also means that Put with an incomple
te | 38 // maintained will be omitted. This also means that Put with an incomple
te |
35 // key will become an error. | 39 // key will become an error. |
36 disableSpecialEntities bool | 40 disableSpecialEntities bool |
37 } | 41 } |
38 | 42 |
39 var ( | 43 var ( |
40 _ = memContextObj((*dataStoreData)(nil)) | 44 _ = memContextObj((*dataStoreData)(nil)) |
41 _ = sync.Locker((*dataStoreData)(nil)) | 45 _ = sync.Locker((*dataStoreData)(nil)) |
42 ) | 46 ) |
43 | 47 |
44 func newDataStoreData() *dataStoreData { | 48 func newDataStoreData(aid string) *dataStoreData { |
45 head := newMemStore() | 49 head := newMemStore() |
46 return &dataStoreData{ | 50 return &dataStoreData{ |
| 51 aid: aid, |
47 head: head, | 52 head: head, |
48 snap: head.Snapshot(), // empty but better than a nil pointer. | 53 snap: head.Snapshot(), // empty but better than a nil pointer. |
49 } | 54 } |
50 } | 55 } |
51 | 56 |
52 func (d *dataStoreData) Lock() { | 57 func (d *dataStoreData) Lock() { |
53 d.rwlock.Lock() | 58 d.rwlock.Lock() |
54 } | 59 } |
55 | 60 |
56 func (d *dataStoreData) Unlock() { | 61 func (d *dataStoreData) Unlock() { |
57 d.rwlock.Unlock() | 62 d.rwlock.Unlock() |
58 } | 63 } |
59 | 64 |
60 func (d *dataStoreData) setTxnRetry(count int) { | 65 func (d *dataStoreData) setTxnRetry(count int) { |
61 d.Lock() | 66 d.Lock() |
62 defer d.Unlock() | 67 defer d.Unlock() |
63 d.txnFakeRetry = count | 68 d.txnFakeRetry = count |
64 } | 69 } |
65 | 70 |
66 func (d *dataStoreData) setConsistent(always bool) { | 71 func (d *dataStoreData) setConsistent(always bool) { |
67 d.Lock() | 72 d.Lock() |
68 defer d.Unlock() | 73 defer d.Unlock() |
69 | 74 |
70 » d.consistent = always | 75 » if always { |
71 » if d.consistent { | 76 » » d.snap = nil |
| 77 » } else { |
72 d.snap = d.head.Snapshot() | 78 d.snap = d.head.Snapshot() |
73 } | 79 } |
74 } | 80 } |
75 | 81 |
76 func (d *dataStoreData) addIndexes(ns string, idxs []*ds.IndexDefinition) { | 82 func (d *dataStoreData) addIndexes(ns string, idxs []*ds.IndexDefinition) { |
77 d.Lock() | 83 d.Lock() |
78 defer d.Unlock() | 84 defer d.Unlock() |
79 » addIndexes(d.head, ns, idxs) | 85 » addIndexes(d.head, d.aid, ns, idxs) |
80 » if d.consistent { | |
81 » » d.snap = d.head.Snapshot() | |
82 » } | |
83 } | 86 } |
84 | 87 |
85 func (d *dataStoreData) setAutoIndex(enable bool) { | 88 func (d *dataStoreData) setAutoIndex(enable bool) { |
86 d.Lock() | 89 d.Lock() |
87 defer d.Unlock() | 90 defer d.Unlock() |
88 d.autoIndex = enable | 91 d.autoIndex = enable |
89 } | 92 } |
90 | 93 |
91 func (d *dataStoreData) maybeAutoIndex(err error) bool { | 94 func (d *dataStoreData) maybeAutoIndex(err error) bool { |
92 mi, ok := err.(*ErrMissingIndex) | 95 mi, ok := err.(*ErrMissingIndex) |
(...skipping 21 matching lines...) Expand all Loading... |
114 | 117 |
115 func (d *dataStoreData) getDisableSpecialEntities() bool { | 118 func (d *dataStoreData) getDisableSpecialEntities() bool { |
116 d.rwlock.RLock() | 119 d.rwlock.RLock() |
117 defer d.rwlock.RUnlock() | 120 defer d.rwlock.RUnlock() |
118 return d.disableSpecialEntities | 121 return d.disableSpecialEntities |
119 } | 122 } |
120 | 123 |
121 func (d *dataStoreData) getQuerySnaps(consistent bool) (idx, head *memStore) { | 124 func (d *dataStoreData) getQuerySnaps(consistent bool) (idx, head *memStore) { |
122 d.rwlock.RLock() | 125 d.rwlock.RLock() |
123 defer d.rwlock.RUnlock() | 126 defer d.rwlock.RUnlock() |
124 » if d.consistent { | 127 » if d.snap == nil { |
125 » » // snap is already a consistent snapshot of head | 128 » » // we're 'always consistent' |
126 » » return d.snap, d.snap | 129 » » return d.head, d.head |
127 } | 130 } |
128 | 131 |
129 head = d.head.Snapshot() | 132 head = d.head.Snapshot() |
130 if consistent { | 133 if consistent { |
131 idx = head | 134 idx = head |
132 } else { | 135 } else { |
133 idx = d.snap | 136 idx = d.snap |
134 } | 137 } |
135 return | 138 return |
136 } | 139 } |
137 | 140 |
138 func (d *dataStoreData) takeSnapshot() *memStore { | 141 func (d *dataStoreData) takeSnapshot() *memStore { |
139 d.rwlock.RLock() | 142 d.rwlock.RLock() |
140 defer d.rwlock.RUnlock() | 143 defer d.rwlock.RUnlock() |
141 if d.consistent { | |
142 return d.snap | |
143 } | |
144 return d.head.Snapshot() | 144 return d.head.Snapshot() |
145 } | 145 } |
146 | 146 |
147 func (d *dataStoreData) setSnapshot(snap *memStore) { | 147 func (d *dataStoreData) setSnapshot(snap *memStore) { |
148 d.rwlock.Lock() | 148 d.rwlock.Lock() |
149 defer d.rwlock.Unlock() | 149 defer d.rwlock.Unlock() |
150 » if d.consistent { | 150 » if d.snap == nil { |
| 151 » » // we're 'always consistent' |
151 return | 152 return |
152 } | 153 } |
153 d.snap = snap | 154 d.snap = snap |
154 } | 155 } |
155 | 156 |
156 func (d *dataStoreData) catchupIndexes() { | 157 func (d *dataStoreData) catchupIndexes() { |
157 d.rwlock.Lock() | 158 d.rwlock.Lock() |
158 defer d.rwlock.Unlock() | 159 defer d.rwlock.Unlock() |
159 » if d.consistent { | 160 » if d.snap == nil { |
| 161 » » // we're 'always consistent' |
160 return | 162 return |
161 } | 163 } |
162 d.snap = d.head.Snapshot() | 164 d.snap = d.head.Snapshot() |
163 } | 165 } |
164 | 166 |
165 /////////////////////////// indexes(dataStoreData) //////////////////////////// | 167 /////////////////////////// indexes(dataStoreData) //////////////////////////// |
166 | 168 |
167 func groupMetaKey(key *ds.Key) []byte { | 169 func groupMetaKey(key *ds.Key) []byte { |
168 return keyBytes(ds.NewKey("", "", "__entity_group__", "", 1, key.Root())
) | 170 return keyBytes(ds.NewKey("", "", "__entity_group__", "", 1, key.Root())
) |
169 } | 171 } |
(...skipping 95 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
265 if err != nil { | 267 if err != nil { |
266 return | 268 return |
267 } | 269 } |
268 if !d.disableSpecialEntities { | 270 if !d.disableSpecialEntities { |
269 incrementLocked(ents, groupMetaKey(ret), 1) | 271 incrementLocked(ents, groupMetaKey(ret), 1) |
270 } | 272 } |
271 | 273 |
272 old := ents.Get(keyBytes(ret)) | 274 old := ents.Get(keyBytes(ret)) |
273 oldPM := ds.PropertyMap(nil) | 275 oldPM := ds.PropertyMap(nil) |
274 if old != nil { | 276 if old != nil { |
275 » » » » if oldPM, err = rpmWoCtx(old, ns); err != nil { | 277 » » » » if oldPM, err = rpm(old); err != nil { |
276 return | 278 return |
277 } | 279 } |
278 } | 280 } |
279 updateIndexes(d.head, ret, oldPM, pmap) | 281 updateIndexes(d.head, ret, oldPM, pmap) |
280 ents.Set(keyBytes(ret), dataBytes) | 282 ents.Set(keyBytes(ret), dataBytes) |
281 if d.consistent { | |
282 d.snap = d.head.Snapshot() | |
283 } | |
284 return | 283 return |
285 }() | 284 }() |
286 if cb != nil { | 285 if cb != nil { |
287 cb(k, err) | 286 cb(k, err) |
288 } | 287 } |
289 } | 288 } |
290 } | 289 } |
291 | 290 |
292 func getMultiInner(keys []*ds.Key, cb ds.GetMultiCB, getColl func() (*memCollect
ion, error)) error { | 291 func getMultiInner(keys []*ds.Key, cb ds.GetMultiCB, getColl func() (*memCollect
ion, error)) error { |
293 ents, err := getColl() | 292 ents, err := getColl() |
294 if err != nil { | 293 if err != nil { |
295 return err | 294 return err |
296 } | 295 } |
297 if ents == nil { | 296 if ents == nil { |
298 for range keys { | 297 for range keys { |
299 cb(nil, ds.ErrNoSuchEntity) | 298 cb(nil, ds.ErrNoSuchEntity) |
300 } | 299 } |
301 return nil | 300 return nil |
302 } | 301 } |
303 | 302 |
304 for _, k := range keys { | 303 for _, k := range keys { |
305 pdata := ents.Get(keyBytes(k)) | 304 pdata := ents.Get(keyBytes(k)) |
306 if pdata == nil { | 305 if pdata == nil { |
307 cb(nil, ds.ErrNoSuchEntity) | 306 cb(nil, ds.ErrNoSuchEntity) |
308 continue | 307 continue |
309 } | 308 } |
310 » » cb(rpmWoCtx(pdata, k.Namespace())) | 309 » » cb(rpm(pdata)) |
311 } | 310 } |
312 return nil | 311 return nil |
313 } | 312 } |
314 | 313 |
315 func (d *dataStoreData) getMulti(keys []*ds.Key, cb ds.GetMultiCB) error { | 314 func (d *dataStoreData) getMulti(keys []*ds.Key, cb ds.GetMultiCB) error { |
316 return getMultiInner(keys, cb, func() (*memCollection, error) { | 315 return getMultiInner(keys, cb, func() (*memCollection, error) { |
317 s := d.takeSnapshot() | 316 s := d.takeSnapshot() |
318 | 317 |
319 return s.GetCollection("ents:" + keys[0].Namespace()), nil | 318 return s.GetCollection("ents:" + keys[0].Namespace()), nil |
320 }) | 319 }) |
321 } | 320 } |
322 | 321 |
323 func (d *dataStoreData) delMulti(keys []*ds.Key, cb ds.DeleteMultiCB) { | 322 func (d *dataStoreData) delMulti(keys []*ds.Key, cb ds.DeleteMultiCB) { |
324 ns := keys[0].Namespace() | 323 ns := keys[0].Namespace() |
325 ents := d.mutableEnts(ns) | 324 ents := d.mutableEnts(ns) |
326 | 325 |
327 if ents != nil { | 326 if ents != nil { |
328 for _, k := range keys { | 327 for _, k := range keys { |
329 err := func() error { | 328 err := func() error { |
330 kb := keyBytes(k) | 329 kb := keyBytes(k) |
331 | 330 |
332 d.Lock() | 331 d.Lock() |
333 defer d.Unlock() | 332 defer d.Unlock() |
334 | 333 |
335 if !d.disableSpecialEntities { | 334 if !d.disableSpecialEntities { |
336 incrementLocked(ents, groupMetaKey(k), 1
) | 335 incrementLocked(ents, groupMetaKey(k), 1
) |
337 } | 336 } |
338 if old := ents.Get(kb); old != nil { | 337 if old := ents.Get(kb); old != nil { |
339 » » » » » oldPM, err := rpmWoCtx(old, ns) | 338 » » » » » oldPM, err := rpm(old) |
340 if err != nil { | 339 if err != nil { |
341 return err | 340 return err |
342 } | 341 } |
343 updateIndexes(d.head, k, oldPM, nil) | 342 updateIndexes(d.head, k, oldPM, nil) |
344 ents.Delete(kb) | 343 ents.Delete(kb) |
345 if d.consistent { | |
346 d.snap = d.head.Snapshot() | |
347 } | |
348 } | 344 } |
349 return nil | 345 return nil |
350 }() | 346 }() |
351 if cb != nil { | 347 if cb != nil { |
352 cb(err) | 348 cb(err) |
353 } | 349 } |
354 } | 350 } |
355 } else if cb != nil { | 351 } else if cb != nil { |
356 for range keys { | 352 for range keys { |
357 cb(nil) | 353 cb(nil) |
(...skipping 191 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
549 cb(err) | 545 cb(err) |
550 } | 546 } |
551 } | 547 } |
552 return nil | 548 return nil |
553 } | 549 } |
554 | 550 |
555 func keyBytes(key *ds.Key) []byte { | 551 func keyBytes(key *ds.Key) []byte { |
556 return serialize.ToBytes(ds.MkProperty(key)) | 552 return serialize.ToBytes(ds.MkProperty(key)) |
557 } | 553 } |
558 | 554 |
559 func rpmWoCtx(data []byte, ns string) (ds.PropertyMap, error) { | |
560 return serialize.ReadPropertyMap(bytes.NewBuffer(data), | |
561 serialize.WithoutContext, globalAppID, ns) | |
562 } | |
563 | |
564 func rpm(data []byte) (ds.PropertyMap, error) { | 555 func rpm(data []byte) (ds.PropertyMap, error) { |
565 return serialize.ReadPropertyMap(bytes.NewBuffer(data), | 556 return serialize.ReadPropertyMap(bytes.NewBuffer(data), |
566 serialize.WithContext, "", "") | 557 serialize.WithContext, "", "") |
567 } | 558 } |
OLD | NEW |