OLD | NEW |
| (Empty) |
1 // Copyright 2015 The Chromium Authors. All rights reserved. | |
2 // Use of this source code is governed by a BSD-style license that can be | |
3 // found in the LICENSE file. | |
4 | |
5 package memory | |
6 | |
7 import ( | |
8 "errors" | |
9 "infra/gae/libs/wrapper" | |
10 goon_internal "infra/gae/libs/wrapper/memory/internal/goon" | |
11 "sync" | |
12 "sync/atomic" | |
13 | |
14 "github.com/mjibson/goon" | |
15 | |
16 "appengine/datastore" | |
17 pb "appengine_internal/datastore" | |
18 "golang.org/x/net/context" | |
19 ) | |
20 | |
21 ////////////////////////////////// knrKeeper /////////////////////////////////// | |
22 | |
23 type knrKeeper struct { | |
24 knrLock sync.Mutex | |
25 knrFunc goon.KindNameResolver | |
26 } | |
27 | |
28 var _ = wrapper.DSKindSetter((*knrKeeper)(nil)) | |
29 | |
30 func (k *knrKeeper) KindNameResolver() goon.KindNameResolver { | |
31 k.knrLock.Lock() | |
32 defer k.knrLock.Unlock() | |
33 if k.knrFunc == nil { | |
34 k.knrFunc = goon.DefaultKindName | |
35 } | |
36 return k.knrFunc | |
37 } | |
38 | |
39 func (k *knrKeeper) SetKindNameResolver(knr goon.KindNameResolver) { | |
40 k.knrLock.Lock() | |
41 defer k.knrLock.Unlock() | |
42 if knr == nil { | |
43 knr = goon.DefaultKindName | |
44 } | |
45 k.knrFunc = knr | |
46 } | |
47 | |
48 //////////////////////////////// dataStoreData ///////////////////////////////// | |
49 | |
50 type dataStoreData struct { | |
51 wrapper.BrokenFeatures | |
52 knrKeeper | |
53 | |
54 rwlock sync.RWMutex | |
55 // See README.md for store schema. | |
56 store *memStore | |
57 snap *memStore | |
58 } | |
59 | |
60 var ( | |
61 _ = memContextObj((*dataStoreData)(nil)) | |
62 _ = sync.Locker((*dataStoreData)(nil)) | |
63 _ = wrapper.Testable((*dataStoreData)(nil)) | |
64 _ = wrapper.DSKindSetter((*dataStoreData)(nil)) | |
65 ) | |
66 | |
67 func newDataStoreData() *dataStoreData { | |
68 store := newMemStore() | |
69 return &dataStoreData{ | |
70 BrokenFeatures: wrapper.BrokenFeatures{DefaultError: newDSError(
pb.Error_INTERNAL_ERROR)}, | |
71 store: store, | |
72 snap: store.Snapshot(), // empty but better than a nil
pointer. | |
73 } | |
74 } | |
75 | |
76 func (d *dataStoreData) Lock() { | |
77 d.rwlock.Lock() | |
78 } | |
79 | |
80 func (d *dataStoreData) Unlock() { | |
81 d.rwlock.Unlock() | |
82 } | |
83 | |
84 /////////////////////////// indicies(dataStoreData) //////////////////////////// | |
85 | |
86 func groupMetaKey(key *datastore.Key) []byte { | |
87 return keyBytes(noNS, newKey("", "__entity_group__", "", 1, rootKey(key)
)) | |
88 } | |
89 | |
90 func groupIDsKey(key *datastore.Key) []byte { | |
91 return keyBytes(noNS, newKey("", "__entity_group_ids__", "", 1, rootKey(
key))) | |
92 } | |
93 | |
94 func rootIDsKey(kind string) []byte { | |
95 return keyBytes(noNS, newKey("", "__entity_root_ids__", kind, 0, nil)) | |
96 } | |
97 | |
98 func curVersion(ents *memCollection, key []byte) (int64, error) { | |
99 if v := ents.Get(key); v != nil { | |
100 numData := &propertyList{} | |
101 if err := numData.UnmarshalBinary(v); err != nil { | |
102 return 0, err | |
103 } | |
104 return (*numData)[0].Value.(int64), nil | |
105 } | |
106 return 0, nil | |
107 } | |
108 | |
109 func incrementLocked(ents *memCollection, key []byte) (int64, error) { | |
110 num := int64(0) | |
111 numData := &propertyList{} | |
112 if v := ents.Get(key); v != nil { | |
113 if err := numData.UnmarshalBinary(v); err != nil { | |
114 return 0, err | |
115 } | |
116 num = (*numData)[0].Value.(int64) | |
117 } else { | |
118 *numData = append(*numData, datastore.Property{Name: "__version_
_"}) | |
119 } | |
120 num++ | |
121 (*numData)[0].Value = num | |
122 incData, err := numData.MarshalBinary() | |
123 if err != nil { | |
124 return 0, err | |
125 } | |
126 ents.Set(key, incData) | |
127 | |
128 return num, nil | |
129 } | |
130 | |
131 func (d *dataStoreData) entsKeyLocked(key *datastore.Key) (*memCollection, *data
store.Key, error) { | |
132 coll := "ents:" + key.Namespace() | |
133 ents := d.store.GetCollection(coll) | |
134 if ents == nil { | |
135 ents = d.store.SetCollection(coll, nil) | |
136 } | |
137 | |
138 if key.Incomplete() { | |
139 idKey := []byte(nil) | |
140 if key.Parent() == nil { | |
141 idKey = rootIDsKey(key.Kind()) | |
142 } else { | |
143 idKey = groupIDsKey(key) | |
144 } | |
145 id, err := incrementLocked(ents, idKey) | |
146 if err != nil { | |
147 return nil, nil, err | |
148 } | |
149 key = newKey(key.Namespace(), key.Kind(), "", id, key.Parent()) | |
150 } | |
151 | |
152 return ents, key, nil | |
153 } | |
154 | |
155 func putPrelim(ns string, knr goon.KindNameResolver, src interface{}) (*datastor
e.Key, *propertyList, error) { | |
156 key := newKeyObj(ns, knr, src) | |
157 if !keyCouldBeValid(ns, key, userKeyOnly) { | |
158 // TODO(riannucci): different error for Put-ing to reserved Keys
? | |
159 return nil, nil, datastore.ErrInvalidKey | |
160 } | |
161 | |
162 data, err := toPL(src) | |
163 return key, data, err | |
164 } | |
165 | |
166 func (d *dataStoreData) put(ns string, src interface{}) (*datastore.Key, error)
{ | |
167 key, plData, err := putPrelim(ns, d.KindNameResolver(), src) | |
168 if err != nil { | |
169 return nil, err | |
170 } | |
171 if key, err = d.putInner(key, plData); err != nil { | |
172 return nil, err | |
173 } | |
174 return key, goon_internal.SetStructKey(src, key, d.KindNameResolver()) | |
175 } | |
176 | |
177 func (d *dataStoreData) putInner(key *datastore.Key, data *propertyList) (*datas
tore.Key, error) { | |
178 dataBytes, err := data.MarshalBinary() | |
179 if err != nil { | |
180 return nil, err | |
181 } | |
182 | |
183 d.rwlock.Lock() | |
184 defer d.rwlock.Unlock() | |
185 | |
186 ents, key, err := d.entsKeyLocked(key) | |
187 if err != nil { | |
188 return nil, err | |
189 } | |
190 if _, err = incrementLocked(ents, groupMetaKey(key)); err != nil { | |
191 return nil, err | |
192 } | |
193 | |
194 old := ents.Get(keyBytes(noNS, key)) | |
195 oldPl := (*propertyList)(nil) | |
196 if old != nil { | |
197 oldPl = &propertyList{} | |
198 if err = oldPl.UnmarshalBinary(old); err != nil { | |
199 return nil, err | |
200 } | |
201 } | |
202 if err = updateIndicies(d.store, key, oldPl, data); err != nil { | |
203 return nil, err | |
204 } | |
205 | |
206 ents.Set(keyBytes(noNS, key), dataBytes) | |
207 | |
208 return key, nil | |
209 } | |
210 | |
211 func getInner(ns string, knr goon.KindNameResolver, dst interface{}, getColl fun
c(*datastore.Key) (*memCollection, error)) error { | |
212 key := newKeyObj(ns, knr, dst) | |
213 if !keyValid(ns, key, allowSpecialKeys) { | |
214 return datastore.ErrInvalidKey | |
215 } | |
216 | |
217 ents, err := getColl(key) | |
218 if err != nil { | |
219 return err | |
220 } | |
221 if ents == nil { | |
222 return datastore.ErrNoSuchEntity | |
223 } | |
224 pdata := ents.Get(keyBytes(noNS, key)) | |
225 if pdata == nil { | |
226 return datastore.ErrNoSuchEntity | |
227 } | |
228 pl := &propertyList{} | |
229 if err = pl.UnmarshalBinary(pdata); err != nil { | |
230 return err | |
231 } | |
232 return fromPL(pl, dst) | |
233 } | |
234 | |
235 func (d *dataStoreData) get(ns string, dst interface{}) error { | |
236 return getInner(ns, d.KindNameResolver(), dst, func(*datastore.Key) (*me
mCollection, error) { | |
237 d.rwlock.RLock() | |
238 s := d.store.Snapshot() | |
239 d.rwlock.RUnlock() | |
240 | |
241 return s.GetCollection("ents:" + ns), nil | |
242 }) | |
243 } | |
244 | |
245 func (d *dataStoreData) del(ns string, key *datastore.Key) error { | |
246 if !keyValid(ns, key, userKeyOnly) { | |
247 return datastore.ErrInvalidKey | |
248 } | |
249 | |
250 keyBuf := keyBytes(noNS, key) | |
251 | |
252 d.rwlock.Lock() | |
253 defer d.rwlock.Unlock() | |
254 | |
255 ents := d.store.GetCollection("ents:" + ns) | |
256 if ents == nil { | |
257 return nil | |
258 } | |
259 if _, err := incrementLocked(ents, groupMetaKey(key)); err != nil { | |
260 return err | |
261 } | |
262 | |
263 old := ents.Get(keyBuf) | |
264 oldPl := (*propertyList)(nil) | |
265 if old != nil { | |
266 oldPl = &propertyList{} | |
267 if err := oldPl.UnmarshalBinary(old); err != nil { | |
268 return err | |
269 } | |
270 } | |
271 if err := updateIndicies(d.store, key, oldPl, nil); err != nil { | |
272 return err | |
273 } | |
274 | |
275 ents.Delete(keyBuf) | |
276 return nil | |
277 } | |
278 | |
279 func (d *dataStoreData) canApplyTxn(obj memContextObj) bool { | |
280 // TODO(riannucci): implement with Flush/FlushRevert for persistance. | |
281 | |
282 txn := obj.(*txnDataStoreData) | |
283 for rk, muts := range txn.muts { | |
284 if len(muts) == 0 { // read-only | |
285 continue | |
286 } | |
287 k, err := keyFromByteString(withNS, rk, "") | |
288 if err != nil { | |
289 panic(err) | |
290 } | |
291 entKey := "ents:" + k.Namespace() | |
292 mkey := groupMetaKey(k) | |
293 entsHead := d.store.GetCollection(entKey) | |
294 entsSnap := txn.snap.GetCollection(entKey) | |
295 vHead, err := curVersion(entsHead, mkey) | |
296 if err != nil { | |
297 panic(err) | |
298 } | |
299 vSnap, err := curVersion(entsSnap, mkey) | |
300 if err != nil { | |
301 panic(err) | |
302 } | |
303 if vHead != vSnap { | |
304 return false | |
305 } | |
306 } | |
307 return true | |
308 } | |
309 | |
310 func (d *dataStoreData) applyTxn(c context.Context, obj memContextObj) { | |
311 txn := obj.(*txnDataStoreData) | |
312 for _, muts := range txn.muts { | |
313 if len(muts) == 0 { // read-only | |
314 continue | |
315 } | |
316 for _, m := range muts { | |
317 err := error(nil) | |
318 if m.data == nil { | |
319 err = d.del(m.key.Namespace(), m.key) | |
320 } else { | |
321 _, err = d.putInner(m.key, m.data) | |
322 } | |
323 if err != nil { | |
324 panic(err) | |
325 } | |
326 } | |
327 } | |
328 } | |
329 | |
330 func (d *dataStoreData) mkTxn(o *datastore.TransactionOptions) (memContextObj, e
rror) { | |
331 return &txnDataStoreData{ | |
332 // alias to the main datastore's so that testing code can have p
rimitive | |
333 // access to break features inside of transactions. | |
334 BrokenFeatures: &d.BrokenFeatures, | |
335 parent: d, | |
336 knrKeeper: knrKeeper{knrFunc: d.knrFunc}, | |
337 isXG: o != nil && o.XG, | |
338 snap: d.store.Snapshot(), | |
339 muts: map[string][]txnMutation{}, | |
340 }, nil | |
341 } | |
342 | |
343 func (d *dataStoreData) endTxn() {} | |
344 | |
345 /////////////////////////////// txnDataStoreData /////////////////////////////// | |
346 | |
347 type txnMutation struct { | |
348 key *datastore.Key | |
349 data *propertyList | |
350 } | |
351 | |
352 type txnDataStoreData struct { | |
353 *wrapper.BrokenFeatures | |
354 knrKeeper | |
355 sync.Mutex | |
356 | |
357 parent *dataStoreData | |
358 | |
359 // boolean 0 or 1, use atomic.*Int32 to access. | |
360 closed int32 | |
361 isXG bool | |
362 | |
363 snap *memStore | |
364 | |
365 // string is the raw-bytes encoding of the entity root incl. namespace | |
366 muts map[string][]txnMutation | |
367 // TODO(riannucci): account for 'transaction size' limit of 10MB by summ
ing | |
368 // length of encoded keys + values. | |
369 } | |
370 | |
371 var ( | |
372 _ = memContextObj((*txnDataStoreData)(nil)) | |
373 _ = sync.Locker((*txnDataStoreData)(nil)) | |
374 _ = wrapper.Testable((*txnDataStoreData)(nil)) | |
375 _ = wrapper.DSKindSetter((*txnDataStoreData)(nil)) | |
376 ) | |
377 | |
378 const xgEGLimit = 25 | |
379 | |
380 func (*txnDataStoreData) canApplyTxn(memContextObj) bool { return false } | |
381 func (td *txnDataStoreData) endTxn() { | |
382 if atomic.LoadInt32(&td.closed) == 1 { | |
383 panic("cannot end transaction twice") | |
384 } | |
385 atomic.StoreInt32(&td.closed, 1) | |
386 } | |
387 func (*txnDataStoreData) applyTxn(context.Context, memContextObj) { | |
388 panic("txnDataStoreData cannot apply transactions") | |
389 } | |
390 func (*txnDataStoreData) mkTxn(*datastore.TransactionOptions) (memContextObj, er
ror) { | |
391 return nil, errors.New("datastore: nested transactions are not supported
") | |
392 } | |
393 | |
394 func (td *txnDataStoreData) IsBroken() error { | |
395 // Slightly different from the SDK... datastore and taskqueue each imple
ment | |
396 // this here, where in the SDK only datastore.transaction.Call does. | |
397 if atomic.LoadInt32(&td.closed) == 1 { | |
398 return errors.New("datastore: transaction context has expired") | |
399 } | |
400 return td.BrokenFeatures.IsBroken() | |
401 } | |
402 | |
403 // writeMutation ensures that this transaction can support the given key/value | |
404 // mutation. | |
405 // | |
406 // if getOnly is true, don't record the actual mutation data, just ensure that | |
407 // the key is in an included entity group (or add an empty entry for tha
t | |
408 // group). | |
409 // | |
410 // if !getOnly && data == nil, this counts as a deletion instead of a Put. | |
411 // | |
412 // Returns an error if this key causes the transaction to cross too many entity | |
413 // groups. | |
414 func (td *txnDataStoreData) writeMutation(getOnly bool, key *datastore.Key, data
*propertyList) error { | |
415 rk := string(keyBytes(withNS, rootKey(key))) | |
416 | |
417 td.Lock() | |
418 defer td.Unlock() | |
419 | |
420 if _, ok := td.muts[rk]; !ok { | |
421 limit := 1 | |
422 if td.isXG { | |
423 limit = xgEGLimit | |
424 } | |
425 if len(td.muts)+1 > limit { | |
426 msg := "cross-group transaction need to be explicitly sp
ecified (xg=True)" | |
427 if td.isXG { | |
428 msg = "operating on too many entity groups in a
single transaction" | |
429 } | |
430 return newDSError(pb.Error_BAD_REQUEST, msg) | |
431 } | |
432 td.muts[rk] = []txnMutation{} | |
433 } | |
434 if !getOnly { | |
435 td.muts[rk] = append(td.muts[rk], txnMutation{key, data}) | |
436 } | |
437 | |
438 return nil | |
439 } | |
440 | |
441 func (td *txnDataStoreData) put(ns string, src interface{}) (*datastore.Key, err
or) { | |
442 key, plData, err := putPrelim(ns, td.KindNameResolver(), src) | |
443 if err != nil { | |
444 return nil, err | |
445 } | |
446 | |
447 func() { | |
448 td.parent.Lock() | |
449 defer td.parent.Unlock() | |
450 _, key, err = td.parent.entsKeyLocked(key) | |
451 }() | |
452 if err != nil { | |
453 return nil, err | |
454 } | |
455 | |
456 if err = td.writeMutation(false, key, plData); err != nil { | |
457 return nil, err | |
458 } | |
459 | |
460 return key, goon_internal.SetStructKey(src, key, td.KindNameResolver()) | |
461 } | |
462 | |
463 func (td *txnDataStoreData) get(ns string, dst interface{}) error { | |
464 return getInner(ns, td.KindNameResolver(), dst, func(key *datastore.Key)
(*memCollection, error) { | |
465 if err := td.writeMutation(true, key, nil); err != nil { | |
466 return nil, err | |
467 } | |
468 return td.snap.GetCollection("ents:" + ns), nil | |
469 }) | |
470 } | |
471 | |
472 func (td *txnDataStoreData) del(ns string, key *datastore.Key) error { | |
473 if !keyValid(ns, key, userKeyOnly) { | |
474 return datastore.ErrInvalidKey | |
475 } | |
476 return td.writeMutation(false, key, nil) | |
477 } | |
OLD | NEW |