| OLD | NEW |
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 package memory | 5 package memory |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "bytes" | 8 "bytes" |
| 9 "fmt" | 9 "fmt" |
| 10 "sync" | 10 "sync" |
| 11 "sync/atomic" | 11 "sync/atomic" |
| 12 | 12 |
| 13 ds "github.com/luci/gae/service/datastore" | 13 ds "github.com/luci/gae/service/datastore" |
| 14 "github.com/luci/gae/service/datastore/serialize" | 14 "github.com/luci/gae/service/datastore/serialize" |
| 15 "github.com/luci/luci-go/common/errors" | 15 "github.com/luci/luci-go/common/errors" |
| 16 "golang.org/x/net/context" | 16 "golang.org/x/net/context" |
| 17 ) | 17 ) |
| 18 | 18 |
| 19 //////////////////////////////// dataStoreData ///////////////////////////////// | 19 //////////////////////////////// dataStoreData ///////////////////////////////// |
| 20 | 20 |
| 21 type dataStoreData struct { | 21 type dataStoreData struct { |
| 22 rwlock sync.RWMutex | 22 rwlock sync.RWMutex |
| 23 |
| 24 // the 'appid' of this datastore |
| 25 aid string |
| 26 |
| 23 // See README.md for head schema. | 27 // See README.md for head schema. |
| 24 head *memStore | 28 head *memStore |
| 25 // if snap is nil, that means that this is always-consistent, and | 29 // if snap is nil, that means that this is always-consistent, and |
| 26 // getQuerySnaps will return (head, head) | 30 // getQuerySnaps will return (head, head) |
| 27 snap *memStore | 31 snap *memStore |
| 28 // For testing, see SetTransactionRetryCount. | 32 // For testing, see SetTransactionRetryCount. |
| 29 txnFakeRetry int | 33 txnFakeRetry int |
| 30 // true means that queries with insufficent indexes will pause to add th
em | 34 // true means that queries with insufficent indexes will pause to add th
em |
| 31 // and then continue instead of failing. | 35 // and then continue instead of failing. |
| 32 autoIndex bool | 36 autoIndex bool |
| 33 // true means that all of the __...__ keys which are normally automatica
lly | 37 // true means that all of the __...__ keys which are normally automatica
lly |
| 34 // maintained will be omitted. This also means that Put with an incomple
te | 38 // maintained will be omitted. This also means that Put with an incomple
te |
| 35 // key will become an error. | 39 // key will become an error. |
| 36 disableSpecialEntities bool | 40 disableSpecialEntities bool |
| 37 } | 41 } |
| 38 | 42 |
| 39 var ( | 43 var ( |
| 40 _ = memContextObj((*dataStoreData)(nil)) | 44 _ = memContextObj((*dataStoreData)(nil)) |
| 41 _ = sync.Locker((*dataStoreData)(nil)) | 45 _ = sync.Locker((*dataStoreData)(nil)) |
| 42 ) | 46 ) |
| 43 | 47 |
| 44 func newDataStoreData() *dataStoreData { | 48 func newDataStoreData(aid string) *dataStoreData { |
| 45 head := newMemStore() | 49 head := newMemStore() |
| 46 return &dataStoreData{ | 50 return &dataStoreData{ |
| 51 aid: aid, |
| 47 head: head, | 52 head: head, |
| 48 snap: head.Snapshot(), // empty but better than a nil pointer. | 53 snap: head.Snapshot(), // empty but better than a nil pointer. |
| 49 } | 54 } |
| 50 } | 55 } |
| 51 | 56 |
| 52 func (d *dataStoreData) Lock() { | 57 func (d *dataStoreData) Lock() { |
| 53 d.rwlock.Lock() | 58 d.rwlock.Lock() |
| 54 } | 59 } |
| 55 | 60 |
| 56 func (d *dataStoreData) Unlock() { | 61 func (d *dataStoreData) Unlock() { |
| (...skipping 13 matching lines...) Expand all Loading... |
| 70 if always { | 75 if always { |
| 71 d.snap = nil | 76 d.snap = nil |
| 72 } else { | 77 } else { |
| 73 d.snap = d.head.Snapshot() | 78 d.snap = d.head.Snapshot() |
| 74 } | 79 } |
| 75 } | 80 } |
| 76 | 81 |
| 77 func (d *dataStoreData) addIndexes(ns string, idxs []*ds.IndexDefinition) { | 82 func (d *dataStoreData) addIndexes(ns string, idxs []*ds.IndexDefinition) { |
| 78 d.Lock() | 83 d.Lock() |
| 79 defer d.Unlock() | 84 defer d.Unlock() |
| 80 » addIndexes(d.head, ns, idxs) | 85 » addIndexes(d.head, d.aid, ns, idxs) |
| 81 } | 86 } |
| 82 | 87 |
| 83 func (d *dataStoreData) setAutoIndex(enable bool) { | 88 func (d *dataStoreData) setAutoIndex(enable bool) { |
| 84 d.Lock() | 89 d.Lock() |
| 85 defer d.Unlock() | 90 defer d.Unlock() |
| 86 d.autoIndex = enable | 91 d.autoIndex = enable |
| 87 } | 92 } |
| 88 | 93 |
| 89 func (d *dataStoreData) maybeAutoIndex(err error) bool { | 94 func (d *dataStoreData) maybeAutoIndex(err error) bool { |
| 90 mi, ok := err.(*ErrMissingIndex) | 95 mi, ok := err.(*ErrMissingIndex) |
| (...skipping 171 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 262 if err != nil { | 267 if err != nil { |
| 263 return | 268 return |
| 264 } | 269 } |
| 265 if !d.disableSpecialEntities { | 270 if !d.disableSpecialEntities { |
| 266 incrementLocked(ents, groupMetaKey(ret), 1) | 271 incrementLocked(ents, groupMetaKey(ret), 1) |
| 267 } | 272 } |
| 268 | 273 |
| 269 old := ents.Get(keyBytes(ret)) | 274 old := ents.Get(keyBytes(ret)) |
| 270 oldPM := ds.PropertyMap(nil) | 275 oldPM := ds.PropertyMap(nil) |
| 271 if old != nil { | 276 if old != nil { |
| 272 » » » » if oldPM, err = rpmWoCtx(old, ns); err != nil { | 277 » » » » if oldPM, err = rpm(old); err != nil { |
| 273 return | 278 return |
| 274 } | 279 } |
| 275 } | 280 } |
| 276 updateIndexes(d.head, ret, oldPM, pmap) | 281 updateIndexes(d.head, ret, oldPM, pmap) |
| 277 ents.Set(keyBytes(ret), dataBytes) | 282 ents.Set(keyBytes(ret), dataBytes) |
| 278 return | 283 return |
| 279 }() | 284 }() |
| 280 if cb != nil { | 285 if cb != nil { |
| 281 cb(k, err) | 286 cb(k, err) |
| 282 } | 287 } |
| (...skipping 11 matching lines...) Expand all Loading... |
| 294 } | 299 } |
| 295 return nil | 300 return nil |
| 296 } | 301 } |
| 297 | 302 |
| 298 for _, k := range keys { | 303 for _, k := range keys { |
| 299 pdata := ents.Get(keyBytes(k)) | 304 pdata := ents.Get(keyBytes(k)) |
| 300 if pdata == nil { | 305 if pdata == nil { |
| 301 cb(nil, ds.ErrNoSuchEntity) | 306 cb(nil, ds.ErrNoSuchEntity) |
| 302 continue | 307 continue |
| 303 } | 308 } |
| 304 » » cb(rpmWoCtx(pdata, k.Namespace())) | 309 » » cb(rpm(pdata)) |
| 305 } | 310 } |
| 306 return nil | 311 return nil |
| 307 } | 312 } |
| 308 | 313 |
| 309 func (d *dataStoreData) getMulti(keys []*ds.Key, cb ds.GetMultiCB) error { | 314 func (d *dataStoreData) getMulti(keys []*ds.Key, cb ds.GetMultiCB) error { |
| 310 return getMultiInner(keys, cb, func() (*memCollection, error) { | 315 return getMultiInner(keys, cb, func() (*memCollection, error) { |
| 311 s := d.takeSnapshot() | 316 s := d.takeSnapshot() |
| 312 | 317 |
| 313 return s.GetCollection("ents:" + keys[0].Namespace()), nil | 318 return s.GetCollection("ents:" + keys[0].Namespace()), nil |
| 314 }) | 319 }) |
| 315 } | 320 } |
| 316 | 321 |
| 317 func (d *dataStoreData) delMulti(keys []*ds.Key, cb ds.DeleteMultiCB) { | 322 func (d *dataStoreData) delMulti(keys []*ds.Key, cb ds.DeleteMultiCB) { |
| 318 ns := keys[0].Namespace() | 323 ns := keys[0].Namespace() |
| 319 ents := d.mutableEnts(ns) | 324 ents := d.mutableEnts(ns) |
| 320 | 325 |
| 321 if ents != nil { | 326 if ents != nil { |
| 322 for _, k := range keys { | 327 for _, k := range keys { |
| 323 err := func() error { | 328 err := func() error { |
| 324 kb := keyBytes(k) | 329 kb := keyBytes(k) |
| 325 | 330 |
| 326 d.Lock() | 331 d.Lock() |
| 327 defer d.Unlock() | 332 defer d.Unlock() |
| 328 | 333 |
| 329 if !d.disableSpecialEntities { | 334 if !d.disableSpecialEntities { |
| 330 incrementLocked(ents, groupMetaKey(k), 1
) | 335 incrementLocked(ents, groupMetaKey(k), 1
) |
| 331 } | 336 } |
| 332 if old := ents.Get(kb); old != nil { | 337 if old := ents.Get(kb); old != nil { |
| 333 » » » » » oldPM, err := rpmWoCtx(old, ns) | 338 » » » » » oldPM, err := rpm(old) |
| 334 if err != nil { | 339 if err != nil { |
| 335 return err | 340 return err |
| 336 } | 341 } |
| 337 updateIndexes(d.head, k, oldPM, nil) | 342 updateIndexes(d.head, k, oldPM, nil) |
| 338 ents.Delete(kb) | 343 ents.Delete(kb) |
| 339 } | 344 } |
| 340 return nil | 345 return nil |
| 341 }() | 346 }() |
| 342 if cb != nil { | 347 if cb != nil { |
| 343 cb(err) | 348 cb(err) |
| (...skipping 196 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 540 cb(err) | 545 cb(err) |
| 541 } | 546 } |
| 542 } | 547 } |
| 543 return nil | 548 return nil |
| 544 } | 549 } |
| 545 | 550 |
| 546 func keyBytes(key *ds.Key) []byte { | 551 func keyBytes(key *ds.Key) []byte { |
| 547 return serialize.ToBytes(ds.MkProperty(key)) | 552 return serialize.ToBytes(ds.MkProperty(key)) |
| 548 } | 553 } |
| 549 | 554 |
| 550 func rpmWoCtx(data []byte, ns string) (ds.PropertyMap, error) { | |
| 551 return serialize.ReadPropertyMap(bytes.NewBuffer(data), | |
| 552 serialize.WithoutContext, globalAppID, ns) | |
| 553 } | |
| 554 | |
| 555 func rpm(data []byte) (ds.PropertyMap, error) { | 555 func rpm(data []byte) (ds.PropertyMap, error) { |
| 556 return serialize.ReadPropertyMap(bytes.NewBuffer(data), | 556 return serialize.ReadPropertyMap(bytes.NewBuffer(data), |
| 557 serialize.WithContext, "", "") | 557 serialize.WithContext, "", "") |
| 558 } | 558 } |
| OLD | NEW |