| OLD | NEW |
| (Empty) |
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. | |
| 2 // Use of this source code is governed by a BSD-style license that can be | |
| 3 // found in the LICENSE file. | |
| 4 | |
| 5 package memory | |
| 6 | |
| 7 import ( | |
| 8 "bytes" | |
| 9 "fmt" | |
| 10 "sync" | |
| 11 "sync/atomic" | |
| 12 | |
| 13 ds "github.com/luci/gae/service/datastore" | |
| 14 "github.com/luci/luci-go/common/errors" | |
| 15 "golang.org/x/net/context" | |
| 16 ) | |
| 17 | |
| 18 //////////////////////////////// dataStoreData ///////////////////////////////// | |
| 19 | |
| 20 type dataStoreData struct { | |
| 21 rwlock sync.RWMutex | |
| 22 // See README.md for store schema. | |
| 23 store *memStore | |
| 24 snap *memStore | |
| 25 } | |
| 26 | |
| 27 var ( | |
| 28 _ = memContextObj((*dataStoreData)(nil)) | |
| 29 _ = sync.Locker((*dataStoreData)(nil)) | |
| 30 ) | |
| 31 | |
| 32 func newDataStoreData() *dataStoreData { | |
| 33 store := newMemStore() | |
| 34 return &dataStoreData{ | |
| 35 store: store, | |
| 36 snap: store.Snapshot(), // empty but better than a nil pointer. | |
| 37 } | |
| 38 } | |
| 39 | |
| 40 func (d *dataStoreData) Lock() { | |
| 41 d.rwlock.Lock() | |
| 42 } | |
| 43 | |
| 44 func (d *dataStoreData) Unlock() { | |
| 45 d.rwlock.Unlock() | |
| 46 } | |
| 47 | |
| 48 /////////////////////////// indicies(dataStoreData) //////////////////////////// | |
| 49 | |
| 50 func groupMetaKey(key ds.Key) []byte { | |
| 51 return keyBytes(ds.WithoutContext, | |
| 52 ds.NewKey("", "", "__entity_group__", "", 1, ds.KeyRoot(key))) | |
| 53 } | |
| 54 | |
| 55 func groupIDsKey(key ds.Key) []byte { | |
| 56 return keyBytes(ds.WithoutContext, | |
| 57 ds.NewKey("", "", "__entity_group_ids__", "", 1, ds.KeyRoot(key)
)) | |
| 58 } | |
| 59 | |
| 60 func rootIDsKey(kind string) []byte { | |
| 61 return keyBytes(ds.WithoutContext, | |
| 62 ds.NewKey("", "", "__entity_root_ids__", kind, 0, nil)) | |
| 63 } | |
| 64 | |
| 65 func curVersion(ents *memCollection, key []byte) int64 { | |
| 66 if ents != nil { | |
| 67 if v := ents.Get(key); v != nil { | |
| 68 pm, err := rpm(v) | |
| 69 if err != nil { | |
| 70 panic(err) // memory corruption | |
| 71 } | |
| 72 pl, ok := pm["__version__"] | |
| 73 if ok && len(pl) > 0 && pl[0].Type() == ds.PTInt { | |
| 74 return pl[0].Value().(int64) | |
| 75 } | |
| 76 panic(fmt.Errorf("__version__ property missing or wrong:
%v", pm)) | |
| 77 } | |
| 78 } | |
| 79 return 0 | |
| 80 } | |
| 81 | |
| 82 func incrementLocked(ents *memCollection, key []byte) int64 { | |
| 83 ret := curVersion(ents, key) + 1 | |
| 84 buf := &bytes.Buffer{} | |
| 85 ds.PropertyMap{"__version__": {ds.MkPropertyNI(ret)}}.Write( | |
| 86 buf, ds.WithContext) | |
| 87 ents.Set(key, buf.Bytes()) | |
| 88 return ret | |
| 89 } | |
| 90 | |
| 91 func (d *dataStoreData) entsKeyLocked(key ds.Key) (*memCollection, ds.Key) { | |
| 92 coll := "ents:" + key.Namespace() | |
| 93 ents := d.store.GetCollection(coll) | |
| 94 if ents == nil { | |
| 95 ents = d.store.SetCollection(coll, nil) | |
| 96 } | |
| 97 | |
| 98 if ds.KeyIncomplete(key) { | |
| 99 idKey := []byte(nil) | |
| 100 if key.Parent() == nil { | |
| 101 idKey = rootIDsKey(key.Kind()) | |
| 102 } else { | |
| 103 idKey = groupIDsKey(key) | |
| 104 } | |
| 105 id := incrementLocked(ents, idKey) | |
| 106 key = ds.NewKey(key.AppID(), key.Namespace(), key.Kind(), "", id
, key.Parent()) | |
| 107 } | |
| 108 | |
| 109 return ents, key | |
| 110 } | |
| 111 | |
| 112 func (d *dataStoreData) putMulti(keys []ds.Key, vals []ds.PropertyMap, cb ds.Put
MultiCB) { | |
| 113 for i, k := range keys { | |
| 114 buf := &bytes.Buffer{} | |
| 115 pmap, _ := vals[i].Save(false) | |
| 116 pmap.Write(buf, ds.WithoutContext) | |
| 117 dataBytes := buf.Bytes() | |
| 118 | |
| 119 k, err := func() (ret ds.Key, err error) { | |
| 120 d.rwlock.Lock() | |
| 121 defer d.rwlock.Unlock() | |
| 122 | |
| 123 ents, ret := d.entsKeyLocked(k) | |
| 124 incrementLocked(ents, groupMetaKey(ret)) | |
| 125 | |
| 126 old := ents.Get(keyBytes(ds.WithoutContext, ret)) | |
| 127 oldPM := ds.PropertyMap(nil) | |
| 128 if old != nil { | |
| 129 if oldPM, err = rpmWoCtx(old, ret.Namespace());
err != nil { | |
| 130 return | |
| 131 } | |
| 132 } | |
| 133 updateIndicies(d.store, ret, oldPM, pmap) | |
| 134 ents.Set(keyBytes(ds.WithoutContext, ret), dataBytes) | |
| 135 return | |
| 136 }() | |
| 137 if cb != nil { | |
| 138 cb(k, err) | |
| 139 } | |
| 140 } | |
| 141 } | |
| 142 | |
| 143 func getMultiInner(keys []ds.Key, cb ds.GetMultiCB, getColl func() (*memCollecti
on, error)) error { | |
| 144 ents, err := getColl() | |
| 145 if err != nil { | |
| 146 return err | |
| 147 } | |
| 148 if ents == nil { | |
| 149 for range keys { | |
| 150 cb(nil, ds.ErrNoSuchEntity) | |
| 151 } | |
| 152 return nil | |
| 153 } | |
| 154 | |
| 155 for _, k := range keys { | |
| 156 pdata := ents.Get(keyBytes(ds.WithoutContext, k)) | |
| 157 if pdata == nil { | |
| 158 cb(nil, ds.ErrNoSuchEntity) | |
| 159 continue | |
| 160 } | |
| 161 cb(rpmWoCtx(pdata, k.Namespace())) | |
| 162 } | |
| 163 return nil | |
| 164 } | |
| 165 | |
| 166 func (d *dataStoreData) getMulti(keys []ds.Key, cb ds.GetMultiCB) error { | |
| 167 getMultiInner(keys, cb, func() (*memCollection, error) { | |
| 168 d.rwlock.RLock() | |
| 169 s := d.store.Snapshot() | |
| 170 d.rwlock.RUnlock() | |
| 171 | |
| 172 return s.GetCollection("ents:" + keys[0].Namespace()), nil | |
| 173 }) | |
| 174 return nil | |
| 175 } | |
| 176 | |
| 177 func (d *dataStoreData) delMulti(keys []ds.Key, cb ds.DeleteMultiCB) { | |
| 178 toDel := make([][]byte, 0, len(keys)) | |
| 179 for _, k := range keys { | |
| 180 toDel = append(toDel, keyBytes(ds.WithoutContext, k)) | |
| 181 } | |
| 182 ns := keys[0].Namespace() | |
| 183 | |
| 184 d.rwlock.Lock() | |
| 185 defer d.rwlock.Unlock() | |
| 186 | |
| 187 ents := d.store.GetCollection("ents:" + ns) | |
| 188 | |
| 189 for i, k := range keys { | |
| 190 if ents != nil { | |
| 191 incrementLocked(ents, groupMetaKey(k)) | |
| 192 kb := toDel[i] | |
| 193 if old := ents.Get(kb); old != nil { | |
| 194 oldPM, err := rpmWoCtx(old, ns) | |
| 195 if err != nil { | |
| 196 if cb != nil { | |
| 197 cb(err) | |
| 198 } | |
| 199 continue | |
| 200 } | |
| 201 updateIndicies(d.store, k, oldPM, nil) | |
| 202 ents.Delete(kb) | |
| 203 } | |
| 204 } | |
| 205 if cb != nil { | |
| 206 cb(nil) | |
| 207 } | |
| 208 } | |
| 209 } | |
| 210 | |
| 211 func (d *dataStoreData) canApplyTxn(obj memContextObj) bool { | |
| 212 // TODO(riannucci): implement with Flush/FlushRevert for persistance. | |
| 213 | |
| 214 txn := obj.(*txnDataStoreData) | |
| 215 for rk, muts := range txn.muts { | |
| 216 if len(muts) == 0 { // read-only | |
| 217 continue | |
| 218 } | |
| 219 k, err := ds.ReadKey(bytes.NewBufferString(rk), ds.WithContext,
"", "") | |
| 220 if err != nil { | |
| 221 panic(err) | |
| 222 } | |
| 223 | |
| 224 entKey := "ents:" + k.Namespace() | |
| 225 mkey := groupMetaKey(k) | |
| 226 entsHead := d.store.GetCollection(entKey) | |
| 227 entsSnap := txn.snap.GetCollection(entKey) | |
| 228 vHead := curVersion(entsHead, mkey) | |
| 229 vSnap := curVersion(entsSnap, mkey) | |
| 230 if vHead != vSnap { | |
| 231 return false | |
| 232 } | |
| 233 } | |
| 234 return true | |
| 235 } | |
| 236 | |
| 237 func (d *dataStoreData) applyTxn(c context.Context, obj memContextObj) { | |
| 238 txn := obj.(*txnDataStoreData) | |
| 239 for _, muts := range txn.muts { | |
| 240 if len(muts) == 0 { // read-only | |
| 241 continue | |
| 242 } | |
| 243 // TODO(riannucci): refactor to do just 1 putMulti, and 1 delMul
ti | |
| 244 for _, m := range muts { | |
| 245 err := error(nil) | |
| 246 k := m.key | |
| 247 if m.data == nil { | |
| 248 d.delMulti([]ds.Key{k}, | |
| 249 func(e error) { err = e }) | |
| 250 } else { | |
| 251 d.putMulti([]ds.Key{m.key}, []ds.PropertyMap{m.d
ata}, | |
| 252 func(_ ds.Key, e error) { err = e }) | |
| 253 } | |
| 254 err = errors.SingleError(err) | |
| 255 if err != nil { | |
| 256 panic(err) | |
| 257 } | |
| 258 } | |
| 259 } | |
| 260 } | |
| 261 | |
| 262 func (d *dataStoreData) mkTxn(o *ds.TransactionOptions) memContextObj { | |
| 263 return &txnDataStoreData{ | |
| 264 // alias to the main datastore's so that testing code can have p
rimitive | |
| 265 // access to break features inside of transactions. | |
| 266 parent: d, | |
| 267 isXG: o != nil && o.XG, | |
| 268 snap: d.store.Snapshot(), | |
| 269 muts: map[string][]txnMutation{}, | |
| 270 } | |
| 271 } | |
| 272 | |
| 273 func (d *dataStoreData) endTxn() {} | |
| 274 | |
| 275 /////////////////////////////// txnDataStoreData /////////////////////////////// | |
| 276 | |
| 277 type txnMutation struct { | |
| 278 key ds.Key | |
| 279 data ds.PropertyMap | |
| 280 } | |
| 281 | |
| 282 type txnDataStoreData struct { | |
| 283 sync.Mutex | |
| 284 | |
| 285 parent *dataStoreData | |
| 286 | |
| 287 // boolean 0 or 1, use atomic.*Int32 to access. | |
| 288 closed int32 | |
| 289 isXG bool | |
| 290 | |
| 291 snap *memStore | |
| 292 | |
| 293 // string is the raw-bytes encoding of the entity root incl. namespace | |
| 294 muts map[string][]txnMutation | |
| 295 // TODO(riannucci): account for 'transaction size' limit of 10MB by summ
ing | |
| 296 // length of encoded keys + values. | |
| 297 } | |
| 298 | |
| 299 var _ memContextObj = (*txnDataStoreData)(nil) | |
| 300 | |
| 301 const xgEGLimit = 25 | |
| 302 | |
| 303 func (*txnDataStoreData) canApplyTxn(memContextObj) bool { return false } | |
| 304 func (td *txnDataStoreData) endTxn() { | |
| 305 if atomic.LoadInt32(&td.closed) == 1 { | |
| 306 panic("cannot end transaction twice") | |
| 307 } | |
| 308 atomic.StoreInt32(&td.closed, 1) | |
| 309 } | |
| 310 func (*txnDataStoreData) applyTxn(context.Context, memContextObj) { | |
| 311 panic("txnDataStoreData cannot apply transactions") | |
| 312 } | |
| 313 func (*txnDataStoreData) mkTxn(*ds.TransactionOptions) memContextObj { | |
| 314 panic("impossible") | |
| 315 } | |
| 316 | |
| 317 func (td *txnDataStoreData) run(f func() error) error { | |
| 318 // Slightly different from the SDK... datastore and taskqueue each imple
ment | |
| 319 // this here, where in the SDK only datastore.transaction.Call does. | |
| 320 if atomic.LoadInt32(&td.closed) == 1 { | |
| 321 return errors.New("datastore: transaction context has expired") | |
| 322 } | |
| 323 return f() | |
| 324 } | |
| 325 | |
| 326 // writeMutation ensures that this transaction can support the given key/value | |
| 327 // mutation. | |
| 328 // | |
| 329 // if getOnly is true, don't record the actual mutation data, just ensure that | |
| 330 // the key is in an included entity group (or add an empty entry for tha
t | |
| 331 // group). | |
| 332 // | |
| 333 // if !getOnly && data == nil, this counts as a deletion instead of a Put. | |
| 334 // | |
| 335 // Returns an error if this key causes the transaction to cross too many entity | |
| 336 // groups. | |
| 337 func (td *txnDataStoreData) writeMutation(getOnly bool, key ds.Key, data ds.Prop
ertyMap) error { | |
| 338 rk := string(keyBytes(ds.WithContext, ds.KeyRoot(key))) | |
| 339 | |
| 340 td.Lock() | |
| 341 defer td.Unlock() | |
| 342 | |
| 343 if _, ok := td.muts[rk]; !ok { | |
| 344 limit := 1 | |
| 345 if td.isXG { | |
| 346 limit = xgEGLimit | |
| 347 } | |
| 348 if len(td.muts)+1 > limit { | |
| 349 msg := "cross-group transaction need to be explicitly sp
ecified (xg=True)" | |
| 350 if td.isXG { | |
| 351 msg = "operating on too many entity groups in a
single transaction" | |
| 352 } | |
| 353 return errors.New(msg) | |
| 354 } | |
| 355 td.muts[rk] = []txnMutation{} | |
| 356 } | |
| 357 if !getOnly { | |
| 358 td.muts[rk] = append(td.muts[rk], txnMutation{key, data}) | |
| 359 } | |
| 360 | |
| 361 return nil | |
| 362 } | |
| 363 | |
| 364 func (td *txnDataStoreData) putMulti(keys []ds.Key, vals []ds.PropertyMap, cb ds
.PutMultiCB) { | |
| 365 for i, k := range keys { | |
| 366 func() { | |
| 367 td.parent.Lock() | |
| 368 defer td.parent.Unlock() | |
| 369 _, k = td.parent.entsKeyLocked(k) | |
| 370 }() | |
| 371 err := td.writeMutation(false, k, vals[i]) | |
| 372 if cb != nil { | |
| 373 cb(k, err) | |
| 374 } | |
| 375 } | |
| 376 } | |
| 377 | |
| 378 func (td *txnDataStoreData) getMulti(keys []ds.Key, cb ds.GetMultiCB) error { | |
| 379 return getMultiInner(keys, cb, func() (*memCollection, error) { | |
| 380 err := error(nil) | |
| 381 for _, key := range keys { | |
| 382 err = td.writeMutation(true, key, nil) | |
| 383 if err != nil { | |
| 384 return nil, err | |
| 385 } | |
| 386 } | |
| 387 return td.snap.GetCollection("ents:" + keys[0].Namespace()), nil | |
| 388 }) | |
| 389 } | |
| 390 | |
| 391 func (td *txnDataStoreData) delMulti(keys []ds.Key, cb ds.DeleteMultiCB) error { | |
| 392 for _, k := range keys { | |
| 393 err := td.writeMutation(false, k, nil) | |
| 394 if cb != nil { | |
| 395 cb(err) | |
| 396 } | |
| 397 } | |
| 398 return nil | |
| 399 } | |
| 400 | |
| 401 func keyBytes(ctx ds.KeyContext, key ds.Key) []byte { | |
| 402 buf := &bytes.Buffer{} | |
| 403 ds.WriteKey(buf, ctx, key) | |
| 404 return buf.Bytes() | |
| 405 } | |
| 406 | |
| 407 func rpmWoCtx(data []byte, ns string) (ds.PropertyMap, error) { | |
| 408 ret := ds.PropertyMap{} | |
| 409 err := ret.Read(bytes.NewBuffer(data), ds.WithoutContext, globalAppID, n
s) | |
| 410 return ret, err | |
| 411 } | |
| 412 | |
| 413 func rpm(data []byte) (ds.PropertyMap, error) { | |
| 414 ret := ds.PropertyMap{} | |
| 415 err := ret.Read(bytes.NewBuffer(data), ds.WithContext, "", "") | |
| 416 return ret, err | |
| 417 } | |
| 418 | |
| 419 type keyitem interface { | |
| 420 Key() ds.Key | |
| 421 } | |
| OLD | NEW |