Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(869)

Side by Side Diff: go/src/infra/gae/libs/wrapper/memory/datastore_data.go

Issue 1230303003: Revert "Refactor current GAE abstraction library to be free of the SDK*" (Closed) Base URL: https://chromium.googlesource.com/infra/infra.git@master
Patch Set: Created 5 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 package memory 5 package memory
6 6
7 import ( 7 import (
8 "bytes"
9 "errors" 8 "errors"
10 » "fmt" 9 » "infra/gae/libs/wrapper"
11 » "golang.org/x/net/context" 10 » goon_internal "infra/gae/libs/wrapper/memory/internal/goon"
12 "sync" 11 "sync"
13 "sync/atomic" 12 "sync/atomic"
14 13
15 » "infra/gae/libs/gae" 14 » "github.com/mjibson/goon"
16 » "infra/gae/libs/gae/helper" 15
16 » "appengine/datastore"
17 » pb "appengine_internal/datastore"
18 » "golang.org/x/net/context"
17 ) 19 )
18 20
21 ////////////////////////////////// knrKeeper ///////////////////////////////////
22
23 type knrKeeper struct {
24 knrLock sync.Mutex
25 knrFunc goon.KindNameResolver
26 }
27
28 var _ = wrapper.DSKindSetter((*knrKeeper)(nil))
29
30 func (k *knrKeeper) KindNameResolver() goon.KindNameResolver {
31 k.knrLock.Lock()
32 defer k.knrLock.Unlock()
33 if k.knrFunc == nil {
34 k.knrFunc = goon.DefaultKindName
35 }
36 return k.knrFunc
37 }
38
39 func (k *knrKeeper) SetKindNameResolver(knr goon.KindNameResolver) {
40 k.knrLock.Lock()
41 defer k.knrLock.Unlock()
42 if knr == nil {
43 knr = goon.DefaultKindName
44 }
45 k.knrFunc = knr
46 }
47
19 //////////////////////////////// dataStoreData ///////////////////////////////// 48 //////////////////////////////// dataStoreData /////////////////////////////////
20 49
21 type dataStoreData struct { 50 type dataStoreData struct {
22 » gae.BrokenFeatures 51 » wrapper.BrokenFeatures
52 » knrKeeper
23 53
24 rwlock sync.RWMutex 54 rwlock sync.RWMutex
25 // See README.md for store schema. 55 // See README.md for store schema.
26 store *memStore 56 store *memStore
27 snap *memStore 57 snap *memStore
28 } 58 }
29 59
30 var ( 60 var (
31 _ = memContextObj((*dataStoreData)(nil)) 61 _ = memContextObj((*dataStoreData)(nil))
32 _ = sync.Locker((*dataStoreData)(nil)) 62 _ = sync.Locker((*dataStoreData)(nil))
33 » _ = gae.Testable((*dataStoreData)(nil)) 63 » _ = wrapper.Testable((*dataStoreData)(nil))
64 » _ = wrapper.DSKindSetter((*dataStoreData)(nil))
34 ) 65 )
35 66
36 func newDataStoreData() *dataStoreData { 67 func newDataStoreData() *dataStoreData {
37 store := newMemStore() 68 store := newMemStore()
38 return &dataStoreData{ 69 return &dataStoreData{
39 » » BrokenFeatures: gae.BrokenFeatures{DefaultError: errors.New("INT ERNAL_ERROR")}, 70 » » BrokenFeatures: wrapper.BrokenFeatures{DefaultError: newDSError( pb.Error_INTERNAL_ERROR)},
40 store: store, 71 store: store,
41 snap: store.Snapshot(), // empty but better than a nil pointer. 72 snap: store.Snapshot(), // empty but better than a nil pointer.
42 } 73 }
43 } 74 }
44 75
45 func (d *dataStoreData) Lock() { 76 func (d *dataStoreData) Lock() {
46 d.rwlock.Lock() 77 d.rwlock.Lock()
47 } 78 }
48 79
49 func (d *dataStoreData) Unlock() { 80 func (d *dataStoreData) Unlock() {
50 d.rwlock.Unlock() 81 d.rwlock.Unlock()
51 } 82 }
52 83
53 /////////////////////////// indicies(dataStoreData) //////////////////////////// 84 /////////////////////////// indicies(dataStoreData) ////////////////////////////
54 85
55 func groupMetaKey(key gae.DSKey) []byte { 86 func groupMetaKey(key *datastore.Key) []byte {
56 » return keyBytes(helper.WithoutContext, 87 » return keyBytes(noNS, newKey("", "__entity_group__", "", 1, rootKey(key) ))
57 » » helper.NewDSKey("", "", "__entity_group__", "", 1, helper.DSKeyR oot(key)))
58 } 88 }
59 89
60 func groupIDsKey(key gae.DSKey) []byte { 90 func groupIDsKey(key *datastore.Key) []byte {
61 » return keyBytes(helper.WithoutContext, 91 » return keyBytes(noNS, newKey("", "__entity_group_ids__", "", 1, rootKey( key)))
62 » » helper.NewDSKey("", "", "__entity_group_ids__", "", 1, helper.DS KeyRoot(key)))
63 } 92 }
64 93
65 func rootIDsKey(kind string) []byte { 94 func rootIDsKey(kind string) []byte {
66 » return keyBytes(helper.WithoutContext, 95 » return keyBytes(noNS, newKey("", "__entity_root_ids__", kind, 0, nil))
67 » » helper.NewDSKey("", "", "__entity_root_ids__", kind, 0, nil))
68 } 96 }
69 97
70 func curVersion(ents *memCollection, key []byte) (int64, error) { 98 func curVersion(ents *memCollection, key []byte) (int64, error) {
71 if v := ents.Get(key); v != nil { 99 if v := ents.Get(key); v != nil {
72 » » pm, err := rpm(v) 100 » » numData := &propertyList{}
73 » » if err != nil { 101 » » if err := numData.UnmarshalBinary(v); err != nil {
74 return 0, err 102 return 0, err
75 } 103 }
76 » » pl, ok := pm["__version__"] 104 » » return (*numData)[0].Value.(int64), nil
77 » » if ok && len(pl) > 0 && pl[0].Type() == gae.DSPTInt {
78 » » » return pl[0].Value().(int64), nil
79 » » }
80 » » return 0, fmt.Errorf("__version__ property missing or wrong: %v" , pm)
81 } 105 }
82 return 0, nil 106 return 0, nil
83 } 107 }
84 108
85 func incrementLocked(ents *memCollection, key []byte) (ret int64, err error) { 109 func incrementLocked(ents *memCollection, key []byte) (int64, error) {
86 » if ret, err = curVersion(ents, key); err != nil { 110 » num := int64(0)
87 » » ret = 0 111 » numData := &propertyList{}
112 » if v := ents.Get(key); v != nil {
113 » » if err := numData.UnmarshalBinary(v); err != nil {
114 » » » return 0, err
115 » » }
116 » » num = (*numData)[0].Value.(int64)
117 » } else {
118 » » *numData = append(*numData, datastore.Property{Name: "__version_ _"})
88 } 119 }
89 » ret++ 120 » num++
90 » p := gae.DSProperty{} 121 » (*numData)[0].Value = num
91 » if err = p.SetValue(ret, true); err != nil { 122 » incData, err := numData.MarshalBinary()
92 » » return 123 » if err != nil {
124 » » return 0, err
93 } 125 }
94 » buf := &bytes.Buffer{} 126 » ents.Set(key, incData)
95 » helper.WriteDSPropertyMap( 127
96 » » buf, gae.DSPropertyMap{"__version__": {p}}, helper.WithContext) 128 » return num, nil
97 » ents.Set(key, buf.Bytes())
98 » return
99 } 129 }
100 130
101 func (d *dataStoreData) entsKeyLocked(key gae.DSKey) (*memCollection, gae.DSKey, error) { 131 func (d *dataStoreData) entsKeyLocked(key *datastore.Key) (*memCollection, *data store.Key, error) {
102 coll := "ents:" + key.Namespace() 132 coll := "ents:" + key.Namespace()
103 ents := d.store.GetCollection(coll) 133 ents := d.store.GetCollection(coll)
104 if ents == nil { 134 if ents == nil {
105 ents = d.store.SetCollection(coll, nil) 135 ents = d.store.SetCollection(coll, nil)
106 } 136 }
107 137
108 » if helper.DSKeyIncomplete(key) { 138 » if key.Incomplete() {
109 idKey := []byte(nil) 139 idKey := []byte(nil)
110 if key.Parent() == nil { 140 if key.Parent() == nil {
111 idKey = rootIDsKey(key.Kind()) 141 idKey = rootIDsKey(key.Kind())
112 } else { 142 } else {
113 idKey = groupIDsKey(key) 143 idKey = groupIDsKey(key)
114 } 144 }
115 id, err := incrementLocked(ents, idKey) 145 id, err := incrementLocked(ents, idKey)
116 if err != nil { 146 if err != nil {
117 return nil, nil, err 147 return nil, nil, err
118 } 148 }
119 » » key = helper.NewDSKey(key.AppID(), key.Namespace(), key.Kind(), "", id, key.Parent()) 149 » » key = newKey(key.Namespace(), key.Kind(), "", id, key.Parent())
120 } 150 }
121 151
122 return ents, key, nil 152 return ents, key, nil
123 } 153 }
124 154
125 func putPrelim(ns string, key gae.DSKey, src interface{}) (gae.DSPropertyMap, er ror) { 155 func putPrelim(ns string, knr goon.KindNameResolver, src interface{}) (*datastor e.Key, *propertyList, error) {
126 » if !keyCouldBeValid(key, ns, false) { 156 » key := newKeyObj(ns, knr, src)
157 » if !keyCouldBeValid(ns, key, userKeyOnly) {
127 // TODO(riannucci): different error for Put-ing to reserved Keys ? 158 // TODO(riannucci): different error for Put-ing to reserved Keys ?
128 » » return nil, gae.ErrDSInvalidKey 159 » » return nil, nil, datastore.ErrInvalidKey
129 } 160 }
130 161
131 » pls, err := helper.GetPLS(src) 162 » data, err := toPL(src)
163 » return key, data, err
164 }
165
166 func (d *dataStoreData) put(ns string, src interface{}) (*datastore.Key, error) {
167 » key, plData, err := putPrelim(ns, d.KindNameResolver(), src)
132 if err != nil { 168 if err != nil {
133 return nil, err 169 return nil, err
134 } 170 }
135 » return pls.Save() 171 » if key, err = d.putInner(key, plData); err != nil {
172 » » return nil, err
173 » }
174 » return key, goon_internal.SetStructKey(src, key, d.KindNameResolver())
136 } 175 }
137 176
138 func (d *dataStoreData) put(ns string, key gae.DSKey, src interface{}) (gae.DSKe y, error) { 177 func (d *dataStoreData) putInner(key *datastore.Key, data *propertyList) (*datas tore.Key, error) {
139 » pmData, err := putPrelim(ns, key, src) 178 » dataBytes, err := data.MarshalBinary()
140 if err != nil { 179 if err != nil {
141 return nil, err 180 return nil, err
142 } 181 }
143 if key, err = d.putInner(key, pmData); err != nil {
144 return nil, err
145 }
146 return key, nil
147 }
148
149 func (d *dataStoreData) putInner(key gae.DSKey, data gae.DSPropertyMap) (gae.DSK ey, error) {
150 buf := &bytes.Buffer{}
151 helper.WriteDSPropertyMap(buf, data, helper.WithoutContext)
152 dataBytes := buf.Bytes()
153 182
154 d.rwlock.Lock() 183 d.rwlock.Lock()
155 defer d.rwlock.Unlock() 184 defer d.rwlock.Unlock()
156 185
157 ents, key, err := d.entsKeyLocked(key) 186 ents, key, err := d.entsKeyLocked(key)
158 if err != nil { 187 if err != nil {
159 return nil, err 188 return nil, err
160 } 189 }
161 if _, err = incrementLocked(ents, groupMetaKey(key)); err != nil { 190 if _, err = incrementLocked(ents, groupMetaKey(key)); err != nil {
162 return nil, err 191 return nil, err
163 } 192 }
164 193
165 » old := ents.Get(keyBytes(helper.WithoutContext, key)) 194 » old := ents.Get(keyBytes(noNS, key))
166 » oldPM := gae.DSPropertyMap(nil) 195 » oldPl := (*propertyList)(nil)
167 if old != nil { 196 if old != nil {
168 » » if oldPM, err = rpmWoCtx(old, key.Namespace()); err != nil { 197 » » oldPl = &propertyList{}
198 » » if err = oldPl.UnmarshalBinary(old); err != nil {
169 return nil, err 199 return nil, err
170 } 200 }
171 } 201 }
172 » if err = updateIndicies(d.store, key, oldPM, data); err != nil { 202 » if err = updateIndicies(d.store, key, oldPl, data); err != nil {
173 return nil, err 203 return nil, err
174 } 204 }
175 205
176 » ents.Set(keyBytes(helper.WithoutContext, key), dataBytes) 206 » ents.Set(keyBytes(noNS, key), dataBytes)
177 207
178 return key, nil 208 return key, nil
179 } 209 }
180 210
181 func getInner(ns string, key gae.DSKey, dst interface{}, getColl func() (*memCol lection, error)) error { 211 func getInner(ns string, knr goon.KindNameResolver, dst interface{}, getColl fun c(*datastore.Key) (*memCollection, error)) error {
182 » if helper.DSKeyIncomplete(key) || !helper.DSKeyValid(key, ns, true) { 212 » key := newKeyObj(ns, knr, dst)
183 » » return gae.ErrDSInvalidKey 213 » if !keyValid(ns, key, allowSpecialKeys) {
214 » » return datastore.ErrInvalidKey
184 } 215 }
185 216
186 » ents, err := getColl() 217 » ents, err := getColl(key)
187 if err != nil { 218 if err != nil {
188 return err 219 return err
189 } 220 }
190 if ents == nil { 221 if ents == nil {
191 » » return gae.ErrDSNoSuchEntity 222 » » return datastore.ErrNoSuchEntity
192 } 223 }
193 » pdata := ents.Get(keyBytes(helper.WithoutContext, key)) 224 » pdata := ents.Get(keyBytes(noNS, key))
194 if pdata == nil { 225 if pdata == nil {
195 » » return gae.ErrDSNoSuchEntity 226 » » return datastore.ErrNoSuchEntity
196 } 227 }
197 228 » pl := &propertyList{}
198 » pm, err := rpmWoCtx(pdata, ns) 229 » if err = pl.UnmarshalBinary(pdata); err != nil {
199 » if err != nil {
200 return err 230 return err
201 } 231 }
202 232 » return fromPL(pl, dst)
203 » pls, err := helper.GetPLS(dst)
204 » if err != nil {
205 » » return err
206 » }
207
208 » // TODO(riannucci): should the Get API reveal conversion errors instead of
209 » // swallowing them?
210 » _, err = pls.Load(pm)
211 » return err
212 } 233 }
213 234
214 func (d *dataStoreData) get(ns string, key gae.DSKey, dst interface{}) error { 235 func (d *dataStoreData) get(ns string, dst interface{}) error {
215 » return getInner(ns, key, dst, func() (*memCollection, error) { 236 » return getInner(ns, d.KindNameResolver(), dst, func(*datastore.Key) (*me mCollection, error) {
216 d.rwlock.RLock() 237 d.rwlock.RLock()
217 s := d.store.Snapshot() 238 s := d.store.Snapshot()
218 d.rwlock.RUnlock() 239 d.rwlock.RUnlock()
219 240
220 return s.GetCollection("ents:" + ns), nil 241 return s.GetCollection("ents:" + ns), nil
221 }) 242 })
222 } 243 }
223 244
224 func (d *dataStoreData) del(ns string, key gae.DSKey) (err error) { 245 func (d *dataStoreData) del(ns string, key *datastore.Key) error {
225 » if !helper.DSKeyValid(key, ns, false) { 246 » if !keyValid(ns, key, userKeyOnly) {
226 » » return gae.ErrDSInvalidKey 247 » » return datastore.ErrInvalidKey
227 } 248 }
228 249
229 » keyBuf := keyBytes(helper.WithoutContext, key) 250 » keyBuf := keyBytes(noNS, key)
230 251
231 d.rwlock.Lock() 252 d.rwlock.Lock()
232 defer d.rwlock.Unlock() 253 defer d.rwlock.Unlock()
233 254
234 ents := d.store.GetCollection("ents:" + ns) 255 ents := d.store.GetCollection("ents:" + ns)
235 if ents == nil { 256 if ents == nil {
236 return nil 257 return nil
237 } 258 }
238 » if _, err = incrementLocked(ents, groupMetaKey(key)); err != nil { 259 » if _, err := incrementLocked(ents, groupMetaKey(key)); err != nil {
239 » » return 260 » » return err
240 } 261 }
241 262
242 old := ents.Get(keyBuf) 263 old := ents.Get(keyBuf)
243 » oldPM := gae.DSPropertyMap(nil) 264 » oldPl := (*propertyList)(nil)
244 if old != nil { 265 if old != nil {
245 » » if oldPM, err = rpmWoCtx(old, ns); err != nil { 266 » » oldPl = &propertyList{}
246 » » » return 267 » » if err := oldPl.UnmarshalBinary(old); err != nil {
268 » » » return err
247 } 269 }
248 } 270 }
249 » if err := updateIndicies(d.store, key, oldPM, nil); err != nil { 271 » if err := updateIndicies(d.store, key, oldPl, nil); err != nil {
250 return err 272 return err
251 } 273 }
252 274
253 ents.Delete(keyBuf) 275 ents.Delete(keyBuf)
254 return nil 276 return nil
255 } 277 }
256 278
257 func (d *dataStoreData) canApplyTxn(obj memContextObj) bool { 279 func (d *dataStoreData) canApplyTxn(obj memContextObj) bool {
258 // TODO(riannucci): implement with Flush/FlushRevert for persistance. 280 // TODO(riannucci): implement with Flush/FlushRevert for persistance.
259 281
260 txn := obj.(*txnDataStoreData) 282 txn := obj.(*txnDataStoreData)
261 for rk, muts := range txn.muts { 283 for rk, muts := range txn.muts {
262 if len(muts) == 0 { // read-only 284 if len(muts) == 0 { // read-only
263 continue 285 continue
264 } 286 }
265 » » k, err := helper.ReadDSKey(bytes.NewBufferString(rk), helper.Wit hContext, "", "") 287 » » k, err := keyFromByteString(withNS, rk, "")
266 if err != nil { 288 if err != nil {
267 panic(err) 289 panic(err)
268 } 290 }
269
270 entKey := "ents:" + k.Namespace() 291 entKey := "ents:" + k.Namespace()
271 mkey := groupMetaKey(k) 292 mkey := groupMetaKey(k)
272 entsHead := d.store.GetCollection(entKey) 293 entsHead := d.store.GetCollection(entKey)
273 entsSnap := txn.snap.GetCollection(entKey) 294 entsSnap := txn.snap.GetCollection(entKey)
274 vHead, err := curVersion(entsHead, mkey) 295 vHead, err := curVersion(entsHead, mkey)
275 if err != nil { 296 if err != nil {
276 panic(err) 297 panic(err)
277 } 298 }
278 vSnap, err := curVersion(entsSnap, mkey) 299 vSnap, err := curVersion(entsSnap, mkey)
279 if err != nil { 300 if err != nil {
(...skipping 19 matching lines...) Expand all
299 } else { 320 } else {
300 _, err = d.putInner(m.key, m.data) 321 _, err = d.putInner(m.key, m.data)
301 } 322 }
302 if err != nil { 323 if err != nil {
303 panic(err) 324 panic(err)
304 } 325 }
305 } 326 }
306 } 327 }
307 } 328 }
308 329
309 func (d *dataStoreData) mkTxn(o *gae.DSTransactionOptions) (memContextObj, error ) { 330 func (d *dataStoreData) mkTxn(o *datastore.TransactionOptions) (memContextObj, e rror) {
310 return &txnDataStoreData{ 331 return &txnDataStoreData{
311 // alias to the main datastore's so that testing code can have p rimitive 332 // alias to the main datastore's so that testing code can have p rimitive
312 // access to break features inside of transactions. 333 // access to break features inside of transactions.
313 BrokenFeatures: &d.BrokenFeatures, 334 BrokenFeatures: &d.BrokenFeatures,
314 parent: d, 335 parent: d,
336 knrKeeper: knrKeeper{knrFunc: d.knrFunc},
315 isXG: o != nil && o.XG, 337 isXG: o != nil && o.XG,
316 snap: d.store.Snapshot(), 338 snap: d.store.Snapshot(),
317 muts: map[string][]txnMutation{}, 339 muts: map[string][]txnMutation{},
318 }, nil 340 }, nil
319 } 341 }
320 342
321 func (d *dataStoreData) endTxn() {} 343 func (d *dataStoreData) endTxn() {}
322 344
323 /////////////////////////////// txnDataStoreData /////////////////////////////// 345 /////////////////////////////// txnDataStoreData ///////////////////////////////
324 346
325 type txnMutation struct { 347 type txnMutation struct {
326 » key gae.DSKey 348 » key *datastore.Key
327 » data gae.DSPropertyMap 349 » data *propertyList
328 } 350 }
329 351
330 type txnDataStoreData struct { 352 type txnDataStoreData struct {
331 » *gae.BrokenFeatures 353 » *wrapper.BrokenFeatures
354 » knrKeeper
332 sync.Mutex 355 sync.Mutex
333 356
334 parent *dataStoreData 357 parent *dataStoreData
335 358
336 // boolean 0 or 1, use atomic.*Int32 to access. 359 // boolean 0 or 1, use atomic.*Int32 to access.
337 closed int32 360 closed int32
338 isXG bool 361 isXG bool
339 362
340 snap *memStore 363 snap *memStore
341 364
342 // string is the raw-bytes encoding of the entity root incl. namespace 365 // string is the raw-bytes encoding of the entity root incl. namespace
343 muts map[string][]txnMutation 366 muts map[string][]txnMutation
344 // TODO(riannucci): account for 'transaction size' limit of 10MB by summ ing 367 // TODO(riannucci): account for 'transaction size' limit of 10MB by summ ing
345 // length of encoded keys + values. 368 // length of encoded keys + values.
346 } 369 }
347 370
348 var ( 371 var (
349 _ = memContextObj((*txnDataStoreData)(nil)) 372 _ = memContextObj((*txnDataStoreData)(nil))
350 _ = sync.Locker((*txnDataStoreData)(nil)) 373 _ = sync.Locker((*txnDataStoreData)(nil))
351 » _ = gae.Testable((*txnDataStoreData)(nil)) 374 » _ = wrapper.Testable((*txnDataStoreData)(nil))
375 » _ = wrapper.DSKindSetter((*txnDataStoreData)(nil))
352 ) 376 )
353 377
354 const xgEGLimit = 25 378 const xgEGLimit = 25
355 379
356 func (*txnDataStoreData) canApplyTxn(memContextObj) bool { return false } 380 func (*txnDataStoreData) canApplyTxn(memContextObj) bool { return false }
357 func (td *txnDataStoreData) endTxn() { 381 func (td *txnDataStoreData) endTxn() {
358 if atomic.LoadInt32(&td.closed) == 1 { 382 if atomic.LoadInt32(&td.closed) == 1 {
359 panic("cannot end transaction twice") 383 panic("cannot end transaction twice")
360 } 384 }
361 atomic.StoreInt32(&td.closed, 1) 385 atomic.StoreInt32(&td.closed, 1)
362 } 386 }
363 func (*txnDataStoreData) applyTxn(context.Context, memContextObj) { 387 func (*txnDataStoreData) applyTxn(context.Context, memContextObj) {
364 panic("txnDataStoreData cannot apply transactions") 388 panic("txnDataStoreData cannot apply transactions")
365 } 389 }
366 func (*txnDataStoreData) mkTxn(*gae.DSTransactionOptions) (memContextObj, error) { 390 func (*txnDataStoreData) mkTxn(*datastore.TransactionOptions) (memContextObj, er ror) {
367 return nil, errors.New("datastore: nested transactions are not supported ") 391 return nil, errors.New("datastore: nested transactions are not supported ")
368 } 392 }
369 393
370 func (td *txnDataStoreData) RunIfNotBroken(f func() error) error { 394 func (td *txnDataStoreData) IsBroken() error {
371 // Slightly different from the SDK... datastore and taskqueue each imple ment 395 // Slightly different from the SDK... datastore and taskqueue each imple ment
372 // this here, where in the SDK only datastore.transaction.Call does. 396 // this here, where in the SDK only datastore.transaction.Call does.
373 if atomic.LoadInt32(&td.closed) == 1 { 397 if atomic.LoadInt32(&td.closed) == 1 {
374 return errors.New("datastore: transaction context has expired") 398 return errors.New("datastore: transaction context has expired")
375 } 399 }
376 » return td.BrokenFeatures.RunIfNotBroken(f) 400 » return td.BrokenFeatures.IsBroken()
377 } 401 }
378 402
379 // writeMutation ensures that this transaction can support the given key/value 403 // writeMutation ensures that this transaction can support the given key/value
380 // mutation. 404 // mutation.
381 // 405 //
382 // if getOnly is true, don't record the actual mutation data, just ensure that 406 // if getOnly is true, don't record the actual mutation data, just ensure that
383 // the key is in an included entity group (or add an empty entry for tha t 407 // the key is in an included entity group (or add an empty entry for tha t
384 // group). 408 // group).
385 // 409 //
386 // if !getOnly && data == nil, this counts as a deletion instead of a Put. 410 // if !getOnly && data == nil, this counts as a deletion instead of a Put.
387 // 411 //
388 // Returns an error if this key causes the transaction to cross too many entity 412 // Returns an error if this key causes the transaction to cross too many entity
389 // groups. 413 // groups.
390 func (td *txnDataStoreData) writeMutation(getOnly bool, key gae.DSKey, data gae. DSPropertyMap) error { 414 func (td *txnDataStoreData) writeMutation(getOnly bool, key *datastore.Key, data *propertyList) error {
391 » rk := string(keyBytes(helper.WithContext, helper.DSKeyRoot(key))) 415 » rk := string(keyBytes(withNS, rootKey(key)))
392 416
393 td.Lock() 417 td.Lock()
394 defer td.Unlock() 418 defer td.Unlock()
395 419
396 if _, ok := td.muts[rk]; !ok { 420 if _, ok := td.muts[rk]; !ok {
397 limit := 1 421 limit := 1
398 if td.isXG { 422 if td.isXG {
399 limit = xgEGLimit 423 limit = xgEGLimit
400 } 424 }
401 if len(td.muts)+1 > limit { 425 if len(td.muts)+1 > limit {
402 msg := "cross-group transaction need to be explicitly sp ecified (xg=True)" 426 msg := "cross-group transaction need to be explicitly sp ecified (xg=True)"
403 if td.isXG { 427 if td.isXG {
404 msg = "operating on too many entity groups in a single transaction" 428 msg = "operating on too many entity groups in a single transaction"
405 } 429 }
406 » » » return errors.New(msg) 430 » » » return newDSError(pb.Error_BAD_REQUEST, msg)
407 } 431 }
408 td.muts[rk] = []txnMutation{} 432 td.muts[rk] = []txnMutation{}
409 } 433 }
410 if !getOnly { 434 if !getOnly {
411 td.muts[rk] = append(td.muts[rk], txnMutation{key, data}) 435 td.muts[rk] = append(td.muts[rk], txnMutation{key, data})
412 } 436 }
413 437
414 return nil 438 return nil
415 } 439 }
416 440
417 func (td *txnDataStoreData) put(ns string, key gae.DSKey, src interface{}) (gae. DSKey, error) { 441 func (td *txnDataStoreData) put(ns string, src interface{}) (*datastore.Key, err or) {
418 » pMap, err := putPrelim(ns, key, src) 442 » key, plData, err := putPrelim(ns, td.KindNameResolver(), src)
419 if err != nil { 443 if err != nil {
420 return nil, err 444 return nil, err
421 } 445 }
422 446
423 func() { 447 func() {
424 td.parent.Lock() 448 td.parent.Lock()
425 defer td.parent.Unlock() 449 defer td.parent.Unlock()
426 _, key, err = td.parent.entsKeyLocked(key) 450 _, key, err = td.parent.entsKeyLocked(key)
427 }() 451 }()
428 if err != nil { 452 if err != nil {
429 return nil, err 453 return nil, err
430 } 454 }
431 455
432 » if err = td.writeMutation(false, key, pMap); err != nil { 456 » if err = td.writeMutation(false, key, plData); err != nil {
433 return nil, err 457 return nil, err
434 } 458 }
435 459
436 » return key, nil 460 » return key, goon_internal.SetStructKey(src, key, td.KindNameResolver())
437 } 461 }
438 462
439 func (td *txnDataStoreData) get(ns string, key gae.DSKey, dst interface{}) error { 463 func (td *txnDataStoreData) get(ns string, dst interface{}) error {
440 » return getInner(ns, key, dst, func() (*memCollection, error) { 464 » return getInner(ns, td.KindNameResolver(), dst, func(key *datastore.Key) (*memCollection, error) {
441 if err := td.writeMutation(true, key, nil); err != nil { 465 if err := td.writeMutation(true, key, nil); err != nil {
442 return nil, err 466 return nil, err
443 } 467 }
444 return td.snap.GetCollection("ents:" + ns), nil 468 return td.snap.GetCollection("ents:" + ns), nil
445 }) 469 })
446 } 470 }
447 471
448 func (td *txnDataStoreData) del(ns string, key gae.DSKey) error { 472 func (td *txnDataStoreData) del(ns string, key *datastore.Key) error {
449 » if !helper.DSKeyValid(key, ns, false) { 473 » if !keyValid(ns, key, userKeyOnly) {
450 » » return gae.ErrDSInvalidKey 474 » » return datastore.ErrInvalidKey
451 } 475 }
452 return td.writeMutation(false, key, nil) 476 return td.writeMutation(false, key, nil)
453 } 477 }
454
455 func keyCouldBeValid(k gae.DSKey, ns string, allowSpecial bool) bool {
456 // adds an id to k if it's incomplete.
457 if helper.DSKeyIncomplete(k) {
458 k = helper.NewDSKey(k.AppID(), k.Namespace(), k.Kind(), "", 1, k .Parent())
459 }
460 return helper.DSKeyValid(k, ns, allowSpecial)
461 }
462
463 func keyBytes(ctx helper.DSKeyContext, key gae.DSKey) []byte {
464 buf := &bytes.Buffer{}
465 helper.WriteDSKey(buf, ctx, key)
466 return buf.Bytes()
467 }
468
469 func rpmWoCtx(data []byte, ns string) (gae.DSPropertyMap, error) {
470 return helper.ReadDSPropertyMap(bytes.NewBuffer(data), helper.WithoutCon text, globalAppID, ns)
471 }
472
473 func rpm(data []byte) (gae.DSPropertyMap, error) {
474 return helper.ReadDSPropertyMap(bytes.NewBuffer(data), helper.WithContex t, "", "")
475 }
OLDNEW
« no previous file with comments | « go/src/infra/gae/libs/wrapper/memory/datastore.go ('k') | go/src/infra/gae/libs/wrapper/memory/datastore_query.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698