| OLD | NEW |
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 package memory | 5 package memory |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "bytes" | 8 "bytes" |
| 9 "errors" | 9 "errors" |
| 10 "fmt" | 10 "fmt" |
| 11 "sync" | 11 "sync" |
| 12 "sync/atomic" | 12 "sync/atomic" |
| 13 | 13 |
| 14 "github.com/luci/gae" |
| 15 rds "github.com/luci/gae/service/rawdatastore" |
| 14 "golang.org/x/net/context" | 16 "golang.org/x/net/context" |
| 15 | |
| 16 "github.com/luci/gae" | |
| 17 "github.com/luci/gae/helper" | |
| 18 ) | 17 ) |
| 19 | 18 |
| 20 //////////////////////////////// dataStoreData ///////////////////////////////// | 19 //////////////////////////////// dataStoreData ///////////////////////////////// |
| 21 | 20 |
| 22 type dataStoreData struct { | 21 type dataStoreData struct { |
| 23 rwlock sync.RWMutex | 22 rwlock sync.RWMutex |
| 24 // See README.md for store schema. | 23 // See README.md for store schema. |
| 25 store *memStore | 24 store *memStore |
| 26 snap *memStore | 25 snap *memStore |
| 27 } | 26 } |
| (...skipping 14 matching lines...) Expand all Loading... |
| 42 func (d *dataStoreData) Lock() { | 41 func (d *dataStoreData) Lock() { |
| 43 d.rwlock.Lock() | 42 d.rwlock.Lock() |
| 44 } | 43 } |
| 45 | 44 |
| 46 func (d *dataStoreData) Unlock() { | 45 func (d *dataStoreData) Unlock() { |
| 47 d.rwlock.Unlock() | 46 d.rwlock.Unlock() |
| 48 } | 47 } |
| 49 | 48 |
| 50 /////////////////////////// indicies(dataStoreData) //////////////////////////// | 49 /////////////////////////// indicies(dataStoreData) //////////////////////////// |
| 51 | 50 |
| 52 func groupMetaKey(key gae.DSKey) []byte { | 51 func groupMetaKey(key rds.Key) []byte { |
| 53 » return keyBytes(helper.WithoutContext, | 52 » return keyBytes(rds.WithoutContext, |
| 54 » » helper.NewDSKey("", "", "__entity_group__", "", 1, helper.DSKeyR
oot(key))) | 53 » » rds.NewKey("", "", "__entity_group__", "", 1, rds.KeyRoot(key))) |
| 55 } | 54 } |
| 56 | 55 |
| 57 func groupIDsKey(key gae.DSKey) []byte { | 56 func groupIDsKey(key rds.Key) []byte { |
| 58 » return keyBytes(helper.WithoutContext, | 57 » return keyBytes(rds.WithoutContext, |
| 59 » » helper.NewDSKey("", "", "__entity_group_ids__", "", 1, helper.DS
KeyRoot(key))) | 58 » » rds.NewKey("", "", "__entity_group_ids__", "", 1, rds.KeyRoot(ke
y))) |
| 60 } | 59 } |
| 61 | 60 |
| 62 func rootIDsKey(kind string) []byte { | 61 func rootIDsKey(kind string) []byte { |
| 63 » return keyBytes(helper.WithoutContext, | 62 » return keyBytes(rds.WithoutContext, |
| 64 » » helper.NewDSKey("", "", "__entity_root_ids__", kind, 0, nil)) | 63 » » rds.NewKey("", "", "__entity_root_ids__", kind, 0, nil)) |
| 65 } | 64 } |
| 66 | 65 |
| 67 func curVersion(ents *memCollection, key []byte) int64 { | 66 func curVersion(ents *memCollection, key []byte) int64 { |
| 68 if v := ents.Get(key); v != nil { | 67 if v := ents.Get(key); v != nil { |
| 69 pm, err := rpm(v) | 68 pm, err := rpm(v) |
| 70 if err != nil { | 69 if err != nil { |
| 71 panic(err) // memory corruption | 70 panic(err) // memory corruption |
| 72 } | 71 } |
| 73 pl, ok := pm["__version__"] | 72 pl, ok := pm["__version__"] |
| 74 » » if ok && len(pl) > 0 && pl[0].Type() == gae.DSPTInt { | 73 » » if ok && len(pl) > 0 && pl[0].Type() == rds.PTInt { |
| 75 return pl[0].Value().(int64) | 74 return pl[0].Value().(int64) |
| 76 } | 75 } |
| 77 panic(fmt.Errorf("__version__ property missing or wrong: %v", pm
)) | 76 panic(fmt.Errorf("__version__ property missing or wrong: %v", pm
)) |
| 78 } | 77 } |
| 79 return 0 | 78 return 0 |
| 80 } | 79 } |
| 81 | 80 |
| 82 func incrementLocked(ents *memCollection, key []byte) int64 { | 81 func incrementLocked(ents *memCollection, key []byte) int64 { |
| 83 ret := curVersion(ents, key) + 1 | 82 ret := curVersion(ents, key) + 1 |
| 84 buf := &bytes.Buffer{} | 83 buf := &bytes.Buffer{} |
| 85 » helper.WriteDSPropertyMap( | 84 » rds.WritePropertyMap( |
| 86 » » buf, gae.DSPropertyMap{"__version__": {gae.MkDSPropertyNI(ret)}}
, helper.WithContext) | 85 » » buf, rds.PropertyMap{"__version__": {rds.MkPropertyNI(ret)}}, rd
s.WithContext) |
| 87 ents.Set(key, buf.Bytes()) | 86 ents.Set(key, buf.Bytes()) |
| 88 return ret | 87 return ret |
| 89 } | 88 } |
| 90 | 89 |
| 91 func (d *dataStoreData) entsKeyLocked(key gae.DSKey) (*memCollection, gae.DSKey)
{ | 90 func (d *dataStoreData) entsKeyLocked(key rds.Key) (*memCollection, rds.Key) { |
| 92 coll := "ents:" + key.Namespace() | 91 coll := "ents:" + key.Namespace() |
| 93 ents := d.store.GetCollection(coll) | 92 ents := d.store.GetCollection(coll) |
| 94 if ents == nil { | 93 if ents == nil { |
| 95 ents = d.store.SetCollection(coll, nil) | 94 ents = d.store.SetCollection(coll, nil) |
| 96 } | 95 } |
| 97 | 96 |
| 98 » if helper.DSKeyIncomplete(key) { | 97 » if rds.KeyIncomplete(key) { |
| 99 idKey := []byte(nil) | 98 idKey := []byte(nil) |
| 100 if key.Parent() == nil { | 99 if key.Parent() == nil { |
| 101 idKey = rootIDsKey(key.Kind()) | 100 idKey = rootIDsKey(key.Kind()) |
| 102 } else { | 101 } else { |
| 103 idKey = groupIDsKey(key) | 102 idKey = groupIDsKey(key) |
| 104 } | 103 } |
| 105 id := incrementLocked(ents, idKey) | 104 id := incrementLocked(ents, idKey) |
| 106 » » key = helper.NewDSKey(key.AppID(), key.Namespace(), key.Kind(),
"", id, key.Parent()) | 105 » » key = rds.NewKey(key.AppID(), key.Namespace(), key.Kind(), "", i
d, key.Parent()) |
| 107 } | 106 } |
| 108 | 107 |
| 109 return ents, key | 108 return ents, key |
| 110 } | 109 } |
| 111 | 110 |
| 112 func (d *dataStoreData) put(ns string, key gae.DSKey, pls gae.DSPropertyLoadSave
r) (gae.DSKey, error) { | 111 func (d *dataStoreData) put(ns string, key rds.Key, pls rds.PropertyLoadSaver) (
rds.Key, error) { |
| 113 » keys, errs := d.putMulti(ns, []gae.DSKey{key}, []gae.DSPropertyLoadSaver
{pls}) | 112 » keys, errs := d.putMulti(ns, []rds.Key{key}, []rds.PropertyLoadSaver{pls
}) |
| 114 if errs == nil { | 113 if errs == nil { |
| 115 return keys[0], nil | 114 return keys[0], nil |
| 116 } | 115 } |
| 117 return nil, gae.SingleError(errs) | 116 return nil, gae.SingleError(errs) |
| 118 } | 117 } |
| 119 | 118 |
| 120 func (d *dataStoreData) putMulti(ns string, keys []gae.DSKey, plss []gae.DSPrope
rtyLoadSaver) ([]gae.DSKey, error) { | 119 func (d *dataStoreData) putMulti(ns string, keys []rds.Key, plss []rds.PropertyL
oadSaver) ([]rds.Key, error) { |
| 121 pmaps, err := putMultiPrelim(ns, keys, plss) | 120 pmaps, err := putMultiPrelim(ns, keys, plss) |
| 122 if err != nil { | 121 if err != nil { |
| 123 return nil, err | 122 return nil, err |
| 124 } | 123 } |
| 125 return d.putMultiInner(keys, pmaps) | 124 return d.putMultiInner(keys, pmaps) |
| 126 } | 125 } |
| 127 | 126 |
| 128 func putMultiPrelim(ns string, keys []gae.DSKey, plss []gae.DSPropertyLoadSaver)
([]gae.DSPropertyMap, error) { | 127 func putMultiPrelim(ns string, keys []rds.Key, plss []rds.PropertyLoadSaver) ([]
rds.PropertyMap, error) { |
| 129 err := multiValid(keys, plss, ns, true, false) | 128 err := multiValid(keys, plss, ns, true, false) |
| 130 if err != nil { | 129 if err != nil { |
| 131 return nil, err | 130 return nil, err |
| 132 } | 131 } |
| 133 » pmaps := make([]gae.DSPropertyMap, len(keys)) | 132 » pmaps := make([]rds.PropertyMap, len(keys)) |
| 134 lme := gae.LazyMultiError{Size: len(keys)} | 133 lme := gae.LazyMultiError{Size: len(keys)} |
| 135 for i, pls := range plss { | 134 for i, pls := range plss { |
| 136 pm, err := pls.Save(false) | 135 pm, err := pls.Save(false) |
| 137 lme.Assign(i, err) | 136 lme.Assign(i, err) |
| 138 pmaps[i] = pm | 137 pmaps[i] = pm |
| 139 } | 138 } |
| 140 return pmaps, lme.Get() | 139 return pmaps, lme.Get() |
| 141 } | 140 } |
| 142 | 141 |
| 143 func (d *dataStoreData) putMultiInner(keys []gae.DSKey, data []gae.DSPropertyMap
) ([]gae.DSKey, error) { | 142 func (d *dataStoreData) putMultiInner(keys []rds.Key, data []rds.PropertyMap) ([
]rds.Key, error) { |
| 144 » retKeys := make([]gae.DSKey, len(keys)) | 143 » retKeys := make([]rds.Key, len(keys)) |
| 145 lme := gae.LazyMultiError{Size: len(keys)} | 144 lme := gae.LazyMultiError{Size: len(keys)} |
| 146 for i, k := range keys { | 145 for i, k := range keys { |
| 147 buf := &bytes.Buffer{} | 146 buf := &bytes.Buffer{} |
| 148 » » helper.WriteDSPropertyMap(buf, data[i], helper.WithoutContext) | 147 » » rds.WritePropertyMap(buf, data[i], rds.WithoutContext) |
| 149 dataBytes := buf.Bytes() | 148 dataBytes := buf.Bytes() |
| 150 | 149 |
| 151 » » rKey, err := func() (ret gae.DSKey, err error) { | 150 » » rKey, err := func() (ret rds.Key, err error) { |
| 152 d.rwlock.Lock() | 151 d.rwlock.Lock() |
| 153 defer d.rwlock.Unlock() | 152 defer d.rwlock.Unlock() |
| 154 | 153 |
| 155 ents, ret := d.entsKeyLocked(k) | 154 ents, ret := d.entsKeyLocked(k) |
| 156 incrementLocked(ents, groupMetaKey(ret)) | 155 incrementLocked(ents, groupMetaKey(ret)) |
| 157 | 156 |
| 158 » » » old := ents.Get(keyBytes(helper.WithoutContext, ret)) | 157 » » » old := ents.Get(keyBytes(rds.WithoutContext, ret)) |
| 159 » » » oldPM := gae.DSPropertyMap(nil) | 158 » » » oldPM := rds.PropertyMap(nil) |
| 160 if old != nil { | 159 if old != nil { |
| 161 if oldPM, err = rpmWoCtx(old, ret.Namespace());
err != nil { | 160 if oldPM, err = rpmWoCtx(old, ret.Namespace());
err != nil { |
| 162 return | 161 return |
| 163 } | 162 } |
| 164 } | 163 } |
| 165 updateIndicies(d.store, ret, oldPM, data[i]) | 164 updateIndicies(d.store, ret, oldPM, data[i]) |
| 166 » » » ents.Set(keyBytes(helper.WithoutContext, ret), dataBytes
) | 165 » » » ents.Set(keyBytes(rds.WithoutContext, ret), dataBytes) |
| 167 return | 166 return |
| 168 }() | 167 }() |
| 169 lme.Assign(i, err) | 168 lme.Assign(i, err) |
| 170 retKeys[i] = rKey | 169 retKeys[i] = rKey |
| 171 } | 170 } |
| 172 return retKeys, lme.Get() | 171 return retKeys, lme.Get() |
| 173 } | 172 } |
| 174 | 173 |
| 175 func getMultiInner(ns string, keys []gae.DSKey, plss []gae.DSPropertyLoadSaver,
getColl func() (*memCollection, error)) error { | 174 func getMultiInner(ns string, keys []rds.Key, plss []rds.PropertyLoadSaver, getC
oll func() (*memCollection, error)) error { |
| 176 if err := multiValid(keys, plss, ns, false, true); err != nil { | 175 if err := multiValid(keys, plss, ns, false, true); err != nil { |
| 177 return err | 176 return err |
| 178 } | 177 } |
| 179 | 178 |
| 180 lme := gae.LazyMultiError{Size: len(keys)} | 179 lme := gae.LazyMultiError{Size: len(keys)} |
| 181 | 180 |
| 182 ents, err := getColl() | 181 ents, err := getColl() |
| 183 if err != nil { | 182 if err != nil { |
| 184 return err | 183 return err |
| 185 } | 184 } |
| 186 if ents == nil { | 185 if ents == nil { |
| 187 for i := range keys { | 186 for i := range keys { |
| 188 » » » lme.Assign(i, gae.ErrDSNoSuchEntity) | 187 » » » lme.Assign(i, rds.ErrNoSuchEntity) |
| 189 } | 188 } |
| 190 return lme.Get() | 189 return lme.Get() |
| 191 } | 190 } |
| 192 | 191 |
| 193 for i, k := range keys { | 192 for i, k := range keys { |
| 194 » » pdata := ents.Get(keyBytes(helper.WithoutContext, k)) | 193 » » pdata := ents.Get(keyBytes(rds.WithoutContext, k)) |
| 195 if pdata == nil { | 194 if pdata == nil { |
| 196 » » » lme.Assign(i, gae.ErrDSNoSuchEntity) | 195 » » » lme.Assign(i, rds.ErrNoSuchEntity) |
| 197 continue | 196 continue |
| 198 } | 197 } |
| 199 | 198 |
| 200 got, err := rpmWoCtx(pdata, ns) | 199 got, err := rpmWoCtx(pdata, ns) |
| 201 if err != nil { | 200 if err != nil { |
| 202 lme.Assign(i, err) | 201 lme.Assign(i, err) |
| 203 continue | 202 continue |
| 204 } | 203 } |
| 205 | 204 |
| 206 lme.Assign(i, plss[i].Load(got)) | 205 lme.Assign(i, plss[i].Load(got)) |
| 207 } | 206 } |
| 208 return lme.Get() | 207 return lme.Get() |
| 209 } | 208 } |
| 210 | 209 |
| 211 func (d *dataStoreData) get(ns string, key gae.DSKey, pls gae.DSPropertyLoadSave
r) error { | 210 func (d *dataStoreData) get(ns string, key rds.Key, pls rds.PropertyLoadSaver) e
rror { |
| 212 » return gae.SingleError(d.getMulti(ns, []gae.DSKey{key}, []gae.DSProperty
LoadSaver{pls})) | 211 » return gae.SingleError(d.getMulti(ns, []rds.Key{key}, []rds.PropertyLoad
Saver{pls})) |
| 213 } | 212 } |
| 214 | 213 |
| 215 func (d *dataStoreData) getMulti(ns string, keys []gae.DSKey, plss []gae.DSPrope
rtyLoadSaver) error { | 214 func (d *dataStoreData) getMulti(ns string, keys []rds.Key, plss []rds.PropertyL
oadSaver) error { |
| 216 return getMultiInner(ns, keys, plss, func() (*memCollection, error) { | 215 return getMultiInner(ns, keys, plss, func() (*memCollection, error) { |
| 217 d.rwlock.RLock() | 216 d.rwlock.RLock() |
| 218 s := d.store.Snapshot() | 217 s := d.store.Snapshot() |
| 219 d.rwlock.RUnlock() | 218 d.rwlock.RUnlock() |
| 220 | 219 |
| 221 return s.GetCollection("ents:" + ns), nil | 220 return s.GetCollection("ents:" + ns), nil |
| 222 }) | 221 }) |
| 223 } | 222 } |
| 224 | 223 |
| 225 func (d *dataStoreData) del(ns string, key gae.DSKey) (err error) { | 224 func (d *dataStoreData) del(ns string, key rds.Key) (err error) { |
| 226 » return gae.SingleError(d.delMulti(ns, []gae.DSKey{key})) | 225 » return gae.SingleError(d.delMulti(ns, []rds.Key{key})) |
| 227 } | 226 } |
| 228 | 227 |
| 229 func (d *dataStoreData) delMulti(ns string, keys []gae.DSKey) error { | 228 func (d *dataStoreData) delMulti(ns string, keys []rds.Key) error { |
| 230 lme := gae.LazyMultiError{Size: len(keys)} | 229 lme := gae.LazyMultiError{Size: len(keys)} |
| 231 toDel := make([][]byte, 0, len(keys)) | 230 toDel := make([][]byte, 0, len(keys)) |
| 232 for i, k := range keys { | 231 for i, k := range keys { |
| 233 » » if !helper.DSKeyValid(k, ns, false) { | 232 » » if !rds.KeyValid(k, ns, false) { |
| 234 » » » lme.Assign(i, gae.ErrDSInvalidKey) | 233 » » » lme.Assign(i, rds.ErrInvalidKey) |
| 235 continue | 234 continue |
| 236 } | 235 } |
| 237 » » toDel = append(toDel, keyBytes(helper.WithoutContext, k)) | 236 » » toDel = append(toDel, keyBytes(rds.WithoutContext, k)) |
| 238 } | 237 } |
| 239 err := lme.Get() | 238 err := lme.Get() |
| 240 if err != nil { | 239 if err != nil { |
| 241 return err | 240 return err |
| 242 } | 241 } |
| 243 | 242 |
| 244 d.rwlock.Lock() | 243 d.rwlock.Lock() |
| 245 defer d.rwlock.Unlock() | 244 defer d.rwlock.Unlock() |
| 246 | 245 |
| 247 ents := d.store.GetCollection("ents:" + ns) | 246 ents := d.store.GetCollection("ents:" + ns) |
| 248 if ents == nil { | 247 if ents == nil { |
| 249 return nil | 248 return nil |
| 250 } | 249 } |
| 251 | 250 |
| 252 for i, k := range keys { | 251 for i, k := range keys { |
| 253 incrementLocked(ents, groupMetaKey(k)) | 252 incrementLocked(ents, groupMetaKey(k)) |
| 254 kb := toDel[i] | 253 kb := toDel[i] |
| 255 old := ents.Get(kb) | 254 old := ents.Get(kb) |
| 256 » » oldPM := gae.DSPropertyMap(nil) | 255 » » oldPM := rds.PropertyMap(nil) |
| 257 if old != nil { | 256 if old != nil { |
| 258 if oldPM, err = rpmWoCtx(old, ns); err != nil { | 257 if oldPM, err = rpmWoCtx(old, ns); err != nil { |
| 259 lme.Assign(i, err) | 258 lme.Assign(i, err) |
| 260 continue | 259 continue |
| 261 } | 260 } |
| 262 } | 261 } |
| 263 updateIndicies(d.store, k, oldPM, nil) | 262 updateIndicies(d.store, k, oldPM, nil) |
| 264 ents.Delete(kb) | 263 ents.Delete(kb) |
| 265 } | 264 } |
| 266 return lme.Get() | 265 return lme.Get() |
| 267 } | 266 } |
| 268 | 267 |
| 269 func (d *dataStoreData) canApplyTxn(obj memContextObj) bool { | 268 func (d *dataStoreData) canApplyTxn(obj memContextObj) bool { |
| 270 // TODO(riannucci): implement with Flush/FlushRevert for persistance. | 269 // TODO(riannucci): implement with Flush/FlushRevert for persistance. |
| 271 | 270 |
| 272 txn := obj.(*txnDataStoreData) | 271 txn := obj.(*txnDataStoreData) |
| 273 for rk, muts := range txn.muts { | 272 for rk, muts := range txn.muts { |
| 274 if len(muts) == 0 { // read-only | 273 if len(muts) == 0 { // read-only |
| 275 continue | 274 continue |
| 276 } | 275 } |
| 277 » » k, err := helper.ReadDSKey(bytes.NewBufferString(rk), helper.Wit
hContext, "", "") | 276 » » k, err := rds.ReadKey(bytes.NewBufferString(rk), rds.WithContext
, "", "") |
| 278 if err != nil { | 277 if err != nil { |
| 279 panic(err) | 278 panic(err) |
| 280 } | 279 } |
| 281 | 280 |
| 282 entKey := "ents:" + k.Namespace() | 281 entKey := "ents:" + k.Namespace() |
| 283 mkey := groupMetaKey(k) | 282 mkey := groupMetaKey(k) |
| 284 entsHead := d.store.GetCollection(entKey) | 283 entsHead := d.store.GetCollection(entKey) |
| 285 entsSnap := txn.snap.GetCollection(entKey) | 284 entsSnap := txn.snap.GetCollection(entKey) |
| 286 vHead := curVersion(entsHead, mkey) | 285 vHead := curVersion(entsHead, mkey) |
| 287 vSnap := curVersion(entsSnap, mkey) | 286 vSnap := curVersion(entsSnap, mkey) |
| (...skipping 17 matching lines...) Expand all Loading... |
| 305 } else { | 304 } else { |
| 306 _, err = d.put(m.key.Namespace(), m.key, m.data) | 305 _, err = d.put(m.key.Namespace(), m.key, m.data) |
| 307 } | 306 } |
| 308 if err != nil { | 307 if err != nil { |
| 309 panic(err) | 308 panic(err) |
| 310 } | 309 } |
| 311 } | 310 } |
| 312 } | 311 } |
| 313 } | 312 } |
| 314 | 313 |
| 315 func (d *dataStoreData) mkTxn(o *gae.DSTransactionOptions) memContextObj { | 314 func (d *dataStoreData) mkTxn(o *rds.TransactionOptions) memContextObj { |
| 316 return &txnDataStoreData{ | 315 return &txnDataStoreData{ |
| 317 // alias to the main datastore's so that testing code can have p
rimitive | 316 // alias to the main datastore's so that testing code can have p
rimitive |
| 318 // access to break features inside of transactions. | 317 // access to break features inside of transactions. |
| 319 parent: d, | 318 parent: d, |
| 320 isXG: o != nil && o.XG, | 319 isXG: o != nil && o.XG, |
| 321 snap: d.store.Snapshot(), | 320 snap: d.store.Snapshot(), |
| 322 muts: map[string][]txnMutation{}, | 321 muts: map[string][]txnMutation{}, |
| 323 } | 322 } |
| 324 } | 323 } |
| 325 | 324 |
| 326 func (d *dataStoreData) endTxn() {} | 325 func (d *dataStoreData) endTxn() {} |
| 327 | 326 |
| 328 /////////////////////////////// txnDataStoreData /////////////////////////////// | 327 /////////////////////////////// txnDataStoreData /////////////////////////////// |
| 329 | 328 |
| 330 type txnMutation struct { | 329 type txnMutation struct { |
| 331 » key gae.DSKey | 330 » key rds.Key |
| 332 » data gae.DSPropertyMap | 331 » data rds.PropertyMap |
| 333 } | 332 } |
| 334 | 333 |
| 335 type txnDataStoreData struct { | 334 type txnDataStoreData struct { |
| 336 sync.Mutex | 335 sync.Mutex |
| 337 | 336 |
| 338 parent *dataStoreData | 337 parent *dataStoreData |
| 339 | 338 |
| 340 // boolean 0 or 1, use atomic.*Int32 to access. | 339 // boolean 0 or 1, use atomic.*Int32 to access. |
| 341 closed int32 | 340 closed int32 |
| 342 isXG bool | 341 isXG bool |
| (...skipping 13 matching lines...) Expand all Loading... |
| 356 func (*txnDataStoreData) canApplyTxn(memContextObj) bool { return false } | 355 func (*txnDataStoreData) canApplyTxn(memContextObj) bool { return false } |
| 357 func (td *txnDataStoreData) endTxn() { | 356 func (td *txnDataStoreData) endTxn() { |
| 358 if atomic.LoadInt32(&td.closed) == 1 { | 357 if atomic.LoadInt32(&td.closed) == 1 { |
| 359 panic("cannot end transaction twice") | 358 panic("cannot end transaction twice") |
| 360 } | 359 } |
| 361 atomic.StoreInt32(&td.closed, 1) | 360 atomic.StoreInt32(&td.closed, 1) |
| 362 } | 361 } |
| 363 func (*txnDataStoreData) applyTxn(context.Context, memContextObj) { | 362 func (*txnDataStoreData) applyTxn(context.Context, memContextObj) { |
| 364 panic("txnDataStoreData cannot apply transactions") | 363 panic("txnDataStoreData cannot apply transactions") |
| 365 } | 364 } |
| 366 func (*txnDataStoreData) mkTxn(*gae.DSTransactionOptions) memContextObj { | 365 func (*txnDataStoreData) mkTxn(*rds.TransactionOptions) memContextObj { |
| 367 panic("impossible") | 366 panic("impossible") |
| 368 } | 367 } |
| 369 | 368 |
| 370 func (td *txnDataStoreData) run(f func() error) error { | 369 func (td *txnDataStoreData) run(f func() error) error { |
| 371 // Slightly different from the SDK... datastore and taskqueue each imple
ment | 370 // Slightly different from the SDK... datastore and taskqueue each imple
ment |
| 372 // this here, where in the SDK only datastore.transaction.Call does. | 371 // this here, where in the SDK only datastore.transaction.Call does. |
| 373 if atomic.LoadInt32(&td.closed) == 1 { | 372 if atomic.LoadInt32(&td.closed) == 1 { |
| 374 return errors.New("datastore: transaction context has expired") | 373 return errors.New("datastore: transaction context has expired") |
| 375 } | 374 } |
| 376 return f() | 375 return f() |
| 377 } | 376 } |
| 378 | 377 |
| 379 // writeMutation ensures that this transaction can support the given key/value | 378 // writeMutation ensures that this transaction can support the given key/value |
| 380 // mutation. | 379 // mutation. |
| 381 // | 380 // |
| 382 // if getOnly is true, don't record the actual mutation data, just ensure that | 381 // if getOnly is true, don't record the actual mutation data, just ensure that |
| 383 // the key is in an included entity group (or add an empty entry for tha
t | 382 // the key is in an included entity group (or add an empty entry for tha
t |
| 384 // group). | 383 // group). |
| 385 // | 384 // |
| 386 // if !getOnly && data == nil, this counts as a deletion instead of a Put. | 385 // if !getOnly && data == nil, this counts as a deletion instead of a Put. |
| 387 // | 386 // |
| 388 // Returns an error if this key causes the transaction to cross too many entity | 387 // Returns an error if this key causes the transaction to cross too many entity |
| 389 // groups. | 388 // groups. |
| 390 func (td *txnDataStoreData) writeMutation(getOnly bool, key gae.DSKey, data gae.
DSPropertyMap) error { | 389 func (td *txnDataStoreData) writeMutation(getOnly bool, key rds.Key, data rds.Pr
opertyMap) error { |
| 391 » rk := string(keyBytes(helper.WithContext, helper.DSKeyRoot(key))) | 390 » rk := string(keyBytes(rds.WithContext, rds.KeyRoot(key))) |
| 392 | 391 |
| 393 td.Lock() | 392 td.Lock() |
| 394 defer td.Unlock() | 393 defer td.Unlock() |
| 395 | 394 |
| 396 if _, ok := td.muts[rk]; !ok { | 395 if _, ok := td.muts[rk]; !ok { |
| 397 limit := 1 | 396 limit := 1 |
| 398 if td.isXG { | 397 if td.isXG { |
| 399 limit = xgEGLimit | 398 limit = xgEGLimit |
| 400 } | 399 } |
| 401 if len(td.muts)+1 > limit { | 400 if len(td.muts)+1 > limit { |
| 402 msg := "cross-group transaction need to be explicitly sp
ecified (xg=True)" | 401 msg := "cross-group transaction need to be explicitly sp
ecified (xg=True)" |
| 403 if td.isXG { | 402 if td.isXG { |
| 404 msg = "operating on too many entity groups in a
single transaction" | 403 msg = "operating on too many entity groups in a
single transaction" |
| 405 } | 404 } |
| 406 return errors.New(msg) | 405 return errors.New(msg) |
| 407 } | 406 } |
| 408 td.muts[rk] = []txnMutation{} | 407 td.muts[rk] = []txnMutation{} |
| 409 } | 408 } |
| 410 if !getOnly { | 409 if !getOnly { |
| 411 td.muts[rk] = append(td.muts[rk], txnMutation{key, data}) | 410 td.muts[rk] = append(td.muts[rk], txnMutation{key, data}) |
| 412 } | 411 } |
| 413 | 412 |
| 414 return nil | 413 return nil |
| 415 } | 414 } |
| 416 | 415 |
| 417 func (td *txnDataStoreData) put(ns string, key gae.DSKey, pls gae.DSPropertyLoad
Saver) (gae.DSKey, error) { | 416 func (td *txnDataStoreData) put(ns string, key rds.Key, pls rds.PropertyLoadSave
r) (rds.Key, error) { |
| 418 » keys, errs := td.putMulti(ns, []gae.DSKey{key}, []gae.DSPropertyLoadSave
r{pls}) | 417 » keys, errs := td.putMulti(ns, []rds.Key{key}, []rds.PropertyLoadSaver{pl
s}) |
| 419 if errs == nil { | 418 if errs == nil { |
| 420 return keys[0], nil | 419 return keys[0], nil |
| 421 } | 420 } |
| 422 return nil, gae.SingleError(errs) | 421 return nil, gae.SingleError(errs) |
| 423 } | 422 } |
| 424 | 423 |
| 425 func (td *txnDataStoreData) putMulti(ns string, keys []gae.DSKey, plss []gae.DSP
ropertyLoadSaver) ([]gae.DSKey, error) { | 424 func (td *txnDataStoreData) putMulti(ns string, keys []rds.Key, plss []rds.Prope
rtyLoadSaver) ([]rds.Key, error) { |
| 426 pmaps, err := putMultiPrelim(ns, keys, plss) | 425 pmaps, err := putMultiPrelim(ns, keys, plss) |
| 427 if err != nil { | 426 if err != nil { |
| 428 return nil, err | 427 return nil, err |
| 429 } | 428 } |
| 430 | 429 |
| 431 » retKeys := make([]gae.DSKey, len(keys)) | 430 » retKeys := make([]rds.Key, len(keys)) |
| 432 lme := gae.LazyMultiError{Size: len(keys)} | 431 lme := gae.LazyMultiError{Size: len(keys)} |
| 433 for i, k := range keys { | 432 for i, k := range keys { |
| 434 func() { | 433 func() { |
| 435 td.parent.Lock() | 434 td.parent.Lock() |
| 436 defer td.parent.Unlock() | 435 defer td.parent.Unlock() |
| 437 _, k = td.parent.entsKeyLocked(k) | 436 _, k = td.parent.entsKeyLocked(k) |
| 438 }() | 437 }() |
| 439 lme.Assign(i, td.writeMutation(false, k, pmaps[i])) | 438 lme.Assign(i, td.writeMutation(false, k, pmaps[i])) |
| 440 retKeys[i] = k | 439 retKeys[i] = k |
| 441 } | 440 } |
| 442 | 441 |
| 443 return retKeys, lme.Get() | 442 return retKeys, lme.Get() |
| 444 } | 443 } |
| 445 | 444 |
| 446 func (td *txnDataStoreData) get(ns string, key gae.DSKey, pls gae.DSPropertyLoad
Saver) error { | 445 func (td *txnDataStoreData) get(ns string, key rds.Key, pls rds.PropertyLoadSave
r) error { |
| 447 » return gae.SingleError(td.getMulti(ns, []gae.DSKey{key}, []gae.DSPropert
yLoadSaver{pls})) | 446 » return gae.SingleError(td.getMulti(ns, []rds.Key{key}, []rds.PropertyLoa
dSaver{pls})) |
| 448 } | 447 } |
| 449 | 448 |
| 450 func (td *txnDataStoreData) getMulti(ns string, keys []gae.DSKey, plss []gae.DSP
ropertyLoadSaver) error { | 449 func (td *txnDataStoreData) getMulti(ns string, keys []rds.Key, plss []rds.Prope
rtyLoadSaver) error { |
| 451 return getMultiInner(ns, keys, plss, func() (*memCollection, error) { | 450 return getMultiInner(ns, keys, plss, func() (*memCollection, error) { |
| 452 lme := gae.LazyMultiError{Size: len(keys)} | 451 lme := gae.LazyMultiError{Size: len(keys)} |
| 453 for i, k := range keys { | 452 for i, k := range keys { |
| 454 lme.Assign(i, td.writeMutation(true, k, nil)) | 453 lme.Assign(i, td.writeMutation(true, k, nil)) |
| 455 } | 454 } |
| 456 return td.snap.GetCollection("ents:" + ns), lme.Get() | 455 return td.snap.GetCollection("ents:" + ns), lme.Get() |
| 457 }) | 456 }) |
| 458 } | 457 } |
| 459 | 458 |
| 460 func (td *txnDataStoreData) del(ns string, key gae.DSKey) error { | 459 func (td *txnDataStoreData) del(ns string, key rds.Key) error { |
| 461 » return gae.SingleError(td.delMulti(ns, []gae.DSKey{key})) | 460 » return gae.SingleError(td.delMulti(ns, []rds.Key{key})) |
| 462 } | 461 } |
| 463 | 462 |
| 464 func (td *txnDataStoreData) delMulti(ns string, keys []gae.DSKey) error { | 463 func (td *txnDataStoreData) delMulti(ns string, keys []rds.Key) error { |
| 465 lme := gae.LazyMultiError{Size: len(keys)} | 464 lme := gae.LazyMultiError{Size: len(keys)} |
| 466 for i, k := range keys { | 465 for i, k := range keys { |
| 467 » » if !helper.DSKeyValid(k, ns, false) { | 466 » » if !rds.KeyValid(k, ns, false) { |
| 468 » » » lme.Assign(i, gae.ErrDSInvalidKey) | 467 » » » lme.Assign(i, rds.ErrInvalidKey) |
| 469 } else { | 468 } else { |
| 470 lme.Assign(i, td.writeMutation(false, k, nil)) | 469 lme.Assign(i, td.writeMutation(false, k, nil)) |
| 471 } | 470 } |
| 472 } | 471 } |
| 473 return lme.Get() | 472 return lme.Get() |
| 474 } | 473 } |
| 475 | 474 |
| 476 func keyBytes(ctx helper.DSKeyContext, key gae.DSKey) []byte { | 475 func keyBytes(ctx rds.KeyContext, key rds.Key) []byte { |
| 477 buf := &bytes.Buffer{} | 476 buf := &bytes.Buffer{} |
| 478 » helper.WriteDSKey(buf, ctx, key) | 477 » rds.WriteKey(buf, ctx, key) |
| 479 return buf.Bytes() | 478 return buf.Bytes() |
| 480 } | 479 } |
| 481 | 480 |
| 482 func rpmWoCtx(data []byte, ns string) (gae.DSPropertyMap, error) { | 481 func rpmWoCtx(data []byte, ns string) (rds.PropertyMap, error) { |
| 483 » return helper.ReadDSPropertyMap(bytes.NewBuffer(data), helper.WithoutCon
text, globalAppID, ns) | 482 » return rds.ReadPropertyMap(bytes.NewBuffer(data), rds.WithoutContext, gl
obalAppID, ns) |
| 484 } | 483 } |
| 485 | 484 |
| 486 func rpm(data []byte) (gae.DSPropertyMap, error) { | 485 func rpm(data []byte) (rds.PropertyMap, error) { |
| 487 » return helper.ReadDSPropertyMap(bytes.NewBuffer(data), helper.WithContex
t, "", "") | 486 » return rds.ReadPropertyMap(bytes.NewBuffer(data), rds.WithContext, "", "
") |
| 488 } | 487 } |
| 489 | 488 |
| 490 func multiValid(keys []gae.DSKey, plss []gae.DSPropertyLoadSaver, ns string, pot
entialKey, allowSpecial bool) error { | 489 func multiValid(keys []rds.Key, plss []rds.PropertyLoadSaver, ns string, potenti
alKey, allowSpecial bool) error { |
| 491 » vfn := func(k gae.DSKey) bool { | 490 » vfn := func(k rds.Key) bool { |
| 492 » » return !helper.DSKeyIncomplete(k) && helper.DSKeyValid(k, ns, al
lowSpecial) | 491 » » return !rds.KeyIncomplete(k) && rds.KeyValid(k, ns, allowSpecial
) |
| 493 } | 492 } |
| 494 if potentialKey { | 493 if potentialKey { |
| 495 » » vfn = func(k gae.DSKey) bool { | 494 » » vfn = func(k rds.Key) bool { |
| 496 // adds an id to k if it's incomplete. | 495 // adds an id to k if it's incomplete. |
| 497 » » » if helper.DSKeyIncomplete(k) { | 496 » » » if rds.KeyIncomplete(k) { |
| 498 » » » » k = helper.NewDSKey(k.AppID(), k.Namespace(), k.
Kind(), "", 1, k.Parent()) | 497 » » » » k = rds.NewKey(k.AppID(), k.Namespace(), k.Kind(
), "", 1, k.Parent()) |
| 499 } | 498 } |
| 500 » » » return helper.DSKeyValid(k, ns, allowSpecial) | 499 » » » return rds.KeyValid(k, ns, allowSpecial) |
| 501 } | 500 } |
| 502 } | 501 } |
| 503 | 502 |
| 504 if keys == nil || plss == nil { | 503 if keys == nil || plss == nil { |
| 505 return errors.New("gae: key or plss slices were nil") | 504 return errors.New("gae: key or plss slices were nil") |
| 506 } | 505 } |
| 507 if len(keys) != len(plss) { | 506 if len(keys) != len(plss) { |
| 508 return errors.New("gae: key and dst slices have different length
") | 507 return errors.New("gae: key and dst slices have different length
") |
| 509 } | 508 } |
| 510 lme := gae.LazyMultiError{Size: len(keys)} | 509 lme := gae.LazyMultiError{Size: len(keys)} |
| 511 for i, k := range keys { | 510 for i, k := range keys { |
| 512 if !vfn(k) { | 511 if !vfn(k) { |
| 513 » » » lme.Assign(i, gae.ErrDSInvalidKey) | 512 » » » lme.Assign(i, rds.ErrInvalidKey) |
| 514 } | 513 } |
| 515 } | 514 } |
| 516 return lme.Get() | 515 return lme.Get() |
| 517 } | 516 } |
| OLD | NEW |