Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(54)

Side by Side Diff: filter/txnBuf/ds_txn.go

Issue 1309803004: Add transaction buffer filter. (Closed) Base URL: https://github.com/luci/gae.git@add_query_support
Patch Set: rebase Created 5 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 package txnBuf
6
7 import (
8 ds "github.com/luci/gae/service/datastore"
9 "github.com/luci/luci-go/common/errors"
10 "golang.org/x/net/context"
11 )
12
13 // ErrTransactionTooLarge is returned when applying an inner transaction would
14 // cause an outer transaction to become too large.
15 var ErrTransactionTooLarge = errors.New(
16 "applying the transaction would make the parent transaction too large")
17
18 type dsTxnBuf struct {
19 ic context.Context
20 state *txnBufState
21 }
22
23 var _ ds.RawInterface = (*dsTxnBuf)(nil)
24
25 func (d *dsTxnBuf) DecodeCursor(s string) (ds.Cursor, error) {
26 return d.state.parentDS.DecodeCursor(s)
27 }
28
29 func (d *dsTxnBuf) AllocateIDs(incomplete *ds.Key, n int) (start int64, err erro r) {
30 return d.state.parentDS.AllocateIDs(incomplete, n)
31 }
32
33 func (d *dsTxnBuf) GetMulti(keys []*ds.Key, metas ds.MultiMetaGetter, cb ds.GetM ultiCB) error {
34 data, err := d.state.getMulti(keys)
35 if err != nil {
36 return err
37 }
38
39 idxMap := []int(nil)
40 getKeys := []*ds.Key(nil)
41 getMetas := ds.MultiMetaGetter(nil)
42 lme := errors.NewLazyMultiError(len(keys))
43
44 for i, itm := range data {
45 if !itm.buffered {
46 idxMap = append(idxMap, i)
47 getKeys = append(getKeys, itm.key)
48 getMetas = append(getMetas, metas.GetSingle(i))
49 }
50 }
51
52 if len(idxMap) > 0 {
53 j := 0
54 err := d.state.parentDS.GetMulti(getKeys, getMetas, func(pm ds.P ropertyMap, err error) {
55 if err != ds.ErrNoSuchEntity {
56 i := idxMap[j]
57 if !lme.Assign(i, err) {
58 data[i].key = keys[j]
Vadim Sh. 2015/09/30 01:11:28 are you sure it shoud be 'j' here and not 'i'? Or
iannucci 2015/09/30 02:00:10 er, actually this line needs to be removed.
59 data[i].data = pm
60 }
61 }
62 j++
63 })
64 if err != nil {
65 return err
66 }
67 }
68
69 for i, itm := range data {
70 err := lme.GetOne(i)
71 if err != nil {
72 cb(nil, err)
73 } else if itm.data == nil {
74 cb(nil, ds.ErrNoSuchEntity)
75 } else {
76 cb(itm.data, nil)
77 }
78 }
79 return nil
80 }
81
82 func (d *dsTxnBuf) PutMulti(keys []*ds.Key, vals []ds.PropertyMap, cb ds.PutMult iCB) error {
83 lme := errors.NewLazyMultiError(len(keys))
84 realKeys := []*ds.Key(nil)
85 for i, key := range keys {
86 if key.Incomplete() {
87 start, err := d.AllocateIDs(key, 1)
88 if !lme.Assign(i, err) {
89 if realKeys == nil {
90 realKeys = make([]*ds.Key, len(keys))
91 copy(realKeys, keys)
92 }
93
94 aid, ns, toks := key.Split()
95 toks[len(toks)-1].IntID = start
96 realKeys[i] = ds.NewKeyToks(aid, ns, toks)
97 }
98 }
99 }
100 if err := lme.Get(); err != nil {
101 for _, e := range err.(errors.MultiError) {
102 cb(nil, e)
Vadim Sh. 2015/09/30 01:11:28 what does cb(nil, nil) mean for Put? It's how it's
iannucci 2015/09/30 02:00:10 Hm... good question. I should stub in some other e
103 }
104 return nil
105 }
106
107 if realKeys == nil {
108 realKeys = keys
109 }
110
111 err := d.state.putMulti(realKeys, vals)
112 if err != nil {
113 return err
114 }
115
116 for _, k := range realKeys {
117 cb(k, nil)
118 }
119 return nil
120 }
121
122 func (d *dsTxnBuf) DeleteMulti(keys []*ds.Key, cb ds.DeleteMultiCB) error {
123 err := d.state.deleteMulti(keys)
124 if err != nil {
125 return err
126 }
127
128 for range keys {
129 cb(nil)
130 }
131 return nil
132 }
133
134 func (d *dsTxnBuf) Count(fq *ds.FinalizedQuery) (count int64, err error) {
135 // Unfortunately there's no fast-path here. We literally have to run the
136 // query and count. Fortunately we can optimize to count keys if it's no t
137 // a projection query. This will save on bandwidth a bit.
138 if len(fq.Project()) == 0 && !fq.KeysOnly() {
Vadim Sh. 2015/09/30 01:11:28 what if fq uses 'limit' or 'skip' or cursor? Worth
iannucci 2015/09/30 02:00:10 There is a check for cursors in Run, and runMerged
139 fq, err = fq.Original().KeysOnly(true).Finalize()
140 if err != nil {
141 return
142 }
143 }
144 err = d.Run(fq, func(_ *ds.Key, _ ds.PropertyMap, _ ds.CursorCB) bool {
145 count++
146 return true
147 })
148 return
149 }
150
151 func (d *dsTxnBuf) Run(fq *ds.FinalizedQuery, cb ds.RawRunCB) error {
152 if start, end := fq.Bounds(); start != nil || end != nil {
153 return errors.New("txnBuf filter does not support query cursors" )
154 }
155
156 limit, limitSet := fq.Limit()
157 offset, _ := fq.Offset()
158 keysOnly := fq.KeysOnly()
159
160 project := fq.Project()
161 needSlimming := len(project) > 0
Vadim Sh. 2015/09/30 01:11:28 nit: just use len(project) > 0 inline. It is using
iannucci 2015/09/30 02:00:10 I guess I was trying to make it more readable... :
Vadim Sh. 2015/09/30 02:09:00 Yes. But "slimming" is a peculiar choice of verb f
162
163 d.state.Lock()
164 defer d.state.Unlock()
165
166 return runMergedQueries(fq, d.state, func(key *ds.Key, data ds.PropertyM ap) bool {
167 if offset > 0 {
168 offset--
169 return true
170 }
171 if limitSet {
172 if limit == 0 {
173 return false
174 }
175 limit--
176 }
177 if keysOnly {
178 data = nil
179 } else if needSlimming {
180 newData := make(ds.PropertyMap, len(project))
181 for _, p := range project {
182 newData[p] = data[p]
183 }
184 data = newData
185 }
186 return cb(key, data, nil)
187 })
188 }
189
190 func (d *dsTxnBuf) RunInTransaction(cb func(context.Context) error, opts *ds.Tra nsactionOptions) error {
191 return withTxnBuf(d.ic, cb, opts)
192 }
193
194 func (d *dsTxnBuf) Testable() ds.Testable {
195 return d.state.parentDS.Testable()
196 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698