OLD | NEW |
---|---|
(Empty) | |
1 // Copyright 2015 The Chromium Authors. All rights reserved. | |
2 // Use of this source code is governed by a BSD-style license that can be | |
3 // found in the LICENSE file. | |
4 | |
5 package txnBuf | |
6 | |
7 import ( | |
8 ds "github.com/luci/gae/service/datastore" | |
9 "github.com/luci/luci-go/common/errors" | |
10 "golang.org/x/net/context" | |
11 ) | |
12 | |
13 // ErrTransactionTooLarge is returned when applying an inner transaction would | |
14 // cause an outer transaction to become too large. | |
15 var ErrTransactionTooLarge = errors.New( | |
16 "applying the transaction would make the parent transaction too large") | |
17 | |
18 type dsTxnBuf struct { | |
19 ic context.Context | |
20 state *txnBufState | |
21 } | |
22 | |
23 var _ ds.RawInterface = (*dsTxnBuf)(nil) | |
24 | |
25 func (d *dsTxnBuf) DecodeCursor(s string) (ds.Cursor, error) { | |
26 return d.state.parentDS.DecodeCursor(s) | |
27 } | |
28 | |
29 func (d *dsTxnBuf) AllocateIDs(incomplete *ds.Key, n int) (start int64, err erro r) { | |
30 return d.state.parentDS.AllocateIDs(incomplete, n) | |
31 } | |
32 | |
33 func (d *dsTxnBuf) GetMulti(keys []*ds.Key, metas ds.MultiMetaGetter, cb ds.GetM ultiCB) error { | |
34 data, err := d.state.getMulti(keys) | |
35 if err != nil { | |
36 return err | |
37 } | |
38 | |
39 idxMap := []int(nil) | |
40 getKeys := []*ds.Key(nil) | |
41 getMetas := ds.MultiMetaGetter(nil) | |
42 lme := errors.NewLazyMultiError(len(keys)) | |
43 | |
44 for i, itm := range data { | |
45 if !itm.buffered { | |
46 idxMap = append(idxMap, i) | |
47 getKeys = append(getKeys, itm.key) | |
48 getMetas = append(getMetas, metas.GetSingle(i)) | |
49 } | |
50 } | |
51 | |
52 if len(idxMap) > 0 { | |
53 j := 0 | |
54 err := d.state.parentDS.GetMulti(getKeys, getMetas, func(pm ds.P ropertyMap, err error) { | |
55 if err != ds.ErrNoSuchEntity { | |
56 i := idxMap[j] | |
57 if !lme.Assign(i, err) { | |
58 data[i].key = keys[j] | |
Vadim Sh.
2015/09/30 01:11:28
are you sure it shoud be 'j' here and not 'i'? Or
iannucci
2015/09/30 02:00:10
er, actually this line needs to be removed.
| |
59 data[i].data = pm | |
60 } | |
61 } | |
62 j++ | |
63 }) | |
64 if err != nil { | |
65 return err | |
66 } | |
67 } | |
68 | |
69 for i, itm := range data { | |
70 err := lme.GetOne(i) | |
71 if err != nil { | |
72 cb(nil, err) | |
73 } else if itm.data == nil { | |
74 cb(nil, ds.ErrNoSuchEntity) | |
75 } else { | |
76 cb(itm.data, nil) | |
77 } | |
78 } | |
79 return nil | |
80 } | |
81 | |
82 func (d *dsTxnBuf) PutMulti(keys []*ds.Key, vals []ds.PropertyMap, cb ds.PutMult iCB) error { | |
83 lme := errors.NewLazyMultiError(len(keys)) | |
84 realKeys := []*ds.Key(nil) | |
85 for i, key := range keys { | |
86 if key.Incomplete() { | |
87 start, err := d.AllocateIDs(key, 1) | |
88 if !lme.Assign(i, err) { | |
89 if realKeys == nil { | |
90 realKeys = make([]*ds.Key, len(keys)) | |
91 copy(realKeys, keys) | |
92 } | |
93 | |
94 aid, ns, toks := key.Split() | |
95 toks[len(toks)-1].IntID = start | |
96 realKeys[i] = ds.NewKeyToks(aid, ns, toks) | |
97 } | |
98 } | |
99 } | |
100 if err := lme.Get(); err != nil { | |
101 for _, e := range err.(errors.MultiError) { | |
102 cb(nil, e) | |
Vadim Sh.
2015/09/30 01:11:28
what does cb(nil, nil) mean for Put? It's how it's
iannucci
2015/09/30 02:00:10
Hm... good question. I should stub in some other e
| |
103 } | |
104 return nil | |
105 } | |
106 | |
107 if realKeys == nil { | |
108 realKeys = keys | |
109 } | |
110 | |
111 err := d.state.putMulti(realKeys, vals) | |
112 if err != nil { | |
113 return err | |
114 } | |
115 | |
116 for _, k := range realKeys { | |
117 cb(k, nil) | |
118 } | |
119 return nil | |
120 } | |
121 | |
122 func (d *dsTxnBuf) DeleteMulti(keys []*ds.Key, cb ds.DeleteMultiCB) error { | |
123 err := d.state.deleteMulti(keys) | |
124 if err != nil { | |
125 return err | |
126 } | |
127 | |
128 for range keys { | |
129 cb(nil) | |
130 } | |
131 return nil | |
132 } | |
133 | |
134 func (d *dsTxnBuf) Count(fq *ds.FinalizedQuery) (count int64, err error) { | |
135 // Unfortunately there's no fast-path here. We literally have to run the | |
136 // query and count. Fortunately we can optimize to count keys if it's no t | |
137 // a projection query. This will save on bandwidth a bit. | |
138 if len(fq.Project()) == 0 && !fq.KeysOnly() { | |
Vadim Sh.
2015/09/30 01:11:28
what if fq uses 'limit' or 'skip' or cursor? Worth
iannucci
2015/09/30 02:00:10
There is a check for cursors in Run, and runMerged
| |
139 fq, err = fq.Original().KeysOnly(true).Finalize() | |
140 if err != nil { | |
141 return | |
142 } | |
143 } | |
144 err = d.Run(fq, func(_ *ds.Key, _ ds.PropertyMap, _ ds.CursorCB) bool { | |
145 count++ | |
146 return true | |
147 }) | |
148 return | |
149 } | |
150 | |
151 func (d *dsTxnBuf) Run(fq *ds.FinalizedQuery, cb ds.RawRunCB) error { | |
152 if start, end := fq.Bounds(); start != nil || end != nil { | |
153 return errors.New("txnBuf filter does not support query cursors" ) | |
154 } | |
155 | |
156 limit, limitSet := fq.Limit() | |
157 offset, _ := fq.Offset() | |
158 keysOnly := fq.KeysOnly() | |
159 | |
160 project := fq.Project() | |
161 needSlimming := len(project) > 0 | |
Vadim Sh.
2015/09/30 01:11:28
nit: just use len(project) > 0 inline. It is using
iannucci
2015/09/30 02:00:10
I guess I was trying to make it more readable... :
Vadim Sh.
2015/09/30 02:09:00
Yes. But "slimming" is a peculiar choice of verb f
| |
162 | |
163 d.state.Lock() | |
164 defer d.state.Unlock() | |
165 | |
166 return runMergedQueries(fq, d.state, func(key *ds.Key, data ds.PropertyM ap) bool { | |
167 if offset > 0 { | |
168 offset-- | |
169 return true | |
170 } | |
171 if limitSet { | |
172 if limit == 0 { | |
173 return false | |
174 } | |
175 limit-- | |
176 } | |
177 if keysOnly { | |
178 data = nil | |
179 } else if needSlimming { | |
180 newData := make(ds.PropertyMap, len(project)) | |
181 for _, p := range project { | |
182 newData[p] = data[p] | |
183 } | |
184 data = newData | |
185 } | |
186 return cb(key, data, nil) | |
187 }) | |
188 } | |
189 | |
190 func (d *dsTxnBuf) RunInTransaction(cb func(context.Context) error, opts *ds.Tra nsactionOptions) error { | |
191 return withTxnBuf(d.ic, cb, opts) | |
192 } | |
193 | |
194 func (d *dsTxnBuf) Testable() ds.Testable { | |
195 return d.state.parentDS.Testable() | |
196 } | |
OLD | NEW |