Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(396)

Side by Side Diff: filter/txnBuf/ds_txn.go

Issue 1309803004: Add transaction buffer filter. (Closed) Base URL: https://github.com/luci/gae.git@add_query_support
Patch Set: one more test Created 5 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 package txnBuf
6
7 import (
8 ds "github.com/luci/gae/service/datastore"
9 "github.com/luci/luci-go/common/errors"
10 "golang.org/x/net/context"
11 )
12
13 // ErrTransactionTooLarge is returned when applying an inner transaction would
14 // cause an outer transaction to become too large.
15 var ErrTransactionTooLarge = errors.New(
16 "applying the transaction would make the parent transaction too large")
17
18 type dsTxnBuf struct {
19 ds.RawInterface
20
21 ic context.Context
22 state *txnBufState
23 }
24
25 var _ ds.RawInterface = (*dsTxnBuf)(nil)
26
27 func (d *dsTxnBuf) GetMulti(keys []*ds.Key, metas ds.MultiMetaGetter, cb ds.GetM ultiCB) error {
28 data, err := d.state.getMulti(keys)
29 if err != nil {
30 return err
31 }
32
33 idxMap := []int(nil)
34 getKeys := []*ds.Key(nil)
35 getMetas := ds.MultiMetaGetter(nil)
36 lme := errors.NewLazyMultiError(len(keys))
37
38 for i, itm := range data {
39 if !itm.buffered {
40 idxMap = append(idxMap, i)
41 getKeys = append(getKeys, itm.key)
42 getMetas = append(getMetas, metas.GetSingle(i))
43 }
44 }
45
46 if len(idxMap) > 0 {
47 j := 0
48 err := d.state.parentDS.GetMulti(getKeys, getMetas, func(pm ds.P ropertyMap, err error) {
49 if err == ds.ErrNoSuchEntity {
50 return
51 }
52 i := idxMap[j]
53 if !lme.Assign(i, err) {
54 data[i].key = keys[j]
55 data[i].data = pm
56 }
57 j++
Vadim Sh. 2015/09/28 18:52:56 this should probably be before err == ds.ErrNoSuch
iannucci 2015/09/29 03:21:37 ah, yeah. I added a test for this too.
58 })
59 if err != nil {
60 return err
61 }
62 }
63
64 for i, itm := range data {
65 err := lme.GetOne(i)
66 if err != nil {
67 cb(nil, err)
68 } else if itm.data == nil {
69 cb(nil, ds.ErrNoSuchEntity)
70 } else {
71 cb(itm.data, nil)
72 }
73 }
74 return nil
Vadim Sh. 2015/09/28 18:52:56 is it ok to return nil here even if lme contains s
iannucci 2015/09/29 03:21:38 yeah, that's the convention. Basically either the
75 }
76
77 func (d *dsTxnBuf) PutMulti(keys []*ds.Key, vals []ds.PropertyMap, cb ds.PutMult iCB) error {
78 lme := errors.NewLazyMultiError(len(keys))
79 realKeys := []*ds.Key(nil)
80 for i, key := range keys {
81 if key.Incomplete() {
82 start, err := d.state.parentDS.AllocateIDs(key, 1)
Vadim Sh. 2015/09/28 18:52:56 in theory it should be possible to batch\paralleli
iannucci 2015/09/29 03:21:38 Thought of this, and yeah it does. However I'm pre
83 if !lme.Assign(i, err) {
84 if realKeys == nil {
85 realKeys = make([]*ds.Key, len(keys))
86 copy(realKeys, keys)
87 }
88
89 aid, ns, toks := key.Split()
90 toks[len(toks)-1].IntID = start
91 realKeys[i] = ds.NewKeyToks(aid, ns, toks)
92 }
93 }
94 }
95 if err := lme.Get(); err != nil {
96 for _, e := range err.(errors.MultiError) {
97 cb(nil, e)
98 }
99 return nil
100 }
101
102 if realKeys == nil {
103 realKeys = keys
104 }
105
106 err := d.state.putMulti(realKeys, vals)
107 if err != nil {
108 return err
Vadim Sh. 2015/09/28 18:52:56 is it ok not to call callback here?
iannucci 2015/09/29 03:21:38 yeah per the convention.
109 }
110
111 for _, k := range realKeys {
112 cb(k, nil)
113 }
114 return nil
115 }
116
117 func (d *dsTxnBuf) DeleteMulti(keys []*ds.Key, cb ds.DeleteMultiCB) error {
118 err := d.state.deleteMulti(keys)
Vadim Sh. 2015/09/28 18:52:56 what about deleting them from _real_ datastore? st
iannucci 2015/09/29 03:21:38 all _real_ datastore activity happens in the apply
Vadim Sh. 2015/09/29 03:27:26 Yes, I understand that. But if I open a new buffer
119 if err != nil {
120 return err
121 }
122
123 for range keys {
124 cb(nil)
125 }
126 return nil
127 }
128
129 func (d *dsTxnBuf) Run(fq *ds.FinalizedQuery, cb ds.RawRunCB) error {
130 if start, end := fq.Bounds(); start != nil || end != nil {
131 return errors.New("txnBuf filter does not support query cursors" )
132 }
133
134 limit, limitSet := fq.Limit()
135 offset, _ := fq.Offset()
136 keysOnly := fq.KeysOnly()
137
138 project := fq.Project()
139 needSlimming := len(project) > 0
140
141 d.state.Lock()
142 defer d.state.Unlock()
143
144 return runMergedQueries(fq, d.state, func(key *ds.Key, data ds.PropertyM ap) bool {
145 if offset > 0 {
146 offset--
147 return true
148 }
149 if limitSet {
150 if limit == 0 {
151 return false
152 }
153 limit--
154 }
155 if keysOnly {
156 data = nil
157 } else if needSlimming {
158 newData := make(ds.PropertyMap, len(project))
159 for _, p := range project {
160 newData[p] = data[p]
161 }
162 data = newData
163 }
164 return cb(key, data, nil)
165 })
166 }
167
168 func (d *dsTxnBuf) RunInTransaction(cb func(context.Context) error, opts *ds.Tra nsactionOptions) error {
169 return withTxnBuf(d.ic, cb, opts)
170 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698