OLD | NEW |
---|---|
1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 package memory | 5 package memory |
6 | 6 |
7 import ( | 7 import ( |
8 "bytes" | 8 "bytes" |
9 "fmt" | 9 "fmt" |
10 "sync" | 10 "sync" |
11 "sync/atomic" | 11 "sync/atomic" |
12 | 12 |
13 ds "github.com/luci/gae/service/datastore" | 13 ds "github.com/luci/gae/service/datastore" |
14 "github.com/luci/gae/service/datastore/serialize" | 14 "github.com/luci/gae/service/datastore/serialize" |
15 "github.com/luci/luci-go/common/errors" | 15 "github.com/luci/luci-go/common/errors" |
16 "golang.org/x/net/context" | 16 "golang.org/x/net/context" |
17 ) | 17 ) |
18 | 18 |
19 //////////////////////////////// dataStoreData ///////////////////////////////// | 19 //////////////////////////////// dataStoreData ///////////////////////////////// |
20 | 20 |
21 type dataStoreData struct { | 21 type dataStoreData struct { |
22 rwlock sync.RWMutex | 22 rwlock sync.RWMutex |
23 // See README.md for head schema. | 23 // See README.md for head schema. |
24 head *memStore | 24 head *memStore |
25 snap *memStore | 25 snap *memStore |
26 // For testing, see SetTransactionRetryCount. | 26 // For testing, see SetTransactionRetryCount. |
27 txnFakeRetry int | 27 txnFakeRetry int |
28 // true means that head always == snap | |
29 consistent bool | |
28 } | 30 } |
29 | 31 |
30 var ( | 32 var ( |
31 _ = memContextObj((*dataStoreData)(nil)) | 33 _ = memContextObj((*dataStoreData)(nil)) |
32 _ = sync.Locker((*dataStoreData)(nil)) | 34 _ = sync.Locker((*dataStoreData)(nil)) |
33 ) | 35 ) |
34 | 36 |
35 func newDataStoreData() *dataStoreData { | 37 func newDataStoreData() *dataStoreData { |
36 head := newMemStore() | 38 head := newMemStore() |
37 return &dataStoreData{ | 39 return &dataStoreData{ |
38 head: head, | 40 head: head, |
39 snap: head.Snapshot(), // empty but better than a nil pointer. | 41 snap: head.Snapshot(), // empty but better than a nil pointer. |
40 } | 42 } |
41 } | 43 } |
42 | 44 |
43 func (d *dataStoreData) Lock() { | 45 func (d *dataStoreData) Lock() { |
44 d.rwlock.Lock() | 46 d.rwlock.Lock() |
45 } | 47 } |
46 | 48 |
47 func (d *dataStoreData) Unlock() { | 49 func (d *dataStoreData) Unlock() { |
48 d.rwlock.Unlock() | 50 d.rwlock.Unlock() |
49 } | 51 } |
50 | 52 |
53 func (d *dataStoreData) setTxnRetry(count int) { | |
54 d.Lock() | |
Vadim Sh.
2015/09/24 18:40:11
oops.. didn't know it's under lock
iannucci
2015/09/24 18:59:30
is ok :)
| |
55 defer d.Unlock() | |
56 d.txnFakeRetry = count | |
57 } | |
58 | |
59 func (d *dataStoreData) setConsistent(always bool) { | |
60 d.Lock() | |
61 defer d.Unlock() | |
62 | |
63 d.consistent = always | |
64 if d.consistent { | |
65 d.snap = d.head.Snapshot() | |
66 } | |
67 } | |
68 | |
51 func (d *dataStoreData) getQuerySnaps(consistent bool) (idx, head *memStore) { | 69 func (d *dataStoreData) getQuerySnaps(consistent bool) (idx, head *memStore) { |
52 d.rwlock.RLock() | 70 d.rwlock.RLock() |
53 defer d.rwlock.RUnlock() | 71 defer d.rwlock.RUnlock() |
72 if d.consistent { | |
73 // snap is already a consistent snapshot of head | |
74 return d.snap, d.snap | |
75 } | |
76 | |
54 head = d.head.Snapshot() | 77 head = d.head.Snapshot() |
55 if consistent { | 78 if consistent { |
56 idx = head | 79 idx = head |
57 } else { | 80 } else { |
58 idx = d.snap | 81 idx = d.snap |
59 } | 82 } |
60 return | 83 return |
61 } | 84 } |
62 | 85 |
63 func (d *dataStoreData) takeSnapshot() *memStore { | 86 func (d *dataStoreData) takeSnapshot() *memStore { |
64 d.rwlock.RLock() | 87 d.rwlock.RLock() |
65 defer d.rwlock.RUnlock() | 88 defer d.rwlock.RUnlock() |
89 if d.consistent { | |
90 return d.snap | |
91 } | |
66 return d.head.Snapshot() | 92 return d.head.Snapshot() |
67 } | 93 } |
68 | 94 |
69 func (d *dataStoreData) setSnapshot(snap *memStore) { | 95 func (d *dataStoreData) setSnapshot(snap *memStore) { |
70 d.rwlock.Lock() | 96 d.rwlock.Lock() |
71 defer d.rwlock.Unlock() | 97 defer d.rwlock.Unlock() |
98 if d.consistent { | |
99 return | |
100 } | |
72 d.snap = snap | 101 d.snap = snap |
73 } | 102 } |
74 | 103 |
75 func (d *dataStoreData) catchupIndexes() { | 104 func (d *dataStoreData) catchupIndexes() { |
76 d.rwlock.Lock() | 105 d.rwlock.Lock() |
77 defer d.rwlock.Unlock() | 106 defer d.rwlock.Unlock() |
107 if d.consistent { | |
108 return | |
109 } | |
78 d.snap = d.head.Snapshot() | 110 d.snap = d.head.Snapshot() |
79 } | 111 } |
80 | 112 |
81 /////////////////////////// indexes(dataStoreData) //////////////////////////// | 113 /////////////////////////// indexes(dataStoreData) //////////////////////////// |
82 | 114 |
83 func groupMetaKey(key *ds.Key) []byte { | 115 func groupMetaKey(key *ds.Key) []byte { |
84 return keyBytes(ds.NewKey("", "", "__entity_group__", "", 1, key.Root()) ) | 116 return keyBytes(ds.NewKey("", "", "__entity_group__", "", 1, key.Root()) ) |
85 } | 117 } |
86 | 118 |
87 func groupIDsKey(key *ds.Key) []byte { | 119 func groupIDsKey(key *ds.Key) []byte { |
(...skipping 64 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
152 | 184 |
153 func (d *dataStoreData) fixKeyLocked(ents *memCollection, key *ds.Key) *ds.Key { | 185 func (d *dataStoreData) fixKeyLocked(ents *memCollection, key *ds.Key) *ds.Key { |
154 if key.Incomplete() { | 186 if key.Incomplete() { |
155 id := d.allocateIDsLocked(ents, key, 1) | 187 id := d.allocateIDsLocked(ents, key, 1) |
156 key = ds.NewKey(key.AppID(), key.Namespace(), key.Kind(), "", id , key.Parent()) | 188 key = ds.NewKey(key.AppID(), key.Namespace(), key.Kind(), "", id , key.Parent()) |
157 } | 189 } |
158 return key | 190 return key |
159 } | 191 } |
160 | 192 |
161 func (d *dataStoreData) putMulti(keys []*ds.Key, vals []ds.PropertyMap, cb ds.Pu tMultiCB) { | 193 func (d *dataStoreData) putMulti(keys []*ds.Key, vals []ds.PropertyMap, cb ds.Pu tMultiCB) { |
162 » ents := d.mutableEnts(keys[0].Namespace()) | 194 » ns := keys[0].Namespace() |
195 » ents := d.mutableEnts(ns) | |
163 | 196 |
164 for i, k := range keys { | 197 for i, k := range keys { |
165 pmap, _ := vals[i].Save(false) | 198 pmap, _ := vals[i].Save(false) |
166 dataBytes := serialize.ToBytes(pmap) | 199 dataBytes := serialize.ToBytes(pmap) |
167 | 200 |
168 k, err := func() (ret *ds.Key, err error) { | 201 k, err := func() (ret *ds.Key, err error) { |
169 d.Lock() | 202 d.Lock() |
170 defer d.Unlock() | 203 defer d.Unlock() |
171 | 204 |
172 ret = d.fixKeyLocked(ents, k) | 205 ret = d.fixKeyLocked(ents, k) |
173 incrementLocked(ents, groupMetaKey(ret), 1) | 206 incrementLocked(ents, groupMetaKey(ret), 1) |
174 | 207 |
175 old := ents.Get(keyBytes(ret)) | 208 old := ents.Get(keyBytes(ret)) |
176 oldPM := ds.PropertyMap(nil) | 209 oldPM := ds.PropertyMap(nil) |
177 if old != nil { | 210 if old != nil { |
178 » » » » if oldPM, err = rpmWoCtx(old, ret.Namespace()); err != nil { | 211 » » » » if oldPM, err = rpmWoCtx(old, ns); err != nil { |
179 return | 212 return |
180 } | 213 } |
181 } | 214 } |
182 updateIndexes(d.head, ret, oldPM, pmap) | 215 updateIndexes(d.head, ret, oldPM, pmap) |
183 ents.Set(keyBytes(ret), dataBytes) | 216 ents.Set(keyBytes(ret), dataBytes) |
217 if d.consistent { | |
218 d.snap = d.head.Snapshot() | |
219 } | |
184 return | 220 return |
185 }() | 221 }() |
186 if cb != nil { | 222 if cb != nil { |
187 cb(k, err) | 223 cb(k, err) |
188 } | 224 } |
189 } | 225 } |
190 } | 226 } |
191 | 227 |
192 func getMultiInner(keys []*ds.Key, cb ds.GetMultiCB, getColl func() (*memCollect ion, error)) error { | 228 func getMultiInner(keys []*ds.Key, cb ds.GetMultiCB, getColl func() (*memCollect ion, error)) error { |
193 ents, err := getColl() | 229 ents, err := getColl() |
(...skipping 20 matching lines...) Expand all Loading... | |
214 | 250 |
215 func (d *dataStoreData) getMulti(keys []*ds.Key, cb ds.GetMultiCB) error { | 251 func (d *dataStoreData) getMulti(keys []*ds.Key, cb ds.GetMultiCB) error { |
216 return getMultiInner(keys, cb, func() (*memCollection, error) { | 252 return getMultiInner(keys, cb, func() (*memCollection, error) { |
217 s := d.takeSnapshot() | 253 s := d.takeSnapshot() |
218 | 254 |
219 return s.GetCollection("ents:" + keys[0].Namespace()), nil | 255 return s.GetCollection("ents:" + keys[0].Namespace()), nil |
220 }) | 256 }) |
221 } | 257 } |
222 | 258 |
223 func (d *dataStoreData) delMulti(keys []*ds.Key, cb ds.DeleteMultiCB) { | 259 func (d *dataStoreData) delMulti(keys []*ds.Key, cb ds.DeleteMultiCB) { |
224 toDel := make([][]byte, 0, len(keys)) | |
225 for _, k := range keys { | |
226 toDel = append(toDel, keyBytes(k)) | |
227 } | |
228 ns := keys[0].Namespace() | 260 ns := keys[0].Namespace() |
261 ents := d.mutableEnts(ns) | |
229 | 262 |
230 » d.rwlock.Lock() | 263 » if ents != nil { |
231 » defer d.rwlock.Unlock() | 264 » » for _, k := range keys { |
265 » » » err := func() error { | |
266 » » » » kb := keyBytes(k) | |
232 | 267 |
233 » ents := d.head.GetCollection("ents:" + ns) | 268 » » » » d.Lock() |
269 » » » » defer d.Unlock() | |
234 | 270 |
235 » for i, k := range keys { | 271 » » » » incrementLocked(ents, groupMetaKey(k), 1) |
236 » » if ents != nil { | 272 » » » » if old := ents.Get(kb); old != nil { |
237 » » » incrementLocked(ents, groupMetaKey(k), 1) | 273 » » » » » oldPM, err := rpmWoCtx(old, ns) |
238 » » » kb := toDel[i] | 274 » » » » » if err != nil { |
239 » » » if old := ents.Get(kb); old != nil { | 275 » » » » » » return err |
240 » » » » oldPM, err := rpmWoCtx(old, ns) | |
241 » » » » if err != nil { | |
242 » » » » » if cb != nil { | |
243 » » » » » » cb(err) | |
244 } | 276 } |
245 » » » » » continue | 277 » » » » » updateIndexes(d.head, k, oldPM, nil) |
278 » » » » » ents.Delete(kb) | |
279 » » » » » if d.consistent { | |
280 » » » » » » d.snap = d.head.Snapshot() | |
281 » » » » » } | |
246 } | 282 } |
247 » » » » updateIndexes(d.head, k, oldPM, nil) | 283 » » » » return nil |
248 » » » » ents.Delete(kb) | 284 » » » }() |
285 » » » if cb != nil { | |
Vadim Sh.
2015/09/24 18:40:11
I think you need to call cb() len(keys) number of
iannucci
2015/09/24 18:59:30
oh, good catch.
| |
286 » » » » cb(err) | |
249 } | 287 } |
250 } | 288 } |
251 if cb != nil { | |
252 cb(nil) | |
253 } | |
254 } | 289 } |
255 } | 290 } |
256 | 291 |
257 func (d *dataStoreData) canApplyTxn(obj memContextObj) bool { | 292 func (d *dataStoreData) canApplyTxn(obj memContextObj) bool { |
258 // TODO(riannucci): implement with Flush/FlushRevert for persistance. | 293 // TODO(riannucci): implement with Flush/FlushRevert for persistance. |
259 | 294 |
260 txn := obj.(*txnDataStoreData) | 295 txn := obj.(*txnDataStoreData) |
261 for rk, muts := range txn.muts { | 296 for rk, muts := range txn.muts { |
262 if len(muts) == 0 { // read-only | 297 if len(muts) == 0 { // read-only |
263 continue | 298 continue |
(...skipping 186 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
450 | 485 |
451 func rpmWoCtx(data []byte, ns string) (ds.PropertyMap, error) { | 486 func rpmWoCtx(data []byte, ns string) (ds.PropertyMap, error) { |
452 return serialize.ReadPropertyMap(bytes.NewBuffer(data), | 487 return serialize.ReadPropertyMap(bytes.NewBuffer(data), |
453 serialize.WithoutContext, globalAppID, ns) | 488 serialize.WithoutContext, globalAppID, ns) |
454 } | 489 } |
455 | 490 |
456 func rpm(data []byte) (ds.PropertyMap, error) { | 491 func rpm(data []byte) (ds.PropertyMap, error) { |
457 return serialize.ReadPropertyMap(bytes.NewBuffer(data), | 492 return serialize.ReadPropertyMap(bytes.NewBuffer(data), |
458 serialize.WithContext, "", "") | 493 serialize.WithContext, "", "") |
459 } | 494 } |
OLD | NEW |