Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(8)

Side by Side Diff: impl/memory/raw_datastore_data.go

Issue 1281173002: Refactor: Rename some files. (Closed) Base URL: https://github.com/luci/gae.git@tweak_testable
Patch Set: Created 5 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « impl/memory/raw_datastore.go ('k') | impl/memory/raw_datastore_query.go » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 package memory
6
7 import (
8 "bytes"
9 "fmt"
10 "sync"
11 "sync/atomic"
12
13 ds "github.com/luci/gae/service/datastore"
14 "github.com/luci/luci-go/common/errors"
15 "golang.org/x/net/context"
16 )
17
18 //////////////////////////////// dataStoreData /////////////////////////////////
19
20 type dataStoreData struct {
21 rwlock sync.RWMutex
22 // See README.md for store schema.
23 store *memStore
24 snap *memStore
25 }
26
27 var (
28 _ = memContextObj((*dataStoreData)(nil))
29 _ = sync.Locker((*dataStoreData)(nil))
30 )
31
32 func newDataStoreData() *dataStoreData {
33 store := newMemStore()
34 return &dataStoreData{
35 store: store,
36 snap: store.Snapshot(), // empty but better than a nil pointer.
37 }
38 }
39
40 func (d *dataStoreData) Lock() {
41 d.rwlock.Lock()
42 }
43
44 func (d *dataStoreData) Unlock() {
45 d.rwlock.Unlock()
46 }
47
48 /////////////////////////// indicies(dataStoreData) ////////////////////////////
49
50 func groupMetaKey(key ds.Key) []byte {
51 return keyBytes(ds.WithoutContext,
52 ds.NewKey("", "", "__entity_group__", "", 1, ds.KeyRoot(key)))
53 }
54
55 func groupIDsKey(key ds.Key) []byte {
56 return keyBytes(ds.WithoutContext,
57 ds.NewKey("", "", "__entity_group_ids__", "", 1, ds.KeyRoot(key) ))
58 }
59
60 func rootIDsKey(kind string) []byte {
61 return keyBytes(ds.WithoutContext,
62 ds.NewKey("", "", "__entity_root_ids__", kind, 0, nil))
63 }
64
65 func curVersion(ents *memCollection, key []byte) int64 {
66 if ents != nil {
67 if v := ents.Get(key); v != nil {
68 pm, err := rpm(v)
69 if err != nil {
70 panic(err) // memory corruption
71 }
72 pl, ok := pm["__version__"]
73 if ok && len(pl) > 0 && pl[0].Type() == ds.PTInt {
74 return pl[0].Value().(int64)
75 }
76 panic(fmt.Errorf("__version__ property missing or wrong: %v", pm))
77 }
78 }
79 return 0
80 }
81
82 func incrementLocked(ents *memCollection, key []byte) int64 {
83 ret := curVersion(ents, key) + 1
84 buf := &bytes.Buffer{}
85 ds.PropertyMap{"__version__": {ds.MkPropertyNI(ret)}}.Write(
86 buf, ds.WithContext)
87 ents.Set(key, buf.Bytes())
88 return ret
89 }
90
91 func (d *dataStoreData) entsKeyLocked(key ds.Key) (*memCollection, ds.Key) {
92 coll := "ents:" + key.Namespace()
93 ents := d.store.GetCollection(coll)
94 if ents == nil {
95 ents = d.store.SetCollection(coll, nil)
96 }
97
98 if ds.KeyIncomplete(key) {
99 idKey := []byte(nil)
100 if key.Parent() == nil {
101 idKey = rootIDsKey(key.Kind())
102 } else {
103 idKey = groupIDsKey(key)
104 }
105 id := incrementLocked(ents, idKey)
106 key = ds.NewKey(key.AppID(), key.Namespace(), key.Kind(), "", id , key.Parent())
107 }
108
109 return ents, key
110 }
111
112 func (d *dataStoreData) putMulti(keys []ds.Key, vals []ds.PropertyMap, cb ds.Put MultiCB) {
113 for i, k := range keys {
114 buf := &bytes.Buffer{}
115 pmap, _ := vals[i].Save(false)
116 pmap.Write(buf, ds.WithoutContext)
117 dataBytes := buf.Bytes()
118
119 k, err := func() (ret ds.Key, err error) {
120 d.rwlock.Lock()
121 defer d.rwlock.Unlock()
122
123 ents, ret := d.entsKeyLocked(k)
124 incrementLocked(ents, groupMetaKey(ret))
125
126 old := ents.Get(keyBytes(ds.WithoutContext, ret))
127 oldPM := ds.PropertyMap(nil)
128 if old != nil {
129 if oldPM, err = rpmWoCtx(old, ret.Namespace()); err != nil {
130 return
131 }
132 }
133 updateIndicies(d.store, ret, oldPM, pmap)
134 ents.Set(keyBytes(ds.WithoutContext, ret), dataBytes)
135 return
136 }()
137 if cb != nil {
138 cb(k, err)
139 }
140 }
141 }
142
143 func getMultiInner(keys []ds.Key, cb ds.GetMultiCB, getColl func() (*memCollecti on, error)) error {
144 ents, err := getColl()
145 if err != nil {
146 return err
147 }
148 if ents == nil {
149 for range keys {
150 cb(nil, ds.ErrNoSuchEntity)
151 }
152 return nil
153 }
154
155 for _, k := range keys {
156 pdata := ents.Get(keyBytes(ds.WithoutContext, k))
157 if pdata == nil {
158 cb(nil, ds.ErrNoSuchEntity)
159 continue
160 }
161 cb(rpmWoCtx(pdata, k.Namespace()))
162 }
163 return nil
164 }
165
166 func (d *dataStoreData) getMulti(keys []ds.Key, cb ds.GetMultiCB) error {
167 getMultiInner(keys, cb, func() (*memCollection, error) {
168 d.rwlock.RLock()
169 s := d.store.Snapshot()
170 d.rwlock.RUnlock()
171
172 return s.GetCollection("ents:" + keys[0].Namespace()), nil
173 })
174 return nil
175 }
176
177 func (d *dataStoreData) delMulti(keys []ds.Key, cb ds.DeleteMultiCB) {
178 toDel := make([][]byte, 0, len(keys))
179 for _, k := range keys {
180 toDel = append(toDel, keyBytes(ds.WithoutContext, k))
181 }
182 ns := keys[0].Namespace()
183
184 d.rwlock.Lock()
185 defer d.rwlock.Unlock()
186
187 ents := d.store.GetCollection("ents:" + ns)
188
189 for i, k := range keys {
190 if ents != nil {
191 incrementLocked(ents, groupMetaKey(k))
192 kb := toDel[i]
193 if old := ents.Get(kb); old != nil {
194 oldPM, err := rpmWoCtx(old, ns)
195 if err != nil {
196 if cb != nil {
197 cb(err)
198 }
199 continue
200 }
201 updateIndicies(d.store, k, oldPM, nil)
202 ents.Delete(kb)
203 }
204 }
205 if cb != nil {
206 cb(nil)
207 }
208 }
209 }
210
211 func (d *dataStoreData) canApplyTxn(obj memContextObj) bool {
212 // TODO(riannucci): implement with Flush/FlushRevert for persistance.
213
214 txn := obj.(*txnDataStoreData)
215 for rk, muts := range txn.muts {
216 if len(muts) == 0 { // read-only
217 continue
218 }
219 k, err := ds.ReadKey(bytes.NewBufferString(rk), ds.WithContext, "", "")
220 if err != nil {
221 panic(err)
222 }
223
224 entKey := "ents:" + k.Namespace()
225 mkey := groupMetaKey(k)
226 entsHead := d.store.GetCollection(entKey)
227 entsSnap := txn.snap.GetCollection(entKey)
228 vHead := curVersion(entsHead, mkey)
229 vSnap := curVersion(entsSnap, mkey)
230 if vHead != vSnap {
231 return false
232 }
233 }
234 return true
235 }
236
237 func (d *dataStoreData) applyTxn(c context.Context, obj memContextObj) {
238 txn := obj.(*txnDataStoreData)
239 for _, muts := range txn.muts {
240 if len(muts) == 0 { // read-only
241 continue
242 }
243 // TODO(riannucci): refactor to do just 1 putMulti, and 1 delMul ti
244 for _, m := range muts {
245 err := error(nil)
246 k := m.key
247 if m.data == nil {
248 d.delMulti([]ds.Key{k},
249 func(e error) { err = e })
250 } else {
251 d.putMulti([]ds.Key{m.key}, []ds.PropertyMap{m.d ata},
252 func(_ ds.Key, e error) { err = e })
253 }
254 err = errors.SingleError(err)
255 if err != nil {
256 panic(err)
257 }
258 }
259 }
260 }
261
262 func (d *dataStoreData) mkTxn(o *ds.TransactionOptions) memContextObj {
263 return &txnDataStoreData{
264 // alias to the main datastore's so that testing code can have p rimitive
265 // access to break features inside of transactions.
266 parent: d,
267 isXG: o != nil && o.XG,
268 snap: d.store.Snapshot(),
269 muts: map[string][]txnMutation{},
270 }
271 }
272
273 func (d *dataStoreData) endTxn() {}
274
275 /////////////////////////////// txnDataStoreData ///////////////////////////////
276
277 type txnMutation struct {
278 key ds.Key
279 data ds.PropertyMap
280 }
281
282 type txnDataStoreData struct {
283 sync.Mutex
284
285 parent *dataStoreData
286
287 // boolean 0 or 1, use atomic.*Int32 to access.
288 closed int32
289 isXG bool
290
291 snap *memStore
292
293 // string is the raw-bytes encoding of the entity root incl. namespace
294 muts map[string][]txnMutation
295 // TODO(riannucci): account for 'transaction size' limit of 10MB by summ ing
296 // length of encoded keys + values.
297 }
298
299 var _ memContextObj = (*txnDataStoreData)(nil)
300
301 const xgEGLimit = 25
302
303 func (*txnDataStoreData) canApplyTxn(memContextObj) bool { return false }
304 func (td *txnDataStoreData) endTxn() {
305 if atomic.LoadInt32(&td.closed) == 1 {
306 panic("cannot end transaction twice")
307 }
308 atomic.StoreInt32(&td.closed, 1)
309 }
310 func (*txnDataStoreData) applyTxn(context.Context, memContextObj) {
311 panic("txnDataStoreData cannot apply transactions")
312 }
313 func (*txnDataStoreData) mkTxn(*ds.TransactionOptions) memContextObj {
314 panic("impossible")
315 }
316
317 func (td *txnDataStoreData) run(f func() error) error {
318 // Slightly different from the SDK... datastore and taskqueue each imple ment
319 // this here, where in the SDK only datastore.transaction.Call does.
320 if atomic.LoadInt32(&td.closed) == 1 {
321 return errors.New("datastore: transaction context has expired")
322 }
323 return f()
324 }
325
326 // writeMutation ensures that this transaction can support the given key/value
327 // mutation.
328 //
329 // if getOnly is true, don't record the actual mutation data, just ensure that
330 // the key is in an included entity group (or add an empty entry for tha t
331 // group).
332 //
333 // if !getOnly && data == nil, this counts as a deletion instead of a Put.
334 //
335 // Returns an error if this key causes the transaction to cross too many entity
336 // groups.
337 func (td *txnDataStoreData) writeMutation(getOnly bool, key ds.Key, data ds.Prop ertyMap) error {
338 rk := string(keyBytes(ds.WithContext, ds.KeyRoot(key)))
339
340 td.Lock()
341 defer td.Unlock()
342
343 if _, ok := td.muts[rk]; !ok {
344 limit := 1
345 if td.isXG {
346 limit = xgEGLimit
347 }
348 if len(td.muts)+1 > limit {
349 msg := "cross-group transaction need to be explicitly sp ecified (xg=True)"
350 if td.isXG {
351 msg = "operating on too many entity groups in a single transaction"
352 }
353 return errors.New(msg)
354 }
355 td.muts[rk] = []txnMutation{}
356 }
357 if !getOnly {
358 td.muts[rk] = append(td.muts[rk], txnMutation{key, data})
359 }
360
361 return nil
362 }
363
364 func (td *txnDataStoreData) putMulti(keys []ds.Key, vals []ds.PropertyMap, cb ds .PutMultiCB) {
365 for i, k := range keys {
366 func() {
367 td.parent.Lock()
368 defer td.parent.Unlock()
369 _, k = td.parent.entsKeyLocked(k)
370 }()
371 err := td.writeMutation(false, k, vals[i])
372 if cb != nil {
373 cb(k, err)
374 }
375 }
376 }
377
378 func (td *txnDataStoreData) getMulti(keys []ds.Key, cb ds.GetMultiCB) error {
379 return getMultiInner(keys, cb, func() (*memCollection, error) {
380 err := error(nil)
381 for _, key := range keys {
382 err = td.writeMutation(true, key, nil)
383 if err != nil {
384 return nil, err
385 }
386 }
387 return td.snap.GetCollection("ents:" + keys[0].Namespace()), nil
388 })
389 }
390
391 func (td *txnDataStoreData) delMulti(keys []ds.Key, cb ds.DeleteMultiCB) error {
392 for _, k := range keys {
393 err := td.writeMutation(false, k, nil)
394 if cb != nil {
395 cb(err)
396 }
397 }
398 return nil
399 }
400
401 func keyBytes(ctx ds.KeyContext, key ds.Key) []byte {
402 buf := &bytes.Buffer{}
403 ds.WriteKey(buf, ctx, key)
404 return buf.Bytes()
405 }
406
407 func rpmWoCtx(data []byte, ns string) (ds.PropertyMap, error) {
408 ret := ds.PropertyMap{}
409 err := ret.Read(bytes.NewBuffer(data), ds.WithoutContext, globalAppID, n s)
410 return ret, err
411 }
412
413 func rpm(data []byte) (ds.PropertyMap, error) {
414 ret := ds.PropertyMap{}
415 err := ret.Read(bytes.NewBuffer(data), ds.WithContext, "", "")
416 return ret, err
417 }
418
419 type keyitem interface {
420 Key() ds.Key
421 }
OLDNEW
« no previous file with comments | « impl/memory/raw_datastore.go ('k') | impl/memory/raw_datastore_query.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698