Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(282)

Side by Side Diff: impl/cloud/datastore.go

Issue 1957953002: Add cloud datastore implementation. (Closed) Base URL: https://chromium.googlesource.com/external/github.com/luci/gae@master
Patch Set: Adjust minimum coverage. Created 4 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « impl/cloud/context.go ('k') | impl/cloud/datastore_test.go » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 // Copyright 2016 The LUCI Authors. All rights reserved.
2 // Use of this source code is governed under the Apache License, Version 2.0
3 // that can be found in the LICENSE file.
4
5 package cloud
6
7 import (
8 "fmt"
9 "reflect"
10 "strings"
11 "time"
12
13 "github.com/luci/luci-go/common/errors"
14
15 ds "github.com/luci/gae/service/datastore"
16 infoS "github.com/luci/gae/service/info"
17 "google.golang.org/cloud/datastore"
18
19 "golang.org/x/net/context"
20 )
21
22 type cloudDatastore struct {
23 client *datastore.Client
24 }
25
26 func (cds *cloudDatastore) use(c context.Context) context.Context {
27 return ds.SetRawFactory(c, func(ic context.Context, wantTxn bool) ds.Raw Interface {
28 inf := infoS.Get(ic)
29 if ns, ok := inf.GetNamespace(); ok {
30 ic = datastore.WithNamespace(ic, ns)
31 }
32
33 bds := boundDatastore{
34 Context: ic,
35 cloudDatastore: cds,
36 appID: inf.FullyQualifiedAppID(),
37 }
38 if wantTxn {
39 bds.transaction = datastoreTransaction(ic)
40 }
41 return &bds
42 })
43 }
44
45 // boundDatastore is a bound instance of the cloudDatastore installed in the
46 // Context.
47 type boundDatastore struct {
48 // Context is the bound user Context. It includes the datastore namespac e, if
49 // one is set.
50 context.Context
51 *cloudDatastore
52
53 appID string
54 transaction *datastore.Transaction
55 }
56
57 func (bds *boundDatastore) AllocateIDs(keys []*ds.Key, cb ds.NewKeyCB) error {
58 nativeKeys, err := bds.client.AllocateIDs(bds, bds.gaeKeysToNative(keys. ..))
59 if err != nil {
60 return normalizeError(err)
61 }
62
63 keys = bds.nativeKeysToGAE(nativeKeys...)
64 for _, key := range keys {
65 cb(key, nil)
66 }
67 return nil
68 }
69
70 func (bds *boundDatastore) RunInTransaction(fn func(context.Context) error, opts *ds.TransactionOptions) error {
71 if bds.transaction != nil {
72 return errors.New("nested transactions are not supported")
73 }
74
75 // The cloud datastore SDK does not expose any transaction options.
76 if opts != nil {
77 switch {
78 case opts.XG:
79 return errors.New("cross-group transactions are not supp orted")
80 }
81 }
82
83 attempts := 3
84 if opts != nil && opts.Attempts > 0 {
85 attempts = opts.Attempts
86 }
87 for i := 0; i < attempts; i++ {
88 _, err := bds.client.RunInTransaction(bds, func(tx *datastore.Tr ansaction) error {
89 return fn(withDatastoreTransaction(bds, tx))
90 })
91 if err = normalizeError(err); err != ds.ErrConcurrentTransaction {
92 return err
93 }
94 }
95 return ds.ErrConcurrentTransaction
96 }
97
98 func (bds *boundDatastore) DecodeCursor(s string) (ds.Cursor, error) {
99 cursor, err := datastore.DecodeCursor(s)
100 return cursor, normalizeError(err)
101 }
102
103 func (bds *boundDatastore) Run(q *ds.FinalizedQuery, cb ds.RawRunCB) error {
104 it := bds.client.Run(bds, bds.prepareNativeQuery(q))
105 cursorFn := func() (ds.Cursor, error) {
106 return it.Cursor()
107 }
108
109 for {
110 var npls *nativePropertyLoadSaver
111 if !q.KeysOnly() {
112 npls = bds.mkNPLS(nil)
113 }
114 nativeKey, err := it.Next(npls)
115 if err != nil {
116 if err == datastore.Done {
117 return nil
118 }
119 return normalizeError(err)
120 }
121
122 if err := cb(bds.nativeKeysToGAE(nativeKey)[0], npls.pmap, curso rFn); err != nil {
123 if err == ds.Stop {
124 return nil
125 }
126 return normalizeError(err)
127 }
128 }
129 }
130
131 func (bds *boundDatastore) Count(q *ds.FinalizedQuery) (int64, error) {
132 v, err := bds.client.Count(bds, bds.prepareNativeQuery(q))
133 if err != nil {
134 return -1, normalizeError(err)
135 }
136 return int64(v), nil
137 }
138
139 func idxCallbacker(err error, amt int, cb func(idx int, err error) error) error {
140 if err == nil {
141 for i := 0; i < amt; i++ {
142 if err := cb(i, nil); err != nil {
143 return err
144 }
145 }
146 return nil
147 }
148
149 err = errors.Fix(err)
150 if me, ok := err.(errors.MultiError); ok {
151 for i, err := range me {
152 if err := cb(i, normalizeError(err)); err != nil {
153 return err
154 }
155 }
156 return nil
157 }
158 return normalizeError(err)
159 }
160
161 func (bds *boundDatastore) GetMulti(keys []*ds.Key, _meta ds.MultiMetaGetter, cb ds.GetMultiCB) error {
162 nativeKeys := bds.gaeKeysToNative(keys...)
163 nativePLS := make([]*nativePropertyLoadSaver, len(nativeKeys))
164 for i := range nativePLS {
165 nativePLS[i] = bds.mkNPLS(nil)
166 }
167
168 var err error
169 if tx := bds.transaction; tx != nil {
170 // Transactional GetMulti.
171 err = tx.GetMulti(nativeKeys, nativePLS)
172 } else {
173 // Non-transactional GetMulti.
174 err = bds.client.GetMulti(bds, nativeKeys, nativePLS)
175 }
176
177 return idxCallbacker(err, len(nativePLS), func(idx int, err error) error {
178 return cb(nativePLS[idx].pmap, err)
179 })
180 }
181
182 func (bds *boundDatastore) PutMulti(keys []*ds.Key, vals []ds.PropertyMap, cb ds .NewKeyCB) error {
183 nativeKeys := bds.gaeKeysToNative(keys...)
184 nativePLS := make([]*nativePropertyLoadSaver, len(vals))
185 for i := range nativePLS {
186 nativePLS[i] = bds.mkNPLS(vals[i])
187 }
188
189 var err error
190 if tx := bds.transaction; tx != nil {
191 // Transactional PutMulti.
192 //
193 // In order to simulate the presence of mid-transaction key allo cation, we
194 // will identify any incomplete keys and allocate IDs for them. This is
195 // potentially wasteful in the event of failed or retried transa ctions, but
196 // it is required to maintain API compatibility with the datasto re
197 // interface.
198 var incompleteKeys []*datastore.Key
199 var incompleteKeyMap map[int]int
200 for i, k := range nativeKeys {
201 if k.Incomplete() {
202 if incompleteKeyMap == nil {
203 // Optimization: if there are any incomp lete keys, allocate room for
204 // the full range.
205 incompleteKeyMap = make(map[int]int, len (nativeKeys)-i)
206 incompleteKeys = make([]*datastore.Key, 0, len(nativeKeys)-i)
207 }
208 incompleteKeyMap[len(incompleteKeys)] = i
209 incompleteKeys = append(incompleteKeys, k)
210 }
211 }
212 if len(incompleteKeys) > 0 {
213 idKeys, err := bds.client.AllocateIDs(bds, incompleteKey s)
214 if err != nil {
215 return err
216 }
217 for i, idKey := range idKeys {
218 nativeKeys[incompleteKeyMap[i]] = idKey
219 }
220 }
221
222 _, err = tx.PutMulti(nativeKeys, nativePLS)
223 } else {
224 // Non-transactional PutMulti.
225 nativeKeys, err = bds.client.PutMulti(bds, nativeKeys, nativePLS )
226 }
227
228 return idxCallbacker(err, len(nativeKeys), func(idx int, err error) erro r {
229 if err == nil {
230 return cb(bds.nativeKeysToGAE(nativeKeys[idx])[0], nil)
231 }
232 return cb(nil, err)
233 })
234 }
235
236 func (bds *boundDatastore) DeleteMulti(keys []*ds.Key, cb ds.DeleteMultiCB) erro r {
237 nativeKeys := bds.gaeKeysToNative(keys...)
238
239 var err error
240 if tx := bds.transaction; tx != nil {
241 // Transactional DeleteMulti.
242 err = tx.DeleteMulti(nativeKeys)
243 } else {
244 // Non-transactional DeleteMulti.
245 err = bds.client.DeleteMulti(bds, nativeKeys)
246 }
247
248 return idxCallbacker(err, len(nativeKeys), func(_ int, err error) error {
249 return cb(err)
250 })
251 }
252
253 func (bds *boundDatastore) Testable() ds.Testable {
254 return nil
255 }
256
257 func (bds *boundDatastore) prepareNativeQuery(fq *ds.FinalizedQuery) *datastore. Query {
258 nq := datastore.NewQuery(fq.Kind())
259 if bds.transaction != nil {
260 nq = nq.Transaction(bds.transaction)
261 }
262
263 // nativeFilter translates a filter field. If the translation fails, we' ll
264 // pass the result through to the underlying datastore and allow it to
265 // reject it.
266 nativeFilter := func(prop ds.Property) interface{} {
267 if np, err := bds.gaePropertyToNative("", []ds.Property{prop}); err == nil {
268 return np.Value
269 }
270 return prop.Value()
271 }
272
273 // Equality filters.
274 for field, props := range fq.EqFilters() {
275 for _, prop := range props {
276 nq = nq.Filter(fmt.Sprintf("%s =", field), nativeFilter( prop))
277 }
278 }
279
280 // Inequality filters.
281 if ineq := fq.IneqFilterProp(); ineq != "" {
282 if field, op, prop := fq.IneqFilterLow(); field != "" {
283 nq = nq.Filter(fmt.Sprintf("%s %s", field, op), nativeFi lter(prop))
284 }
285
286 if field, op, prop := fq.IneqFilterHigh(); field != "" {
287 nq = nq.Filter(fmt.Sprintf("%s %s", field, op), nativeFi lter(prop))
288 }
289 }
290
291 start, end := fq.Bounds()
292 if start != nil {
293 nq = nq.Start(start.(datastore.Cursor))
294 }
295 if end != nil {
296 nq = nq.End(end.(datastore.Cursor))
297 }
298
299 if fq.Distinct() {
300 nq = nq.Distinct()
301 }
302 if fq.KeysOnly() {
303 nq = nq.KeysOnly()
304 }
305 if limit, ok := fq.Limit(); ok {
306 nq = nq.Limit(int(limit))
307 }
308 if offset, ok := fq.Offset(); ok {
309 nq = nq.Offset(int(offset))
310 }
311 if proj := fq.Project(); proj != nil {
312 nq = nq.Project(proj...)
313 }
314 if ancestor := fq.Ancestor(); ancestor != nil {
315 nq = nq.Ancestor(bds.gaeKeysToNative(ancestor)[0])
316 }
317 if fq.EventuallyConsistent() {
318 nq = nq.EventualConsistency()
319 }
320
321 for _, ic := range fq.Orders() {
322 prop := ic.Property
323 if ic.Descending {
324 prop = "-" + prop
325 }
326 nq = nq.Order(prop)
327 }
328
329 return nq
330 }
331
332 func (bds *boundDatastore) mkNPLS(base ds.PropertyMap) *nativePropertyLoadSaver {
333 return &nativePropertyLoadSaver{bds: bds, pmap: clonePropertyMap(base)}
334 }
335
336 func (bds *boundDatastore) gaePropertyToNative(name string, props []ds.Property) (nativeProp datastore.Property, err error) {
337 nativeProp.Name = name
338
339 nativeValues := make([]interface{}, len(props))
340 for i, prop := range props {
341 switch pt := prop.Type(); pt {
342 case ds.PTNull, ds.PTInt, ds.PTTime, ds.PTBool, ds.PTBytes, ds.P TString, ds.PTFloat:
343 nativeValues[i] = prop.Value()
344 break
345
346 case ds.PTKey:
347 nativeValues[i] = bds.gaeKeysToNative(prop.Value().(*ds. Key))[0]
348
349 default:
350 err = fmt.Errorf("unsupported property type at %d: %v", i, pt)
351 return
352 }
353 }
354
355 if len(nativeValues) == 1 {
356 nativeProp.Value = nativeValues[0]
357 nativeProp.NoIndex = (props[0].IndexSetting() != ds.ShouldIndex)
358 } else {
359 // We must always index list values.
360 nativeProp.Value = nativeValues
361 }
362 return
363 }
364
365 func (bds *boundDatastore) nativePropertyToGAE(nativeProp datastore.Property) (n ame string, props []ds.Property, err error) {
366 name = nativeProp.Name
367
368 var nativeValues []interface{}
369 // Slice of supported native type. Break this into a slice of datastore
370 // properties.
371 //
372 // It must be an []interface{}.
373 if rv := reflect.ValueOf(nativeProp.Value); rv.Kind() == reflect.Slice & & rv.Type().Elem().Kind() == reflect.Interface {
374 nativeValues = rv.Interface().([]interface{})
375 } else {
376 nativeValues = []interface{}{nativeProp.Value}
377 }
378
379 if len(nativeValues) == 0 {
380 return
381 }
382
383 props = make([]ds.Property, len(nativeValues))
384 for i, nv := range nativeValues {
385 switch nvt := nv.(type) {
386 case int64, bool, string, float64, []byte:
387 break
388
389 case time.Time:
390 // Cloud datastore library returns local time.
391 nv = nvt.UTC()
392
393 case *datastore.Key:
394 nv = bds.nativeKeysToGAE(nvt)[0]
395
396 default:
397 err = fmt.Errorf("element %d has unsupported datastore.V alue type %T", i, nv)
398 return
399 }
400
401 indexSetting := ds.ShouldIndex
402 if nativeProp.NoIndex {
403 indexSetting = ds.NoIndex
404 }
405 props[i].SetValue(nv, indexSetting)
406 }
407 return
408 }
409
410 func (bds *boundDatastore) gaeKeysToNative(keys ...*ds.Key) []*datastore.Key {
411 nativeKeys := make([]*datastore.Key, len(keys))
412 for i, key := range keys {
413 _, _, toks := key.Split()
414
415 var nativeKey *datastore.Key
416 for _, tok := range toks {
417 nativeKey = datastore.NewKey(bds, tok.Kind, tok.StringID , tok.IntID, nativeKey)
418 }
419 nativeKeys[i] = nativeKey
420 }
421 return nativeKeys
422 }
423
424 func (bds *boundDatastore) nativeKeysToGAE(nativeKeys ...*datastore.Key) []*ds.K ey {
425 keys := make([]*ds.Key, len(nativeKeys))
426 toks := make([]ds.KeyTok, 1)
427 for i, nativeKey := range nativeKeys {
428 toks = toks[:0]
429 cur := nativeKey
430 for {
431 toks = append(toks, ds.KeyTok{Kind: cur.Kind(), IntID: c ur.ID(), StringID: cur.Name()})
432 cur = cur.Parent()
433 if cur == nil {
434 break
435 }
436 }
437
438 // Reverse "toks" so we have ancestor-to-child lineage.
439 for i := 0; i < len(toks)/2; i++ {
440 ri := len(toks) - i - 1
441 toks[i], toks[ri] = toks[ri], toks[i]
442 }
443 keys[i] = ds.NewKeyToks(bds.appID, nativeKey.Namespace(), toks)
444 }
445 return keys
446 }
447
448 // nativePropertyLoadSaver is a ds.PropertyMap which implements
449 // datastore.PropertyLoadSaver.
450 //
451 // It naturally converts between native and GAE properties and values.
452 type nativePropertyLoadSaver struct {
453 bds *boundDatastore
454 pmap ds.PropertyMap
455 }
456
457 var _ datastore.PropertyLoadSaver = (*nativePropertyLoadSaver)(nil)
458
459 func (npls *nativePropertyLoadSaver) Load(props []datastore.Property) error {
460 if npls.pmap == nil {
461 // Allocate for common case: one property per property name.
462 npls.pmap = make(ds.PropertyMap, len(props))
463 }
464
465 for _, nativeProp := range props {
466 name, props, err := npls.bds.nativePropertyToGAE(nativeProp)
467 if err != nil {
468 return err
469 }
470 npls.pmap[name] = append(npls.pmap[name], props...)
471 }
472 return nil
473 }
474
475 func (npls *nativePropertyLoadSaver) Save() ([]datastore.Property, error) {
476 if len(npls.pmap) == 0 {
477 return nil, nil
478 }
479
480 props := make([]datastore.Property, 0, len(npls.pmap))
481 for name, plist := range npls.pmap {
482 // Strip meta.
483 if strings.HasPrefix(name, "$") {
484 continue
485 }
486
487 nativeProp, err := npls.bds.gaePropertyToNative(name, plist)
488 if err != nil {
489 return nil, err
490 }
491 props = append(props, nativeProp)
492 }
493 return props, nil
494 }
495
496 var datastoreTransactionKey = "*datastore.Transaction"
497
498 func withDatastoreTransaction(c context.Context, tx *datastore.Transaction) cont ext.Context {
499 return context.WithValue(c, &datastoreTransactionKey, tx)
500 }
501
502 func datastoreTransaction(c context.Context) *datastore.Transaction {
503 if tx, ok := c.Value(&datastoreTransactionKey).(*datastore.Transaction); ok {
504 return tx
505 }
506 return nil
507 }
508
509 func clonePropertyMap(pmap ds.PropertyMap) ds.PropertyMap {
510 if pmap == nil {
511 return nil
512 }
513
514 clone := make(ds.PropertyMap, len(pmap))
515 for k, props := range pmap {
516 clone[k] = append([]ds.Property(nil), props...)
517 }
518 return clone
519 }
520
521 func normalizeError(err error) error {
522 switch err {
523 case datastore.ErrNoSuchEntity:
524 return ds.ErrNoSuchEntity
525 case datastore.ErrConcurrentTransaction:
526 return ds.ErrConcurrentTransaction
527 case datastore.ErrInvalidKey:
528 return ds.ErrInvalidKey
529 default:
530 return err
531 }
532 }
OLDNEW
« no previous file with comments | « impl/cloud/context.go ('k') | impl/cloud/datastore_test.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698