OLD | NEW |
1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 package memory | 5 package memory |
6 | 6 |
7 import ( | 7 import ( |
8 "bytes" | 8 "bytes" |
9 "errors" | 9 "errors" |
10 "fmt" | 10 "fmt" |
11 "sync" | 11 "sync" |
12 "sync/atomic" | 12 "sync/atomic" |
13 | 13 |
| 14 "github.com/luci/gae" |
| 15 rds "github.com/luci/gae/service/rawdatastore" |
14 "golang.org/x/net/context" | 16 "golang.org/x/net/context" |
15 | |
16 "github.com/luci/gae" | |
17 "github.com/luci/gae/helper" | |
18 ) | 17 ) |
19 | 18 |
20 //////////////////////////////// dataStoreData ///////////////////////////////// | 19 //////////////////////////////// dataStoreData ///////////////////////////////// |
21 | 20 |
22 type dataStoreData struct { | 21 type dataStoreData struct { |
23 rwlock sync.RWMutex | 22 rwlock sync.RWMutex |
24 // See README.md for store schema. | 23 // See README.md for store schema. |
25 store *memStore | 24 store *memStore |
26 snap *memStore | 25 snap *memStore |
27 } | 26 } |
(...skipping 14 matching lines...) Expand all Loading... |
42 func (d *dataStoreData) Lock() { | 41 func (d *dataStoreData) Lock() { |
43 d.rwlock.Lock() | 42 d.rwlock.Lock() |
44 } | 43 } |
45 | 44 |
46 func (d *dataStoreData) Unlock() { | 45 func (d *dataStoreData) Unlock() { |
47 d.rwlock.Unlock() | 46 d.rwlock.Unlock() |
48 } | 47 } |
49 | 48 |
50 /////////////////////////// indicies(dataStoreData) //////////////////////////// | 49 /////////////////////////// indicies(dataStoreData) //////////////////////////// |
51 | 50 |
52 func groupMetaKey(key gae.DSKey) []byte { | 51 func groupMetaKey(key rds.Key) []byte { |
53 » return keyBytes(helper.WithoutContext, | 52 » return keyBytes(rds.WithoutContext, |
54 » » helper.NewDSKey("", "", "__entity_group__", "", 1, helper.DSKeyR
oot(key))) | 53 » » rds.NewKey("", "", "__entity_group__", "", 1, rds.KeyRoot(key))) |
55 } | 54 } |
56 | 55 |
57 func groupIDsKey(key gae.DSKey) []byte { | 56 func groupIDsKey(key rds.Key) []byte { |
58 » return keyBytes(helper.WithoutContext, | 57 » return keyBytes(rds.WithoutContext, |
59 » » helper.NewDSKey("", "", "__entity_group_ids__", "", 1, helper.DS
KeyRoot(key))) | 58 » » rds.NewKey("", "", "__entity_group_ids__", "", 1, rds.KeyRoot(ke
y))) |
60 } | 59 } |
61 | 60 |
62 func rootIDsKey(kind string) []byte { | 61 func rootIDsKey(kind string) []byte { |
63 » return keyBytes(helper.WithoutContext, | 62 » return keyBytes(rds.WithoutContext, |
64 » » helper.NewDSKey("", "", "__entity_root_ids__", kind, 0, nil)) | 63 » » rds.NewKey("", "", "__entity_root_ids__", kind, 0, nil)) |
65 } | 64 } |
66 | 65 |
67 func curVersion(ents *memCollection, key []byte) int64 { | 66 func curVersion(ents *memCollection, key []byte) int64 { |
68 if v := ents.Get(key); v != nil { | 67 if v := ents.Get(key); v != nil { |
69 pm, err := rpm(v) | 68 pm, err := rpm(v) |
70 if err != nil { | 69 if err != nil { |
71 panic(err) // memory corruption | 70 panic(err) // memory corruption |
72 } | 71 } |
73 pl, ok := pm["__version__"] | 72 pl, ok := pm["__version__"] |
74 » » if ok && len(pl) > 0 && pl[0].Type() == gae.DSPTInt { | 73 » » if ok && len(pl) > 0 && pl[0].Type() == rds.PTInt { |
75 return pl[0].Value().(int64) | 74 return pl[0].Value().(int64) |
76 } | 75 } |
77 panic(fmt.Errorf("__version__ property missing or wrong: %v", pm
)) | 76 panic(fmt.Errorf("__version__ property missing or wrong: %v", pm
)) |
78 } | 77 } |
79 return 0 | 78 return 0 |
80 } | 79 } |
81 | 80 |
82 func incrementLocked(ents *memCollection, key []byte) int64 { | 81 func incrementLocked(ents *memCollection, key []byte) int64 { |
83 ret := curVersion(ents, key) + 1 | 82 ret := curVersion(ents, key) + 1 |
84 buf := &bytes.Buffer{} | 83 buf := &bytes.Buffer{} |
85 » helper.WriteDSPropertyMap( | 84 » rds.WritePropertyMap( |
86 » » buf, gae.DSPropertyMap{"__version__": {gae.MkDSPropertyNI(ret)}}
, helper.WithContext) | 85 » » buf, rds.PropertyMap{"__version__": {rds.MkPropertyNI(ret)}}, rd
s.WithContext) |
87 ents.Set(key, buf.Bytes()) | 86 ents.Set(key, buf.Bytes()) |
88 return ret | 87 return ret |
89 } | 88 } |
90 | 89 |
91 func (d *dataStoreData) entsKeyLocked(key gae.DSKey) (*memCollection, gae.DSKey)
{ | 90 func (d *dataStoreData) entsKeyLocked(key rds.Key) (*memCollection, rds.Key) { |
92 coll := "ents:" + key.Namespace() | 91 coll := "ents:" + key.Namespace() |
93 ents := d.store.GetCollection(coll) | 92 ents := d.store.GetCollection(coll) |
94 if ents == nil { | 93 if ents == nil { |
95 ents = d.store.SetCollection(coll, nil) | 94 ents = d.store.SetCollection(coll, nil) |
96 } | 95 } |
97 | 96 |
98 » if helper.DSKeyIncomplete(key) { | 97 » if rds.KeyIncomplete(key) { |
99 idKey := []byte(nil) | 98 idKey := []byte(nil) |
100 if key.Parent() == nil { | 99 if key.Parent() == nil { |
101 idKey = rootIDsKey(key.Kind()) | 100 idKey = rootIDsKey(key.Kind()) |
102 } else { | 101 } else { |
103 idKey = groupIDsKey(key) | 102 idKey = groupIDsKey(key) |
104 } | 103 } |
105 id := incrementLocked(ents, idKey) | 104 id := incrementLocked(ents, idKey) |
106 » » key = helper.NewDSKey(key.AppID(), key.Namespace(), key.Kind(),
"", id, key.Parent()) | 105 » » key = rds.NewKey(key.AppID(), key.Namespace(), key.Kind(), "", i
d, key.Parent()) |
107 } | 106 } |
108 | 107 |
109 return ents, key | 108 return ents, key |
110 } | 109 } |
111 | 110 |
112 func (d *dataStoreData) put(ns string, key gae.DSKey, pls gae.DSPropertyLoadSave
r) (gae.DSKey, error) { | 111 func (d *dataStoreData) put(ns string, key rds.Key, pls rds.PropertyLoadSaver) (
rds.Key, error) { |
113 » keys, errs := d.putMulti(ns, []gae.DSKey{key}, []gae.DSPropertyLoadSaver
{pls}) | 112 » keys, errs := d.putMulti(ns, []rds.Key{key}, []rds.PropertyLoadSaver{pls
}) |
114 if errs == nil { | 113 if errs == nil { |
115 return keys[0], nil | 114 return keys[0], nil |
116 } | 115 } |
117 return nil, gae.SingleError(errs) | 116 return nil, gae.SingleError(errs) |
118 } | 117 } |
119 | 118 |
120 func (d *dataStoreData) putMulti(ns string, keys []gae.DSKey, plss []gae.DSPrope
rtyLoadSaver) ([]gae.DSKey, error) { | 119 func (d *dataStoreData) putMulti(ns string, keys []rds.Key, plss []rds.PropertyL
oadSaver) ([]rds.Key, error) { |
121 pmaps, err := putMultiPrelim(ns, keys, plss) | 120 pmaps, err := putMultiPrelim(ns, keys, plss) |
122 if err != nil { | 121 if err != nil { |
123 return nil, err | 122 return nil, err |
124 } | 123 } |
125 return d.putMultiInner(keys, pmaps) | 124 return d.putMultiInner(keys, pmaps) |
126 } | 125 } |
127 | 126 |
128 func putMultiPrelim(ns string, keys []gae.DSKey, plss []gae.DSPropertyLoadSaver)
([]gae.DSPropertyMap, error) { | 127 func putMultiPrelim(ns string, keys []rds.Key, plss []rds.PropertyLoadSaver) ([]
rds.PropertyMap, error) { |
129 err := multiValid(keys, plss, ns, true, false) | 128 err := multiValid(keys, plss, ns, true, false) |
130 if err != nil { | 129 if err != nil { |
131 return nil, err | 130 return nil, err |
132 } | 131 } |
133 » pmaps := make([]gae.DSPropertyMap, len(keys)) | 132 » pmaps := make([]rds.PropertyMap, len(keys)) |
134 lme := gae.LazyMultiError{Size: len(keys)} | 133 lme := gae.LazyMultiError{Size: len(keys)} |
135 for i, pls := range plss { | 134 for i, pls := range plss { |
136 pm, err := pls.Save(false) | 135 pm, err := pls.Save(false) |
137 lme.Assign(i, err) | 136 lme.Assign(i, err) |
138 pmaps[i] = pm | 137 pmaps[i] = pm |
139 } | 138 } |
140 return pmaps, lme.Get() | 139 return pmaps, lme.Get() |
141 } | 140 } |
142 | 141 |
143 func (d *dataStoreData) putMultiInner(keys []gae.DSKey, data []gae.DSPropertyMap
) ([]gae.DSKey, error) { | 142 func (d *dataStoreData) putMultiInner(keys []rds.Key, data []rds.PropertyMap) ([
]rds.Key, error) { |
144 » retKeys := make([]gae.DSKey, len(keys)) | 143 » retKeys := make([]rds.Key, len(keys)) |
145 lme := gae.LazyMultiError{Size: len(keys)} | 144 lme := gae.LazyMultiError{Size: len(keys)} |
146 for i, k := range keys { | 145 for i, k := range keys { |
147 buf := &bytes.Buffer{} | 146 buf := &bytes.Buffer{} |
148 » » helper.WriteDSPropertyMap(buf, data[i], helper.WithoutContext) | 147 » » rds.WritePropertyMap(buf, data[i], rds.WithoutContext) |
149 dataBytes := buf.Bytes() | 148 dataBytes := buf.Bytes() |
150 | 149 |
151 » » rKey, err := func() (ret gae.DSKey, err error) { | 150 » » rKey, err := func() (ret rds.Key, err error) { |
152 d.rwlock.Lock() | 151 d.rwlock.Lock() |
153 defer d.rwlock.Unlock() | 152 defer d.rwlock.Unlock() |
154 | 153 |
155 ents, ret := d.entsKeyLocked(k) | 154 ents, ret := d.entsKeyLocked(k) |
156 incrementLocked(ents, groupMetaKey(ret)) | 155 incrementLocked(ents, groupMetaKey(ret)) |
157 | 156 |
158 » » » old := ents.Get(keyBytes(helper.WithoutContext, ret)) | 157 » » » old := ents.Get(keyBytes(rds.WithoutContext, ret)) |
159 » » » oldPM := gae.DSPropertyMap(nil) | 158 » » » oldPM := rds.PropertyMap(nil) |
160 if old != nil { | 159 if old != nil { |
161 if oldPM, err = rpmWoCtx(old, ret.Namespace());
err != nil { | 160 if oldPM, err = rpmWoCtx(old, ret.Namespace());
err != nil { |
162 return | 161 return |
163 } | 162 } |
164 } | 163 } |
165 updateIndicies(d.store, ret, oldPM, data[i]) | 164 updateIndicies(d.store, ret, oldPM, data[i]) |
166 » » » ents.Set(keyBytes(helper.WithoutContext, ret), dataBytes
) | 165 » » » ents.Set(keyBytes(rds.WithoutContext, ret), dataBytes) |
167 return | 166 return |
168 }() | 167 }() |
169 lme.Assign(i, err) | 168 lme.Assign(i, err) |
170 retKeys[i] = rKey | 169 retKeys[i] = rKey |
171 } | 170 } |
172 return retKeys, lme.Get() | 171 return retKeys, lme.Get() |
173 } | 172 } |
174 | 173 |
175 func getMultiInner(ns string, keys []gae.DSKey, plss []gae.DSPropertyLoadSaver,
getColl func() (*memCollection, error)) error { | 174 func getMultiInner(ns string, keys []rds.Key, plss []rds.PropertyLoadSaver, getC
oll func() (*memCollection, error)) error { |
176 if err := multiValid(keys, plss, ns, false, true); err != nil { | 175 if err := multiValid(keys, plss, ns, false, true); err != nil { |
177 return err | 176 return err |
178 } | 177 } |
179 | 178 |
180 lme := gae.LazyMultiError{Size: len(keys)} | 179 lme := gae.LazyMultiError{Size: len(keys)} |
181 | 180 |
182 ents, err := getColl() | 181 ents, err := getColl() |
183 if err != nil { | 182 if err != nil { |
184 return err | 183 return err |
185 } | 184 } |
186 if ents == nil { | 185 if ents == nil { |
187 for i := range keys { | 186 for i := range keys { |
188 » » » lme.Assign(i, gae.ErrDSNoSuchEntity) | 187 » » » lme.Assign(i, rds.ErrNoSuchEntity) |
189 } | 188 } |
190 return lme.Get() | 189 return lme.Get() |
191 } | 190 } |
192 | 191 |
193 for i, k := range keys { | 192 for i, k := range keys { |
194 » » pdata := ents.Get(keyBytes(helper.WithoutContext, k)) | 193 » » pdata := ents.Get(keyBytes(rds.WithoutContext, k)) |
195 if pdata == nil { | 194 if pdata == nil { |
196 » » » lme.Assign(i, gae.ErrDSNoSuchEntity) | 195 » » » lme.Assign(i, rds.ErrNoSuchEntity) |
197 continue | 196 continue |
198 } | 197 } |
199 | 198 |
200 got, err := rpmWoCtx(pdata, ns) | 199 got, err := rpmWoCtx(pdata, ns) |
201 if err != nil { | 200 if err != nil { |
202 lme.Assign(i, err) | 201 lme.Assign(i, err) |
203 continue | 202 continue |
204 } | 203 } |
205 | 204 |
206 lme.Assign(i, plss[i].Load(got)) | 205 lme.Assign(i, plss[i].Load(got)) |
207 } | 206 } |
208 return lme.Get() | 207 return lme.Get() |
209 } | 208 } |
210 | 209 |
211 func (d *dataStoreData) get(ns string, key gae.DSKey, pls gae.DSPropertyLoadSave
r) error { | 210 func (d *dataStoreData) get(ns string, key rds.Key, pls rds.PropertyLoadSaver) e
rror { |
212 » return gae.SingleError(d.getMulti(ns, []gae.DSKey{key}, []gae.DSProperty
LoadSaver{pls})) | 211 » return gae.SingleError(d.getMulti(ns, []rds.Key{key}, []rds.PropertyLoad
Saver{pls})) |
213 } | 212 } |
214 | 213 |
215 func (d *dataStoreData) getMulti(ns string, keys []gae.DSKey, plss []gae.DSPrope
rtyLoadSaver) error { | 214 func (d *dataStoreData) getMulti(ns string, keys []rds.Key, plss []rds.PropertyL
oadSaver) error { |
216 return getMultiInner(ns, keys, plss, func() (*memCollection, error) { | 215 return getMultiInner(ns, keys, plss, func() (*memCollection, error) { |
217 d.rwlock.RLock() | 216 d.rwlock.RLock() |
218 s := d.store.Snapshot() | 217 s := d.store.Snapshot() |
219 d.rwlock.RUnlock() | 218 d.rwlock.RUnlock() |
220 | 219 |
221 return s.GetCollection("ents:" + ns), nil | 220 return s.GetCollection("ents:" + ns), nil |
222 }) | 221 }) |
223 } | 222 } |
224 | 223 |
225 func (d *dataStoreData) del(ns string, key gae.DSKey) (err error) { | 224 func (d *dataStoreData) del(ns string, key rds.Key) (err error) { |
226 » return gae.SingleError(d.delMulti(ns, []gae.DSKey{key})) | 225 » return gae.SingleError(d.delMulti(ns, []rds.Key{key})) |
227 } | 226 } |
228 | 227 |
229 func (d *dataStoreData) delMulti(ns string, keys []gae.DSKey) error { | 228 func (d *dataStoreData) delMulti(ns string, keys []rds.Key) error { |
230 lme := gae.LazyMultiError{Size: len(keys)} | 229 lme := gae.LazyMultiError{Size: len(keys)} |
231 toDel := make([][]byte, 0, len(keys)) | 230 toDel := make([][]byte, 0, len(keys)) |
232 for i, k := range keys { | 231 for i, k := range keys { |
233 » » if !helper.DSKeyValid(k, ns, false) { | 232 » » if !rds.KeyValid(k, ns, false) { |
234 » » » lme.Assign(i, gae.ErrDSInvalidKey) | 233 » » » lme.Assign(i, rds.ErrInvalidKey) |
235 continue | 234 continue |
236 } | 235 } |
237 » » toDel = append(toDel, keyBytes(helper.WithoutContext, k)) | 236 » » toDel = append(toDel, keyBytes(rds.WithoutContext, k)) |
238 } | 237 } |
239 err := lme.Get() | 238 err := lme.Get() |
240 if err != nil { | 239 if err != nil { |
241 return err | 240 return err |
242 } | 241 } |
243 | 242 |
244 d.rwlock.Lock() | 243 d.rwlock.Lock() |
245 defer d.rwlock.Unlock() | 244 defer d.rwlock.Unlock() |
246 | 245 |
247 ents := d.store.GetCollection("ents:" + ns) | 246 ents := d.store.GetCollection("ents:" + ns) |
248 if ents == nil { | 247 if ents == nil { |
249 return nil | 248 return nil |
250 } | 249 } |
251 | 250 |
252 for i, k := range keys { | 251 for i, k := range keys { |
253 incrementLocked(ents, groupMetaKey(k)) | 252 incrementLocked(ents, groupMetaKey(k)) |
254 kb := toDel[i] | 253 kb := toDel[i] |
255 old := ents.Get(kb) | 254 old := ents.Get(kb) |
256 » » oldPM := gae.DSPropertyMap(nil) | 255 » » oldPM := rds.PropertyMap(nil) |
257 if old != nil { | 256 if old != nil { |
258 if oldPM, err = rpmWoCtx(old, ns); err != nil { | 257 if oldPM, err = rpmWoCtx(old, ns); err != nil { |
259 lme.Assign(i, err) | 258 lme.Assign(i, err) |
260 continue | 259 continue |
261 } | 260 } |
262 } | 261 } |
263 updateIndicies(d.store, k, oldPM, nil) | 262 updateIndicies(d.store, k, oldPM, nil) |
264 ents.Delete(kb) | 263 ents.Delete(kb) |
265 } | 264 } |
266 return lme.Get() | 265 return lme.Get() |
267 } | 266 } |
268 | 267 |
269 func (d *dataStoreData) canApplyTxn(obj memContextObj) bool { | 268 func (d *dataStoreData) canApplyTxn(obj memContextObj) bool { |
270 // TODO(riannucci): implement with Flush/FlushRevert for persistance. | 269 // TODO(riannucci): implement with Flush/FlushRevert for persistance. |
271 | 270 |
272 txn := obj.(*txnDataStoreData) | 271 txn := obj.(*txnDataStoreData) |
273 for rk, muts := range txn.muts { | 272 for rk, muts := range txn.muts { |
274 if len(muts) == 0 { // read-only | 273 if len(muts) == 0 { // read-only |
275 continue | 274 continue |
276 } | 275 } |
277 » » k, err := helper.ReadDSKey(bytes.NewBufferString(rk), helper.Wit
hContext, "", "") | 276 » » k, err := rds.ReadKey(bytes.NewBufferString(rk), rds.WithContext
, "", "") |
278 if err != nil { | 277 if err != nil { |
279 panic(err) | 278 panic(err) |
280 } | 279 } |
281 | 280 |
282 entKey := "ents:" + k.Namespace() | 281 entKey := "ents:" + k.Namespace() |
283 mkey := groupMetaKey(k) | 282 mkey := groupMetaKey(k) |
284 entsHead := d.store.GetCollection(entKey) | 283 entsHead := d.store.GetCollection(entKey) |
285 entsSnap := txn.snap.GetCollection(entKey) | 284 entsSnap := txn.snap.GetCollection(entKey) |
286 vHead := curVersion(entsHead, mkey) | 285 vHead := curVersion(entsHead, mkey) |
287 vSnap := curVersion(entsSnap, mkey) | 286 vSnap := curVersion(entsSnap, mkey) |
(...skipping 17 matching lines...) Expand all Loading... |
305 } else { | 304 } else { |
306 _, err = d.put(m.key.Namespace(), m.key, m.data) | 305 _, err = d.put(m.key.Namespace(), m.key, m.data) |
307 } | 306 } |
308 if err != nil { | 307 if err != nil { |
309 panic(err) | 308 panic(err) |
310 } | 309 } |
311 } | 310 } |
312 } | 311 } |
313 } | 312 } |
314 | 313 |
315 func (d *dataStoreData) mkTxn(o *gae.DSTransactionOptions) memContextObj { | 314 func (d *dataStoreData) mkTxn(o *rds.TransactionOptions) memContextObj { |
316 return &txnDataStoreData{ | 315 return &txnDataStoreData{ |
317 // alias to the main datastore's so that testing code can have p
rimitive | 316 // alias to the main datastore's so that testing code can have p
rimitive |
318 // access to break features inside of transactions. | 317 // access to break features inside of transactions. |
319 parent: d, | 318 parent: d, |
320 isXG: o != nil && o.XG, | 319 isXG: o != nil && o.XG, |
321 snap: d.store.Snapshot(), | 320 snap: d.store.Snapshot(), |
322 muts: map[string][]txnMutation{}, | 321 muts: map[string][]txnMutation{}, |
323 } | 322 } |
324 } | 323 } |
325 | 324 |
326 func (d *dataStoreData) endTxn() {} | 325 func (d *dataStoreData) endTxn() {} |
327 | 326 |
328 /////////////////////////////// txnDataStoreData /////////////////////////////// | 327 /////////////////////////////// txnDataStoreData /////////////////////////////// |
329 | 328 |
330 type txnMutation struct { | 329 type txnMutation struct { |
331 » key gae.DSKey | 330 » key rds.Key |
332 » data gae.DSPropertyMap | 331 » data rds.PropertyMap |
333 } | 332 } |
334 | 333 |
335 type txnDataStoreData struct { | 334 type txnDataStoreData struct { |
336 sync.Mutex | 335 sync.Mutex |
337 | 336 |
338 parent *dataStoreData | 337 parent *dataStoreData |
339 | 338 |
340 // boolean 0 or 1, use atomic.*Int32 to access. | 339 // boolean 0 or 1, use atomic.*Int32 to access. |
341 closed int32 | 340 closed int32 |
342 isXG bool | 341 isXG bool |
(...skipping 13 matching lines...) Expand all Loading... |
356 func (*txnDataStoreData) canApplyTxn(memContextObj) bool { return false } | 355 func (*txnDataStoreData) canApplyTxn(memContextObj) bool { return false } |
357 func (td *txnDataStoreData) endTxn() { | 356 func (td *txnDataStoreData) endTxn() { |
358 if atomic.LoadInt32(&td.closed) == 1 { | 357 if atomic.LoadInt32(&td.closed) == 1 { |
359 panic("cannot end transaction twice") | 358 panic("cannot end transaction twice") |
360 } | 359 } |
361 atomic.StoreInt32(&td.closed, 1) | 360 atomic.StoreInt32(&td.closed, 1) |
362 } | 361 } |
363 func (*txnDataStoreData) applyTxn(context.Context, memContextObj) { | 362 func (*txnDataStoreData) applyTxn(context.Context, memContextObj) { |
364 panic("txnDataStoreData cannot apply transactions") | 363 panic("txnDataStoreData cannot apply transactions") |
365 } | 364 } |
366 func (*txnDataStoreData) mkTxn(*gae.DSTransactionOptions) memContextObj { | 365 func (*txnDataStoreData) mkTxn(*rds.TransactionOptions) memContextObj { |
367 panic("impossible") | 366 panic("impossible") |
368 } | 367 } |
369 | 368 |
370 func (td *txnDataStoreData) run(f func() error) error { | 369 func (td *txnDataStoreData) run(f func() error) error { |
371 // Slightly different from the SDK... datastore and taskqueue each imple
ment | 370 // Slightly different from the SDK... datastore and taskqueue each imple
ment |
372 // this here, where in the SDK only datastore.transaction.Call does. | 371 // this here, where in the SDK only datastore.transaction.Call does. |
373 if atomic.LoadInt32(&td.closed) == 1 { | 372 if atomic.LoadInt32(&td.closed) == 1 { |
374 return errors.New("datastore: transaction context has expired") | 373 return errors.New("datastore: transaction context has expired") |
375 } | 374 } |
376 return f() | 375 return f() |
377 } | 376 } |
378 | 377 |
379 // writeMutation ensures that this transaction can support the given key/value | 378 // writeMutation ensures that this transaction can support the given key/value |
380 // mutation. | 379 // mutation. |
381 // | 380 // |
382 // if getOnly is true, don't record the actual mutation data, just ensure that | 381 // if getOnly is true, don't record the actual mutation data, just ensure that |
383 // the key is in an included entity group (or add an empty entry for tha
t | 382 // the key is in an included entity group (or add an empty entry for tha
t |
384 // group). | 383 // group). |
385 // | 384 // |
386 // if !getOnly && data == nil, this counts as a deletion instead of a Put. | 385 // if !getOnly && data == nil, this counts as a deletion instead of a Put. |
387 // | 386 // |
388 // Returns an error if this key causes the transaction to cross too many entity | 387 // Returns an error if this key causes the transaction to cross too many entity |
389 // groups. | 388 // groups. |
390 func (td *txnDataStoreData) writeMutation(getOnly bool, key gae.DSKey, data gae.
DSPropertyMap) error { | 389 func (td *txnDataStoreData) writeMutation(getOnly bool, key rds.Key, data rds.Pr
opertyMap) error { |
391 » rk := string(keyBytes(helper.WithContext, helper.DSKeyRoot(key))) | 390 » rk := string(keyBytes(rds.WithContext, rds.KeyRoot(key))) |
392 | 391 |
393 td.Lock() | 392 td.Lock() |
394 defer td.Unlock() | 393 defer td.Unlock() |
395 | 394 |
396 if _, ok := td.muts[rk]; !ok { | 395 if _, ok := td.muts[rk]; !ok { |
397 limit := 1 | 396 limit := 1 |
398 if td.isXG { | 397 if td.isXG { |
399 limit = xgEGLimit | 398 limit = xgEGLimit |
400 } | 399 } |
401 if len(td.muts)+1 > limit { | 400 if len(td.muts)+1 > limit { |
402 msg := "cross-group transaction need to be explicitly sp
ecified (xg=True)" | 401 msg := "cross-group transaction need to be explicitly sp
ecified (xg=True)" |
403 if td.isXG { | 402 if td.isXG { |
404 msg = "operating on too many entity groups in a
single transaction" | 403 msg = "operating on too many entity groups in a
single transaction" |
405 } | 404 } |
406 return errors.New(msg) | 405 return errors.New(msg) |
407 } | 406 } |
408 td.muts[rk] = []txnMutation{} | 407 td.muts[rk] = []txnMutation{} |
409 } | 408 } |
410 if !getOnly { | 409 if !getOnly { |
411 td.muts[rk] = append(td.muts[rk], txnMutation{key, data}) | 410 td.muts[rk] = append(td.muts[rk], txnMutation{key, data}) |
412 } | 411 } |
413 | 412 |
414 return nil | 413 return nil |
415 } | 414 } |
416 | 415 |
417 func (td *txnDataStoreData) put(ns string, key gae.DSKey, pls gae.DSPropertyLoad
Saver) (gae.DSKey, error) { | 416 func (td *txnDataStoreData) put(ns string, key rds.Key, pls rds.PropertyLoadSave
r) (rds.Key, error) { |
418 » keys, errs := td.putMulti(ns, []gae.DSKey{key}, []gae.DSPropertyLoadSave
r{pls}) | 417 » keys, errs := td.putMulti(ns, []rds.Key{key}, []rds.PropertyLoadSaver{pl
s}) |
419 if errs == nil { | 418 if errs == nil { |
420 return keys[0], nil | 419 return keys[0], nil |
421 } | 420 } |
422 return nil, gae.SingleError(errs) | 421 return nil, gae.SingleError(errs) |
423 } | 422 } |
424 | 423 |
425 func (td *txnDataStoreData) putMulti(ns string, keys []gae.DSKey, plss []gae.DSP
ropertyLoadSaver) ([]gae.DSKey, error) { | 424 func (td *txnDataStoreData) putMulti(ns string, keys []rds.Key, plss []rds.Prope
rtyLoadSaver) ([]rds.Key, error) { |
426 pmaps, err := putMultiPrelim(ns, keys, plss) | 425 pmaps, err := putMultiPrelim(ns, keys, plss) |
427 if err != nil { | 426 if err != nil { |
428 return nil, err | 427 return nil, err |
429 } | 428 } |
430 | 429 |
431 » retKeys := make([]gae.DSKey, len(keys)) | 430 » retKeys := make([]rds.Key, len(keys)) |
432 lme := gae.LazyMultiError{Size: len(keys)} | 431 lme := gae.LazyMultiError{Size: len(keys)} |
433 for i, k := range keys { | 432 for i, k := range keys { |
434 func() { | 433 func() { |
435 td.parent.Lock() | 434 td.parent.Lock() |
436 defer td.parent.Unlock() | 435 defer td.parent.Unlock() |
437 _, k = td.parent.entsKeyLocked(k) | 436 _, k = td.parent.entsKeyLocked(k) |
438 }() | 437 }() |
439 lme.Assign(i, td.writeMutation(false, k, pmaps[i])) | 438 lme.Assign(i, td.writeMutation(false, k, pmaps[i])) |
440 retKeys[i] = k | 439 retKeys[i] = k |
441 } | 440 } |
442 | 441 |
443 return retKeys, lme.Get() | 442 return retKeys, lme.Get() |
444 } | 443 } |
445 | 444 |
446 func (td *txnDataStoreData) get(ns string, key gae.DSKey, pls gae.DSPropertyLoad
Saver) error { | 445 func (td *txnDataStoreData) get(ns string, key rds.Key, pls rds.PropertyLoadSave
r) error { |
447 » return gae.SingleError(td.getMulti(ns, []gae.DSKey{key}, []gae.DSPropert
yLoadSaver{pls})) | 446 » return gae.SingleError(td.getMulti(ns, []rds.Key{key}, []rds.PropertyLoa
dSaver{pls})) |
448 } | 447 } |
449 | 448 |
450 func (td *txnDataStoreData) getMulti(ns string, keys []gae.DSKey, plss []gae.DSP
ropertyLoadSaver) error { | 449 func (td *txnDataStoreData) getMulti(ns string, keys []rds.Key, plss []rds.Prope
rtyLoadSaver) error { |
451 return getMultiInner(ns, keys, plss, func() (*memCollection, error) { | 450 return getMultiInner(ns, keys, plss, func() (*memCollection, error) { |
452 lme := gae.LazyMultiError{Size: len(keys)} | 451 lme := gae.LazyMultiError{Size: len(keys)} |
453 for i, k := range keys { | 452 for i, k := range keys { |
454 lme.Assign(i, td.writeMutation(true, k, nil)) | 453 lme.Assign(i, td.writeMutation(true, k, nil)) |
455 } | 454 } |
456 return td.snap.GetCollection("ents:" + ns), lme.Get() | 455 return td.snap.GetCollection("ents:" + ns), lme.Get() |
457 }) | 456 }) |
458 } | 457 } |
459 | 458 |
460 func (td *txnDataStoreData) del(ns string, key gae.DSKey) error { | 459 func (td *txnDataStoreData) del(ns string, key rds.Key) error { |
461 » return gae.SingleError(td.delMulti(ns, []gae.DSKey{key})) | 460 » return gae.SingleError(td.delMulti(ns, []rds.Key{key})) |
462 } | 461 } |
463 | 462 |
464 func (td *txnDataStoreData) delMulti(ns string, keys []gae.DSKey) error { | 463 func (td *txnDataStoreData) delMulti(ns string, keys []rds.Key) error { |
465 lme := gae.LazyMultiError{Size: len(keys)} | 464 lme := gae.LazyMultiError{Size: len(keys)} |
466 for i, k := range keys { | 465 for i, k := range keys { |
467 » » if !helper.DSKeyValid(k, ns, false) { | 466 » » if !rds.KeyValid(k, ns, false) { |
468 » » » lme.Assign(i, gae.ErrDSInvalidKey) | 467 » » » lme.Assign(i, rds.ErrInvalidKey) |
469 } else { | 468 } else { |
470 lme.Assign(i, td.writeMutation(false, k, nil)) | 469 lme.Assign(i, td.writeMutation(false, k, nil)) |
471 } | 470 } |
472 } | 471 } |
473 return lme.Get() | 472 return lme.Get() |
474 } | 473 } |
475 | 474 |
476 func keyBytes(ctx helper.DSKeyContext, key gae.DSKey) []byte { | 475 func keyBytes(ctx rds.KeyContext, key rds.Key) []byte { |
477 buf := &bytes.Buffer{} | 476 buf := &bytes.Buffer{} |
478 » helper.WriteDSKey(buf, ctx, key) | 477 » rds.WriteKey(buf, ctx, key) |
479 return buf.Bytes() | 478 return buf.Bytes() |
480 } | 479 } |
481 | 480 |
482 func rpmWoCtx(data []byte, ns string) (gae.DSPropertyMap, error) { | 481 func rpmWoCtx(data []byte, ns string) (rds.PropertyMap, error) { |
483 » return helper.ReadDSPropertyMap(bytes.NewBuffer(data), helper.WithoutCon
text, globalAppID, ns) | 482 » return rds.ReadPropertyMap(bytes.NewBuffer(data), rds.WithoutContext, gl
obalAppID, ns) |
484 } | 483 } |
485 | 484 |
486 func rpm(data []byte) (gae.DSPropertyMap, error) { | 485 func rpm(data []byte) (rds.PropertyMap, error) { |
487 » return helper.ReadDSPropertyMap(bytes.NewBuffer(data), helper.WithContex
t, "", "") | 486 » return rds.ReadPropertyMap(bytes.NewBuffer(data), rds.WithContext, "", "
") |
488 } | 487 } |
489 | 488 |
490 func multiValid(keys []gae.DSKey, plss []gae.DSPropertyLoadSaver, ns string, pot
entialKey, allowSpecial bool) error { | 489 func multiValid(keys []rds.Key, plss []rds.PropertyLoadSaver, ns string, potenti
alKey, allowSpecial bool) error { |
491 » vfn := func(k gae.DSKey) bool { | 490 » vfn := func(k rds.Key) bool { |
492 » » return !helper.DSKeyIncomplete(k) && helper.DSKeyValid(k, ns, al
lowSpecial) | 491 » » return !rds.KeyIncomplete(k) && rds.KeyValid(k, ns, allowSpecial
) |
493 } | 492 } |
494 if potentialKey { | 493 if potentialKey { |
495 » » vfn = func(k gae.DSKey) bool { | 494 » » vfn = func(k rds.Key) bool { |
496 // adds an id to k if it's incomplete. | 495 // adds an id to k if it's incomplete. |
497 » » » if helper.DSKeyIncomplete(k) { | 496 » » » if rds.KeyIncomplete(k) { |
498 » » » » k = helper.NewDSKey(k.AppID(), k.Namespace(), k.
Kind(), "", 1, k.Parent()) | 497 » » » » k = rds.NewKey(k.AppID(), k.Namespace(), k.Kind(
), "", 1, k.Parent()) |
499 } | 498 } |
500 » » » return helper.DSKeyValid(k, ns, allowSpecial) | 499 » » » return rds.KeyValid(k, ns, allowSpecial) |
501 } | 500 } |
502 } | 501 } |
503 | 502 |
504 if keys == nil || plss == nil { | 503 if keys == nil || plss == nil { |
505 return errors.New("gae: key or plss slices were nil") | 504 return errors.New("gae: key or plss slices were nil") |
506 } | 505 } |
507 if len(keys) != len(plss) { | 506 if len(keys) != len(plss) { |
508 return errors.New("gae: key and dst slices have different length
") | 507 return errors.New("gae: key and dst slices have different length
") |
509 } | 508 } |
510 lme := gae.LazyMultiError{Size: len(keys)} | 509 lme := gae.LazyMultiError{Size: len(keys)} |
511 for i, k := range keys { | 510 for i, k := range keys { |
512 if !vfn(k) { | 511 if !vfn(k) { |
513 » » » lme.Assign(i, gae.ErrDSInvalidKey) | 512 » » » lme.Assign(i, rds.ErrInvalidKey) |
514 } | 513 } |
515 } | 514 } |
516 return lme.Get() | 515 return lme.Get() |
517 } | 516 } |
OLD | NEW |