Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(241)

Side by Side Diff: go/src/infra/gae/libs/wrapper/memory/datastore_data.go

Issue 1222903002: Refactor current GAE abstraction library to be free of the SDK* (Closed) Base URL: https://chromium.googlesource.com/infra/infra.git@master
Patch Set: more fixes Created 5 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 package memory
6
7 import (
8 "errors"
9 "infra/gae/libs/wrapper"
10 goon_internal "infra/gae/libs/wrapper/memory/internal/goon"
11 "sync"
12 "sync/atomic"
13
14 "github.com/mjibson/goon"
15
16 "appengine/datastore"
17 pb "appengine_internal/datastore"
18 "golang.org/x/net/context"
19 )
20
21 ////////////////////////////////// knrKeeper ///////////////////////////////////
22
23 type knrKeeper struct {
24 knrLock sync.Mutex
25 knrFunc goon.KindNameResolver
26 }
27
28 var _ = wrapper.DSKindSetter((*knrKeeper)(nil))
29
30 func (k *knrKeeper) KindNameResolver() goon.KindNameResolver {
31 k.knrLock.Lock()
32 defer k.knrLock.Unlock()
33 if k.knrFunc == nil {
34 k.knrFunc = goon.DefaultKindName
35 }
36 return k.knrFunc
37 }
38
39 func (k *knrKeeper) SetKindNameResolver(knr goon.KindNameResolver) {
40 k.knrLock.Lock()
41 defer k.knrLock.Unlock()
42 if knr == nil {
43 knr = goon.DefaultKindName
44 }
45 k.knrFunc = knr
46 }
47
48 //////////////////////////////// dataStoreData /////////////////////////////////
49
50 type dataStoreData struct {
51 wrapper.BrokenFeatures
52 knrKeeper
53
54 rwlock sync.RWMutex
55 // See README.md for store schema.
56 store *memStore
57 snap *memStore
58 }
59
60 var (
61 _ = memContextObj((*dataStoreData)(nil))
62 _ = sync.Locker((*dataStoreData)(nil))
63 _ = wrapper.Testable((*dataStoreData)(nil))
64 _ = wrapper.DSKindSetter((*dataStoreData)(nil))
65 )
66
67 func newDataStoreData() *dataStoreData {
68 store := newMemStore()
69 return &dataStoreData{
70 BrokenFeatures: wrapper.BrokenFeatures{DefaultError: newDSError( pb.Error_INTERNAL_ERROR)},
71 store: store,
72 snap: store.Snapshot(), // empty but better than a nil pointer.
73 }
74 }
75
76 func (d *dataStoreData) Lock() {
77 d.rwlock.Lock()
78 }
79
80 func (d *dataStoreData) Unlock() {
81 d.rwlock.Unlock()
82 }
83
84 /////////////////////////// indicies(dataStoreData) ////////////////////////////
85
86 func groupMetaKey(key *datastore.Key) []byte {
87 return keyBytes(noNS, newKey("", "__entity_group__", "", 1, rootKey(key) ))
88 }
89
90 func groupIDsKey(key *datastore.Key) []byte {
91 return keyBytes(noNS, newKey("", "__entity_group_ids__", "", 1, rootKey( key)))
92 }
93
94 func rootIDsKey(kind string) []byte {
95 return keyBytes(noNS, newKey("", "__entity_root_ids__", kind, 0, nil))
96 }
97
98 func curVersion(ents *memCollection, key []byte) (int64, error) {
99 if v := ents.Get(key); v != nil {
100 numData := &propertyList{}
101 if err := numData.UnmarshalBinary(v); err != nil {
102 return 0, err
103 }
104 return (*numData)[0].Value.(int64), nil
105 }
106 return 0, nil
107 }
108
109 func incrementLocked(ents *memCollection, key []byte) (int64, error) {
110 num := int64(0)
111 numData := &propertyList{}
112 if v := ents.Get(key); v != nil {
113 if err := numData.UnmarshalBinary(v); err != nil {
114 return 0, err
115 }
116 num = (*numData)[0].Value.(int64)
117 } else {
118 *numData = append(*numData, datastore.Property{Name: "__version_ _"})
119 }
120 num++
121 (*numData)[0].Value = num
122 incData, err := numData.MarshalBinary()
123 if err != nil {
124 return 0, err
125 }
126 ents.Set(key, incData)
127
128 return num, nil
129 }
130
131 func (d *dataStoreData) entsKeyLocked(key *datastore.Key) (*memCollection, *data store.Key, error) {
132 coll := "ents:" + key.Namespace()
133 ents := d.store.GetCollection(coll)
134 if ents == nil {
135 ents = d.store.SetCollection(coll, nil)
136 }
137
138 if key.Incomplete() {
139 idKey := []byte(nil)
140 if key.Parent() == nil {
141 idKey = rootIDsKey(key.Kind())
142 } else {
143 idKey = groupIDsKey(key)
144 }
145 id, err := incrementLocked(ents, idKey)
146 if err != nil {
147 return nil, nil, err
148 }
149 key = newKey(key.Namespace(), key.Kind(), "", id, key.Parent())
150 }
151
152 return ents, key, nil
153 }
154
155 func putPrelim(ns string, knr goon.KindNameResolver, src interface{}) (*datastor e.Key, *propertyList, error) {
156 key := newKeyObj(ns, knr, src)
157 if !keyCouldBeValid(ns, key, userKeyOnly) {
158 // TODO(riannucci): different error for Put-ing to reserved Keys ?
159 return nil, nil, datastore.ErrInvalidKey
160 }
161
162 data, err := toPL(src)
163 return key, data, err
164 }
165
166 func (d *dataStoreData) put(ns string, src interface{}) (*datastore.Key, error) {
167 key, plData, err := putPrelim(ns, d.KindNameResolver(), src)
168 if err != nil {
169 return nil, err
170 }
171 if key, err = d.putInner(key, plData); err != nil {
172 return nil, err
173 }
174 return key, goon_internal.SetStructKey(src, key, d.KindNameResolver())
175 }
176
177 func (d *dataStoreData) putInner(key *datastore.Key, data *propertyList) (*datas tore.Key, error) {
178 dataBytes, err := data.MarshalBinary()
179 if err != nil {
180 return nil, err
181 }
182
183 d.rwlock.Lock()
184 defer d.rwlock.Unlock()
185
186 ents, key, err := d.entsKeyLocked(key)
187 if err != nil {
188 return nil, err
189 }
190 if _, err = incrementLocked(ents, groupMetaKey(key)); err != nil {
191 return nil, err
192 }
193
194 old := ents.Get(keyBytes(noNS, key))
195 oldPl := (*propertyList)(nil)
196 if old != nil {
197 oldPl = &propertyList{}
198 if err = oldPl.UnmarshalBinary(old); err != nil {
199 return nil, err
200 }
201 }
202 if err = updateIndicies(d.store, key, oldPl, data); err != nil {
203 return nil, err
204 }
205
206 ents.Set(keyBytes(noNS, key), dataBytes)
207
208 return key, nil
209 }
210
211 func getInner(ns string, knr goon.KindNameResolver, dst interface{}, getColl fun c(*datastore.Key) (*memCollection, error)) error {
212 key := newKeyObj(ns, knr, dst)
213 if !keyValid(ns, key, allowSpecialKeys) {
214 return datastore.ErrInvalidKey
215 }
216
217 ents, err := getColl(key)
218 if err != nil {
219 return err
220 }
221 if ents == nil {
222 return datastore.ErrNoSuchEntity
223 }
224 pdata := ents.Get(keyBytes(noNS, key))
225 if pdata == nil {
226 return datastore.ErrNoSuchEntity
227 }
228 pl := &propertyList{}
229 if err = pl.UnmarshalBinary(pdata); err != nil {
230 return err
231 }
232 return fromPL(pl, dst)
233 }
234
235 func (d *dataStoreData) get(ns string, dst interface{}) error {
236 return getInner(ns, d.KindNameResolver(), dst, func(*datastore.Key) (*me mCollection, error) {
237 d.rwlock.RLock()
238 s := d.store.Snapshot()
239 d.rwlock.RUnlock()
240
241 return s.GetCollection("ents:" + ns), nil
242 })
243 }
244
245 func (d *dataStoreData) del(ns string, key *datastore.Key) error {
246 if !keyValid(ns, key, userKeyOnly) {
247 return datastore.ErrInvalidKey
248 }
249
250 keyBuf := keyBytes(noNS, key)
251
252 d.rwlock.Lock()
253 defer d.rwlock.Unlock()
254
255 ents := d.store.GetCollection("ents:" + ns)
256 if ents == nil {
257 return nil
258 }
259 if _, err := incrementLocked(ents, groupMetaKey(key)); err != nil {
260 return err
261 }
262
263 old := ents.Get(keyBuf)
264 oldPl := (*propertyList)(nil)
265 if old != nil {
266 oldPl = &propertyList{}
267 if err := oldPl.UnmarshalBinary(old); err != nil {
268 return err
269 }
270 }
271 if err := updateIndicies(d.store, key, oldPl, nil); err != nil {
272 return err
273 }
274
275 ents.Delete(keyBuf)
276 return nil
277 }
278
279 func (d *dataStoreData) canApplyTxn(obj memContextObj) bool {
280 // TODO(riannucci): implement with Flush/FlushRevert for persistance.
281
282 txn := obj.(*txnDataStoreData)
283 for rk, muts := range txn.muts {
284 if len(muts) == 0 { // read-only
285 continue
286 }
287 k, err := keyFromByteString(withNS, rk, "")
288 if err != nil {
289 panic(err)
290 }
291 entKey := "ents:" + k.Namespace()
292 mkey := groupMetaKey(k)
293 entsHead := d.store.GetCollection(entKey)
294 entsSnap := txn.snap.GetCollection(entKey)
295 vHead, err := curVersion(entsHead, mkey)
296 if err != nil {
297 panic(err)
298 }
299 vSnap, err := curVersion(entsSnap, mkey)
300 if err != nil {
301 panic(err)
302 }
303 if vHead != vSnap {
304 return false
305 }
306 }
307 return true
308 }
309
310 func (d *dataStoreData) applyTxn(c context.Context, obj memContextObj) {
311 txn := obj.(*txnDataStoreData)
312 for _, muts := range txn.muts {
313 if len(muts) == 0 { // read-only
314 continue
315 }
316 for _, m := range muts {
317 err := error(nil)
318 if m.data == nil {
319 err = d.del(m.key.Namespace(), m.key)
320 } else {
321 _, err = d.putInner(m.key, m.data)
322 }
323 if err != nil {
324 panic(err)
325 }
326 }
327 }
328 }
329
330 func (d *dataStoreData) mkTxn(o *datastore.TransactionOptions) (memContextObj, e rror) {
331 return &txnDataStoreData{
332 // alias to the main datastore's so that testing code can have p rimitive
333 // access to break features inside of transactions.
334 BrokenFeatures: &d.BrokenFeatures,
335 parent: d,
336 knrKeeper: knrKeeper{knrFunc: d.knrFunc},
337 isXG: o != nil && o.XG,
338 snap: d.store.Snapshot(),
339 muts: map[string][]txnMutation{},
340 }, nil
341 }
342
343 func (d *dataStoreData) endTxn() {}
344
345 /////////////////////////////// txnDataStoreData ///////////////////////////////
346
347 type txnMutation struct {
348 key *datastore.Key
349 data *propertyList
350 }
351
352 type txnDataStoreData struct {
353 *wrapper.BrokenFeatures
354 knrKeeper
355 sync.Mutex
356
357 parent *dataStoreData
358
359 // boolean 0 or 1, use atomic.*Int32 to access.
360 closed int32
361 isXG bool
362
363 snap *memStore
364
365 // string is the raw-bytes encoding of the entity root incl. namespace
366 muts map[string][]txnMutation
367 // TODO(riannucci): account for 'transaction size' limit of 10MB by summ ing
368 // length of encoded keys + values.
369 }
370
371 var (
372 _ = memContextObj((*txnDataStoreData)(nil))
373 _ = sync.Locker((*txnDataStoreData)(nil))
374 _ = wrapper.Testable((*txnDataStoreData)(nil))
375 _ = wrapper.DSKindSetter((*txnDataStoreData)(nil))
376 )
377
378 const xgEGLimit = 25
379
380 func (*txnDataStoreData) canApplyTxn(memContextObj) bool { return false }
381 func (td *txnDataStoreData) endTxn() {
382 if atomic.LoadInt32(&td.closed) == 1 {
383 panic("cannot end transaction twice")
384 }
385 atomic.StoreInt32(&td.closed, 1)
386 }
387 func (*txnDataStoreData) applyTxn(context.Context, memContextObj) {
388 panic("txnDataStoreData cannot apply transactions")
389 }
390 func (*txnDataStoreData) mkTxn(*datastore.TransactionOptions) (memContextObj, er ror) {
391 return nil, errors.New("datastore: nested transactions are not supported ")
392 }
393
394 func (td *txnDataStoreData) IsBroken() error {
395 // Slightly different from the SDK... datastore and taskqueue each imple ment
396 // this here, where in the SDK only datastore.transaction.Call does.
397 if atomic.LoadInt32(&td.closed) == 1 {
398 return errors.New("datastore: transaction context has expired")
399 }
400 return td.BrokenFeatures.IsBroken()
401 }
402
403 // writeMutation ensures that this transaction can support the given key/value
404 // mutation.
405 //
406 // if getOnly is true, don't record the actual mutation data, just ensure that
407 // the key is in an included entity group (or add an empty entry for tha t
408 // group).
409 //
410 // if !getOnly && data == nil, this counts as a deletion instead of a Put.
411 //
412 // Returns an error if this key causes the transaction to cross too many entity
413 // groups.
414 func (td *txnDataStoreData) writeMutation(getOnly bool, key *datastore.Key, data *propertyList) error {
415 rk := string(keyBytes(withNS, rootKey(key)))
416
417 td.Lock()
418 defer td.Unlock()
419
420 if _, ok := td.muts[rk]; !ok {
421 limit := 1
422 if td.isXG {
423 limit = xgEGLimit
424 }
425 if len(td.muts)+1 > limit {
426 msg := "cross-group transaction need to be explicitly sp ecified (xg=True)"
427 if td.isXG {
428 msg = "operating on too many entity groups in a single transaction"
429 }
430 return newDSError(pb.Error_BAD_REQUEST, msg)
431 }
432 td.muts[rk] = []txnMutation{}
433 }
434 if !getOnly {
435 td.muts[rk] = append(td.muts[rk], txnMutation{key, data})
436 }
437
438 return nil
439 }
440
441 func (td *txnDataStoreData) put(ns string, src interface{}) (*datastore.Key, err or) {
442 key, plData, err := putPrelim(ns, td.KindNameResolver(), src)
443 if err != nil {
444 return nil, err
445 }
446
447 func() {
448 td.parent.Lock()
449 defer td.parent.Unlock()
450 _, key, err = td.parent.entsKeyLocked(key)
451 }()
452 if err != nil {
453 return nil, err
454 }
455
456 if err = td.writeMutation(false, key, plData); err != nil {
457 return nil, err
458 }
459
460 return key, goon_internal.SetStructKey(src, key, td.KindNameResolver())
461 }
462
463 func (td *txnDataStoreData) get(ns string, dst interface{}) error {
464 return getInner(ns, td.KindNameResolver(), dst, func(key *datastore.Key) (*memCollection, error) {
465 if err := td.writeMutation(true, key, nil); err != nil {
466 return nil, err
467 }
468 return td.snap.GetCollection("ents:" + ns), nil
469 })
470 }
471
472 func (td *txnDataStoreData) del(ns string, key *datastore.Key) error {
473 if !keyValid(ns, key, userKeyOnly) {
474 return datastore.ErrInvalidKey
475 }
476 return td.writeMutation(false, key, nil)
477 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698