OLD | NEW |
1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 package memory | 5 package memory |
6 | 6 |
7 import ( | 7 import ( |
8 "bytes" | 8 "bytes" |
9 "fmt" | 9 "fmt" |
10 "strings" | 10 "strings" |
11 "sync" | 11 "sync" |
12 "sync/atomic" | 12 "sync/atomic" |
13 | 13 |
14 ds "github.com/luci/gae/service/datastore" | 14 ds "github.com/luci/gae/service/datastore" |
15 "github.com/luci/gae/service/datastore/serialize" | 15 "github.com/luci/gae/service/datastore/serialize" |
16 "github.com/luci/luci-go/common/errors" | 16 "github.com/luci/luci-go/common/errors" |
17 "golang.org/x/net/context" | 17 "golang.org/x/net/context" |
18 ) | 18 ) |
19 | 19 |
20 //////////////////////////////// dataStoreData ///////////////////////////////// | 20 //////////////////////////////// dataStoreData ///////////////////////////////// |
21 | 21 |
22 type dataStoreData struct { | 22 type dataStoreData struct { |
23 rwlock sync.RWMutex | 23 rwlock sync.RWMutex |
24 | 24 |
25 // the 'appid' of this datastore | 25 // the 'appid' of this datastore |
26 aid string | 26 aid string |
27 | 27 |
28 // See README.md for head schema. | 28 // See README.md for head schema. |
29 » head *memStore | 29 » head memStore |
30 // if snap is nil, that means that this is always-consistent, and | 30 // if snap is nil, that means that this is always-consistent, and |
31 // getQuerySnaps will return (head, head) | 31 // getQuerySnaps will return (head, head) |
32 » snap *memStore | 32 » snap memStore |
33 // For testing, see SetTransactionRetryCount. | 33 // For testing, see SetTransactionRetryCount. |
34 txnFakeRetry int | 34 txnFakeRetry int |
35 // true means that queries with insufficent indexes will pause to add th
em | 35 // true means that queries with insufficent indexes will pause to add th
em |
36 // and then continue instead of failing. | 36 // and then continue instead of failing. |
37 autoIndex bool | 37 autoIndex bool |
38 // true means that all of the __...__ keys which are normally automatica
lly | 38 // true means that all of the __...__ keys which are normally automatica
lly |
39 // maintained will be omitted. This also means that Put with an incomple
te | 39 // maintained will be omitted. This also means that Put with an incomple
te |
40 // key will become an error. | 40 // key will become an error. |
41 disableSpecialEntities bool | 41 disableSpecialEntities bool |
42 } | 42 } |
(...skipping 72 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
115 defer d.Unlock() | 115 defer d.Unlock() |
116 d.disableSpecialEntities = true | 116 d.disableSpecialEntities = true |
117 } | 117 } |
118 | 118 |
119 func (d *dataStoreData) getDisableSpecialEntities() bool { | 119 func (d *dataStoreData) getDisableSpecialEntities() bool { |
120 d.rwlock.RLock() | 120 d.rwlock.RLock() |
121 defer d.rwlock.RUnlock() | 121 defer d.rwlock.RUnlock() |
122 return d.disableSpecialEntities | 122 return d.disableSpecialEntities |
123 } | 123 } |
124 | 124 |
125 func (d *dataStoreData) getQuerySnaps(consistent bool) (idx, head *memStore) { | 125 func (d *dataStoreData) getQuerySnaps(consistent bool) (idx, head memStore) { |
126 d.rwlock.RLock() | 126 d.rwlock.RLock() |
127 defer d.rwlock.RUnlock() | 127 defer d.rwlock.RUnlock() |
128 if d.snap == nil { | 128 if d.snap == nil { |
129 // we're 'always consistent' | 129 // we're 'always consistent' |
130 snap := d.head.Snapshot() | 130 snap := d.head.Snapshot() |
131 return snap, snap | 131 return snap, snap |
132 } | 132 } |
133 | 133 |
134 head = d.head.Snapshot() | 134 head = d.head.Snapshot() |
135 if consistent { | 135 if consistent { |
136 idx = head | 136 idx = head |
137 } else { | 137 } else { |
138 idx = d.snap | 138 idx = d.snap |
139 } | 139 } |
140 return | 140 return |
141 } | 141 } |
142 | 142 |
143 func (d *dataStoreData) takeSnapshot() *memStore { | 143 func (d *dataStoreData) takeSnapshot() memStore { |
144 d.rwlock.RLock() | 144 d.rwlock.RLock() |
145 defer d.rwlock.RUnlock() | 145 defer d.rwlock.RUnlock() |
146 return d.head.Snapshot() | 146 return d.head.Snapshot() |
147 } | 147 } |
148 | 148 |
149 func (d *dataStoreData) setSnapshot(snap *memStore) { | 149 func (d *dataStoreData) setSnapshot(snap memStore) { |
150 d.rwlock.Lock() | 150 d.rwlock.Lock() |
151 defer d.rwlock.Unlock() | 151 defer d.rwlock.Unlock() |
152 if d.snap == nil { | 152 if d.snap == nil { |
153 // we're 'always consistent' | 153 // we're 'always consistent' |
154 return | 154 return |
155 } | 155 } |
156 d.snap = snap | 156 d.snap = snap |
157 } | 157 } |
158 | 158 |
159 func (d *dataStoreData) catchupIndexes() { | 159 func (d *dataStoreData) catchupIndexes() { |
(...skipping 20 matching lines...) Expand all Loading... |
180 } | 180 } |
181 | 181 |
182 func groupIDsKey(key *ds.Key) []byte { | 182 func groupIDsKey(key *ds.Key) []byte { |
183 return keyBytes(ds.NewKey("", "", "__entity_group_ids__", "", 1, key.Roo
t())) | 183 return keyBytes(ds.NewKey("", "", "__entity_group_ids__", "", 1, key.Roo
t())) |
184 } | 184 } |
185 | 185 |
186 func rootIDsKey(kind string) []byte { | 186 func rootIDsKey(kind string) []byte { |
187 return keyBytes(ds.NewKey("", "", "__entity_root_ids__", kind, 0, nil)) | 187 return keyBytes(ds.NewKey("", "", "__entity_root_ids__", kind, 0, nil)) |
188 } | 188 } |
189 | 189 |
190 func curVersion(ents *memCollection, key []byte) int64 { | 190 func curVersion(ents memCollection, key []byte) int64 { |
191 if ents != nil { | 191 if ents != nil { |
192 if v := ents.Get(key); v != nil { | 192 if v := ents.Get(key); v != nil { |
193 pm, err := rpm(v) | 193 pm, err := rpm(v) |
194 memoryCorruption(err) | 194 memoryCorruption(err) |
195 | 195 |
196 pl, ok := pm["__version__"] | 196 pl, ok := pm["__version__"] |
197 if ok && len(pl) > 0 && pl[0].Type() == ds.PTInt { | 197 if ok && len(pl) > 0 && pl[0].Type() == ds.PTInt { |
198 return pl[0].Value().(int64) | 198 return pl[0].Value().(int64) |
199 } | 199 } |
200 | 200 |
201 memoryCorruption(fmt.Errorf("__version__ property missin
g or wrong: %v", pm)) | 201 memoryCorruption(fmt.Errorf("__version__ property missin
g or wrong: %v", pm)) |
202 } | 202 } |
203 } | 203 } |
204 return 0 | 204 return 0 |
205 } | 205 } |
206 | 206 |
207 func incrementLocked(ents *memCollection, key []byte, amt int) int64 { | 207 func incrementLocked(ents memCollection, key []byte, amt int) int64 { |
208 if amt <= 0 { | 208 if amt <= 0 { |
209 panic(fmt.Errorf("incrementLocked called with bad `amt`: %d", am
t)) | 209 panic(fmt.Errorf("incrementLocked called with bad `amt`: %d", am
t)) |
210 } | 210 } |
211 ret := curVersion(ents, key) + 1 | 211 ret := curVersion(ents, key) + 1 |
212 ents.Set(key, serialize.ToBytes(ds.PropertyMap{ | 212 ents.Set(key, serialize.ToBytes(ds.PropertyMap{ |
213 "__version__": {ds.MkPropertyNI(ret + int64(amt-1))}, | 213 "__version__": {ds.MkPropertyNI(ret + int64(amt-1))}, |
214 })) | 214 })) |
215 return ret | 215 return ret |
216 } | 216 } |
217 | 217 |
218 func (d *dataStoreData) mutableEntsLocked(ns string) *memCollection { | |
219 coll := "ents:" + ns | |
220 ents := d.head.GetCollection(coll) | |
221 if ents == nil { | |
222 ents = d.head.SetCollection(coll, nil) | |
223 } | |
224 return ents | |
225 } | |
226 | |
227 func (d *dataStoreData) allocateIDs(incomplete *ds.Key, n int) (int64, error) { | 218 func (d *dataStoreData) allocateIDs(incomplete *ds.Key, n int) (int64, error) { |
228 d.Lock() | 219 d.Lock() |
229 defer d.Unlock() | 220 defer d.Unlock() |
230 | 221 |
231 » ents := d.mutableEntsLocked(incomplete.Namespace()) | 222 » ents := d.head.GetOrCreateCollection("ents:" + incomplete.Namespace()) |
232 return d.allocateIDsLocked(ents, incomplete, n) | 223 return d.allocateIDsLocked(ents, incomplete, n) |
233 } | 224 } |
234 | 225 |
235 func (d *dataStoreData) allocateIDsLocked(ents *memCollection, incomplete *ds.Ke
y, n int) (int64, error) { | 226 func (d *dataStoreData) allocateIDsLocked(ents memCollection, incomplete *ds.Key
, n int) (int64, error) { |
236 if d.disableSpecialEntities { | 227 if d.disableSpecialEntities { |
237 return 0, errors.New("disableSpecialEntities is true so allocate
IDs is disabled") | 228 return 0, errors.New("disableSpecialEntities is true so allocate
IDs is disabled") |
238 } | 229 } |
239 | 230 |
240 idKey := []byte(nil) | 231 idKey := []byte(nil) |
241 if incomplete.Parent() == nil { | 232 if incomplete.Parent() == nil { |
242 idKey = rootIDsKey(incomplete.Kind()) | 233 idKey = rootIDsKey(incomplete.Kind()) |
243 } else { | 234 } else { |
244 idKey = groupIDsKey(incomplete) | 235 idKey = groupIDsKey(incomplete) |
245 } | 236 } |
246 return incrementLocked(ents, idKey, n), nil | 237 return incrementLocked(ents, idKey, n), nil |
247 } | 238 } |
248 | 239 |
249 func (d *dataStoreData) fixKeyLocked(ents *memCollection, key *ds.Key) (*ds.Key,
error) { | 240 func (d *dataStoreData) fixKeyLocked(ents memCollection, key *ds.Key) (*ds.Key,
error) { |
250 if key.Incomplete() { | 241 if key.Incomplete() { |
251 id, err := d.allocateIDsLocked(ents, key, 1) | 242 id, err := d.allocateIDsLocked(ents, key, 1) |
252 if err != nil { | 243 if err != nil { |
253 return key, err | 244 return key, err |
254 } | 245 } |
255 key = ds.NewKey(key.AppID(), key.Namespace(), key.Kind(), "", id
, key.Parent()) | 246 key = ds.NewKey(key.AppID(), key.Namespace(), key.Kind(), "", id
, key.Parent()) |
256 } | 247 } |
257 return key, nil | 248 return key, nil |
258 } | 249 } |
259 | 250 |
260 func (d *dataStoreData) putMulti(keys []*ds.Key, vals []ds.PropertyMap, cb ds.Pu
tMultiCB) error { | 251 func (d *dataStoreData) putMulti(keys []*ds.Key, vals []ds.PropertyMap, cb ds.Pu
tMultiCB) error { |
261 ns := keys[0].Namespace() | 252 ns := keys[0].Namespace() |
262 | 253 |
263 for i, k := range keys { | 254 for i, k := range keys { |
264 pmap, _ := vals[i].Save(false) | 255 pmap, _ := vals[i].Save(false) |
265 dataBytes := serialize.ToBytes(pmap) | 256 dataBytes := serialize.ToBytes(pmap) |
266 | 257 |
267 k, err := func() (ret *ds.Key, err error) { | 258 k, err := func() (ret *ds.Key, err error) { |
268 d.Lock() | 259 d.Lock() |
269 defer d.Unlock() | 260 defer d.Unlock() |
270 | 261 |
271 » » » ents := d.mutableEntsLocked(ns) | 262 » » » ents := d.head.GetOrCreateCollection("ents:" + ns) |
272 | 263 |
273 ret, err = d.fixKeyLocked(ents, k) | 264 ret, err = d.fixKeyLocked(ents, k) |
274 if err != nil { | 265 if err != nil { |
275 return | 266 return |
276 } | 267 } |
277 if !d.disableSpecialEntities { | 268 if !d.disableSpecialEntities { |
278 incrementLocked(ents, groupMetaKey(ret), 1) | 269 incrementLocked(ents, groupMetaKey(ret), 1) |
279 } | 270 } |
280 | 271 |
281 old := ents.Get(keyBytes(ret)) | 272 old := ents.Get(keyBytes(ret)) |
282 oldPM := ds.PropertyMap(nil) | 273 oldPM := ds.PropertyMap(nil) |
283 if old != nil { | 274 if old != nil { |
284 if oldPM, err = rpm(old); err != nil { | 275 if oldPM, err = rpm(old); err != nil { |
285 return | 276 return |
286 } | 277 } |
287 } | 278 } |
288 ents.Set(keyBytes(ret), dataBytes) | 279 ents.Set(keyBytes(ret), dataBytes) |
289 updateIndexes(d.head, ret, oldPM, pmap) | 280 updateIndexes(d.head, ret, oldPM, pmap) |
290 return | 281 return |
291 }() | 282 }() |
292 if cb != nil { | 283 if cb != nil { |
293 if err := cb(k, err); err != nil { | 284 if err := cb(k, err); err != nil { |
294 return err | 285 return err |
295 } | 286 } |
296 } | 287 } |
297 } | 288 } |
298 return nil | 289 return nil |
299 } | 290 } |
300 | 291 |
301 func getMultiInner(keys []*ds.Key, cb ds.GetMultiCB, getColl func() (*memCollect
ion, error)) error { | 292 func getMultiInner(keys []*ds.Key, cb ds.GetMultiCB, getColl func() (memCollecti
on, error)) error { |
302 ents, err := getColl() | 293 ents, err := getColl() |
303 if err != nil { | 294 if err != nil { |
304 return err | 295 return err |
305 } | 296 } |
306 if ents == nil { | 297 if ents == nil { |
307 for range keys { | 298 for range keys { |
308 cb(nil, ds.ErrNoSuchEntity) | 299 cb(nil, ds.ErrNoSuchEntity) |
309 } | 300 } |
310 return nil | 301 return nil |
311 } | 302 } |
312 | 303 |
313 for _, k := range keys { | 304 for _, k := range keys { |
314 pdata := ents.Get(keyBytes(k)) | 305 pdata := ents.Get(keyBytes(k)) |
315 if pdata == nil { | 306 if pdata == nil { |
316 cb(nil, ds.ErrNoSuchEntity) | 307 cb(nil, ds.ErrNoSuchEntity) |
317 continue | 308 continue |
318 } | 309 } |
319 cb(rpm(pdata)) | 310 cb(rpm(pdata)) |
320 } | 311 } |
321 return nil | 312 return nil |
322 } | 313 } |
323 | 314 |
324 func (d *dataStoreData) getMulti(keys []*ds.Key, cb ds.GetMultiCB) error { | 315 func (d *dataStoreData) getMulti(keys []*ds.Key, cb ds.GetMultiCB) error { |
325 » return getMultiInner(keys, cb, func() (*memCollection, error) { | 316 » return getMultiInner(keys, cb, func() (memCollection, error) { |
326 s := d.takeSnapshot() | 317 s := d.takeSnapshot() |
327 | 318 |
328 return s.GetCollection("ents:" + keys[0].Namespace()), nil | 319 return s.GetCollection("ents:" + keys[0].Namespace()), nil |
329 }) | 320 }) |
330 } | 321 } |
331 | 322 |
332 func (d *dataStoreData) delMulti(keys []*ds.Key, cb ds.DeleteMultiCB) error { | 323 func (d *dataStoreData) delMulti(keys []*ds.Key, cb ds.DeleteMultiCB) error { |
333 ns := keys[0].Namespace() | 324 ns := keys[0].Namespace() |
334 | 325 |
335 hasEntsInNS := func() bool { | 326 hasEntsInNS := func() bool { |
336 d.Lock() | 327 d.Lock() |
337 defer d.Unlock() | 328 defer d.Unlock() |
338 » » return d.mutableEntsLocked(ns) != nil | 329 » » return d.head.GetOrCreateCollection("ents:"+ns) != nil |
339 }() | 330 }() |
340 | 331 |
341 if hasEntsInNS { | 332 if hasEntsInNS { |
342 for _, k := range keys { | 333 for _, k := range keys { |
343 err := func() error { | 334 err := func() error { |
344 kb := keyBytes(k) | 335 kb := keyBytes(k) |
345 | 336 |
346 d.Lock() | 337 d.Lock() |
347 defer d.Unlock() | 338 defer d.Unlock() |
348 | 339 |
349 » » » » ents := d.mutableEntsLocked(ns) | 340 » » » » ents := d.head.GetOrCreateCollection("ents:" + n
s) |
350 | 341 |
351 if !d.disableSpecialEntities { | 342 if !d.disableSpecialEntities { |
352 incrementLocked(ents, groupMetaKey(k), 1
) | 343 incrementLocked(ents, groupMetaKey(k), 1
) |
353 } | 344 } |
354 if old := ents.Get(kb); old != nil { | 345 if old := ents.Get(kb); old != nil { |
355 oldPM, err := rpm(old) | 346 oldPM, err := rpm(old) |
356 if err != nil { | 347 if err != nil { |
357 return err | 348 return err |
358 } | 349 } |
359 ents.Delete(kb) | 350 ents.Delete(kb) |
(...skipping 85 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
445 | 436 |
446 type txnDataStoreData struct { | 437 type txnDataStoreData struct { |
447 sync.Mutex | 438 sync.Mutex |
448 | 439 |
449 parent *dataStoreData | 440 parent *dataStoreData |
450 | 441 |
451 // boolean 0 or 1, use atomic.*Int32 to access. | 442 // boolean 0 or 1, use atomic.*Int32 to access. |
452 closed int32 | 443 closed int32 |
453 isXG bool | 444 isXG bool |
454 | 445 |
455 » snap *memStore | 446 » snap memStore |
456 | 447 |
457 // string is the raw-bytes encoding of the entity root incl. namespace | 448 // string is the raw-bytes encoding of the entity root incl. namespace |
458 muts map[string][]txnMutation | 449 muts map[string][]txnMutation |
459 // TODO(riannucci): account for 'transaction size' limit of 10MB by summ
ing | 450 // TODO(riannucci): account for 'transaction size' limit of 10MB by summ
ing |
460 // length of encoded keys + values. | 451 // length of encoded keys + values. |
461 } | 452 } |
462 | 453 |
463 var _ memContextObj = (*txnDataStoreData)(nil) | 454 var _ memContextObj = (*txnDataStoreData)(nil) |
464 | 455 |
465 const xgEGLimit = 25 | 456 const xgEGLimit = 25 |
(...skipping 60 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
526 return nil | 517 return nil |
527 } | 518 } |
528 | 519 |
529 func (td *txnDataStoreData) putMulti(keys []*ds.Key, vals []ds.PropertyMap, cb d
s.PutMultiCB) { | 520 func (td *txnDataStoreData) putMulti(keys []*ds.Key, vals []ds.PropertyMap, cb d
s.PutMultiCB) { |
530 ns := keys[0].Namespace() | 521 ns := keys[0].Namespace() |
531 | 522 |
532 for i, k := range keys { | 523 for i, k := range keys { |
533 err := func() (err error) { | 524 err := func() (err error) { |
534 td.parent.Lock() | 525 td.parent.Lock() |
535 defer td.parent.Unlock() | 526 defer td.parent.Unlock() |
536 » » » ents := td.parent.mutableEntsLocked(ns) | 527 » » » ents := td.parent.head.GetOrCreateCollection("ents:" + n
s) |
537 | 528 |
538 k, err = td.parent.fixKeyLocked(ents, k) | 529 k, err = td.parent.fixKeyLocked(ents, k) |
539 return | 530 return |
540 }() | 531 }() |
541 if err == nil { | 532 if err == nil { |
542 err = td.writeMutation(false, k, vals[i]) | 533 err = td.writeMutation(false, k, vals[i]) |
543 } | 534 } |
544 if cb != nil { | 535 if cb != nil { |
545 cb(k, err) | 536 cb(k, err) |
546 } | 537 } |
547 } | 538 } |
548 } | 539 } |
549 | 540 |
550 func (td *txnDataStoreData) getMulti(keys []*ds.Key, cb ds.GetMultiCB) error { | 541 func (td *txnDataStoreData) getMulti(keys []*ds.Key, cb ds.GetMultiCB) error { |
551 » return getMultiInner(keys, cb, func() (*memCollection, error) { | 542 » return getMultiInner(keys, cb, func() (memCollection, error) { |
552 err := error(nil) | 543 err := error(nil) |
553 for _, key := range keys { | 544 for _, key := range keys { |
554 err = td.writeMutation(true, key, nil) | 545 err = td.writeMutation(true, key, nil) |
555 if err != nil { | 546 if err != nil { |
556 return nil, err | 547 return nil, err |
557 } | 548 } |
558 } | 549 } |
559 return td.snap.GetCollection("ents:" + keys[0].Namespace()), nil | 550 return td.snap.GetCollection("ents:" + keys[0].Namespace()), nil |
560 }) | 551 }) |
561 } | 552 } |
(...skipping 10 matching lines...) Expand all Loading... |
572 | 563 |
573 func keyBytes(key *ds.Key) []byte { | 564 func keyBytes(key *ds.Key) []byte { |
574 return serialize.ToBytes(ds.MkProperty(key)) | 565 return serialize.ToBytes(ds.MkProperty(key)) |
575 } | 566 } |
576 | 567 |
577 func rpm(data []byte) (ds.PropertyMap, error) { | 568 func rpm(data []byte) (ds.PropertyMap, error) { |
578 return serialize.ReadPropertyMap(bytes.NewBuffer(data), | 569 return serialize.ReadPropertyMap(bytes.NewBuffer(data), |
579 serialize.WithContext, "", "") | 570 serialize.WithContext, "", "") |
580 } | 571 } |
581 | 572 |
582 func namespaces(store *memStore) []string { | 573 func namespaces(store memStore) []string { |
583 var namespaces []string | 574 var namespaces []string |
584 for _, c := range store.GetCollectionNames() { | 575 for _, c := range store.GetCollectionNames() { |
585 ns, has := trimPrefix(c, "ents:") | 576 ns, has := trimPrefix(c, "ents:") |
586 if !has { | 577 if !has { |
587 if len(namespaces) > 0 { | 578 if len(namespaces) > 0 { |
588 break | 579 break |
589 } | 580 } |
590 continue | 581 continue |
591 } | 582 } |
592 namespaces = append(namespaces, ns) | 583 namespaces = append(namespaces, ns) |
593 } | 584 } |
594 return namespaces | 585 return namespaces |
595 } | 586 } |
596 | 587 |
597 func trimPrefix(v, p string) (string, bool) { | 588 func trimPrefix(v, p string) (string, bool) { |
598 if strings.HasPrefix(v, p) { | 589 if strings.HasPrefix(v, p) { |
599 return v[len(p):], true | 590 return v[len(p):], true |
600 } | 591 } |
601 return v, false | 592 return v, false |
602 } | 593 } |
OLD | NEW |