OLD | NEW |
| (Empty) |
1 // Copyright 2015 The Chromium Authors. All rights reserved. | |
2 // Use of this source code is governed by a BSD-style license that can be | |
3 // found in the LICENSE file. | |
4 | |
5 package memory | |
6 | |
7 import ( | |
8 "bytes" | |
9 "fmt" | |
10 "sync" | |
11 "sync/atomic" | |
12 | |
13 ds "github.com/luci/gae/service/datastore" | |
14 "github.com/luci/luci-go/common/errors" | |
15 "golang.org/x/net/context" | |
16 ) | |
17 | |
18 //////////////////////////////// dataStoreData ///////////////////////////////// | |
19 | |
20 type dataStoreData struct { | |
21 rwlock sync.RWMutex | |
22 // See README.md for store schema. | |
23 store *memStore | |
24 snap *memStore | |
25 } | |
26 | |
27 var ( | |
28 _ = memContextObj((*dataStoreData)(nil)) | |
29 _ = sync.Locker((*dataStoreData)(nil)) | |
30 ) | |
31 | |
32 func newDataStoreData() *dataStoreData { | |
33 store := newMemStore() | |
34 return &dataStoreData{ | |
35 store: store, | |
36 snap: store.Snapshot(), // empty but better than a nil pointer. | |
37 } | |
38 } | |
39 | |
40 func (d *dataStoreData) Lock() { | |
41 d.rwlock.Lock() | |
42 } | |
43 | |
44 func (d *dataStoreData) Unlock() { | |
45 d.rwlock.Unlock() | |
46 } | |
47 | |
48 /////////////////////////// indicies(dataStoreData) //////////////////////////// | |
49 | |
50 func groupMetaKey(key ds.Key) []byte { | |
51 return keyBytes(ds.WithoutContext, | |
52 ds.NewKey("", "", "__entity_group__", "", 1, ds.KeyRoot(key))) | |
53 } | |
54 | |
55 func groupIDsKey(key ds.Key) []byte { | |
56 return keyBytes(ds.WithoutContext, | |
57 ds.NewKey("", "", "__entity_group_ids__", "", 1, ds.KeyRoot(key)
)) | |
58 } | |
59 | |
60 func rootIDsKey(kind string) []byte { | |
61 return keyBytes(ds.WithoutContext, | |
62 ds.NewKey("", "", "__entity_root_ids__", kind, 0, nil)) | |
63 } | |
64 | |
65 func curVersion(ents *memCollection, key []byte) int64 { | |
66 if ents != nil { | |
67 if v := ents.Get(key); v != nil { | |
68 pm, err := rpm(v) | |
69 if err != nil { | |
70 panic(err) // memory corruption | |
71 } | |
72 pl, ok := pm["__version__"] | |
73 if ok && len(pl) > 0 && pl[0].Type() == ds.PTInt { | |
74 return pl[0].Value().(int64) | |
75 } | |
76 panic(fmt.Errorf("__version__ property missing or wrong:
%v", pm)) | |
77 } | |
78 } | |
79 return 0 | |
80 } | |
81 | |
82 func incrementLocked(ents *memCollection, key []byte) int64 { | |
83 ret := curVersion(ents, key) + 1 | |
84 buf := &bytes.Buffer{} | |
85 ds.PropertyMap{"__version__": {ds.MkPropertyNI(ret)}}.Write( | |
86 buf, ds.WithContext) | |
87 ents.Set(key, buf.Bytes()) | |
88 return ret | |
89 } | |
90 | |
91 func (d *dataStoreData) entsKeyLocked(key ds.Key) (*memCollection, ds.Key) { | |
92 coll := "ents:" + key.Namespace() | |
93 ents := d.store.GetCollection(coll) | |
94 if ents == nil { | |
95 ents = d.store.SetCollection(coll, nil) | |
96 } | |
97 | |
98 if ds.KeyIncomplete(key) { | |
99 idKey := []byte(nil) | |
100 if key.Parent() == nil { | |
101 idKey = rootIDsKey(key.Kind()) | |
102 } else { | |
103 idKey = groupIDsKey(key) | |
104 } | |
105 id := incrementLocked(ents, idKey) | |
106 key = ds.NewKey(key.AppID(), key.Namespace(), key.Kind(), "", id
, key.Parent()) | |
107 } | |
108 | |
109 return ents, key | |
110 } | |
111 | |
112 func (d *dataStoreData) putMulti(keys []ds.Key, vals []ds.PropertyMap, cb ds.Put
MultiCB) { | |
113 for i, k := range keys { | |
114 buf := &bytes.Buffer{} | |
115 pmap, _ := vals[i].Save(false) | |
116 pmap.Write(buf, ds.WithoutContext) | |
117 dataBytes := buf.Bytes() | |
118 | |
119 k, err := func() (ret ds.Key, err error) { | |
120 d.rwlock.Lock() | |
121 defer d.rwlock.Unlock() | |
122 | |
123 ents, ret := d.entsKeyLocked(k) | |
124 incrementLocked(ents, groupMetaKey(ret)) | |
125 | |
126 old := ents.Get(keyBytes(ds.WithoutContext, ret)) | |
127 oldPM := ds.PropertyMap(nil) | |
128 if old != nil { | |
129 if oldPM, err = rpmWoCtx(old, ret.Namespace());
err != nil { | |
130 return | |
131 } | |
132 } | |
133 updateIndicies(d.store, ret, oldPM, pmap) | |
134 ents.Set(keyBytes(ds.WithoutContext, ret), dataBytes) | |
135 return | |
136 }() | |
137 if cb != nil { | |
138 cb(k, err) | |
139 } | |
140 } | |
141 } | |
142 | |
143 func getMultiInner(keys []ds.Key, cb ds.GetMultiCB, getColl func() (*memCollecti
on, error)) error { | |
144 ents, err := getColl() | |
145 if err != nil { | |
146 return err | |
147 } | |
148 if ents == nil { | |
149 for range keys { | |
150 cb(nil, ds.ErrNoSuchEntity) | |
151 } | |
152 return nil | |
153 } | |
154 | |
155 for _, k := range keys { | |
156 pdata := ents.Get(keyBytes(ds.WithoutContext, k)) | |
157 if pdata == nil { | |
158 cb(nil, ds.ErrNoSuchEntity) | |
159 continue | |
160 } | |
161 cb(rpmWoCtx(pdata, k.Namespace())) | |
162 } | |
163 return nil | |
164 } | |
165 | |
166 func (d *dataStoreData) getMulti(keys []ds.Key, cb ds.GetMultiCB) error { | |
167 getMultiInner(keys, cb, func() (*memCollection, error) { | |
168 d.rwlock.RLock() | |
169 s := d.store.Snapshot() | |
170 d.rwlock.RUnlock() | |
171 | |
172 return s.GetCollection("ents:" + keys[0].Namespace()), nil | |
173 }) | |
174 return nil | |
175 } | |
176 | |
177 func (d *dataStoreData) delMulti(keys []ds.Key, cb ds.DeleteMultiCB) { | |
178 toDel := make([][]byte, 0, len(keys)) | |
179 for _, k := range keys { | |
180 toDel = append(toDel, keyBytes(ds.WithoutContext, k)) | |
181 } | |
182 ns := keys[0].Namespace() | |
183 | |
184 d.rwlock.Lock() | |
185 defer d.rwlock.Unlock() | |
186 | |
187 ents := d.store.GetCollection("ents:" + ns) | |
188 | |
189 for i, k := range keys { | |
190 if ents != nil { | |
191 incrementLocked(ents, groupMetaKey(k)) | |
192 kb := toDel[i] | |
193 if old := ents.Get(kb); old != nil { | |
194 oldPM, err := rpmWoCtx(old, ns) | |
195 if err != nil { | |
196 if cb != nil { | |
197 cb(err) | |
198 } | |
199 continue | |
200 } | |
201 updateIndicies(d.store, k, oldPM, nil) | |
202 ents.Delete(kb) | |
203 } | |
204 } | |
205 if cb != nil { | |
206 cb(nil) | |
207 } | |
208 } | |
209 } | |
210 | |
211 func (d *dataStoreData) canApplyTxn(obj memContextObj) bool { | |
212 // TODO(riannucci): implement with Flush/FlushRevert for persistance. | |
213 | |
214 txn := obj.(*txnDataStoreData) | |
215 for rk, muts := range txn.muts { | |
216 if len(muts) == 0 { // read-only | |
217 continue | |
218 } | |
219 k, err := ds.ReadKey(bytes.NewBufferString(rk), ds.WithContext,
"", "") | |
220 if err != nil { | |
221 panic(err) | |
222 } | |
223 | |
224 entKey := "ents:" + k.Namespace() | |
225 mkey := groupMetaKey(k) | |
226 entsHead := d.store.GetCollection(entKey) | |
227 entsSnap := txn.snap.GetCollection(entKey) | |
228 vHead := curVersion(entsHead, mkey) | |
229 vSnap := curVersion(entsSnap, mkey) | |
230 if vHead != vSnap { | |
231 return false | |
232 } | |
233 } | |
234 return true | |
235 } | |
236 | |
237 func (d *dataStoreData) applyTxn(c context.Context, obj memContextObj) { | |
238 txn := obj.(*txnDataStoreData) | |
239 for _, muts := range txn.muts { | |
240 if len(muts) == 0 { // read-only | |
241 continue | |
242 } | |
243 // TODO(riannucci): refactor to do just 1 putMulti, and 1 delMul
ti | |
244 for _, m := range muts { | |
245 err := error(nil) | |
246 k := m.key | |
247 if m.data == nil { | |
248 d.delMulti([]ds.Key{k}, | |
249 func(e error) { err = e }) | |
250 } else { | |
251 d.putMulti([]ds.Key{m.key}, []ds.PropertyMap{m.d
ata}, | |
252 func(_ ds.Key, e error) { err = e }) | |
253 } | |
254 err = errors.SingleError(err) | |
255 if err != nil { | |
256 panic(err) | |
257 } | |
258 } | |
259 } | |
260 } | |
261 | |
262 func (d *dataStoreData) mkTxn(o *ds.TransactionOptions) memContextObj { | |
263 return &txnDataStoreData{ | |
264 // alias to the main datastore's so that testing code can have p
rimitive | |
265 // access to break features inside of transactions. | |
266 parent: d, | |
267 isXG: o != nil && o.XG, | |
268 snap: d.store.Snapshot(), | |
269 muts: map[string][]txnMutation{}, | |
270 } | |
271 } | |
272 | |
273 func (d *dataStoreData) endTxn() {} | |
274 | |
275 /////////////////////////////// txnDataStoreData /////////////////////////////// | |
276 | |
277 type txnMutation struct { | |
278 key ds.Key | |
279 data ds.PropertyMap | |
280 } | |
281 | |
282 type txnDataStoreData struct { | |
283 sync.Mutex | |
284 | |
285 parent *dataStoreData | |
286 | |
287 // boolean 0 or 1, use atomic.*Int32 to access. | |
288 closed int32 | |
289 isXG bool | |
290 | |
291 snap *memStore | |
292 | |
293 // string is the raw-bytes encoding of the entity root incl. namespace | |
294 muts map[string][]txnMutation | |
295 // TODO(riannucci): account for 'transaction size' limit of 10MB by summ
ing | |
296 // length of encoded keys + values. | |
297 } | |
298 | |
299 var _ memContextObj = (*txnDataStoreData)(nil) | |
300 | |
301 const xgEGLimit = 25 | |
302 | |
303 func (*txnDataStoreData) canApplyTxn(memContextObj) bool { return false } | |
304 func (td *txnDataStoreData) endTxn() { | |
305 if atomic.LoadInt32(&td.closed) == 1 { | |
306 panic("cannot end transaction twice") | |
307 } | |
308 atomic.StoreInt32(&td.closed, 1) | |
309 } | |
310 func (*txnDataStoreData) applyTxn(context.Context, memContextObj) { | |
311 panic("txnDataStoreData cannot apply transactions") | |
312 } | |
313 func (*txnDataStoreData) mkTxn(*ds.TransactionOptions) memContextObj { | |
314 panic("impossible") | |
315 } | |
316 | |
317 func (td *txnDataStoreData) run(f func() error) error { | |
318 // Slightly different from the SDK... datastore and taskqueue each imple
ment | |
319 // this here, where in the SDK only datastore.transaction.Call does. | |
320 if atomic.LoadInt32(&td.closed) == 1 { | |
321 return errors.New("datastore: transaction context has expired") | |
322 } | |
323 return f() | |
324 } | |
325 | |
326 // writeMutation ensures that this transaction can support the given key/value | |
327 // mutation. | |
328 // | |
329 // if getOnly is true, don't record the actual mutation data, just ensure that | |
330 // the key is in an included entity group (or add an empty entry for tha
t | |
331 // group). | |
332 // | |
333 // if !getOnly && data == nil, this counts as a deletion instead of a Put. | |
334 // | |
335 // Returns an error if this key causes the transaction to cross too many entity | |
336 // groups. | |
337 func (td *txnDataStoreData) writeMutation(getOnly bool, key ds.Key, data ds.Prop
ertyMap) error { | |
338 rk := string(keyBytes(ds.WithContext, ds.KeyRoot(key))) | |
339 | |
340 td.Lock() | |
341 defer td.Unlock() | |
342 | |
343 if _, ok := td.muts[rk]; !ok { | |
344 limit := 1 | |
345 if td.isXG { | |
346 limit = xgEGLimit | |
347 } | |
348 if len(td.muts)+1 > limit { | |
349 msg := "cross-group transaction need to be explicitly sp
ecified (xg=True)" | |
350 if td.isXG { | |
351 msg = "operating on too many entity groups in a
single transaction" | |
352 } | |
353 return errors.New(msg) | |
354 } | |
355 td.muts[rk] = []txnMutation{} | |
356 } | |
357 if !getOnly { | |
358 td.muts[rk] = append(td.muts[rk], txnMutation{key, data}) | |
359 } | |
360 | |
361 return nil | |
362 } | |
363 | |
364 func (td *txnDataStoreData) putMulti(keys []ds.Key, vals []ds.PropertyMap, cb ds
.PutMultiCB) { | |
365 for i, k := range keys { | |
366 func() { | |
367 td.parent.Lock() | |
368 defer td.parent.Unlock() | |
369 _, k = td.parent.entsKeyLocked(k) | |
370 }() | |
371 err := td.writeMutation(false, k, vals[i]) | |
372 if cb != nil { | |
373 cb(k, err) | |
374 } | |
375 } | |
376 } | |
377 | |
378 func (td *txnDataStoreData) getMulti(keys []ds.Key, cb ds.GetMultiCB) error { | |
379 return getMultiInner(keys, cb, func() (*memCollection, error) { | |
380 err := error(nil) | |
381 for _, key := range keys { | |
382 err = td.writeMutation(true, key, nil) | |
383 if err != nil { | |
384 return nil, err | |
385 } | |
386 } | |
387 return td.snap.GetCollection("ents:" + keys[0].Namespace()), nil | |
388 }) | |
389 } | |
390 | |
391 func (td *txnDataStoreData) delMulti(keys []ds.Key, cb ds.DeleteMultiCB) error { | |
392 for _, k := range keys { | |
393 err := td.writeMutation(false, k, nil) | |
394 if cb != nil { | |
395 cb(err) | |
396 } | |
397 } | |
398 return nil | |
399 } | |
400 | |
401 func keyBytes(ctx ds.KeyContext, key ds.Key) []byte { | |
402 buf := &bytes.Buffer{} | |
403 ds.WriteKey(buf, ctx, key) | |
404 return buf.Bytes() | |
405 } | |
406 | |
407 func rpmWoCtx(data []byte, ns string) (ds.PropertyMap, error) { | |
408 ret := ds.PropertyMap{} | |
409 err := ret.Read(bytes.NewBuffer(data), ds.WithoutContext, globalAppID, n
s) | |
410 return ret, err | |
411 } | |
412 | |
413 func rpm(data []byte) (ds.PropertyMap, error) { | |
414 ret := ds.PropertyMap{} | |
415 err := ret.Read(bytes.NewBuffer(data), ds.WithContext, "", "") | |
416 return ret, err | |
417 } | |
418 | |
419 type keyitem interface { | |
420 Key() ds.Key | |
421 } | |
OLD | NEW |