Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(59)

Side by Side Diff: filter/txnBuf/ds_txn.go

Issue 1309803004: Add transaction buffer filter. (Closed) Base URL: https://github.com/luci/gae.git@add_query_support
Patch Set: fix comments Created 5 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 package txnBuf
6
7 import (
8 ds "github.com/luci/gae/service/datastore"
9 "github.com/luci/luci-go/common/errors"
10 "golang.org/x/net/context"
11 )
12
13 // ErrTransactionTooLarge is returned when applying an inner transaction would
14 // cause an outer transaction to become too large.
15 var ErrTransactionTooLarge = errors.New(
16 "applying the transaction would make the parent transaction too large")
17
18 type dsTxnBuf struct {
19 ds.RawInterface
20
21 ic context.Context
22 state *txnBufState
23 }
24
25 var _ ds.RawInterface = (*dsTxnBuf)(nil)
26
27 func (d *dsTxnBuf) GetMulti(keys []*ds.Key, metas ds.MultiMetaGetter, cb ds.GetM ultiCB) error {
28 data, err := d.state.getMulti(keys)
29 if err != nil {
30 return err
31 }
32
33 idxMap := []int(nil)
34 getKeys := []*ds.Key(nil)
35 getMetas := ds.MultiMetaGetter(nil)
36 lme := errors.NewLazyMultiError(len(keys))
37
38 for i, itm := range data {
39 if !itm.buffered {
40 idxMap = append(idxMap, i)
41 getKeys = append(getKeys, itm.key)
42 getMetas = append(getMetas, metas.GetSingle(i))
43 }
44 }
45
46 if len(idxMap) > 0 {
47 j := 0
48 err := d.state.parentDS.GetMulti(getKeys, getMetas, func(pm ds.P ropertyMap, err error) {
49 if err != ds.ErrNoSuchEntity {
50 i := idxMap[j]
51 if !lme.Assign(i, err) {
52 data[i].key = keys[j]
53 data[i].data = pm
54 }
55 }
56 j++
57 })
58 if err != nil {
59 return err
60 }
61 }
62
63 for i, itm := range data {
64 err := lme.GetOne(i)
65 if err != nil {
66 cb(nil, err)
67 } else if itm.data == nil {
68 cb(nil, ds.ErrNoSuchEntity)
69 } else {
70 cb(itm.data, nil)
71 }
72 }
73 return nil
74 }
75
76 func (d *dsTxnBuf) PutMulti(keys []*ds.Key, vals []ds.PropertyMap, cb ds.PutMult iCB) error {
77 lme := errors.NewLazyMultiError(len(keys))
78 realKeys := []*ds.Key(nil)
79 for i, key := range keys {
80 if key.Incomplete() {
81 start, err := d.state.parentDS.AllocateIDs(key, 1)
82 if !lme.Assign(i, err) {
83 if realKeys == nil {
84 realKeys = make([]*ds.Key, len(keys))
85 copy(realKeys, keys)
86 }
87
88 aid, ns, toks := key.Split()
89 toks[len(toks)-1].IntID = start
90 realKeys[i] = ds.NewKeyToks(aid, ns, toks)
91 }
92 }
93 }
94 if err := lme.Get(); err != nil {
95 for _, e := range err.(errors.MultiError) {
96 cb(nil, e)
97 }
98 return nil
99 }
100
101 if realKeys == nil {
102 realKeys = keys
103 }
104
105 err := d.state.putMulti(realKeys, vals)
106 if err != nil {
107 return err
108 }
109
110 for _, k := range realKeys {
111 cb(k, nil)
112 }
113 return nil
114 }
115
116 func (d *dsTxnBuf) DeleteMulti(keys []*ds.Key, cb ds.DeleteMultiCB) error {
117 err := d.state.deleteMulti(keys)
118 if err != nil {
119 return err
120 }
121
122 for range keys {
123 cb(nil)
124 }
125 return nil
126 }
127
128 func (d *dsTxnBuf) Run(fq *ds.FinalizedQuery, cb ds.RawRunCB) error {
129 if start, end := fq.Bounds(); start != nil || end != nil {
130 return errors.New("txnBuf filter does not support query cursors" )
131 }
132
133 limit, limitSet := fq.Limit()
134 offset, _ := fq.Offset()
135 keysOnly := fq.KeysOnly()
136
137 project := fq.Project()
138 needSlimming := len(project) > 0
139
140 d.state.Lock()
141 defer d.state.Unlock()
142
143 return runMergedQueries(fq, d.state, func(key *ds.Key, data ds.PropertyM ap) bool {
144 if offset > 0 {
145 offset--
146 return true
147 }
148 if limitSet {
149 if limit == 0 {
150 return false
151 }
152 limit--
153 }
154 if keysOnly {
155 data = nil
156 } else if needSlimming {
157 newData := make(ds.PropertyMap, len(project))
158 for _, p := range project {
159 newData[p] = data[p]
160 }
161 data = newData
162 }
163 return cb(key, data, nil)
164 })
165 }
166
167 func (d *dsTxnBuf) RunInTransaction(cb func(context.Context) error, opts *ds.Tra nsactionOptions) error {
168 return withTxnBuf(d.ic, cb, opts)
169 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698