Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(867)

Side by Side Diff: filter/txnBuf/ds_txn.go

Issue 1434873003: Fix races in txnBuf (Closed) Base URL: https://github.com/luci/gae.git@race_tests
Patch Set: fix stuff Created 5 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « filter/txnBuf/ds.go ('k') | filter/txnBuf/state.go » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 package txnBuf 5 package txnBuf
6 6
7 import ( 7 import (
8 ds "github.com/luci/gae/service/datastore" 8 ds "github.com/luci/gae/service/datastore"
9 "github.com/luci/luci-go/common/errors" 9 "github.com/luci/luci-go/common/errors"
10 "golang.org/x/net/context" 10 "golang.org/x/net/context"
11 ) 11 )
12 12
13 // ErrTransactionTooLarge is returned when applying an inner transaction would 13 // ErrTransactionTooLarge is returned when applying an inner transaction would
14 // cause an outer transaction to become too large. 14 // cause an outer transaction to become too large.
15 var ErrTransactionTooLarge = errors.New( 15 var ErrTransactionTooLarge = errors.New(
16 "applying the transaction would make the parent transaction too large") 16 "applying the transaction would make the parent transaction too large")
17 17
18 // ErrTooManyRoots is returned when executing an operation which would cause 18 // ErrTooManyRoots is returned when executing an operation which would cause
19 // the transaction to exceed it's allotted number of entity groups. 19 // the transaction to exceed it's allotted number of entity groups.
20 var ErrTooManyRoots = errors.New( 20 var ErrTooManyRoots = errors.New(
21 "operating on too many entity groups in nested transaction") 21 "operating on too many entity groups in nested transaction")
22 22
23 type dsTxnBuf struct { 23 type dsTxnBuf struct {
24 » ic context.Context 24 » ic context.Context
25 » state *txnBufState 25 » state *txnBufState
26 » haveLock bool
26 } 27 }
27 28
28 var _ ds.RawInterface = (*dsTxnBuf)(nil) 29 var _ ds.RawInterface = (*dsTxnBuf)(nil)
29 30
30 func (d *dsTxnBuf) DecodeCursor(s string) (ds.Cursor, error) { 31 func (d *dsTxnBuf) DecodeCursor(s string) (ds.Cursor, error) {
31 return d.state.parentDS.DecodeCursor(s) 32 return d.state.parentDS.DecodeCursor(s)
32 } 33 }
33 34
34 func (d *dsTxnBuf) AllocateIDs(incomplete *ds.Key, n int) (start int64, err erro r) { 35 func (d *dsTxnBuf) AllocateIDs(incomplete *ds.Key, n int) (start int64, err erro r) {
35 return d.state.parentDS.AllocateIDs(incomplete, n) 36 return d.state.parentDS.AllocateIDs(incomplete, n)
36 } 37 }
37 38
38 func (d *dsTxnBuf) GetMulti(keys []*ds.Key, metas ds.MultiMetaGetter, cb ds.GetM ultiCB) error { 39 func (d *dsTxnBuf) GetMulti(keys []*ds.Key, metas ds.MultiMetaGetter, cb ds.GetM ultiCB) error {
39 » data, err := d.state.getMulti(keys) 40 » return d.state.getMulti(keys, metas, cb, d.haveLock)
40 » if err != nil {
41 » » return err
42 » }
43
44 » idxMap := []int(nil)
45 » getKeys := []*ds.Key(nil)
46 » getMetas := ds.MultiMetaGetter(nil)
47 » lme := errors.NewLazyMultiError(len(keys))
48
49 » for i, itm := range data {
50 » » if !itm.buffered {
51 » » » idxMap = append(idxMap, i)
52 » » » getKeys = append(getKeys, itm.key)
53 » » » getMetas = append(getMetas, metas.GetSingle(i))
54 » » }
55 » }
56
57 » if len(idxMap) > 0 {
58 » » j := 0
59 » » err := d.state.parentDS.GetMulti(getKeys, getMetas, func(pm ds.P ropertyMap, err error) {
60 » » » if err != ds.ErrNoSuchEntity {
61 » » » » i := idxMap[j]
62 » » » » if !lme.Assign(i, err) {
63 » » » » » data[i].data = pm
64 » » » » }
65 » » » }
66 » » » j++
67 » » })
68 » » if err != nil {
69 » » » return err
70 » » }
71 » }
72
73 » for i, itm := range data {
74 » » err := lme.GetOne(i)
75 » » if err != nil {
76 » » » cb(nil, err)
77 » » } else if itm.data == nil {
78 » » » cb(nil, ds.ErrNoSuchEntity)
79 » » } else {
80 » » » cb(itm.data, nil)
81 » » }
82 » }
83 » return nil
84 } 41 }
85 42
86 func (d *dsTxnBuf) PutMulti(keys []*ds.Key, vals []ds.PropertyMap, cb ds.PutMult iCB) error { 43 func (d *dsTxnBuf) PutMulti(keys []*ds.Key, vals []ds.PropertyMap, cb ds.PutMult iCB) error {
87 » lme := errors.NewLazyMultiError(len(keys)) 44 » return d.state.putMulti(keys, vals, cb, d.haveLock)
88 » realKeys := []*ds.Key(nil)
89 » for i, key := range keys {
90 » » if key.Incomplete() {
91 » » » start, err := d.AllocateIDs(key, 1)
92 » » » if !lme.Assign(i, err) {
93 » » » » if realKeys == nil {
94 » » » » » realKeys = make([]*ds.Key, len(keys))
95 » » » » » copy(realKeys, keys)
96 » » » » }
97
98 » » » » aid, ns, toks := key.Split()
99 » » » » toks[len(toks)-1].IntID = start
100 » » » » realKeys[i] = ds.NewKeyToks(aid, ns, toks)
101 » » » }
102 » » }
103 » }
104 » if err := lme.Get(); err != nil {
105 » » for _, e := range err.(errors.MultiError) {
106 » » » if e == nil {
107 » » » » e = errors.New("putMulti failed because some key s were unable to AllocateIDs")
108 » » » }
109 » » » cb(nil, e)
110 » » }
111 » » return nil
112 » }
113
114 » if realKeys == nil {
115 » » realKeys = keys
116 » }
117
118 » err := d.state.putMulti(realKeys, vals)
119 » if err != nil {
120 » » return err
121 » }
122
123 » for _, k := range realKeys {
124 » » cb(k, nil)
125 » }
126 » return nil
127 } 45 }
128 46
129 func (d *dsTxnBuf) DeleteMulti(keys []*ds.Key, cb ds.DeleteMultiCB) error { 47 func (d *dsTxnBuf) DeleteMulti(keys []*ds.Key, cb ds.DeleteMultiCB) error {
130 » if err := d.state.deleteMulti(keys); err != nil { 48 » return d.state.deleteMulti(keys, cb, d.haveLock)
131 » » return err
132 » }
133
134 » for range keys {
135 » » cb(nil)
136 » }
137 » return nil
138 } 49 }
139 50
140 func (d *dsTxnBuf) Count(fq *ds.FinalizedQuery) (count int64, err error) { 51 func (d *dsTxnBuf) Count(fq *ds.FinalizedQuery) (count int64, err error) {
141 // Unfortunately there's no fast-path here. We literally have to run the 52 // Unfortunately there's no fast-path here. We literally have to run the
142 // query and count. Fortunately we can optimize to count keys if it's no t 53 // query and count. Fortunately we can optimize to count keys if it's no t
143 // a projection query. This will save on bandwidth a bit. 54 // a projection query. This will save on bandwidth a bit.
144 if len(fq.Project()) == 0 && !fq.KeysOnly() { 55 if len(fq.Project()) == 0 && !fq.KeysOnly() {
145 fq, err = fq.Original().KeysOnly(true).Finalize() 56 fq, err = fq.Original().KeysOnly(true).Finalize()
146 if err != nil { 57 if err != nil {
147 return 58 return
(...skipping 10 matching lines...) Expand all
158 if start, end := fq.Bounds(); start != nil || end != nil { 69 if start, end := fq.Bounds(); start != nil || end != nil {
159 return errors.New("txnBuf filter does not support query cursors" ) 70 return errors.New("txnBuf filter does not support query cursors" )
160 } 71 }
161 72
162 limit, limitSet := fq.Limit() 73 limit, limitSet := fq.Limit()
163 offset, _ := fq.Offset() 74 offset, _ := fq.Offset()
164 keysOnly := fq.KeysOnly() 75 keysOnly := fq.KeysOnly()
165 76
166 project := fq.Project() 77 project := fq.Project()
167 78
168 » d.state.Lock() 79 » bufDS, parentDS, sizes := func() (ds.RawInterface, ds.RawInterface, *siz eTracker) {
169 » memDS := d.state.memDS 80 » » if !d.haveLock {
170 » parentDS := d.state.parentDS 81 » » » d.state.Lock()
171 » sizes := d.state.entState.dup() 82 » » » defer d.state.Unlock()
172 » d.state.Unlock() 83 » » }
84 » » return d.state.bufDS, d.state.parentDS, d.state.entState.dup()
85 » }()
173 86
174 » return runMergedQueries(fq, sizes, memDS, parentDS, func(key *ds.Key, da ta ds.PropertyMap) bool { 87 » return runMergedQueries(fq, sizes, bufDS, parentDS, func(key *ds.Key, da ta ds.PropertyMap) bool {
175 if offset > 0 { 88 if offset > 0 {
176 offset-- 89 offset--
177 return true 90 return true
178 } 91 }
179 if limitSet { 92 if limitSet {
180 if limit == 0 { 93 if limit == 0 {
181 return false 94 return false
182 } 95 }
183 limit-- 96 limit--
184 } 97 }
185 if keysOnly { 98 if keysOnly {
186 data = nil 99 data = nil
187 } else if len(project) > 0 { 100 } else if len(project) > 0 {
188 newData := make(ds.PropertyMap, len(project)) 101 newData := make(ds.PropertyMap, len(project))
189 for _, p := range project { 102 for _, p := range project {
190 newData[p] = data[p] 103 newData[p] = data[p]
191 } 104 }
192 data = newData 105 data = newData
193 } 106 }
194 return cb(key, data, nil) 107 return cb(key, data, nil)
195 }) 108 })
196 } 109 }
197 110
198 func (d *dsTxnBuf) RunInTransaction(cb func(context.Context) error, opts *ds.Tra nsactionOptions) error { 111 func (d *dsTxnBuf) RunInTransaction(cb func(context.Context) error, opts *ds.Tra nsactionOptions) error {
112 if !d.haveLock {
113 d.state.Lock()
114 defer d.state.Unlock()
115 }
199 return withTxnBuf(d.ic, cb, opts) 116 return withTxnBuf(d.ic, cb, opts)
200 } 117 }
201 118
202 func (d *dsTxnBuf) Testable() ds.Testable { 119 func (d *dsTxnBuf) Testable() ds.Testable {
203 return d.state.parentDS.Testable() 120 return d.state.parentDS.Testable()
204 } 121 }
OLDNEW
« no previous file with comments | « filter/txnBuf/ds.go ('k') | filter/txnBuf/state.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698