OLD | NEW |
1 // Copyright 2015 The LUCI Authors. All rights reserved. | 1 // Copyright 2015 The LUCI Authors. All rights reserved. |
2 // Use of this source code is governed under the Apache License, Version 2.0 | 2 // Use of this source code is governed under the Apache License, Version 2.0 |
3 // that can be found in the LICENSE file. | 3 // that can be found in the LICENSE file. |
4 | 4 |
5 package prod | 5 package prod |
6 | 6 |
7 import ( | 7 import ( |
8 ds "github.com/luci/gae/service/datastore" | 8 ds "github.com/luci/gae/service/datastore" |
9 "github.com/luci/luci-go/common/errors" | 9 "github.com/luci/luci-go/common/errors" |
10 "golang.org/x/net/context" | 10 "golang.org/x/net/context" |
11 "google.golang.org/appengine/datastore" | 11 "google.golang.org/appengine/datastore" |
12 ) | 12 ) |
13 | 13 |
14 // useRDS adds a gae.RawDatastore implementation to context, accessible | 14 // useRDS adds a gae.RawDatastore implementation to context, accessible |
15 // by gae.GetDS(c) | 15 // by gae.GetDS(c) |
16 func useRDS(c context.Context) context.Context { | 16 func useRDS(c context.Context) context.Context { |
17 » return ds.SetRawFactory(c, func(ci context.Context, wantTxn bool) ds.Raw
Interface { | 17 » return ds.SetRawFactory(c, func(ci context.Context) ds.RawInterface { |
18 » » maybeTxnCtx := AEContext(ci) | 18 » » rds := rdsImpl{ |
19 | 19 » » » userCtx: ci, |
20 » » if wantTxn { | 20 » » » ps: getProdState(ci), |
21 » » » return rdsImpl{ci, maybeTxnCtx} | |
22 } | 21 } |
23 » » aeCtx := AEContextNoTxn(ci) | 22 » » rds.aeCtx = rds.ps.context(ci) |
24 » » if maybeTxnCtx != aeCtx { | 23 » » return &rds |
25 » » » ci = context.WithValue(ci, prodContextKey, aeCtx) | |
26 » » } | |
27 » » return rdsImpl{ci, aeCtx} | |
28 }) | 24 }) |
29 } | 25 } |
30 | 26 |
31 ////////// Datastore | 27 ////////// Datastore |
32 | 28 |
33 type rdsImpl struct { | 29 type rdsImpl struct { |
34 // userCtx is the context that has the luci/gae services and user object
s in | 30 // userCtx is the context that has the luci/gae services and user object
s in |
35 // it. | 31 // it. |
36 userCtx context.Context | 32 userCtx context.Context |
37 | 33 |
38 » // aeCtx is the context with the appengine connection information in it. | 34 » // aeCtx is the AppEngine Context that will be used in method calls. Thi
s is |
| 35 » // derived from ps. |
39 aeCtx context.Context | 36 aeCtx context.Context |
| 37 |
| 38 // ps is the current production state. |
| 39 ps prodState |
40 } | 40 } |
41 | 41 |
42 func idxCallbacker(err error, amt int, cb func(idx int, err error) error) error
{ | 42 func idxCallbacker(err error, amt int, cb func(idx int, err error) error) error
{ |
43 if err == nil { | 43 if err == nil { |
44 for i := 0; i < amt; i++ { | 44 for i := 0; i < amt; i++ { |
45 if err := cb(i, nil); err != nil { | 45 if err := cb(i, nil); err != nil { |
46 return err | 46 return err |
47 } | 47 } |
48 } | 48 } |
49 return nil | 49 return nil |
50 } | 50 } |
51 err = errors.Fix(err) | 51 err = errors.Fix(err) |
52 me, ok := err.(errors.MultiError) | 52 me, ok := err.(errors.MultiError) |
53 if ok { | 53 if ok { |
54 for i, err := range me { | 54 for i, err := range me { |
55 if err := cb(i, err); err != nil { | 55 if err := cb(i, err); err != nil { |
56 return err | 56 return err |
57 } | 57 } |
58 } | 58 } |
59 return nil | 59 return nil |
60 } | 60 } |
61 return err | 61 return err |
62 } | 62 } |
63 | 63 |
64 func (d rdsImpl) AllocateIDs(keys []*ds.Key, cb ds.NewKeyCB) error { | 64 func (d *rdsImpl) AllocateIDs(keys []*ds.Key, cb ds.NewKeyCB) error { |
65 // Map keys by entity type. | 65 // Map keys by entity type. |
66 entityMap := make(map[string][]int) | 66 entityMap := make(map[string][]int) |
67 for i, key := range keys { | 67 for i, key := range keys { |
68 ks := key.String() | 68 ks := key.String() |
69 entityMap[ks] = append(entityMap[ks], i) | 69 entityMap[ks] = append(entityMap[ks], i) |
70 } | 70 } |
71 | 71 |
72 // Allocate a set of IDs for each unique entity type. | 72 // Allocate a set of IDs for each unique entity type. |
73 errors := errors.NewLazyMultiError(len(keys)) | 73 errors := errors.NewLazyMultiError(len(keys)) |
74 setErrs := func(idxs []int, err error) { | 74 setErrs := func(idxs []int, err error) { |
(...skipping 24 matching lines...) Expand all Loading... |
99 for i, key := range keys { | 99 for i, key := range keys { |
100 if err := errors.GetOne(i); err != nil { | 100 if err := errors.GetOne(i); err != nil { |
101 cb(nil, err) | 101 cb(nil, err) |
102 } else { | 102 } else { |
103 cb(key, nil) | 103 cb(key, nil) |
104 } | 104 } |
105 } | 105 } |
106 return nil | 106 return nil |
107 } | 107 } |
108 | 108 |
109 func (d rdsImpl) DeleteMulti(ks []*ds.Key, cb ds.DeleteMultiCB) error { | 109 func (d *rdsImpl) DeleteMulti(ks []*ds.Key, cb ds.DeleteMultiCB) error { |
110 keys, err := dsMF2R(d.aeCtx, ks) | 110 keys, err := dsMF2R(d.aeCtx, ks) |
111 if err == nil { | 111 if err == nil { |
112 err = datastore.DeleteMulti(d.aeCtx, keys) | 112 err = datastore.DeleteMulti(d.aeCtx, keys) |
113 } | 113 } |
114 return idxCallbacker(err, len(ks), func(_ int, err error) error { | 114 return idxCallbacker(err, len(ks), func(_ int, err error) error { |
115 return cb(err) | 115 return cb(err) |
116 }) | 116 }) |
117 } | 117 } |
118 | 118 |
119 func (d rdsImpl) GetMulti(keys []*ds.Key, _meta ds.MultiMetaGetter, cb ds.GetMul
tiCB) error { | 119 func (d *rdsImpl) GetMulti(keys []*ds.Key, _meta ds.MultiMetaGetter, cb ds.GetMu
ltiCB) error { |
120 vals := make([]datastore.PropertyLoadSaver, len(keys)) | 120 vals := make([]datastore.PropertyLoadSaver, len(keys)) |
121 rkeys, err := dsMF2R(d.aeCtx, keys) | 121 rkeys, err := dsMF2R(d.aeCtx, keys) |
122 if err == nil { | 122 if err == nil { |
123 for i := range keys { | 123 for i := range keys { |
124 vals[i] = &typeFilter{d.aeCtx, ds.PropertyMap{}} | 124 vals[i] = &typeFilter{d.aeCtx, ds.PropertyMap{}} |
125 } | 125 } |
126 err = datastore.GetMulti(d.aeCtx, rkeys, vals) | 126 err = datastore.GetMulti(d.aeCtx, rkeys, vals) |
127 } | 127 } |
128 return idxCallbacker(err, len(keys), func(idx int, err error) error { | 128 return idxCallbacker(err, len(keys), func(idx int, err error) error { |
129 if pls := vals[idx]; pls != nil { | 129 if pls := vals[idx]; pls != nil { |
130 return cb(pls.(*typeFilter).pm, err) | 130 return cb(pls.(*typeFilter).pm, err) |
131 } | 131 } |
132 return cb(nil, err) | 132 return cb(nil, err) |
133 }) | 133 }) |
134 } | 134 } |
135 | 135 |
136 func (d rdsImpl) PutMulti(keys []*ds.Key, vals []ds.PropertyMap, cb ds.NewKeyCB)
error { | 136 func (d *rdsImpl) PutMulti(keys []*ds.Key, vals []ds.PropertyMap, cb ds.NewKeyCB
) error { |
137 rkeys, err := dsMF2R(d.aeCtx, keys) | 137 rkeys, err := dsMF2R(d.aeCtx, keys) |
138 if err == nil { | 138 if err == nil { |
139 rvals := make([]datastore.PropertyLoadSaver, len(vals)) | 139 rvals := make([]datastore.PropertyLoadSaver, len(vals)) |
140 for i, val := range vals { | 140 for i, val := range vals { |
141 rvals[i] = &typeFilter{d.aeCtx, val} | 141 rvals[i] = &typeFilter{d.aeCtx, val} |
142 } | 142 } |
143 rkeys, err = datastore.PutMulti(d.aeCtx, rkeys, rvals) | 143 rkeys, err = datastore.PutMulti(d.aeCtx, rkeys, rvals) |
144 } | 144 } |
145 return idxCallbacker(err, len(keys), func(idx int, err error) error { | 145 return idxCallbacker(err, len(keys), func(idx int, err error) error { |
146 k := (*ds.Key)(nil) | 146 k := (*ds.Key)(nil) |
147 if err == nil { | 147 if err == nil { |
148 k = dsR2F(rkeys[idx]) | 148 k = dsR2F(rkeys[idx]) |
149 } | 149 } |
150 return cb(k, err) | 150 return cb(k, err) |
151 }) | 151 }) |
152 } | 152 } |
153 | 153 |
154 func (d rdsImpl) fixQuery(fq *ds.FinalizedQuery) (*datastore.Query, error) { | 154 func (d *rdsImpl) fixQuery(fq *ds.FinalizedQuery) (*datastore.Query, error) { |
155 ret := datastore.NewQuery(fq.Kind()) | 155 ret := datastore.NewQuery(fq.Kind()) |
156 | 156 |
157 start, end := fq.Bounds() | 157 start, end := fq.Bounds() |
158 if start != nil { | 158 if start != nil { |
159 ret = ret.Start(start.(datastore.Cursor)) | 159 ret = ret.Start(start.(datastore.Cursor)) |
160 } | 160 } |
161 if end != nil { | 161 if end != nil { |
162 ret = ret.End(end.(datastore.Cursor)) | 162 ret = ret.End(end.(datastore.Cursor)) |
163 } | 163 } |
164 | 164 |
(...skipping 54 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
219 } | 219 } |
220 | 220 |
221 ret = ret.Project(fq.Project()...) | 221 ret = ret.Project(fq.Project()...) |
222 if fq.Distinct() { | 222 if fq.Distinct() { |
223 ret = ret.Distinct() | 223 ret = ret.Distinct() |
224 } | 224 } |
225 | 225 |
226 return ret, nil | 226 return ret, nil |
227 } | 227 } |
228 | 228 |
229 func (d rdsImpl) DecodeCursor(s string) (ds.Cursor, error) { | 229 func (d *rdsImpl) DecodeCursor(s string) (ds.Cursor, error) { |
230 return datastore.DecodeCursor(s) | 230 return datastore.DecodeCursor(s) |
231 } | 231 } |
232 | 232 |
233 func (d rdsImpl) Run(fq *ds.FinalizedQuery, cb ds.RawRunCB) error { | 233 func (d *rdsImpl) Run(fq *ds.FinalizedQuery, cb ds.RawRunCB) error { |
234 q, err := d.fixQuery(fq) | 234 q, err := d.fixQuery(fq) |
235 if err != nil { | 235 if err != nil { |
236 return err | 236 return err |
237 } | 237 } |
238 | 238 |
239 t := q.Run(d.aeCtx) | 239 t := q.Run(d.aeCtx) |
240 | 240 |
241 cfunc := func() (ds.Cursor, error) { | 241 cfunc := func() (ds.Cursor, error) { |
242 return t.Cursor() | 242 return t.Cursor() |
243 } | 243 } |
244 tf := typeFilter{} | 244 tf := typeFilter{} |
245 for { | 245 for { |
246 k, err := t.Next(&tf) | 246 k, err := t.Next(&tf) |
247 if err == datastore.Done { | 247 if err == datastore.Done { |
248 return nil | 248 return nil |
249 } | 249 } |
250 if err != nil { | 250 if err != nil { |
251 return err | 251 return err |
252 } | 252 } |
253 if err := cb(dsR2F(k), tf.pm, cfunc); err != nil { | 253 if err := cb(dsR2F(k), tf.pm, cfunc); err != nil { |
254 return err | 254 return err |
255 } | 255 } |
256 } | 256 } |
257 } | 257 } |
258 | 258 |
259 func (d rdsImpl) Count(fq *ds.FinalizedQuery) (int64, error) { | 259 func (d *rdsImpl) Count(fq *ds.FinalizedQuery) (int64, error) { |
260 q, err := d.fixQuery(fq) | 260 q, err := d.fixQuery(fq) |
261 if err != nil { | 261 if err != nil { |
262 return 0, err | 262 return 0, err |
263 } | 263 } |
264 ret, err := q.Count(d.aeCtx) | 264 ret, err := q.Count(d.aeCtx) |
265 return int64(ret), err | 265 return int64(ret), err |
266 } | 266 } |
267 | 267 |
268 func (d rdsImpl) RunInTransaction(f func(c context.Context) error, opts *ds.Tran
sactionOptions) error { | 268 func (d *rdsImpl) RunInTransaction(f func(c context.Context) error, opts *ds.Tra
nsactionOptions) error { |
269 ropts := (*datastore.TransactionOptions)(opts) | 269 ropts := (*datastore.TransactionOptions)(opts) |
270 return datastore.RunInTransaction(d.aeCtx, func(c context.Context) error
{ | 270 return datastore.RunInTransaction(d.aeCtx, func(c context.Context) error
{ |
271 » » return f(context.WithValue(d.userCtx, prodContextKey, c)) | 271 » » // Derive a prodState with this transaction Context. |
| 272 » » ps := d.ps |
| 273 » » ps.ctx = c |
| 274 » » ps.inTxn = true |
| 275 |
| 276 » » c = withProdState(d.userCtx, ps) |
| 277 » » return f(c) |
272 }, ropts) | 278 }, ropts) |
273 } | 279 } |
274 | 280 |
275 func (d rdsImpl) Testable() ds.Testable { | 281 func (d *rdsImpl) WithoutTransaction() context.Context { |
| 282 » c := d.userCtx |
| 283 » if d.ps.inTxn { |
| 284 » » // We're in a transaction. Reset to non-transactional state. |
| 285 » » ps := d.ps |
| 286 » » ps.ctx = ps.noTxnCtx |
| 287 » » ps.inTxn = false |
| 288 » » c = withProdState(c, ps) |
| 289 » } |
| 290 » return c |
| 291 } |
| 292 |
| 293 func (d *rdsImpl) CurrentTransaction() ds.Transaction { |
| 294 » if d.ps.inTxn { |
| 295 » » // Since we don't distinguish between transactions (yet), we jus
t need this |
| 296 » » // to be non-nil. |
| 297 » » return struct{}{} |
| 298 » } |
276 return nil | 299 return nil |
277 } | 300 } |
| 301 |
| 302 func (d *rdsImpl) GetTestable() ds.Testable { |
| 303 return nil |
| 304 } |
OLD | NEW |