OLD | NEW |
---|---|
(Empty) | |
1 // Copyright 2015 The Chromium Authors. All rights reserved. | |
2 // Use of this source code is governed by a BSD-style license that can be | |
3 // found in the LICENSE file. | |
4 | |
5 package txnBuf | |
6 | |
7 import ( | |
8 ds "github.com/luci/gae/service/datastore" | |
9 "github.com/luci/luci-go/common/errors" | |
10 "golang.org/x/net/context" | |
11 ) | |
12 | |
13 // ErrTransactionTooLarge is returned when applying an inner transaction would | |
14 // cause an outer transaction to become too large. | |
15 var ErrTransactionTooLarge = errors.New( | |
16 "applying the transaction would make the parent transaction too large") | |
17 | |
18 type dsTxnBuf struct { | |
19 ds.RawInterface | |
20 | |
21 ic context.Context | |
22 state *txnBufState | |
23 } | |
24 | |
25 var _ ds.RawInterface = (*dsTxnBuf)(nil) | |
26 | |
27 func (d *dsTxnBuf) GetMulti(keys []*ds.Key, metas ds.MultiMetaGetter, cb ds.GetM ultiCB) error { | |
28 data, err := d.state.getMulti(keys) | |
29 if err != nil { | |
30 return err | |
31 } | |
32 | |
33 idxMap := []int(nil) | |
34 getKeys := []*ds.Key(nil) | |
35 getMetas := ds.MultiMetaGetter(nil) | |
36 lme := errors.NewLazyMultiError(len(keys)) | |
37 | |
38 for i, itm := range data { | |
39 if !itm.buffered { | |
40 idxMap = append(idxMap, i) | |
41 getKeys = append(getKeys, itm.key) | |
42 getMetas = append(getMetas, metas.GetSingle(i)) | |
43 } | |
44 } | |
45 | |
46 if len(idxMap) > 0 { | |
47 j := 0 | |
48 err := d.state.parentDS.GetMulti(getKeys, getMetas, func(pm ds.P ropertyMap, err error) { | |
49 if err == ds.ErrNoSuchEntity { | |
50 return | |
51 } | |
52 i := idxMap[j] | |
53 if !lme.Assign(i, err) { | |
54 data[i].key = keys[j] | |
55 data[i].data = pm | |
56 } | |
57 j++ | |
Vadim Sh.
2015/09/28 18:52:56
this should probably be before err == ds.ErrNoSuch
iannucci
2015/09/29 03:21:37
ah, yeah. I added a test for this too.
| |
58 }) | |
59 if err != nil { | |
60 return err | |
61 } | |
62 } | |
63 | |
64 for i, itm := range data { | |
65 err := lme.GetOne(i) | |
66 if err != nil { | |
67 cb(nil, err) | |
68 } else if itm.data == nil { | |
69 cb(nil, ds.ErrNoSuchEntity) | |
70 } else { | |
71 cb(itm.data, nil) | |
72 } | |
73 } | |
74 return nil | |
Vadim Sh.
2015/09/28 18:52:56
is it ok to return nil here even if lme contains s
iannucci
2015/09/29 03:21:38
yeah, that's the convention. Basically either the
| |
75 } | |
76 | |
77 func (d *dsTxnBuf) PutMulti(keys []*ds.Key, vals []ds.PropertyMap, cb ds.PutMult iCB) error { | |
78 lme := errors.NewLazyMultiError(len(keys)) | |
79 realKeys := []*ds.Key(nil) | |
80 for i, key := range keys { | |
81 if key.Incomplete() { | |
82 start, err := d.state.parentDS.AllocateIDs(key, 1) | |
Vadim Sh.
2015/09/28 18:52:56
in theory it should be possible to batch\paralleli
iannucci
2015/09/29 03:21:38
Thought of this, and yeah it does. However I'm pre
| |
83 if !lme.Assign(i, err) { | |
84 if realKeys == nil { | |
85 realKeys = make([]*ds.Key, len(keys)) | |
86 copy(realKeys, keys) | |
87 } | |
88 | |
89 aid, ns, toks := key.Split() | |
90 toks[len(toks)-1].IntID = start | |
91 realKeys[i] = ds.NewKeyToks(aid, ns, toks) | |
92 } | |
93 } | |
94 } | |
95 if err := lme.Get(); err != nil { | |
96 for _, e := range err.(errors.MultiError) { | |
97 cb(nil, e) | |
98 } | |
99 return nil | |
100 } | |
101 | |
102 if realKeys == nil { | |
103 realKeys = keys | |
104 } | |
105 | |
106 err := d.state.putMulti(realKeys, vals) | |
107 if err != nil { | |
108 return err | |
Vadim Sh.
2015/09/28 18:52:56
is it ok not to call callback here?
iannucci
2015/09/29 03:21:38
yeah per the convention.
| |
109 } | |
110 | |
111 for _, k := range realKeys { | |
112 cb(k, nil) | |
113 } | |
114 return nil | |
115 } | |
116 | |
117 func (d *dsTxnBuf) DeleteMulti(keys []*ds.Key, cb ds.DeleteMultiCB) error { | |
118 err := d.state.deleteMulti(keys) | |
Vadim Sh.
2015/09/28 18:52:56
what about deleting them from _real_ datastore? st
iannucci
2015/09/29 03:21:38
all _real_ datastore activity happens in the apply
Vadim Sh.
2015/09/29 03:27:26
Yes, I understand that. But if I open a new buffer
| |
119 if err != nil { | |
120 return err | |
121 } | |
122 | |
123 for range keys { | |
124 cb(nil) | |
125 } | |
126 return nil | |
127 } | |
128 | |
129 func (d *dsTxnBuf) Run(fq *ds.FinalizedQuery, cb ds.RawRunCB) error { | |
130 if start, end := fq.Bounds(); start != nil || end != nil { | |
131 return errors.New("txnBuf filter does not support query cursors" ) | |
132 } | |
133 | |
134 limit, limitSet := fq.Limit() | |
135 offset, _ := fq.Offset() | |
136 keysOnly := fq.KeysOnly() | |
137 | |
138 project := fq.Project() | |
139 needSlimming := len(project) > 0 | |
140 | |
141 d.state.Lock() | |
142 defer d.state.Unlock() | |
143 | |
144 return runMergedQueries(fq, d.state, func(key *ds.Key, data ds.PropertyM ap) bool { | |
145 if offset > 0 { | |
146 offset-- | |
147 return true | |
148 } | |
149 if limitSet { | |
150 if limit == 0 { | |
151 return false | |
152 } | |
153 limit-- | |
154 } | |
155 if keysOnly { | |
156 data = nil | |
157 } else if needSlimming { | |
158 newData := make(ds.PropertyMap, len(project)) | |
159 for _, p := range project { | |
160 newData[p] = data[p] | |
161 } | |
162 data = newData | |
163 } | |
164 return cb(key, data, nil) | |
165 }) | |
166 } | |
167 | |
168 func (d *dsTxnBuf) RunInTransaction(cb func(context.Context) error, opts *ds.Tra nsactionOptions) error { | |
169 return withTxnBuf(d.ic, cb, opts) | |
170 } | |
OLD | NEW |