| OLD | NEW |
| (Empty) |
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. | |
| 2 // Use of this source code is governed by a BSD-style license that can be | |
| 3 // found in the LICENSE file. | |
| 4 | |
| 5 package memory | |
| 6 | |
| 7 import ( | |
| 8 "bytes" | |
| 9 "errors" | |
| 10 "fmt" | |
| 11 "sync" | |
| 12 "sync/atomic" | |
| 13 | |
| 14 "golang.org/x/net/context" | |
| 15 | |
| 16 "github.com/luci/gae" | |
| 17 "github.com/luci/gae/helper" | |
| 18 ) | |
| 19 | |
| 20 //////////////////////////////// dataStoreData ///////////////////////////////// | |
| 21 | |
| 22 type dataStoreData struct { | |
| 23 rwlock sync.RWMutex | |
| 24 // See README.md for store schema. | |
| 25 store *memStore | |
| 26 snap *memStore | |
| 27 } | |
| 28 | |
| 29 var ( | |
| 30 _ = memContextObj((*dataStoreData)(nil)) | |
| 31 _ = sync.Locker((*dataStoreData)(nil)) | |
| 32 ) | |
| 33 | |
| 34 func newDataStoreData() *dataStoreData { | |
| 35 store := newMemStore() | |
| 36 return &dataStoreData{ | |
| 37 store: store, | |
| 38 snap: store.Snapshot(), // empty but better than a nil pointer. | |
| 39 } | |
| 40 } | |
| 41 | |
| 42 func (d *dataStoreData) Lock() { | |
| 43 d.rwlock.Lock() | |
| 44 } | |
| 45 | |
| 46 func (d *dataStoreData) Unlock() { | |
| 47 d.rwlock.Unlock() | |
| 48 } | |
| 49 | |
| 50 /////////////////////////// indicies(dataStoreData) //////////////////////////// | |
| 51 | |
| 52 func groupMetaKey(key gae.DSKey) []byte { | |
| 53 return keyBytes(helper.WithoutContext, | |
| 54 helper.NewDSKey("", "", "__entity_group__", "", 1, helper.DSKeyR
oot(key))) | |
| 55 } | |
| 56 | |
| 57 func groupIDsKey(key gae.DSKey) []byte { | |
| 58 return keyBytes(helper.WithoutContext, | |
| 59 helper.NewDSKey("", "", "__entity_group_ids__", "", 1, helper.DS
KeyRoot(key))) | |
| 60 } | |
| 61 | |
| 62 func rootIDsKey(kind string) []byte { | |
| 63 return keyBytes(helper.WithoutContext, | |
| 64 helper.NewDSKey("", "", "__entity_root_ids__", kind, 0, nil)) | |
| 65 } | |
| 66 | |
| 67 func curVersion(ents *memCollection, key []byte) int64 { | |
| 68 if v := ents.Get(key); v != nil { | |
| 69 pm, err := rpm(v) | |
| 70 if err != nil { | |
| 71 panic(err) // memory corruption | |
| 72 } | |
| 73 pl, ok := pm["__version__"] | |
| 74 if ok && len(pl) > 0 && pl[0].Type() == gae.DSPTInt { | |
| 75 return pl[0].Value().(int64) | |
| 76 } | |
| 77 panic(fmt.Errorf("__version__ property missing or wrong: %v", pm
)) | |
| 78 } | |
| 79 return 0 | |
| 80 } | |
| 81 | |
| 82 func incrementLocked(ents *memCollection, key []byte) int64 { | |
| 83 ret := curVersion(ents, key) + 1 | |
| 84 buf := &bytes.Buffer{} | |
| 85 helper.WriteDSPropertyMap( | |
| 86 buf, gae.DSPropertyMap{"__version__": {gae.MkDSPropertyNI(ret)}}
, helper.WithContext) | |
| 87 ents.Set(key, buf.Bytes()) | |
| 88 return ret | |
| 89 } | |
| 90 | |
| 91 func (d *dataStoreData) entsKeyLocked(key gae.DSKey) (*memCollection, gae.DSKey)
{ | |
| 92 coll := "ents:" + key.Namespace() | |
| 93 ents := d.store.GetCollection(coll) | |
| 94 if ents == nil { | |
| 95 ents = d.store.SetCollection(coll, nil) | |
| 96 } | |
| 97 | |
| 98 if helper.DSKeyIncomplete(key) { | |
| 99 idKey := []byte(nil) | |
| 100 if key.Parent() == nil { | |
| 101 idKey = rootIDsKey(key.Kind()) | |
| 102 } else { | |
| 103 idKey = groupIDsKey(key) | |
| 104 } | |
| 105 id := incrementLocked(ents, idKey) | |
| 106 key = helper.NewDSKey(key.AppID(), key.Namespace(), key.Kind(),
"", id, key.Parent()) | |
| 107 } | |
| 108 | |
| 109 return ents, key | |
| 110 } | |
| 111 | |
| 112 func (d *dataStoreData) put(ns string, key gae.DSKey, pls gae.DSPropertyLoadSave
r) (gae.DSKey, error) { | |
| 113 keys, errs := d.putMulti(ns, []gae.DSKey{key}, []gae.DSPropertyLoadSaver
{pls}) | |
| 114 if errs == nil { | |
| 115 return keys[0], nil | |
| 116 } | |
| 117 return nil, gae.SingleError(errs) | |
| 118 } | |
| 119 | |
| 120 func (d *dataStoreData) putMulti(ns string, keys []gae.DSKey, plss []gae.DSPrope
rtyLoadSaver) ([]gae.DSKey, error) { | |
| 121 pmaps, err := putMultiPrelim(ns, keys, plss) | |
| 122 if err != nil { | |
| 123 return nil, err | |
| 124 } | |
| 125 return d.putMultiInner(keys, pmaps) | |
| 126 } | |
| 127 | |
| 128 func putMultiPrelim(ns string, keys []gae.DSKey, plss []gae.DSPropertyLoadSaver)
([]gae.DSPropertyMap, error) { | |
| 129 err := multiValid(keys, plss, ns, true, false) | |
| 130 if err != nil { | |
| 131 return nil, err | |
| 132 } | |
| 133 pmaps := make([]gae.DSPropertyMap, len(keys)) | |
| 134 lme := gae.LazyMultiError{Size: len(keys)} | |
| 135 for i, pls := range plss { | |
| 136 pm, err := pls.Save(false) | |
| 137 lme.Assign(i, err) | |
| 138 pmaps[i] = pm | |
| 139 } | |
| 140 return pmaps, lme.Get() | |
| 141 } | |
| 142 | |
| 143 func (d *dataStoreData) putMultiInner(keys []gae.DSKey, data []gae.DSPropertyMap
) ([]gae.DSKey, error) { | |
| 144 retKeys := make([]gae.DSKey, len(keys)) | |
| 145 lme := gae.LazyMultiError{Size: len(keys)} | |
| 146 for i, k := range keys { | |
| 147 buf := &bytes.Buffer{} | |
| 148 helper.WriteDSPropertyMap(buf, data[i], helper.WithoutContext) | |
| 149 dataBytes := buf.Bytes() | |
| 150 | |
| 151 rKey, err := func() (ret gae.DSKey, err error) { | |
| 152 d.rwlock.Lock() | |
| 153 defer d.rwlock.Unlock() | |
| 154 | |
| 155 ents, ret := d.entsKeyLocked(k) | |
| 156 incrementLocked(ents, groupMetaKey(ret)) | |
| 157 | |
| 158 old := ents.Get(keyBytes(helper.WithoutContext, ret)) | |
| 159 oldPM := gae.DSPropertyMap(nil) | |
| 160 if old != nil { | |
| 161 if oldPM, err = rpmWoCtx(old, ret.Namespace());
err != nil { | |
| 162 return | |
| 163 } | |
| 164 } | |
| 165 updateIndicies(d.store, ret, oldPM, data[i]) | |
| 166 ents.Set(keyBytes(helper.WithoutContext, ret), dataBytes
) | |
| 167 return | |
| 168 }() | |
| 169 lme.Assign(i, err) | |
| 170 retKeys[i] = rKey | |
| 171 } | |
| 172 return retKeys, lme.Get() | |
| 173 } | |
| 174 | |
| 175 func getMultiInner(ns string, keys []gae.DSKey, plss []gae.DSPropertyLoadSaver,
getColl func() (*memCollection, error)) error { | |
| 176 if err := multiValid(keys, plss, ns, false, true); err != nil { | |
| 177 return err | |
| 178 } | |
| 179 | |
| 180 lme := gae.LazyMultiError{Size: len(keys)} | |
| 181 | |
| 182 ents, err := getColl() | |
| 183 if err != nil { | |
| 184 return err | |
| 185 } | |
| 186 if ents == nil { | |
| 187 for i := range keys { | |
| 188 lme.Assign(i, gae.ErrDSNoSuchEntity) | |
| 189 } | |
| 190 return lme.Get() | |
| 191 } | |
| 192 | |
| 193 for i, k := range keys { | |
| 194 pdata := ents.Get(keyBytes(helper.WithoutContext, k)) | |
| 195 if pdata == nil { | |
| 196 lme.Assign(i, gae.ErrDSNoSuchEntity) | |
| 197 continue | |
| 198 } | |
| 199 | |
| 200 got, err := rpmWoCtx(pdata, ns) | |
| 201 if err != nil { | |
| 202 lme.Assign(i, err) | |
| 203 continue | |
| 204 } | |
| 205 | |
| 206 lme.Assign(i, plss[i].Load(got)) | |
| 207 } | |
| 208 return lme.Get() | |
| 209 } | |
| 210 | |
| 211 func (d *dataStoreData) get(ns string, key gae.DSKey, pls gae.DSPropertyLoadSave
r) error { | |
| 212 return gae.SingleError(d.getMulti(ns, []gae.DSKey{key}, []gae.DSProperty
LoadSaver{pls})) | |
| 213 } | |
| 214 | |
| 215 func (d *dataStoreData) getMulti(ns string, keys []gae.DSKey, plss []gae.DSPrope
rtyLoadSaver) error { | |
| 216 return getMultiInner(ns, keys, plss, func() (*memCollection, error) { | |
| 217 d.rwlock.RLock() | |
| 218 s := d.store.Snapshot() | |
| 219 d.rwlock.RUnlock() | |
| 220 | |
| 221 return s.GetCollection("ents:" + ns), nil | |
| 222 }) | |
| 223 } | |
| 224 | |
| 225 func (d *dataStoreData) del(ns string, key gae.DSKey) (err error) { | |
| 226 return gae.SingleError(d.delMulti(ns, []gae.DSKey{key})) | |
| 227 } | |
| 228 | |
| 229 func (d *dataStoreData) delMulti(ns string, keys []gae.DSKey) error { | |
| 230 lme := gae.LazyMultiError{Size: len(keys)} | |
| 231 toDel := make([][]byte, 0, len(keys)) | |
| 232 for i, k := range keys { | |
| 233 if !helper.DSKeyValid(k, ns, false) { | |
| 234 lme.Assign(i, gae.ErrDSInvalidKey) | |
| 235 continue | |
| 236 } | |
| 237 toDel = append(toDel, keyBytes(helper.WithoutContext, k)) | |
| 238 } | |
| 239 err := lme.Get() | |
| 240 if err != nil { | |
| 241 return err | |
| 242 } | |
| 243 | |
| 244 d.rwlock.Lock() | |
| 245 defer d.rwlock.Unlock() | |
| 246 | |
| 247 ents := d.store.GetCollection("ents:" + ns) | |
| 248 if ents == nil { | |
| 249 return nil | |
| 250 } | |
| 251 | |
| 252 for i, k := range keys { | |
| 253 incrementLocked(ents, groupMetaKey(k)) | |
| 254 kb := toDel[i] | |
| 255 old := ents.Get(kb) | |
| 256 oldPM := gae.DSPropertyMap(nil) | |
| 257 if old != nil { | |
| 258 if oldPM, err = rpmWoCtx(old, ns); err != nil { | |
| 259 lme.Assign(i, err) | |
| 260 continue | |
| 261 } | |
| 262 } | |
| 263 updateIndicies(d.store, k, oldPM, nil) | |
| 264 ents.Delete(kb) | |
| 265 } | |
| 266 return lme.Get() | |
| 267 } | |
| 268 | |
| 269 func (d *dataStoreData) canApplyTxn(obj memContextObj) bool { | |
| 270 // TODO(riannucci): implement with Flush/FlushRevert for persistance. | |
| 271 | |
| 272 txn := obj.(*txnDataStoreData) | |
| 273 for rk, muts := range txn.muts { | |
| 274 if len(muts) == 0 { // read-only | |
| 275 continue | |
| 276 } | |
| 277 k, err := helper.ReadDSKey(bytes.NewBufferString(rk), helper.Wit
hContext, "", "") | |
| 278 if err != nil { | |
| 279 panic(err) | |
| 280 } | |
| 281 | |
| 282 entKey := "ents:" + k.Namespace() | |
| 283 mkey := groupMetaKey(k) | |
| 284 entsHead := d.store.GetCollection(entKey) | |
| 285 entsSnap := txn.snap.GetCollection(entKey) | |
| 286 vHead := curVersion(entsHead, mkey) | |
| 287 vSnap := curVersion(entsSnap, mkey) | |
| 288 if vHead != vSnap { | |
| 289 return false | |
| 290 } | |
| 291 } | |
| 292 return true | |
| 293 } | |
| 294 | |
| 295 func (d *dataStoreData) applyTxn(c context.Context, obj memContextObj) { | |
| 296 txn := obj.(*txnDataStoreData) | |
| 297 for _, muts := range txn.muts { | |
| 298 if len(muts) == 0 { // read-only | |
| 299 continue | |
| 300 } | |
| 301 for _, m := range muts { | |
| 302 err := error(nil) | |
| 303 if m.data == nil { | |
| 304 err = d.del(m.key.Namespace(), m.key) | |
| 305 } else { | |
| 306 _, err = d.put(m.key.Namespace(), m.key, m.data) | |
| 307 } | |
| 308 if err != nil { | |
| 309 panic(err) | |
| 310 } | |
| 311 } | |
| 312 } | |
| 313 } | |
| 314 | |
| 315 func (d *dataStoreData) mkTxn(o *gae.DSTransactionOptions) memContextObj { | |
| 316 return &txnDataStoreData{ | |
| 317 // alias to the main datastore's so that testing code can have p
rimitive | |
| 318 // access to break features inside of transactions. | |
| 319 parent: d, | |
| 320 isXG: o != nil && o.XG, | |
| 321 snap: d.store.Snapshot(), | |
| 322 muts: map[string][]txnMutation{}, | |
| 323 } | |
| 324 } | |
| 325 | |
| 326 func (d *dataStoreData) endTxn() {} | |
| 327 | |
| 328 /////////////////////////////// txnDataStoreData /////////////////////////////// | |
| 329 | |
| 330 type txnMutation struct { | |
| 331 key gae.DSKey | |
| 332 data gae.DSPropertyMap | |
| 333 } | |
| 334 | |
| 335 type txnDataStoreData struct { | |
| 336 sync.Mutex | |
| 337 | |
| 338 parent *dataStoreData | |
| 339 | |
| 340 // boolean 0 or 1, use atomic.*Int32 to access. | |
| 341 closed int32 | |
| 342 isXG bool | |
| 343 | |
| 344 snap *memStore | |
| 345 | |
| 346 // string is the raw-bytes encoding of the entity root incl. namespace | |
| 347 muts map[string][]txnMutation | |
| 348 // TODO(riannucci): account for 'transaction size' limit of 10MB by summ
ing | |
| 349 // length of encoded keys + values. | |
| 350 } | |
| 351 | |
| 352 var _ memContextObj = (*txnDataStoreData)(nil) | |
| 353 | |
| 354 const xgEGLimit = 25 | |
| 355 | |
| 356 func (*txnDataStoreData) canApplyTxn(memContextObj) bool { return false } | |
| 357 func (td *txnDataStoreData) endTxn() { | |
| 358 if atomic.LoadInt32(&td.closed) == 1 { | |
| 359 panic("cannot end transaction twice") | |
| 360 } | |
| 361 atomic.StoreInt32(&td.closed, 1) | |
| 362 } | |
| 363 func (*txnDataStoreData) applyTxn(context.Context, memContextObj) { | |
| 364 panic("txnDataStoreData cannot apply transactions") | |
| 365 } | |
| 366 func (*txnDataStoreData) mkTxn(*gae.DSTransactionOptions) memContextObj { | |
| 367 panic("impossible") | |
| 368 } | |
| 369 | |
| 370 func (td *txnDataStoreData) run(f func() error) error { | |
| 371 // Slightly different from the SDK... datastore and taskqueue each imple
ment | |
| 372 // this here, where in the SDK only datastore.transaction.Call does. | |
| 373 if atomic.LoadInt32(&td.closed) == 1 { | |
| 374 return errors.New("datastore: transaction context has expired") | |
| 375 } | |
| 376 return f() | |
| 377 } | |
| 378 | |
| 379 // writeMutation ensures that this transaction can support the given key/value | |
| 380 // mutation. | |
| 381 // | |
| 382 // if getOnly is true, don't record the actual mutation data, just ensure that | |
| 383 // the key is in an included entity group (or add an empty entry for tha
t | |
| 384 // group). | |
| 385 // | |
| 386 // if !getOnly && data == nil, this counts as a deletion instead of a Put. | |
| 387 // | |
| 388 // Returns an error if this key causes the transaction to cross too many entity | |
| 389 // groups. | |
| 390 func (td *txnDataStoreData) writeMutation(getOnly bool, key gae.DSKey, data gae.
DSPropertyMap) error { | |
| 391 rk := string(keyBytes(helper.WithContext, helper.DSKeyRoot(key))) | |
| 392 | |
| 393 td.Lock() | |
| 394 defer td.Unlock() | |
| 395 | |
| 396 if _, ok := td.muts[rk]; !ok { | |
| 397 limit := 1 | |
| 398 if td.isXG { | |
| 399 limit = xgEGLimit | |
| 400 } | |
| 401 if len(td.muts)+1 > limit { | |
| 402 msg := "cross-group transaction need to be explicitly sp
ecified (xg=True)" | |
| 403 if td.isXG { | |
| 404 msg = "operating on too many entity groups in a
single transaction" | |
| 405 } | |
| 406 return errors.New(msg) | |
| 407 } | |
| 408 td.muts[rk] = []txnMutation{} | |
| 409 } | |
| 410 if !getOnly { | |
| 411 td.muts[rk] = append(td.muts[rk], txnMutation{key, data}) | |
| 412 } | |
| 413 | |
| 414 return nil | |
| 415 } | |
| 416 | |
| 417 func (td *txnDataStoreData) put(ns string, key gae.DSKey, pls gae.DSPropertyLoad
Saver) (gae.DSKey, error) { | |
| 418 keys, errs := td.putMulti(ns, []gae.DSKey{key}, []gae.DSPropertyLoadSave
r{pls}) | |
| 419 if errs == nil { | |
| 420 return keys[0], nil | |
| 421 } | |
| 422 return nil, gae.SingleError(errs) | |
| 423 } | |
| 424 | |
| 425 func (td *txnDataStoreData) putMulti(ns string, keys []gae.DSKey, plss []gae.DSP
ropertyLoadSaver) ([]gae.DSKey, error) { | |
| 426 pmaps, err := putMultiPrelim(ns, keys, plss) | |
| 427 if err != nil { | |
| 428 return nil, err | |
| 429 } | |
| 430 | |
| 431 retKeys := make([]gae.DSKey, len(keys)) | |
| 432 lme := gae.LazyMultiError{Size: len(keys)} | |
| 433 for i, k := range keys { | |
| 434 func() { | |
| 435 td.parent.Lock() | |
| 436 defer td.parent.Unlock() | |
| 437 _, k = td.parent.entsKeyLocked(k) | |
| 438 }() | |
| 439 lme.Assign(i, td.writeMutation(false, k, pmaps[i])) | |
| 440 retKeys[i] = k | |
| 441 } | |
| 442 | |
| 443 return retKeys, lme.Get() | |
| 444 } | |
| 445 | |
| 446 func (td *txnDataStoreData) get(ns string, key gae.DSKey, pls gae.DSPropertyLoad
Saver) error { | |
| 447 return gae.SingleError(td.getMulti(ns, []gae.DSKey{key}, []gae.DSPropert
yLoadSaver{pls})) | |
| 448 } | |
| 449 | |
| 450 func (td *txnDataStoreData) getMulti(ns string, keys []gae.DSKey, plss []gae.DSP
ropertyLoadSaver) error { | |
| 451 return getMultiInner(ns, keys, plss, func() (*memCollection, error) { | |
| 452 lme := gae.LazyMultiError{Size: len(keys)} | |
| 453 for i, k := range keys { | |
| 454 lme.Assign(i, td.writeMutation(true, k, nil)) | |
| 455 } | |
| 456 return td.snap.GetCollection("ents:" + ns), lme.Get() | |
| 457 }) | |
| 458 } | |
| 459 | |
| 460 func (td *txnDataStoreData) del(ns string, key gae.DSKey) error { | |
| 461 return gae.SingleError(td.delMulti(ns, []gae.DSKey{key})) | |
| 462 } | |
| 463 | |
| 464 func (td *txnDataStoreData) delMulti(ns string, keys []gae.DSKey) error { | |
| 465 lme := gae.LazyMultiError{Size: len(keys)} | |
| 466 for i, k := range keys { | |
| 467 if !helper.DSKeyValid(k, ns, false) { | |
| 468 lme.Assign(i, gae.ErrDSInvalidKey) | |
| 469 } else { | |
| 470 lme.Assign(i, td.writeMutation(false, k, nil)) | |
| 471 } | |
| 472 } | |
| 473 return lme.Get() | |
| 474 } | |
| 475 | |
| 476 func keyBytes(ctx helper.DSKeyContext, key gae.DSKey) []byte { | |
| 477 buf := &bytes.Buffer{} | |
| 478 helper.WriteDSKey(buf, ctx, key) | |
| 479 return buf.Bytes() | |
| 480 } | |
| 481 | |
| 482 func rpmWoCtx(data []byte, ns string) (gae.DSPropertyMap, error) { | |
| 483 return helper.ReadDSPropertyMap(bytes.NewBuffer(data), helper.WithoutCon
text, globalAppID, ns) | |
| 484 } | |
| 485 | |
| 486 func rpm(data []byte) (gae.DSPropertyMap, error) { | |
| 487 return helper.ReadDSPropertyMap(bytes.NewBuffer(data), helper.WithContex
t, "", "") | |
| 488 } | |
| 489 | |
| 490 func multiValid(keys []gae.DSKey, plss []gae.DSPropertyLoadSaver, ns string, pot
entialKey, allowSpecial bool) error { | |
| 491 vfn := func(k gae.DSKey) bool { | |
| 492 return !helper.DSKeyIncomplete(k) && helper.DSKeyValid(k, ns, al
lowSpecial) | |
| 493 } | |
| 494 if potentialKey { | |
| 495 vfn = func(k gae.DSKey) bool { | |
| 496 // adds an id to k if it's incomplete. | |
| 497 if helper.DSKeyIncomplete(k) { | |
| 498 k = helper.NewDSKey(k.AppID(), k.Namespace(), k.
Kind(), "", 1, k.Parent()) | |
| 499 } | |
| 500 return helper.DSKeyValid(k, ns, allowSpecial) | |
| 501 } | |
| 502 } | |
| 503 | |
| 504 if keys == nil || plss == nil { | |
| 505 return errors.New("gae: key or plss slices were nil") | |
| 506 } | |
| 507 if len(keys) != len(plss) { | |
| 508 return errors.New("gae: key and dst slices have different length
") | |
| 509 } | |
| 510 lme := gae.LazyMultiError{Size: len(keys)} | |
| 511 for i, k := range keys { | |
| 512 if !vfn(k) { | |
| 513 lme.Assign(i, gae.ErrDSInvalidKey) | |
| 514 } | |
| 515 } | |
| 516 return lme.Get() | |
| 517 } | |
| OLD | NEW |