| OLD | NEW |
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 package memory | 5 package memory |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "bytes" | |
| 9 "errors" | 8 "errors" |
| 10 » "fmt" | 9 » "infra/gae/libs/wrapper" |
| 11 » "golang.org/x/net/context" | 10 » goon_internal "infra/gae/libs/wrapper/memory/internal/goon" |
| 12 "sync" | 11 "sync" |
| 13 "sync/atomic" | 12 "sync/atomic" |
| 14 | 13 |
| 15 » "infra/gae/libs/gae" | 14 » "github.com/mjibson/goon" |
| 16 » "infra/gae/libs/gae/helper" | 15 |
| 16 » "appengine/datastore" |
| 17 » pb "appengine_internal/datastore" |
| 18 » "golang.org/x/net/context" |
| 17 ) | 19 ) |
| 18 | 20 |
| 21 ////////////////////////////////// knrKeeper /////////////////////////////////// |
| 22 |
| 23 type knrKeeper struct { |
| 24 knrLock sync.Mutex |
| 25 knrFunc goon.KindNameResolver |
| 26 } |
| 27 |
| 28 var _ = wrapper.DSKindSetter((*knrKeeper)(nil)) |
| 29 |
| 30 func (k *knrKeeper) KindNameResolver() goon.KindNameResolver { |
| 31 k.knrLock.Lock() |
| 32 defer k.knrLock.Unlock() |
| 33 if k.knrFunc == nil { |
| 34 k.knrFunc = goon.DefaultKindName |
| 35 } |
| 36 return k.knrFunc |
| 37 } |
| 38 |
| 39 func (k *knrKeeper) SetKindNameResolver(knr goon.KindNameResolver) { |
| 40 k.knrLock.Lock() |
| 41 defer k.knrLock.Unlock() |
| 42 if knr == nil { |
| 43 knr = goon.DefaultKindName |
| 44 } |
| 45 k.knrFunc = knr |
| 46 } |
| 47 |
| 19 //////////////////////////////// dataStoreData ///////////////////////////////// | 48 //////////////////////////////// dataStoreData ///////////////////////////////// |
| 20 | 49 |
| 21 type dataStoreData struct { | 50 type dataStoreData struct { |
| 22 » gae.BrokenFeatures | 51 » wrapper.BrokenFeatures |
| 52 » knrKeeper |
| 23 | 53 |
| 24 rwlock sync.RWMutex | 54 rwlock sync.RWMutex |
| 25 // See README.md for store schema. | 55 // See README.md for store schema. |
| 26 store *memStore | 56 store *memStore |
| 27 snap *memStore | 57 snap *memStore |
| 28 } | 58 } |
| 29 | 59 |
| 30 var ( | 60 var ( |
| 31 _ = memContextObj((*dataStoreData)(nil)) | 61 _ = memContextObj((*dataStoreData)(nil)) |
| 32 _ = sync.Locker((*dataStoreData)(nil)) | 62 _ = sync.Locker((*dataStoreData)(nil)) |
| 33 » _ = gae.Testable((*dataStoreData)(nil)) | 63 » _ = wrapper.Testable((*dataStoreData)(nil)) |
| 64 » _ = wrapper.DSKindSetter((*dataStoreData)(nil)) |
| 34 ) | 65 ) |
| 35 | 66 |
| 36 func newDataStoreData() *dataStoreData { | 67 func newDataStoreData() *dataStoreData { |
| 37 store := newMemStore() | 68 store := newMemStore() |
| 38 return &dataStoreData{ | 69 return &dataStoreData{ |
| 39 » » BrokenFeatures: gae.BrokenFeatures{DefaultError: errors.New("INT
ERNAL_ERROR")}, | 70 » » BrokenFeatures: wrapper.BrokenFeatures{DefaultError: newDSError(
pb.Error_INTERNAL_ERROR)}, |
| 40 store: store, | 71 store: store, |
| 41 snap: store.Snapshot(), // empty but better than a nil
pointer. | 72 snap: store.Snapshot(), // empty but better than a nil
pointer. |
| 42 } | 73 } |
| 43 } | 74 } |
| 44 | 75 |
| 45 func (d *dataStoreData) Lock() { | 76 func (d *dataStoreData) Lock() { |
| 46 d.rwlock.Lock() | 77 d.rwlock.Lock() |
| 47 } | 78 } |
| 48 | 79 |
| 49 func (d *dataStoreData) Unlock() { | 80 func (d *dataStoreData) Unlock() { |
| 50 d.rwlock.Unlock() | 81 d.rwlock.Unlock() |
| 51 } | 82 } |
| 52 | 83 |
| 53 /////////////////////////// indicies(dataStoreData) //////////////////////////// | 84 /////////////////////////// indicies(dataStoreData) //////////////////////////// |
| 54 | 85 |
| 55 func groupMetaKey(key gae.DSKey) []byte { | 86 func groupMetaKey(key *datastore.Key) []byte { |
| 56 » return keyBytes(helper.WithoutContext, | 87 » return keyBytes(noNS, newKey("", "__entity_group__", "", 1, rootKey(key)
)) |
| 57 » » helper.NewDSKey("", "", "__entity_group__", "", 1, helper.DSKeyR
oot(key))) | |
| 58 } | 88 } |
| 59 | 89 |
| 60 func groupIDsKey(key gae.DSKey) []byte { | 90 func groupIDsKey(key *datastore.Key) []byte { |
| 61 » return keyBytes(helper.WithoutContext, | 91 » return keyBytes(noNS, newKey("", "__entity_group_ids__", "", 1, rootKey(
key))) |
| 62 » » helper.NewDSKey("", "", "__entity_group_ids__", "", 1, helper.DS
KeyRoot(key))) | |
| 63 } | 92 } |
| 64 | 93 |
| 65 func rootIDsKey(kind string) []byte { | 94 func rootIDsKey(kind string) []byte { |
| 66 » return keyBytes(helper.WithoutContext, | 95 » return keyBytes(noNS, newKey("", "__entity_root_ids__", kind, 0, nil)) |
| 67 » » helper.NewDSKey("", "", "__entity_root_ids__", kind, 0, nil)) | |
| 68 } | 96 } |
| 69 | 97 |
| 70 func curVersion(ents *memCollection, key []byte) (int64, error) { | 98 func curVersion(ents *memCollection, key []byte) (int64, error) { |
| 71 if v := ents.Get(key); v != nil { | 99 if v := ents.Get(key); v != nil { |
| 72 » » pm, err := rpm(v) | 100 » » numData := &propertyList{} |
| 73 » » if err != nil { | 101 » » if err := numData.UnmarshalBinary(v); err != nil { |
| 74 return 0, err | 102 return 0, err |
| 75 } | 103 } |
| 76 » » pl, ok := pm["__version__"] | 104 » » return (*numData)[0].Value.(int64), nil |
| 77 » » if ok && len(pl) > 0 && pl[0].Type() == gae.DSPTInt { | |
| 78 » » » return pl[0].Value().(int64), nil | |
| 79 » » } | |
| 80 » » return 0, fmt.Errorf("__version__ property missing or wrong: %v"
, pm) | |
| 81 } | 105 } |
| 82 return 0, nil | 106 return 0, nil |
| 83 } | 107 } |
| 84 | 108 |
| 85 func incrementLocked(ents *memCollection, key []byte) (ret int64, err error) { | 109 func incrementLocked(ents *memCollection, key []byte) (int64, error) { |
| 86 » if ret, err = curVersion(ents, key); err != nil { | 110 » num := int64(0) |
| 87 » » ret = 0 | 111 » numData := &propertyList{} |
| 112 » if v := ents.Get(key); v != nil { |
| 113 » » if err := numData.UnmarshalBinary(v); err != nil { |
| 114 » » » return 0, err |
| 115 » » } |
| 116 » » num = (*numData)[0].Value.(int64) |
| 117 » } else { |
| 118 » » *numData = append(*numData, datastore.Property{Name: "__version_
_"}) |
| 88 } | 119 } |
| 89 » ret++ | 120 » num++ |
| 90 » p := gae.DSProperty{} | 121 » (*numData)[0].Value = num |
| 91 » if err = p.SetValue(ret, true); err != nil { | 122 » incData, err := numData.MarshalBinary() |
| 92 » » return | 123 » if err != nil { |
| 124 » » return 0, err |
| 93 } | 125 } |
| 94 » buf := &bytes.Buffer{} | 126 » ents.Set(key, incData) |
| 95 » helper.WriteDSPropertyMap( | 127 |
| 96 » » buf, gae.DSPropertyMap{"__version__": {p}}, helper.WithContext) | 128 » return num, nil |
| 97 » ents.Set(key, buf.Bytes()) | |
| 98 » return | |
| 99 } | 129 } |
| 100 | 130 |
| 101 func (d *dataStoreData) entsKeyLocked(key gae.DSKey) (*memCollection, gae.DSKey,
error) { | 131 func (d *dataStoreData) entsKeyLocked(key *datastore.Key) (*memCollection, *data
store.Key, error) { |
| 102 coll := "ents:" + key.Namespace() | 132 coll := "ents:" + key.Namespace() |
| 103 ents := d.store.GetCollection(coll) | 133 ents := d.store.GetCollection(coll) |
| 104 if ents == nil { | 134 if ents == nil { |
| 105 ents = d.store.SetCollection(coll, nil) | 135 ents = d.store.SetCollection(coll, nil) |
| 106 } | 136 } |
| 107 | 137 |
| 108 » if helper.DSKeyIncomplete(key) { | 138 » if key.Incomplete() { |
| 109 idKey := []byte(nil) | 139 idKey := []byte(nil) |
| 110 if key.Parent() == nil { | 140 if key.Parent() == nil { |
| 111 idKey = rootIDsKey(key.Kind()) | 141 idKey = rootIDsKey(key.Kind()) |
| 112 } else { | 142 } else { |
| 113 idKey = groupIDsKey(key) | 143 idKey = groupIDsKey(key) |
| 114 } | 144 } |
| 115 id, err := incrementLocked(ents, idKey) | 145 id, err := incrementLocked(ents, idKey) |
| 116 if err != nil { | 146 if err != nil { |
| 117 return nil, nil, err | 147 return nil, nil, err |
| 118 } | 148 } |
| 119 » » key = helper.NewDSKey(key.AppID(), key.Namespace(), key.Kind(),
"", id, key.Parent()) | 149 » » key = newKey(key.Namespace(), key.Kind(), "", id, key.Parent()) |
| 120 } | 150 } |
| 121 | 151 |
| 122 return ents, key, nil | 152 return ents, key, nil |
| 123 } | 153 } |
| 124 | 154 |
| 125 func putPrelim(ns string, key gae.DSKey, src interface{}) (gae.DSPropertyMap, er
ror) { | 155 func putPrelim(ns string, knr goon.KindNameResolver, src interface{}) (*datastor
e.Key, *propertyList, error) { |
| 126 » if !keyCouldBeValid(key, ns, false) { | 156 » key := newKeyObj(ns, knr, src) |
| 157 » if !keyCouldBeValid(ns, key, userKeyOnly) { |
| 127 // TODO(riannucci): different error for Put-ing to reserved Keys
? | 158 // TODO(riannucci): different error for Put-ing to reserved Keys
? |
| 128 » » return nil, gae.ErrDSInvalidKey | 159 » » return nil, nil, datastore.ErrInvalidKey |
| 129 } | 160 } |
| 130 | 161 |
| 131 » pls, err := helper.GetPLS(src) | 162 » data, err := toPL(src) |
| 163 » return key, data, err |
| 164 } |
| 165 |
| 166 func (d *dataStoreData) put(ns string, src interface{}) (*datastore.Key, error)
{ |
| 167 » key, plData, err := putPrelim(ns, d.KindNameResolver(), src) |
| 132 if err != nil { | 168 if err != nil { |
| 133 return nil, err | 169 return nil, err |
| 134 } | 170 } |
| 135 » return pls.Save() | 171 » if key, err = d.putInner(key, plData); err != nil { |
| 172 » » return nil, err |
| 173 » } |
| 174 » return key, goon_internal.SetStructKey(src, key, d.KindNameResolver()) |
| 136 } | 175 } |
| 137 | 176 |
| 138 func (d *dataStoreData) put(ns string, key gae.DSKey, src interface{}) (gae.DSKe
y, error) { | 177 func (d *dataStoreData) putInner(key *datastore.Key, data *propertyList) (*datas
tore.Key, error) { |
| 139 » pmData, err := putPrelim(ns, key, src) | 178 » dataBytes, err := data.MarshalBinary() |
| 140 if err != nil { | 179 if err != nil { |
| 141 return nil, err | 180 return nil, err |
| 142 } | 181 } |
| 143 if key, err = d.putInner(key, pmData); err != nil { | |
| 144 return nil, err | |
| 145 } | |
| 146 return key, nil | |
| 147 } | |
| 148 | |
| 149 func (d *dataStoreData) putInner(key gae.DSKey, data gae.DSPropertyMap) (gae.DSK
ey, error) { | |
| 150 buf := &bytes.Buffer{} | |
| 151 helper.WriteDSPropertyMap(buf, data, helper.WithoutContext) | |
| 152 dataBytes := buf.Bytes() | |
| 153 | 182 |
| 154 d.rwlock.Lock() | 183 d.rwlock.Lock() |
| 155 defer d.rwlock.Unlock() | 184 defer d.rwlock.Unlock() |
| 156 | 185 |
| 157 ents, key, err := d.entsKeyLocked(key) | 186 ents, key, err := d.entsKeyLocked(key) |
| 158 if err != nil { | 187 if err != nil { |
| 159 return nil, err | 188 return nil, err |
| 160 } | 189 } |
| 161 if _, err = incrementLocked(ents, groupMetaKey(key)); err != nil { | 190 if _, err = incrementLocked(ents, groupMetaKey(key)); err != nil { |
| 162 return nil, err | 191 return nil, err |
| 163 } | 192 } |
| 164 | 193 |
| 165 » old := ents.Get(keyBytes(helper.WithoutContext, key)) | 194 » old := ents.Get(keyBytes(noNS, key)) |
| 166 » oldPM := gae.DSPropertyMap(nil) | 195 » oldPl := (*propertyList)(nil) |
| 167 if old != nil { | 196 if old != nil { |
| 168 » » if oldPM, err = rpmWoCtx(old, key.Namespace()); err != nil { | 197 » » oldPl = &propertyList{} |
| 198 » » if err = oldPl.UnmarshalBinary(old); err != nil { |
| 169 return nil, err | 199 return nil, err |
| 170 } | 200 } |
| 171 } | 201 } |
| 172 » if err = updateIndicies(d.store, key, oldPM, data); err != nil { | 202 » if err = updateIndicies(d.store, key, oldPl, data); err != nil { |
| 173 return nil, err | 203 return nil, err |
| 174 } | 204 } |
| 175 | 205 |
| 176 » ents.Set(keyBytes(helper.WithoutContext, key), dataBytes) | 206 » ents.Set(keyBytes(noNS, key), dataBytes) |
| 177 | 207 |
| 178 return key, nil | 208 return key, nil |
| 179 } | 209 } |
| 180 | 210 |
| 181 func getInner(ns string, key gae.DSKey, dst interface{}, getColl func() (*memCol
lection, error)) error { | 211 func getInner(ns string, knr goon.KindNameResolver, dst interface{}, getColl fun
c(*datastore.Key) (*memCollection, error)) error { |
| 182 » if helper.DSKeyIncomplete(key) || !helper.DSKeyValid(key, ns, true) { | 212 » key := newKeyObj(ns, knr, dst) |
| 183 » » return gae.ErrDSInvalidKey | 213 » if !keyValid(ns, key, allowSpecialKeys) { |
| 214 » » return datastore.ErrInvalidKey |
| 184 } | 215 } |
| 185 | 216 |
| 186 » ents, err := getColl() | 217 » ents, err := getColl(key) |
| 187 if err != nil { | 218 if err != nil { |
| 188 return err | 219 return err |
| 189 } | 220 } |
| 190 if ents == nil { | 221 if ents == nil { |
| 191 » » return gae.ErrDSNoSuchEntity | 222 » » return datastore.ErrNoSuchEntity |
| 192 } | 223 } |
| 193 » pdata := ents.Get(keyBytes(helper.WithoutContext, key)) | 224 » pdata := ents.Get(keyBytes(noNS, key)) |
| 194 if pdata == nil { | 225 if pdata == nil { |
| 195 » » return gae.ErrDSNoSuchEntity | 226 » » return datastore.ErrNoSuchEntity |
| 196 } | 227 } |
| 197 | 228 » pl := &propertyList{} |
| 198 » pm, err := rpmWoCtx(pdata, ns) | 229 » if err = pl.UnmarshalBinary(pdata); err != nil { |
| 199 » if err != nil { | |
| 200 return err | 230 return err |
| 201 } | 231 } |
| 202 | 232 » return fromPL(pl, dst) |
| 203 » pls, err := helper.GetPLS(dst) | |
| 204 » if err != nil { | |
| 205 » » return err | |
| 206 » } | |
| 207 | |
| 208 » // TODO(riannucci): should the Get API reveal conversion errors instead
of | |
| 209 » // swallowing them? | |
| 210 » _, err = pls.Load(pm) | |
| 211 » return err | |
| 212 } | 233 } |
| 213 | 234 |
| 214 func (d *dataStoreData) get(ns string, key gae.DSKey, dst interface{}) error { | 235 func (d *dataStoreData) get(ns string, dst interface{}) error { |
| 215 » return getInner(ns, key, dst, func() (*memCollection, error) { | 236 » return getInner(ns, d.KindNameResolver(), dst, func(*datastore.Key) (*me
mCollection, error) { |
| 216 d.rwlock.RLock() | 237 d.rwlock.RLock() |
| 217 s := d.store.Snapshot() | 238 s := d.store.Snapshot() |
| 218 d.rwlock.RUnlock() | 239 d.rwlock.RUnlock() |
| 219 | 240 |
| 220 return s.GetCollection("ents:" + ns), nil | 241 return s.GetCollection("ents:" + ns), nil |
| 221 }) | 242 }) |
| 222 } | 243 } |
| 223 | 244 |
| 224 func (d *dataStoreData) del(ns string, key gae.DSKey) (err error) { | 245 func (d *dataStoreData) del(ns string, key *datastore.Key) error { |
| 225 » if !helper.DSKeyValid(key, ns, false) { | 246 » if !keyValid(ns, key, userKeyOnly) { |
| 226 » » return gae.ErrDSInvalidKey | 247 » » return datastore.ErrInvalidKey |
| 227 } | 248 } |
| 228 | 249 |
| 229 » keyBuf := keyBytes(helper.WithoutContext, key) | 250 » keyBuf := keyBytes(noNS, key) |
| 230 | 251 |
| 231 d.rwlock.Lock() | 252 d.rwlock.Lock() |
| 232 defer d.rwlock.Unlock() | 253 defer d.rwlock.Unlock() |
| 233 | 254 |
| 234 ents := d.store.GetCollection("ents:" + ns) | 255 ents := d.store.GetCollection("ents:" + ns) |
| 235 if ents == nil { | 256 if ents == nil { |
| 236 return nil | 257 return nil |
| 237 } | 258 } |
| 238 » if _, err = incrementLocked(ents, groupMetaKey(key)); err != nil { | 259 » if _, err := incrementLocked(ents, groupMetaKey(key)); err != nil { |
| 239 » » return | 260 » » return err |
| 240 } | 261 } |
| 241 | 262 |
| 242 old := ents.Get(keyBuf) | 263 old := ents.Get(keyBuf) |
| 243 » oldPM := gae.DSPropertyMap(nil) | 264 » oldPl := (*propertyList)(nil) |
| 244 if old != nil { | 265 if old != nil { |
| 245 » » if oldPM, err = rpmWoCtx(old, ns); err != nil { | 266 » » oldPl = &propertyList{} |
| 246 » » » return | 267 » » if err := oldPl.UnmarshalBinary(old); err != nil { |
| 268 » » » return err |
| 247 } | 269 } |
| 248 } | 270 } |
| 249 » if err := updateIndicies(d.store, key, oldPM, nil); err != nil { | 271 » if err := updateIndicies(d.store, key, oldPl, nil); err != nil { |
| 250 return err | 272 return err |
| 251 } | 273 } |
| 252 | 274 |
| 253 ents.Delete(keyBuf) | 275 ents.Delete(keyBuf) |
| 254 return nil | 276 return nil |
| 255 } | 277 } |
| 256 | 278 |
| 257 func (d *dataStoreData) canApplyTxn(obj memContextObj) bool { | 279 func (d *dataStoreData) canApplyTxn(obj memContextObj) bool { |
| 258 // TODO(riannucci): implement with Flush/FlushRevert for persistance. | 280 // TODO(riannucci): implement with Flush/FlushRevert for persistance. |
| 259 | 281 |
| 260 txn := obj.(*txnDataStoreData) | 282 txn := obj.(*txnDataStoreData) |
| 261 for rk, muts := range txn.muts { | 283 for rk, muts := range txn.muts { |
| 262 if len(muts) == 0 { // read-only | 284 if len(muts) == 0 { // read-only |
| 263 continue | 285 continue |
| 264 } | 286 } |
| 265 » » k, err := helper.ReadDSKey(bytes.NewBufferString(rk), helper.Wit
hContext, "", "") | 287 » » k, err := keyFromByteString(withNS, rk, "") |
| 266 if err != nil { | 288 if err != nil { |
| 267 panic(err) | 289 panic(err) |
| 268 } | 290 } |
| 269 | |
| 270 entKey := "ents:" + k.Namespace() | 291 entKey := "ents:" + k.Namespace() |
| 271 mkey := groupMetaKey(k) | 292 mkey := groupMetaKey(k) |
| 272 entsHead := d.store.GetCollection(entKey) | 293 entsHead := d.store.GetCollection(entKey) |
| 273 entsSnap := txn.snap.GetCollection(entKey) | 294 entsSnap := txn.snap.GetCollection(entKey) |
| 274 vHead, err := curVersion(entsHead, mkey) | 295 vHead, err := curVersion(entsHead, mkey) |
| 275 if err != nil { | 296 if err != nil { |
| 276 panic(err) | 297 panic(err) |
| 277 } | 298 } |
| 278 vSnap, err := curVersion(entsSnap, mkey) | 299 vSnap, err := curVersion(entsSnap, mkey) |
| 279 if err != nil { | 300 if err != nil { |
| (...skipping 19 matching lines...) Expand all Loading... |
| 299 } else { | 320 } else { |
| 300 _, err = d.putInner(m.key, m.data) | 321 _, err = d.putInner(m.key, m.data) |
| 301 } | 322 } |
| 302 if err != nil { | 323 if err != nil { |
| 303 panic(err) | 324 panic(err) |
| 304 } | 325 } |
| 305 } | 326 } |
| 306 } | 327 } |
| 307 } | 328 } |
| 308 | 329 |
| 309 func (d *dataStoreData) mkTxn(o *gae.DSTransactionOptions) (memContextObj, error
) { | 330 func (d *dataStoreData) mkTxn(o *datastore.TransactionOptions) (memContextObj, e
rror) { |
| 310 return &txnDataStoreData{ | 331 return &txnDataStoreData{ |
| 311 // alias to the main datastore's so that testing code can have p
rimitive | 332 // alias to the main datastore's so that testing code can have p
rimitive |
| 312 // access to break features inside of transactions. | 333 // access to break features inside of transactions. |
| 313 BrokenFeatures: &d.BrokenFeatures, | 334 BrokenFeatures: &d.BrokenFeatures, |
| 314 parent: d, | 335 parent: d, |
| 336 knrKeeper: knrKeeper{knrFunc: d.knrFunc}, |
| 315 isXG: o != nil && o.XG, | 337 isXG: o != nil && o.XG, |
| 316 snap: d.store.Snapshot(), | 338 snap: d.store.Snapshot(), |
| 317 muts: map[string][]txnMutation{}, | 339 muts: map[string][]txnMutation{}, |
| 318 }, nil | 340 }, nil |
| 319 } | 341 } |
| 320 | 342 |
| 321 func (d *dataStoreData) endTxn() {} | 343 func (d *dataStoreData) endTxn() {} |
| 322 | 344 |
| 323 /////////////////////////////// txnDataStoreData /////////////////////////////// | 345 /////////////////////////////// txnDataStoreData /////////////////////////////// |
| 324 | 346 |
| 325 type txnMutation struct { | 347 type txnMutation struct { |
| 326 » key gae.DSKey | 348 » key *datastore.Key |
| 327 » data gae.DSPropertyMap | 349 » data *propertyList |
| 328 } | 350 } |
| 329 | 351 |
| 330 type txnDataStoreData struct { | 352 type txnDataStoreData struct { |
| 331 » *gae.BrokenFeatures | 353 » *wrapper.BrokenFeatures |
| 354 » knrKeeper |
| 332 sync.Mutex | 355 sync.Mutex |
| 333 | 356 |
| 334 parent *dataStoreData | 357 parent *dataStoreData |
| 335 | 358 |
| 336 // boolean 0 or 1, use atomic.*Int32 to access. | 359 // boolean 0 or 1, use atomic.*Int32 to access. |
| 337 closed int32 | 360 closed int32 |
| 338 isXG bool | 361 isXG bool |
| 339 | 362 |
| 340 snap *memStore | 363 snap *memStore |
| 341 | 364 |
| 342 // string is the raw-bytes encoding of the entity root incl. namespace | 365 // string is the raw-bytes encoding of the entity root incl. namespace |
| 343 muts map[string][]txnMutation | 366 muts map[string][]txnMutation |
| 344 // TODO(riannucci): account for 'transaction size' limit of 10MB by summ
ing | 367 // TODO(riannucci): account for 'transaction size' limit of 10MB by summ
ing |
| 345 // length of encoded keys + values. | 368 // length of encoded keys + values. |
| 346 } | 369 } |
| 347 | 370 |
| 348 var ( | 371 var ( |
| 349 _ = memContextObj((*txnDataStoreData)(nil)) | 372 _ = memContextObj((*txnDataStoreData)(nil)) |
| 350 _ = sync.Locker((*txnDataStoreData)(nil)) | 373 _ = sync.Locker((*txnDataStoreData)(nil)) |
| 351 » _ = gae.Testable((*txnDataStoreData)(nil)) | 374 » _ = wrapper.Testable((*txnDataStoreData)(nil)) |
| 375 » _ = wrapper.DSKindSetter((*txnDataStoreData)(nil)) |
| 352 ) | 376 ) |
| 353 | 377 |
| 354 const xgEGLimit = 25 | 378 const xgEGLimit = 25 |
| 355 | 379 |
| 356 func (*txnDataStoreData) canApplyTxn(memContextObj) bool { return false } | 380 func (*txnDataStoreData) canApplyTxn(memContextObj) bool { return false } |
| 357 func (td *txnDataStoreData) endTxn() { | 381 func (td *txnDataStoreData) endTxn() { |
| 358 if atomic.LoadInt32(&td.closed) == 1 { | 382 if atomic.LoadInt32(&td.closed) == 1 { |
| 359 panic("cannot end transaction twice") | 383 panic("cannot end transaction twice") |
| 360 } | 384 } |
| 361 atomic.StoreInt32(&td.closed, 1) | 385 atomic.StoreInt32(&td.closed, 1) |
| 362 } | 386 } |
| 363 func (*txnDataStoreData) applyTxn(context.Context, memContextObj) { | 387 func (*txnDataStoreData) applyTxn(context.Context, memContextObj) { |
| 364 panic("txnDataStoreData cannot apply transactions") | 388 panic("txnDataStoreData cannot apply transactions") |
| 365 } | 389 } |
| 366 func (*txnDataStoreData) mkTxn(*gae.DSTransactionOptions) (memContextObj, error)
{ | 390 func (*txnDataStoreData) mkTxn(*datastore.TransactionOptions) (memContextObj, er
ror) { |
| 367 return nil, errors.New("datastore: nested transactions are not supported
") | 391 return nil, errors.New("datastore: nested transactions are not supported
") |
| 368 } | 392 } |
| 369 | 393 |
| 370 func (td *txnDataStoreData) RunIfNotBroken(f func() error) error { | 394 func (td *txnDataStoreData) IsBroken() error { |
| 371 // Slightly different from the SDK... datastore and taskqueue each imple
ment | 395 // Slightly different from the SDK... datastore and taskqueue each imple
ment |
| 372 // this here, where in the SDK only datastore.transaction.Call does. | 396 // this here, where in the SDK only datastore.transaction.Call does. |
| 373 if atomic.LoadInt32(&td.closed) == 1 { | 397 if atomic.LoadInt32(&td.closed) == 1 { |
| 374 return errors.New("datastore: transaction context has expired") | 398 return errors.New("datastore: transaction context has expired") |
| 375 } | 399 } |
| 376 » return td.BrokenFeatures.RunIfNotBroken(f) | 400 » return td.BrokenFeatures.IsBroken() |
| 377 } | 401 } |
| 378 | 402 |
| 379 // writeMutation ensures that this transaction can support the given key/value | 403 // writeMutation ensures that this transaction can support the given key/value |
| 380 // mutation. | 404 // mutation. |
| 381 // | 405 // |
| 382 // if getOnly is true, don't record the actual mutation data, just ensure that | 406 // if getOnly is true, don't record the actual mutation data, just ensure that |
| 383 // the key is in an included entity group (or add an empty entry for tha
t | 407 // the key is in an included entity group (or add an empty entry for tha
t |
| 384 // group). | 408 // group). |
| 385 // | 409 // |
| 386 // if !getOnly && data == nil, this counts as a deletion instead of a Put. | 410 // if !getOnly && data == nil, this counts as a deletion instead of a Put. |
| 387 // | 411 // |
| 388 // Returns an error if this key causes the transaction to cross too many entity | 412 // Returns an error if this key causes the transaction to cross too many entity |
| 389 // groups. | 413 // groups. |
| 390 func (td *txnDataStoreData) writeMutation(getOnly bool, key gae.DSKey, data gae.
DSPropertyMap) error { | 414 func (td *txnDataStoreData) writeMutation(getOnly bool, key *datastore.Key, data
*propertyList) error { |
| 391 » rk := string(keyBytes(helper.WithContext, helper.DSKeyRoot(key))) | 415 » rk := string(keyBytes(withNS, rootKey(key))) |
| 392 | 416 |
| 393 td.Lock() | 417 td.Lock() |
| 394 defer td.Unlock() | 418 defer td.Unlock() |
| 395 | 419 |
| 396 if _, ok := td.muts[rk]; !ok { | 420 if _, ok := td.muts[rk]; !ok { |
| 397 limit := 1 | 421 limit := 1 |
| 398 if td.isXG { | 422 if td.isXG { |
| 399 limit = xgEGLimit | 423 limit = xgEGLimit |
| 400 } | 424 } |
| 401 if len(td.muts)+1 > limit { | 425 if len(td.muts)+1 > limit { |
| 402 msg := "cross-group transaction need to be explicitly sp
ecified (xg=True)" | 426 msg := "cross-group transaction need to be explicitly sp
ecified (xg=True)" |
| 403 if td.isXG { | 427 if td.isXG { |
| 404 msg = "operating on too many entity groups in a
single transaction" | 428 msg = "operating on too many entity groups in a
single transaction" |
| 405 } | 429 } |
| 406 » » » return errors.New(msg) | 430 » » » return newDSError(pb.Error_BAD_REQUEST, msg) |
| 407 } | 431 } |
| 408 td.muts[rk] = []txnMutation{} | 432 td.muts[rk] = []txnMutation{} |
| 409 } | 433 } |
| 410 if !getOnly { | 434 if !getOnly { |
| 411 td.muts[rk] = append(td.muts[rk], txnMutation{key, data}) | 435 td.muts[rk] = append(td.muts[rk], txnMutation{key, data}) |
| 412 } | 436 } |
| 413 | 437 |
| 414 return nil | 438 return nil |
| 415 } | 439 } |
| 416 | 440 |
| 417 func (td *txnDataStoreData) put(ns string, key gae.DSKey, src interface{}) (gae.
DSKey, error) { | 441 func (td *txnDataStoreData) put(ns string, src interface{}) (*datastore.Key, err
or) { |
| 418 » pMap, err := putPrelim(ns, key, src) | 442 » key, plData, err := putPrelim(ns, td.KindNameResolver(), src) |
| 419 if err != nil { | 443 if err != nil { |
| 420 return nil, err | 444 return nil, err |
| 421 } | 445 } |
| 422 | 446 |
| 423 func() { | 447 func() { |
| 424 td.parent.Lock() | 448 td.parent.Lock() |
| 425 defer td.parent.Unlock() | 449 defer td.parent.Unlock() |
| 426 _, key, err = td.parent.entsKeyLocked(key) | 450 _, key, err = td.parent.entsKeyLocked(key) |
| 427 }() | 451 }() |
| 428 if err != nil { | 452 if err != nil { |
| 429 return nil, err | 453 return nil, err |
| 430 } | 454 } |
| 431 | 455 |
| 432 » if err = td.writeMutation(false, key, pMap); err != nil { | 456 » if err = td.writeMutation(false, key, plData); err != nil { |
| 433 return nil, err | 457 return nil, err |
| 434 } | 458 } |
| 435 | 459 |
| 436 » return key, nil | 460 » return key, goon_internal.SetStructKey(src, key, td.KindNameResolver()) |
| 437 } | 461 } |
| 438 | 462 |
| 439 func (td *txnDataStoreData) get(ns string, key gae.DSKey, dst interface{}) error
{ | 463 func (td *txnDataStoreData) get(ns string, dst interface{}) error { |
| 440 » return getInner(ns, key, dst, func() (*memCollection, error) { | 464 » return getInner(ns, td.KindNameResolver(), dst, func(key *datastore.Key)
(*memCollection, error) { |
| 441 if err := td.writeMutation(true, key, nil); err != nil { | 465 if err := td.writeMutation(true, key, nil); err != nil { |
| 442 return nil, err | 466 return nil, err |
| 443 } | 467 } |
| 444 return td.snap.GetCollection("ents:" + ns), nil | 468 return td.snap.GetCollection("ents:" + ns), nil |
| 445 }) | 469 }) |
| 446 } | 470 } |
| 447 | 471 |
| 448 func (td *txnDataStoreData) del(ns string, key gae.DSKey) error { | 472 func (td *txnDataStoreData) del(ns string, key *datastore.Key) error { |
| 449 » if !helper.DSKeyValid(key, ns, false) { | 473 » if !keyValid(ns, key, userKeyOnly) { |
| 450 » » return gae.ErrDSInvalidKey | 474 » » return datastore.ErrInvalidKey |
| 451 } | 475 } |
| 452 return td.writeMutation(false, key, nil) | 476 return td.writeMutation(false, key, nil) |
| 453 } | 477 } |
| 454 | |
| 455 func keyCouldBeValid(k gae.DSKey, ns string, allowSpecial bool) bool { | |
| 456 // adds an id to k if it's incomplete. | |
| 457 if helper.DSKeyIncomplete(k) { | |
| 458 k = helper.NewDSKey(k.AppID(), k.Namespace(), k.Kind(), "", 1, k
.Parent()) | |
| 459 } | |
| 460 return helper.DSKeyValid(k, ns, allowSpecial) | |
| 461 } | |
| 462 | |
| 463 func keyBytes(ctx helper.DSKeyContext, key gae.DSKey) []byte { | |
| 464 buf := &bytes.Buffer{} | |
| 465 helper.WriteDSKey(buf, ctx, key) | |
| 466 return buf.Bytes() | |
| 467 } | |
| 468 | |
| 469 func rpmWoCtx(data []byte, ns string) (gae.DSPropertyMap, error) { | |
| 470 return helper.ReadDSPropertyMap(bytes.NewBuffer(data), helper.WithoutCon
text, globalAppID, ns) | |
| 471 } | |
| 472 | |
| 473 func rpm(data []byte) (gae.DSPropertyMap, error) { | |
| 474 return helper.ReadDSPropertyMap(bytes.NewBuffer(data), helper.WithContex
t, "", "") | |
| 475 } | |
| OLD | NEW |