OLD | NEW |
1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 package memory | 5 package memory |
6 | 6 |
7 import ( | 7 import ( |
8 "bytes" | |
9 "errors" | 8 "errors" |
10 » "fmt" | 9 » "infra/gae/libs/wrapper" |
11 » "golang.org/x/net/context" | 10 » goon_internal "infra/gae/libs/wrapper/memory/internal/goon" |
12 "sync" | 11 "sync" |
13 "sync/atomic" | 12 "sync/atomic" |
14 | 13 |
15 » "infra/gae/libs/gae" | 14 » "github.com/mjibson/goon" |
16 » "infra/gae/libs/gae/helper" | 15 |
| 16 » "appengine/datastore" |
| 17 » pb "appengine_internal/datastore" |
| 18 » "golang.org/x/net/context" |
17 ) | 19 ) |
18 | 20 |
| 21 ////////////////////////////////// knrKeeper /////////////////////////////////// |
| 22 |
| 23 type knrKeeper struct { |
| 24 knrLock sync.Mutex |
| 25 knrFunc goon.KindNameResolver |
| 26 } |
| 27 |
| 28 var _ = wrapper.DSKindSetter((*knrKeeper)(nil)) |
| 29 |
| 30 func (k *knrKeeper) KindNameResolver() goon.KindNameResolver { |
| 31 k.knrLock.Lock() |
| 32 defer k.knrLock.Unlock() |
| 33 if k.knrFunc == nil { |
| 34 k.knrFunc = goon.DefaultKindName |
| 35 } |
| 36 return k.knrFunc |
| 37 } |
| 38 |
| 39 func (k *knrKeeper) SetKindNameResolver(knr goon.KindNameResolver) { |
| 40 k.knrLock.Lock() |
| 41 defer k.knrLock.Unlock() |
| 42 if knr == nil { |
| 43 knr = goon.DefaultKindName |
| 44 } |
| 45 k.knrFunc = knr |
| 46 } |
| 47 |
19 //////////////////////////////// dataStoreData ///////////////////////////////// | 48 //////////////////////////////// dataStoreData ///////////////////////////////// |
20 | 49 |
21 type dataStoreData struct { | 50 type dataStoreData struct { |
22 » gae.BrokenFeatures | 51 » wrapper.BrokenFeatures |
| 52 » knrKeeper |
23 | 53 |
24 rwlock sync.RWMutex | 54 rwlock sync.RWMutex |
25 // See README.md for store schema. | 55 // See README.md for store schema. |
26 store *memStore | 56 store *memStore |
27 snap *memStore | 57 snap *memStore |
28 } | 58 } |
29 | 59 |
30 var ( | 60 var ( |
31 _ = memContextObj((*dataStoreData)(nil)) | 61 _ = memContextObj((*dataStoreData)(nil)) |
32 _ = sync.Locker((*dataStoreData)(nil)) | 62 _ = sync.Locker((*dataStoreData)(nil)) |
33 » _ = gae.Testable((*dataStoreData)(nil)) | 63 » _ = wrapper.Testable((*dataStoreData)(nil)) |
| 64 » _ = wrapper.DSKindSetter((*dataStoreData)(nil)) |
34 ) | 65 ) |
35 | 66 |
36 func newDataStoreData() *dataStoreData { | 67 func newDataStoreData() *dataStoreData { |
37 store := newMemStore() | 68 store := newMemStore() |
38 return &dataStoreData{ | 69 return &dataStoreData{ |
39 » » BrokenFeatures: gae.BrokenFeatures{DefaultError: errors.New("INT
ERNAL_ERROR")}, | 70 » » BrokenFeatures: wrapper.BrokenFeatures{DefaultError: newDSError(
pb.Error_INTERNAL_ERROR)}, |
40 store: store, | 71 store: store, |
41 snap: store.Snapshot(), // empty but better than a nil
pointer. | 72 snap: store.Snapshot(), // empty but better than a nil
pointer. |
42 } | 73 } |
43 } | 74 } |
44 | 75 |
45 func (d *dataStoreData) Lock() { | 76 func (d *dataStoreData) Lock() { |
46 d.rwlock.Lock() | 77 d.rwlock.Lock() |
47 } | 78 } |
48 | 79 |
49 func (d *dataStoreData) Unlock() { | 80 func (d *dataStoreData) Unlock() { |
50 d.rwlock.Unlock() | 81 d.rwlock.Unlock() |
51 } | 82 } |
52 | 83 |
53 /////////////////////////// indicies(dataStoreData) //////////////////////////// | 84 /////////////////////////// indicies(dataStoreData) //////////////////////////// |
54 | 85 |
55 func groupMetaKey(key gae.DSKey) []byte { | 86 func groupMetaKey(key *datastore.Key) []byte { |
56 » return keyBytes(helper.WithoutContext, | 87 » return keyBytes(noNS, newKey("", "__entity_group__", "", 1, rootKey(key)
)) |
57 » » helper.NewDSKey("", "", "__entity_group__", "", 1, helper.DSKeyR
oot(key))) | |
58 } | 88 } |
59 | 89 |
60 func groupIDsKey(key gae.DSKey) []byte { | 90 func groupIDsKey(key *datastore.Key) []byte { |
61 » return keyBytes(helper.WithoutContext, | 91 » return keyBytes(noNS, newKey("", "__entity_group_ids__", "", 1, rootKey(
key))) |
62 » » helper.NewDSKey("", "", "__entity_group_ids__", "", 1, helper.DS
KeyRoot(key))) | |
63 } | 92 } |
64 | 93 |
65 func rootIDsKey(kind string) []byte { | 94 func rootIDsKey(kind string) []byte { |
66 » return keyBytes(helper.WithoutContext, | 95 » return keyBytes(noNS, newKey("", "__entity_root_ids__", kind, 0, nil)) |
67 » » helper.NewDSKey("", "", "__entity_root_ids__", kind, 0, nil)) | |
68 } | 96 } |
69 | 97 |
70 func curVersion(ents *memCollection, key []byte) (int64, error) { | 98 func curVersion(ents *memCollection, key []byte) (int64, error) { |
71 if v := ents.Get(key); v != nil { | 99 if v := ents.Get(key); v != nil { |
72 » » pm, err := rpm(v) | 100 » » numData := &propertyList{} |
73 » » if err != nil { | 101 » » if err := numData.UnmarshalBinary(v); err != nil { |
74 return 0, err | 102 return 0, err |
75 } | 103 } |
76 » » pl, ok := pm["__version__"] | 104 » » return (*numData)[0].Value.(int64), nil |
77 » » if ok && len(pl) > 0 && pl[0].Type() == gae.DSPTInt { | |
78 » » » return pl[0].Value().(int64), nil | |
79 » » } | |
80 » » return 0, fmt.Errorf("__version__ property missing or wrong: %v"
, pm) | |
81 } | 105 } |
82 return 0, nil | 106 return 0, nil |
83 } | 107 } |
84 | 108 |
85 func incrementLocked(ents *memCollection, key []byte) (ret int64, err error) { | 109 func incrementLocked(ents *memCollection, key []byte) (int64, error) { |
86 » if ret, err = curVersion(ents, key); err != nil { | 110 » num := int64(0) |
87 » » ret = 0 | 111 » numData := &propertyList{} |
| 112 » if v := ents.Get(key); v != nil { |
| 113 » » if err := numData.UnmarshalBinary(v); err != nil { |
| 114 » » » return 0, err |
| 115 » » } |
| 116 » » num = (*numData)[0].Value.(int64) |
| 117 » } else { |
| 118 » » *numData = append(*numData, datastore.Property{Name: "__version_
_"}) |
88 } | 119 } |
89 » ret++ | 120 » num++ |
90 » p := gae.DSProperty{} | 121 » (*numData)[0].Value = num |
91 » if err = p.SetValue(ret, true); err != nil { | 122 » incData, err := numData.MarshalBinary() |
92 » » return | 123 » if err != nil { |
| 124 » » return 0, err |
93 } | 125 } |
94 » buf := &bytes.Buffer{} | 126 » ents.Set(key, incData) |
95 » helper.WriteDSPropertyMap( | 127 |
96 » » buf, gae.DSPropertyMap{"__version__": {p}}, helper.WithContext) | 128 » return num, nil |
97 » ents.Set(key, buf.Bytes()) | |
98 » return | |
99 } | 129 } |
100 | 130 |
101 func (d *dataStoreData) entsKeyLocked(key gae.DSKey) (*memCollection, gae.DSKey,
error) { | 131 func (d *dataStoreData) entsKeyLocked(key *datastore.Key) (*memCollection, *data
store.Key, error) { |
102 coll := "ents:" + key.Namespace() | 132 coll := "ents:" + key.Namespace() |
103 ents := d.store.GetCollection(coll) | 133 ents := d.store.GetCollection(coll) |
104 if ents == nil { | 134 if ents == nil { |
105 ents = d.store.SetCollection(coll, nil) | 135 ents = d.store.SetCollection(coll, nil) |
106 } | 136 } |
107 | 137 |
108 » if helper.DSKeyIncomplete(key) { | 138 » if key.Incomplete() { |
109 idKey := []byte(nil) | 139 idKey := []byte(nil) |
110 if key.Parent() == nil { | 140 if key.Parent() == nil { |
111 idKey = rootIDsKey(key.Kind()) | 141 idKey = rootIDsKey(key.Kind()) |
112 } else { | 142 } else { |
113 idKey = groupIDsKey(key) | 143 idKey = groupIDsKey(key) |
114 } | 144 } |
115 id, err := incrementLocked(ents, idKey) | 145 id, err := incrementLocked(ents, idKey) |
116 if err != nil { | 146 if err != nil { |
117 return nil, nil, err | 147 return nil, nil, err |
118 } | 148 } |
119 » » key = helper.NewDSKey(key.AppID(), key.Namespace(), key.Kind(),
"", id, key.Parent()) | 149 » » key = newKey(key.Namespace(), key.Kind(), "", id, key.Parent()) |
120 } | 150 } |
121 | 151 |
122 return ents, key, nil | 152 return ents, key, nil |
123 } | 153 } |
124 | 154 |
125 func putPrelim(ns string, key gae.DSKey, src interface{}) (gae.DSPropertyMap, er
ror) { | 155 func putPrelim(ns string, knr goon.KindNameResolver, src interface{}) (*datastor
e.Key, *propertyList, error) { |
126 » if !keyCouldBeValid(key, ns, false) { | 156 » key := newKeyObj(ns, knr, src) |
| 157 » if !keyCouldBeValid(ns, key, userKeyOnly) { |
127 // TODO(riannucci): different error for Put-ing to reserved Keys
? | 158 // TODO(riannucci): different error for Put-ing to reserved Keys
? |
128 » » return nil, gae.ErrDSInvalidKey | 159 » » return nil, nil, datastore.ErrInvalidKey |
129 } | 160 } |
130 | 161 |
131 » pls, err := helper.GetPLS(src) | 162 » data, err := toPL(src) |
| 163 » return key, data, err |
| 164 } |
| 165 |
| 166 func (d *dataStoreData) put(ns string, src interface{}) (*datastore.Key, error)
{ |
| 167 » key, plData, err := putPrelim(ns, d.KindNameResolver(), src) |
132 if err != nil { | 168 if err != nil { |
133 return nil, err | 169 return nil, err |
134 } | 170 } |
135 » return pls.Save() | 171 » if key, err = d.putInner(key, plData); err != nil { |
| 172 » » return nil, err |
| 173 » } |
| 174 » return key, goon_internal.SetStructKey(src, key, d.KindNameResolver()) |
136 } | 175 } |
137 | 176 |
138 func (d *dataStoreData) put(ns string, key gae.DSKey, src interface{}) (gae.DSKe
y, error) { | 177 func (d *dataStoreData) putInner(key *datastore.Key, data *propertyList) (*datas
tore.Key, error) { |
139 » pmData, err := putPrelim(ns, key, src) | 178 » dataBytes, err := data.MarshalBinary() |
140 if err != nil { | 179 if err != nil { |
141 return nil, err | 180 return nil, err |
142 } | 181 } |
143 if key, err = d.putInner(key, pmData); err != nil { | |
144 return nil, err | |
145 } | |
146 return key, nil | |
147 } | |
148 | |
149 func (d *dataStoreData) putInner(key gae.DSKey, data gae.DSPropertyMap) (gae.DSK
ey, error) { | |
150 buf := &bytes.Buffer{} | |
151 helper.WriteDSPropertyMap(buf, data, helper.WithoutContext) | |
152 dataBytes := buf.Bytes() | |
153 | 182 |
154 d.rwlock.Lock() | 183 d.rwlock.Lock() |
155 defer d.rwlock.Unlock() | 184 defer d.rwlock.Unlock() |
156 | 185 |
157 ents, key, err := d.entsKeyLocked(key) | 186 ents, key, err := d.entsKeyLocked(key) |
158 if err != nil { | 187 if err != nil { |
159 return nil, err | 188 return nil, err |
160 } | 189 } |
161 if _, err = incrementLocked(ents, groupMetaKey(key)); err != nil { | 190 if _, err = incrementLocked(ents, groupMetaKey(key)); err != nil { |
162 return nil, err | 191 return nil, err |
163 } | 192 } |
164 | 193 |
165 » old := ents.Get(keyBytes(helper.WithoutContext, key)) | 194 » old := ents.Get(keyBytes(noNS, key)) |
166 » oldPM := gae.DSPropertyMap(nil) | 195 » oldPl := (*propertyList)(nil) |
167 if old != nil { | 196 if old != nil { |
168 » » if oldPM, err = rpmWoCtx(old, key.Namespace()); err != nil { | 197 » » oldPl = &propertyList{} |
| 198 » » if err = oldPl.UnmarshalBinary(old); err != nil { |
169 return nil, err | 199 return nil, err |
170 } | 200 } |
171 } | 201 } |
172 » if err = updateIndicies(d.store, key, oldPM, data); err != nil { | 202 » if err = updateIndicies(d.store, key, oldPl, data); err != nil { |
173 return nil, err | 203 return nil, err |
174 } | 204 } |
175 | 205 |
176 » ents.Set(keyBytes(helper.WithoutContext, key), dataBytes) | 206 » ents.Set(keyBytes(noNS, key), dataBytes) |
177 | 207 |
178 return key, nil | 208 return key, nil |
179 } | 209 } |
180 | 210 |
181 func getInner(ns string, key gae.DSKey, dst interface{}, getColl func() (*memCol
lection, error)) error { | 211 func getInner(ns string, knr goon.KindNameResolver, dst interface{}, getColl fun
c(*datastore.Key) (*memCollection, error)) error { |
182 » if helper.DSKeyIncomplete(key) || !helper.DSKeyValid(key, ns, true) { | 212 » key := newKeyObj(ns, knr, dst) |
183 » » return gae.ErrDSInvalidKey | 213 » if !keyValid(ns, key, allowSpecialKeys) { |
| 214 » » return datastore.ErrInvalidKey |
184 } | 215 } |
185 | 216 |
186 » ents, err := getColl() | 217 » ents, err := getColl(key) |
187 if err != nil { | 218 if err != nil { |
188 return err | 219 return err |
189 } | 220 } |
190 if ents == nil { | 221 if ents == nil { |
191 » » return gae.ErrDSNoSuchEntity | 222 » » return datastore.ErrNoSuchEntity |
192 } | 223 } |
193 » pdata := ents.Get(keyBytes(helper.WithoutContext, key)) | 224 » pdata := ents.Get(keyBytes(noNS, key)) |
194 if pdata == nil { | 225 if pdata == nil { |
195 » » return gae.ErrDSNoSuchEntity | 226 » » return datastore.ErrNoSuchEntity |
196 } | 227 } |
197 | 228 » pl := &propertyList{} |
198 » pm, err := rpmWoCtx(pdata, ns) | 229 » if err = pl.UnmarshalBinary(pdata); err != nil { |
199 » if err != nil { | |
200 return err | 230 return err |
201 } | 231 } |
202 | 232 » return fromPL(pl, dst) |
203 » pls, err := helper.GetPLS(dst) | |
204 » if err != nil { | |
205 » » return err | |
206 » } | |
207 | |
208 » // TODO(riannucci): should the Get API reveal conversion errors instead
of | |
209 » // swallowing them? | |
210 » _, err = pls.Load(pm) | |
211 » return err | |
212 } | 233 } |
213 | 234 |
214 func (d *dataStoreData) get(ns string, key gae.DSKey, dst interface{}) error { | 235 func (d *dataStoreData) get(ns string, dst interface{}) error { |
215 » return getInner(ns, key, dst, func() (*memCollection, error) { | 236 » return getInner(ns, d.KindNameResolver(), dst, func(*datastore.Key) (*me
mCollection, error) { |
216 d.rwlock.RLock() | 237 d.rwlock.RLock() |
217 s := d.store.Snapshot() | 238 s := d.store.Snapshot() |
218 d.rwlock.RUnlock() | 239 d.rwlock.RUnlock() |
219 | 240 |
220 return s.GetCollection("ents:" + ns), nil | 241 return s.GetCollection("ents:" + ns), nil |
221 }) | 242 }) |
222 } | 243 } |
223 | 244 |
224 func (d *dataStoreData) del(ns string, key gae.DSKey) (err error) { | 245 func (d *dataStoreData) del(ns string, key *datastore.Key) error { |
225 » if !helper.DSKeyValid(key, ns, false) { | 246 » if !keyValid(ns, key, userKeyOnly) { |
226 » » return gae.ErrDSInvalidKey | 247 » » return datastore.ErrInvalidKey |
227 } | 248 } |
228 | 249 |
229 » keyBuf := keyBytes(helper.WithoutContext, key) | 250 » keyBuf := keyBytes(noNS, key) |
230 | 251 |
231 d.rwlock.Lock() | 252 d.rwlock.Lock() |
232 defer d.rwlock.Unlock() | 253 defer d.rwlock.Unlock() |
233 | 254 |
234 ents := d.store.GetCollection("ents:" + ns) | 255 ents := d.store.GetCollection("ents:" + ns) |
235 if ents == nil { | 256 if ents == nil { |
236 return nil | 257 return nil |
237 } | 258 } |
238 » if _, err = incrementLocked(ents, groupMetaKey(key)); err != nil { | 259 » if _, err := incrementLocked(ents, groupMetaKey(key)); err != nil { |
239 » » return | 260 » » return err |
240 } | 261 } |
241 | 262 |
242 old := ents.Get(keyBuf) | 263 old := ents.Get(keyBuf) |
243 » oldPM := gae.DSPropertyMap(nil) | 264 » oldPl := (*propertyList)(nil) |
244 if old != nil { | 265 if old != nil { |
245 » » if oldPM, err = rpmWoCtx(old, ns); err != nil { | 266 » » oldPl = &propertyList{} |
246 » » » return | 267 » » if err := oldPl.UnmarshalBinary(old); err != nil { |
| 268 » » » return err |
247 } | 269 } |
248 } | 270 } |
249 » if err := updateIndicies(d.store, key, oldPM, nil); err != nil { | 271 » if err := updateIndicies(d.store, key, oldPl, nil); err != nil { |
250 return err | 272 return err |
251 } | 273 } |
252 | 274 |
253 ents.Delete(keyBuf) | 275 ents.Delete(keyBuf) |
254 return nil | 276 return nil |
255 } | 277 } |
256 | 278 |
257 func (d *dataStoreData) canApplyTxn(obj memContextObj) bool { | 279 func (d *dataStoreData) canApplyTxn(obj memContextObj) bool { |
258 // TODO(riannucci): implement with Flush/FlushRevert for persistance. | 280 // TODO(riannucci): implement with Flush/FlushRevert for persistance. |
259 | 281 |
260 txn := obj.(*txnDataStoreData) | 282 txn := obj.(*txnDataStoreData) |
261 for rk, muts := range txn.muts { | 283 for rk, muts := range txn.muts { |
262 if len(muts) == 0 { // read-only | 284 if len(muts) == 0 { // read-only |
263 continue | 285 continue |
264 } | 286 } |
265 » » k, err := helper.ReadDSKey(bytes.NewBufferString(rk), helper.Wit
hContext, "", "") | 287 » » k, err := keyFromByteString(withNS, rk, "") |
266 if err != nil { | 288 if err != nil { |
267 panic(err) | 289 panic(err) |
268 } | 290 } |
269 | |
270 entKey := "ents:" + k.Namespace() | 291 entKey := "ents:" + k.Namespace() |
271 mkey := groupMetaKey(k) | 292 mkey := groupMetaKey(k) |
272 entsHead := d.store.GetCollection(entKey) | 293 entsHead := d.store.GetCollection(entKey) |
273 entsSnap := txn.snap.GetCollection(entKey) | 294 entsSnap := txn.snap.GetCollection(entKey) |
274 vHead, err := curVersion(entsHead, mkey) | 295 vHead, err := curVersion(entsHead, mkey) |
275 if err != nil { | 296 if err != nil { |
276 panic(err) | 297 panic(err) |
277 } | 298 } |
278 vSnap, err := curVersion(entsSnap, mkey) | 299 vSnap, err := curVersion(entsSnap, mkey) |
279 if err != nil { | 300 if err != nil { |
(...skipping 19 matching lines...) Expand all Loading... |
299 } else { | 320 } else { |
300 _, err = d.putInner(m.key, m.data) | 321 _, err = d.putInner(m.key, m.data) |
301 } | 322 } |
302 if err != nil { | 323 if err != nil { |
303 panic(err) | 324 panic(err) |
304 } | 325 } |
305 } | 326 } |
306 } | 327 } |
307 } | 328 } |
308 | 329 |
309 func (d *dataStoreData) mkTxn(o *gae.DSTransactionOptions) (memContextObj, error
) { | 330 func (d *dataStoreData) mkTxn(o *datastore.TransactionOptions) (memContextObj, e
rror) { |
310 return &txnDataStoreData{ | 331 return &txnDataStoreData{ |
311 // alias to the main datastore's so that testing code can have p
rimitive | 332 // alias to the main datastore's so that testing code can have p
rimitive |
312 // access to break features inside of transactions. | 333 // access to break features inside of transactions. |
313 BrokenFeatures: &d.BrokenFeatures, | 334 BrokenFeatures: &d.BrokenFeatures, |
314 parent: d, | 335 parent: d, |
| 336 knrKeeper: knrKeeper{knrFunc: d.knrFunc}, |
315 isXG: o != nil && o.XG, | 337 isXG: o != nil && o.XG, |
316 snap: d.store.Snapshot(), | 338 snap: d.store.Snapshot(), |
317 muts: map[string][]txnMutation{}, | 339 muts: map[string][]txnMutation{}, |
318 }, nil | 340 }, nil |
319 } | 341 } |
320 | 342 |
321 func (d *dataStoreData) endTxn() {} | 343 func (d *dataStoreData) endTxn() {} |
322 | 344 |
323 /////////////////////////////// txnDataStoreData /////////////////////////////// | 345 /////////////////////////////// txnDataStoreData /////////////////////////////// |
324 | 346 |
325 type txnMutation struct { | 347 type txnMutation struct { |
326 » key gae.DSKey | 348 » key *datastore.Key |
327 » data gae.DSPropertyMap | 349 » data *propertyList |
328 } | 350 } |
329 | 351 |
330 type txnDataStoreData struct { | 352 type txnDataStoreData struct { |
331 » *gae.BrokenFeatures | 353 » *wrapper.BrokenFeatures |
| 354 » knrKeeper |
332 sync.Mutex | 355 sync.Mutex |
333 | 356 |
334 parent *dataStoreData | 357 parent *dataStoreData |
335 | 358 |
336 // boolean 0 or 1, use atomic.*Int32 to access. | 359 // boolean 0 or 1, use atomic.*Int32 to access. |
337 closed int32 | 360 closed int32 |
338 isXG bool | 361 isXG bool |
339 | 362 |
340 snap *memStore | 363 snap *memStore |
341 | 364 |
342 // string is the raw-bytes encoding of the entity root incl. namespace | 365 // string is the raw-bytes encoding of the entity root incl. namespace |
343 muts map[string][]txnMutation | 366 muts map[string][]txnMutation |
344 // TODO(riannucci): account for 'transaction size' limit of 10MB by summ
ing | 367 // TODO(riannucci): account for 'transaction size' limit of 10MB by summ
ing |
345 // length of encoded keys + values. | 368 // length of encoded keys + values. |
346 } | 369 } |
347 | 370 |
348 var ( | 371 var ( |
349 _ = memContextObj((*txnDataStoreData)(nil)) | 372 _ = memContextObj((*txnDataStoreData)(nil)) |
350 _ = sync.Locker((*txnDataStoreData)(nil)) | 373 _ = sync.Locker((*txnDataStoreData)(nil)) |
351 » _ = gae.Testable((*txnDataStoreData)(nil)) | 374 » _ = wrapper.Testable((*txnDataStoreData)(nil)) |
| 375 » _ = wrapper.DSKindSetter((*txnDataStoreData)(nil)) |
352 ) | 376 ) |
353 | 377 |
354 const xgEGLimit = 25 | 378 const xgEGLimit = 25 |
355 | 379 |
356 func (*txnDataStoreData) canApplyTxn(memContextObj) bool { return false } | 380 func (*txnDataStoreData) canApplyTxn(memContextObj) bool { return false } |
357 func (td *txnDataStoreData) endTxn() { | 381 func (td *txnDataStoreData) endTxn() { |
358 if atomic.LoadInt32(&td.closed) == 1 { | 382 if atomic.LoadInt32(&td.closed) == 1 { |
359 panic("cannot end transaction twice") | 383 panic("cannot end transaction twice") |
360 } | 384 } |
361 atomic.StoreInt32(&td.closed, 1) | 385 atomic.StoreInt32(&td.closed, 1) |
362 } | 386 } |
363 func (*txnDataStoreData) applyTxn(context.Context, memContextObj) { | 387 func (*txnDataStoreData) applyTxn(context.Context, memContextObj) { |
364 panic("txnDataStoreData cannot apply transactions") | 388 panic("txnDataStoreData cannot apply transactions") |
365 } | 389 } |
366 func (*txnDataStoreData) mkTxn(*gae.DSTransactionOptions) (memContextObj, error)
{ | 390 func (*txnDataStoreData) mkTxn(*datastore.TransactionOptions) (memContextObj, er
ror) { |
367 return nil, errors.New("datastore: nested transactions are not supported
") | 391 return nil, errors.New("datastore: nested transactions are not supported
") |
368 } | 392 } |
369 | 393 |
370 func (td *txnDataStoreData) RunIfNotBroken(f func() error) error { | 394 func (td *txnDataStoreData) IsBroken() error { |
371 // Slightly different from the SDK... datastore and taskqueue each imple
ment | 395 // Slightly different from the SDK... datastore and taskqueue each imple
ment |
372 // this here, where in the SDK only datastore.transaction.Call does. | 396 // this here, where in the SDK only datastore.transaction.Call does. |
373 if atomic.LoadInt32(&td.closed) == 1 { | 397 if atomic.LoadInt32(&td.closed) == 1 { |
374 return errors.New("datastore: transaction context has expired") | 398 return errors.New("datastore: transaction context has expired") |
375 } | 399 } |
376 » return td.BrokenFeatures.RunIfNotBroken(f) | 400 » return td.BrokenFeatures.IsBroken() |
377 } | 401 } |
378 | 402 |
379 // writeMutation ensures that this transaction can support the given key/value | 403 // writeMutation ensures that this transaction can support the given key/value |
380 // mutation. | 404 // mutation. |
381 // | 405 // |
382 // if getOnly is true, don't record the actual mutation data, just ensure that | 406 // if getOnly is true, don't record the actual mutation data, just ensure that |
383 // the key is in an included entity group (or add an empty entry for tha
t | 407 // the key is in an included entity group (or add an empty entry for tha
t |
384 // group). | 408 // group). |
385 // | 409 // |
386 // if !getOnly && data == nil, this counts as a deletion instead of a Put. | 410 // if !getOnly && data == nil, this counts as a deletion instead of a Put. |
387 // | 411 // |
388 // Returns an error if this key causes the transaction to cross too many entity | 412 // Returns an error if this key causes the transaction to cross too many entity |
389 // groups. | 413 // groups. |
390 func (td *txnDataStoreData) writeMutation(getOnly bool, key gae.DSKey, data gae.
DSPropertyMap) error { | 414 func (td *txnDataStoreData) writeMutation(getOnly bool, key *datastore.Key, data
*propertyList) error { |
391 » rk := string(keyBytes(helper.WithContext, helper.DSKeyRoot(key))) | 415 » rk := string(keyBytes(withNS, rootKey(key))) |
392 | 416 |
393 td.Lock() | 417 td.Lock() |
394 defer td.Unlock() | 418 defer td.Unlock() |
395 | 419 |
396 if _, ok := td.muts[rk]; !ok { | 420 if _, ok := td.muts[rk]; !ok { |
397 limit := 1 | 421 limit := 1 |
398 if td.isXG { | 422 if td.isXG { |
399 limit = xgEGLimit | 423 limit = xgEGLimit |
400 } | 424 } |
401 if len(td.muts)+1 > limit { | 425 if len(td.muts)+1 > limit { |
402 msg := "cross-group transaction need to be explicitly sp
ecified (xg=True)" | 426 msg := "cross-group transaction need to be explicitly sp
ecified (xg=True)" |
403 if td.isXG { | 427 if td.isXG { |
404 msg = "operating on too many entity groups in a
single transaction" | 428 msg = "operating on too many entity groups in a
single transaction" |
405 } | 429 } |
406 » » » return errors.New(msg) | 430 » » » return newDSError(pb.Error_BAD_REQUEST, msg) |
407 } | 431 } |
408 td.muts[rk] = []txnMutation{} | 432 td.muts[rk] = []txnMutation{} |
409 } | 433 } |
410 if !getOnly { | 434 if !getOnly { |
411 td.muts[rk] = append(td.muts[rk], txnMutation{key, data}) | 435 td.muts[rk] = append(td.muts[rk], txnMutation{key, data}) |
412 } | 436 } |
413 | 437 |
414 return nil | 438 return nil |
415 } | 439 } |
416 | 440 |
417 func (td *txnDataStoreData) put(ns string, key gae.DSKey, src interface{}) (gae.
DSKey, error) { | 441 func (td *txnDataStoreData) put(ns string, src interface{}) (*datastore.Key, err
or) { |
418 » pMap, err := putPrelim(ns, key, src) | 442 » key, plData, err := putPrelim(ns, td.KindNameResolver(), src) |
419 if err != nil { | 443 if err != nil { |
420 return nil, err | 444 return nil, err |
421 } | 445 } |
422 | 446 |
423 func() { | 447 func() { |
424 td.parent.Lock() | 448 td.parent.Lock() |
425 defer td.parent.Unlock() | 449 defer td.parent.Unlock() |
426 _, key, err = td.parent.entsKeyLocked(key) | 450 _, key, err = td.parent.entsKeyLocked(key) |
427 }() | 451 }() |
428 if err != nil { | 452 if err != nil { |
429 return nil, err | 453 return nil, err |
430 } | 454 } |
431 | 455 |
432 » if err = td.writeMutation(false, key, pMap); err != nil { | 456 » if err = td.writeMutation(false, key, plData); err != nil { |
433 return nil, err | 457 return nil, err |
434 } | 458 } |
435 | 459 |
436 » return key, nil | 460 » return key, goon_internal.SetStructKey(src, key, td.KindNameResolver()) |
437 } | 461 } |
438 | 462 |
439 func (td *txnDataStoreData) get(ns string, key gae.DSKey, dst interface{}) error
{ | 463 func (td *txnDataStoreData) get(ns string, dst interface{}) error { |
440 » return getInner(ns, key, dst, func() (*memCollection, error) { | 464 » return getInner(ns, td.KindNameResolver(), dst, func(key *datastore.Key)
(*memCollection, error) { |
441 if err := td.writeMutation(true, key, nil); err != nil { | 465 if err := td.writeMutation(true, key, nil); err != nil { |
442 return nil, err | 466 return nil, err |
443 } | 467 } |
444 return td.snap.GetCollection("ents:" + ns), nil | 468 return td.snap.GetCollection("ents:" + ns), nil |
445 }) | 469 }) |
446 } | 470 } |
447 | 471 |
448 func (td *txnDataStoreData) del(ns string, key gae.DSKey) error { | 472 func (td *txnDataStoreData) del(ns string, key *datastore.Key) error { |
449 » if !helper.DSKeyValid(key, ns, false) { | 473 » if !keyValid(ns, key, userKeyOnly) { |
450 » » return gae.ErrDSInvalidKey | 474 » » return datastore.ErrInvalidKey |
451 } | 475 } |
452 return td.writeMutation(false, key, nil) | 476 return td.writeMutation(false, key, nil) |
453 } | 477 } |
454 | |
455 func keyCouldBeValid(k gae.DSKey, ns string, allowSpecial bool) bool { | |
456 // adds an id to k if it's incomplete. | |
457 if helper.DSKeyIncomplete(k) { | |
458 k = helper.NewDSKey(k.AppID(), k.Namespace(), k.Kind(), "", 1, k
.Parent()) | |
459 } | |
460 return helper.DSKeyValid(k, ns, allowSpecial) | |
461 } | |
462 | |
463 func keyBytes(ctx helper.DSKeyContext, key gae.DSKey) []byte { | |
464 buf := &bytes.Buffer{} | |
465 helper.WriteDSKey(buf, ctx, key) | |
466 return buf.Bytes() | |
467 } | |
468 | |
469 func rpmWoCtx(data []byte, ns string) (gae.DSPropertyMap, error) { | |
470 return helper.ReadDSPropertyMap(bytes.NewBuffer(data), helper.WithoutCon
text, globalAppID, ns) | |
471 } | |
472 | |
473 func rpm(data []byte) (gae.DSPropertyMap, error) { | |
474 return helper.ReadDSPropertyMap(bytes.NewBuffer(data), helper.WithContex
t, "", "") | |
475 } | |
OLD | NEW |