Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 package memory | 5 package memory |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "bytes" | 8 "bytes" |
| 9 "fmt" | 9 "fmt" |
| 10 "sync" | 10 "sync" |
| 11 "sync/atomic" | 11 "sync/atomic" |
| 12 | 12 |
| 13 ds "github.com/luci/gae/service/datastore" | 13 ds "github.com/luci/gae/service/datastore" |
| 14 "github.com/luci/gae/service/datastore/serialize" | 14 "github.com/luci/gae/service/datastore/serialize" |
| 15 "github.com/luci/luci-go/common/errors" | 15 "github.com/luci/luci-go/common/errors" |
| 16 "golang.org/x/net/context" | 16 "golang.org/x/net/context" |
| 17 ) | 17 ) |
| 18 | 18 |
| 19 //////////////////////////////// dataStoreData ///////////////////////////////// | 19 //////////////////////////////// dataStoreData ///////////////////////////////// |
| 20 | 20 |
| 21 type dataStoreData struct { | 21 type dataStoreData struct { |
| 22 rwlock sync.RWMutex | 22 rwlock sync.RWMutex |
| 23 // See README.md for head schema. | 23 // See README.md for head schema. |
| 24 head *memStore | 24 head *memStore |
| 25 snap *memStore | 25 snap *memStore |
| 26 // For testing, see SetTransactionRetryCount. | 26 // For testing, see SetTransactionRetryCount. |
| 27 txnFakeRetry int | 27 txnFakeRetry int |
| 28 // true means that head always == snap | |
| 29 consistent bool | |
| 28 } | 30 } |
| 29 | 31 |
| 30 var ( | 32 var ( |
| 31 _ = memContextObj((*dataStoreData)(nil)) | 33 _ = memContextObj((*dataStoreData)(nil)) |
| 32 _ = sync.Locker((*dataStoreData)(nil)) | 34 _ = sync.Locker((*dataStoreData)(nil)) |
| 33 ) | 35 ) |
| 34 | 36 |
| 35 func newDataStoreData() *dataStoreData { | 37 func newDataStoreData() *dataStoreData { |
| 36 head := newMemStore() | 38 head := newMemStore() |
| 37 return &dataStoreData{ | 39 return &dataStoreData{ |
| 38 head: head, | 40 head: head, |
| 39 snap: head.Snapshot(), // empty but better than a nil pointer. | 41 snap: head.Snapshot(), // empty but better than a nil pointer. |
| 40 } | 42 } |
| 41 } | 43 } |
| 42 | 44 |
| 43 func (d *dataStoreData) Lock() { | 45 func (d *dataStoreData) Lock() { |
| 44 d.rwlock.Lock() | 46 d.rwlock.Lock() |
| 45 } | 47 } |
| 46 | 48 |
| 47 func (d *dataStoreData) Unlock() { | 49 func (d *dataStoreData) Unlock() { |
| 48 d.rwlock.Unlock() | 50 d.rwlock.Unlock() |
| 49 } | 51 } |
| 50 | 52 |
| 53 func (d *dataStoreData) setTxnRetry(count int) { | |
| 54 d.Lock() | |
|
Vadim Sh.
2015/09/24 18:40:11
oops.. didn't know it's under lock
iannucci
2015/09/24 18:59:30
is ok :)
| |
| 55 defer d.Unlock() | |
| 56 d.txnFakeRetry = count | |
| 57 } | |
| 58 | |
| 59 func (d *dataStoreData) setConsistent(always bool) { | |
| 60 d.Lock() | |
| 61 defer d.Unlock() | |
| 62 | |
| 63 d.consistent = always | |
| 64 if d.consistent { | |
| 65 d.snap = d.head.Snapshot() | |
| 66 } | |
| 67 } | |
| 68 | |
| 51 func (d *dataStoreData) getQuerySnaps(consistent bool) (idx, head *memStore) { | 69 func (d *dataStoreData) getQuerySnaps(consistent bool) (idx, head *memStore) { |
| 52 d.rwlock.RLock() | 70 d.rwlock.RLock() |
| 53 defer d.rwlock.RUnlock() | 71 defer d.rwlock.RUnlock() |
| 72 if d.consistent { | |
| 73 // snap is already a consistent snapshot of head | |
| 74 return d.snap, d.snap | |
| 75 } | |
| 76 | |
| 54 head = d.head.Snapshot() | 77 head = d.head.Snapshot() |
| 55 if consistent { | 78 if consistent { |
| 56 idx = head | 79 idx = head |
| 57 } else { | 80 } else { |
| 58 idx = d.snap | 81 idx = d.snap |
| 59 } | 82 } |
| 60 return | 83 return |
| 61 } | 84 } |
| 62 | 85 |
| 63 func (d *dataStoreData) takeSnapshot() *memStore { | 86 func (d *dataStoreData) takeSnapshot() *memStore { |
| 64 d.rwlock.RLock() | 87 d.rwlock.RLock() |
| 65 defer d.rwlock.RUnlock() | 88 defer d.rwlock.RUnlock() |
| 89 if d.consistent { | |
| 90 return d.snap | |
| 91 } | |
| 66 return d.head.Snapshot() | 92 return d.head.Snapshot() |
| 67 } | 93 } |
| 68 | 94 |
| 69 func (d *dataStoreData) setSnapshot(snap *memStore) { | 95 func (d *dataStoreData) setSnapshot(snap *memStore) { |
| 70 d.rwlock.Lock() | 96 d.rwlock.Lock() |
| 71 defer d.rwlock.Unlock() | 97 defer d.rwlock.Unlock() |
| 98 if d.consistent { | |
| 99 return | |
| 100 } | |
| 72 d.snap = snap | 101 d.snap = snap |
| 73 } | 102 } |
| 74 | 103 |
| 75 func (d *dataStoreData) catchupIndexes() { | 104 func (d *dataStoreData) catchupIndexes() { |
| 76 d.rwlock.Lock() | 105 d.rwlock.Lock() |
| 77 defer d.rwlock.Unlock() | 106 defer d.rwlock.Unlock() |
| 107 if d.consistent { | |
| 108 return | |
| 109 } | |
| 78 d.snap = d.head.Snapshot() | 110 d.snap = d.head.Snapshot() |
| 79 } | 111 } |
| 80 | 112 |
| 81 /////////////////////////// indexes(dataStoreData) //////////////////////////// | 113 /////////////////////////// indexes(dataStoreData) //////////////////////////// |
| 82 | 114 |
| 83 func groupMetaKey(key *ds.Key) []byte { | 115 func groupMetaKey(key *ds.Key) []byte { |
| 84 return keyBytes(ds.NewKey("", "", "__entity_group__", "", 1, key.Root()) ) | 116 return keyBytes(ds.NewKey("", "", "__entity_group__", "", 1, key.Root()) ) |
| 85 } | 117 } |
| 86 | 118 |
| 87 func groupIDsKey(key *ds.Key) []byte { | 119 func groupIDsKey(key *ds.Key) []byte { |
| (...skipping 64 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 152 | 184 |
| 153 func (d *dataStoreData) fixKeyLocked(ents *memCollection, key *ds.Key) *ds.Key { | 185 func (d *dataStoreData) fixKeyLocked(ents *memCollection, key *ds.Key) *ds.Key { |
| 154 if key.Incomplete() { | 186 if key.Incomplete() { |
| 155 id := d.allocateIDsLocked(ents, key, 1) | 187 id := d.allocateIDsLocked(ents, key, 1) |
| 156 key = ds.NewKey(key.AppID(), key.Namespace(), key.Kind(), "", id , key.Parent()) | 188 key = ds.NewKey(key.AppID(), key.Namespace(), key.Kind(), "", id , key.Parent()) |
| 157 } | 189 } |
| 158 return key | 190 return key |
| 159 } | 191 } |
| 160 | 192 |
| 161 func (d *dataStoreData) putMulti(keys []*ds.Key, vals []ds.PropertyMap, cb ds.Pu tMultiCB) { | 193 func (d *dataStoreData) putMulti(keys []*ds.Key, vals []ds.PropertyMap, cb ds.Pu tMultiCB) { |
| 162 » ents := d.mutableEnts(keys[0].Namespace()) | 194 » ns := keys[0].Namespace() |
| 195 » ents := d.mutableEnts(ns) | |
| 163 | 196 |
| 164 for i, k := range keys { | 197 for i, k := range keys { |
| 165 pmap, _ := vals[i].Save(false) | 198 pmap, _ := vals[i].Save(false) |
| 166 dataBytes := serialize.ToBytes(pmap) | 199 dataBytes := serialize.ToBytes(pmap) |
| 167 | 200 |
| 168 k, err := func() (ret *ds.Key, err error) { | 201 k, err := func() (ret *ds.Key, err error) { |
| 169 d.Lock() | 202 d.Lock() |
| 170 defer d.Unlock() | 203 defer d.Unlock() |
| 171 | 204 |
| 172 ret = d.fixKeyLocked(ents, k) | 205 ret = d.fixKeyLocked(ents, k) |
| 173 incrementLocked(ents, groupMetaKey(ret), 1) | 206 incrementLocked(ents, groupMetaKey(ret), 1) |
| 174 | 207 |
| 175 old := ents.Get(keyBytes(ret)) | 208 old := ents.Get(keyBytes(ret)) |
| 176 oldPM := ds.PropertyMap(nil) | 209 oldPM := ds.PropertyMap(nil) |
| 177 if old != nil { | 210 if old != nil { |
| 178 » » » » if oldPM, err = rpmWoCtx(old, ret.Namespace()); err != nil { | 211 » » » » if oldPM, err = rpmWoCtx(old, ns); err != nil { |
| 179 return | 212 return |
| 180 } | 213 } |
| 181 } | 214 } |
| 182 updateIndexes(d.head, ret, oldPM, pmap) | 215 updateIndexes(d.head, ret, oldPM, pmap) |
| 183 ents.Set(keyBytes(ret), dataBytes) | 216 ents.Set(keyBytes(ret), dataBytes) |
| 217 if d.consistent { | |
| 218 d.snap = d.head.Snapshot() | |
| 219 } | |
| 184 return | 220 return |
| 185 }() | 221 }() |
| 186 if cb != nil { | 222 if cb != nil { |
| 187 cb(k, err) | 223 cb(k, err) |
| 188 } | 224 } |
| 189 } | 225 } |
| 190 } | 226 } |
| 191 | 227 |
| 192 func getMultiInner(keys []*ds.Key, cb ds.GetMultiCB, getColl func() (*memCollect ion, error)) error { | 228 func getMultiInner(keys []*ds.Key, cb ds.GetMultiCB, getColl func() (*memCollect ion, error)) error { |
| 193 ents, err := getColl() | 229 ents, err := getColl() |
| (...skipping 20 matching lines...) Expand all Loading... | |
| 214 | 250 |
| 215 func (d *dataStoreData) getMulti(keys []*ds.Key, cb ds.GetMultiCB) error { | 251 func (d *dataStoreData) getMulti(keys []*ds.Key, cb ds.GetMultiCB) error { |
| 216 return getMultiInner(keys, cb, func() (*memCollection, error) { | 252 return getMultiInner(keys, cb, func() (*memCollection, error) { |
| 217 s := d.takeSnapshot() | 253 s := d.takeSnapshot() |
| 218 | 254 |
| 219 return s.GetCollection("ents:" + keys[0].Namespace()), nil | 255 return s.GetCollection("ents:" + keys[0].Namespace()), nil |
| 220 }) | 256 }) |
| 221 } | 257 } |
| 222 | 258 |
| 223 func (d *dataStoreData) delMulti(keys []*ds.Key, cb ds.DeleteMultiCB) { | 259 func (d *dataStoreData) delMulti(keys []*ds.Key, cb ds.DeleteMultiCB) { |
| 224 toDel := make([][]byte, 0, len(keys)) | |
| 225 for _, k := range keys { | |
| 226 toDel = append(toDel, keyBytes(k)) | |
| 227 } | |
| 228 ns := keys[0].Namespace() | 260 ns := keys[0].Namespace() |
| 261 ents := d.mutableEnts(ns) | |
| 229 | 262 |
| 230 » d.rwlock.Lock() | 263 » if ents != nil { |
| 231 » defer d.rwlock.Unlock() | 264 » » for _, k := range keys { |
| 265 » » » err := func() error { | |
| 266 » » » » kb := keyBytes(k) | |
| 232 | 267 |
| 233 » ents := d.head.GetCollection("ents:" + ns) | 268 » » » » d.Lock() |
| 269 » » » » defer d.Unlock() | |
| 234 | 270 |
| 235 » for i, k := range keys { | 271 » » » » incrementLocked(ents, groupMetaKey(k), 1) |
| 236 » » if ents != nil { | 272 » » » » if old := ents.Get(kb); old != nil { |
| 237 » » » incrementLocked(ents, groupMetaKey(k), 1) | 273 » » » » » oldPM, err := rpmWoCtx(old, ns) |
| 238 » » » kb := toDel[i] | 274 » » » » » if err != nil { |
| 239 » » » if old := ents.Get(kb); old != nil { | 275 » » » » » » return err |
| 240 » » » » oldPM, err := rpmWoCtx(old, ns) | |
| 241 » » » » if err != nil { | |
| 242 » » » » » if cb != nil { | |
| 243 » » » » » » cb(err) | |
| 244 } | 276 } |
| 245 » » » » » continue | 277 » » » » » updateIndexes(d.head, k, oldPM, nil) |
| 278 » » » » » ents.Delete(kb) | |
| 279 » » » » » if d.consistent { | |
| 280 » » » » » » d.snap = d.head.Snapshot() | |
| 281 » » » » » } | |
| 246 } | 282 } |
| 247 » » » » updateIndexes(d.head, k, oldPM, nil) | 283 » » » » return nil |
| 248 » » » » ents.Delete(kb) | 284 » » » }() |
| 285 » » » if cb != nil { | |
|
Vadim Sh.
2015/09/24 18:40:11
I think you need to call cb() len(keys) number of
iannucci
2015/09/24 18:59:30
oh, good catch.
| |
| 286 » » » » cb(err) | |
| 249 } | 287 } |
| 250 } | 288 } |
| 251 if cb != nil { | |
| 252 cb(nil) | |
| 253 } | |
| 254 } | 289 } |
| 255 } | 290 } |
| 256 | 291 |
| 257 func (d *dataStoreData) canApplyTxn(obj memContextObj) bool { | 292 func (d *dataStoreData) canApplyTxn(obj memContextObj) bool { |
| 258 // TODO(riannucci): implement with Flush/FlushRevert for persistance. | 293 // TODO(riannucci): implement with Flush/FlushRevert for persistance. |
| 259 | 294 |
| 260 txn := obj.(*txnDataStoreData) | 295 txn := obj.(*txnDataStoreData) |
| 261 for rk, muts := range txn.muts { | 296 for rk, muts := range txn.muts { |
| 262 if len(muts) == 0 { // read-only | 297 if len(muts) == 0 { // read-only |
| 263 continue | 298 continue |
| (...skipping 186 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 450 | 485 |
| 451 func rpmWoCtx(data []byte, ns string) (ds.PropertyMap, error) { | 486 func rpmWoCtx(data []byte, ns string) (ds.PropertyMap, error) { |
| 452 return serialize.ReadPropertyMap(bytes.NewBuffer(data), | 487 return serialize.ReadPropertyMap(bytes.NewBuffer(data), |
| 453 serialize.WithoutContext, globalAppID, ns) | 488 serialize.WithoutContext, globalAppID, ns) |
| 454 } | 489 } |
| 455 | 490 |
| 456 func rpm(data []byte) (ds.PropertyMap, error) { | 491 func rpm(data []byte) (ds.PropertyMap, error) { |
| 457 return serialize.ReadPropertyMap(bytes.NewBuffer(data), | 492 return serialize.ReadPropertyMap(bytes.NewBuffer(data), |
| 458 serialize.WithContext, "", "") | 493 serialize.WithContext, "", "") |
| 459 } | 494 } |
| OLD | NEW |