Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(517)

Side by Side Diff: impl/cloud/datastore.go

Issue 1957953002: Add cloud datastore implementation. (Closed) Base URL: https://chromium.googlesource.com/external/github.com/luci/gae@master
Patch Set: Created 4 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 package cloud
6
7 import (
8 "fmt"
9 "reflect"
10 "strings"
11 "time"
12
13 "github.com/luci/luci-go/common/errors"
14
15 ds "github.com/luci/gae/service/datastore"
16 infoS "github.com/luci/gae/service/info"
17 "google.golang.org/cloud/datastore"
18
19 "golang.org/x/net/context"
20 )
21
22 type cloudDatastore struct {
23 client *datastore.Client
24 }
25
26 func (cds *cloudDatastore) use(c context.Context) context.Context {
27 return ds.SetRawFactory(c, func(ic context.Context, wantTxn bool) ds.Raw Interface {
28 inf := infoS.Get(ic)
29 if ns, ok := inf.GetNamespace(); ok {
30 ic = datastore.WithNamespace(ic, ns)
31 }
32
33 bds := boundDatastore{
34 Context: ic,
35 cloudDatastore: cds,
36 appID: inf.FullyQualifiedAppID(),
37 }
38 if wantTxn {
39 bds.transaction = datastoreTransaction(ic)
40 }
41 return &bds
42 })
43 }
44
45 // boundDatastore is a bound instance of the cloudDatastore installed in the
46 // Context.
47 type boundDatastore struct {
48 // Context is the bound user Context. It includes the datastore namespac e, if
49 // one is set.
50 context.Context
51 *cloudDatastore
52
53 appID string
iannucci 2016/05/23 21:53:06 namespace too?
dnj (Google) 2016/07/01 02:25:55 I'm tracking that in Info service and applying it
54 transaction *datastore.Transaction
55 }
56
57 func (bds *boundDatastore) AllocateIDs(incomplete *ds.Key, n int) (int64, error) {
58 // AllocateIDs assumes that a contiguous ID space will be returned. The cloud
59 // datastore library does not offer this guarantee.
iannucci 2016/05/23 21:53:06 IMO, I would just change the high level AllocateID
dnj (Google) 2016/07/01 02:25:55 Done.
60 //
61 // It is our *expectation* that the remote datastore API will return a
62 // contiguous set of IDs for a given entity type. We will panic if this
63 // expectation is violated.
64 keys := make([]*ds.Key, n)
65 for i := range keys {
66 keys[i] = incomplete
67 }
68
69 nativeKeys, err := bds.client.AllocateIDs(bds, bds.gaeKeysToNative(keys. ..))
70 if err != nil {
71 return -1, normalizeError(err)
72 }
73
74 keys = bds.nativeKeysToGAE(nativeKeys...)
75 start := keys[0].IntID()
76
77 // Assert that the allocated IDs are contiguous.
78 expected := start + 1
79 for _, key := range keys[1:] {
80 if id := key.IntID(); id != expected {
81 panic(fmt.Errorf("non-contiugous key IDs returned (%d != %d)", id, expected))
82 }
83 expected++
84 }
85
86 return start, nil
87 }
88
89 func (bds *boundDatastore) RunInTransaction(fn func(context.Context) error, opts *ds.TransactionOptions) error {
90 if bds.transaction != nil {
91 return errors.New("nested transactions are not supported")
92 }
93
94 // The cloud datastore SDK does not expose any transaction options.
iannucci 2016/05/23 21:53:06 well that's a bummer.
dnj (Google) 2016/07/01 02:25:55 Acknowledged.
95 if opts != nil {
96 switch {
97 case opts.XG:
98 return errors.New("cross-group transactions are not supp orted")
99 case opts.Attempts != 0 && opts.Attempts != 3:
iannucci 2016/05/23 21:53:06 This is actually a client library responsibility.
dnj (Google) 2016/07/01 02:25:55 Oh good to know. Implemented in the same manner as
100 return errors.New("setting transaction attempts is not s upported")
101 }
102 }
103
104 _, err := bds.client.RunInTransaction(bds, func(tx *datastore.Transactio n) error {
105 return fn(withDatastoreTransaction(bds, tx))
106 })
107 return normalizeError(err)
108 }
109
110 func (bds *boundDatastore) DecodeCursor(s string) (ds.Cursor, error) {
111 cursor, err := datastore.DecodeCursor(s)
112 return cursor, normalizeError(err)
113 }
114
115 func (bds *boundDatastore) Run(q *ds.FinalizedQuery, cb ds.RawRunCB) error {
116 it := bds.client.Run(bds, bds.prepareNativeQuery(q))
117 cursorFn := func() (ds.Cursor, error) {
118 return it.Cursor()
119 }
120
121 for {
122 var npls *nativePropertyLoadSaver
123 if !q.KeysOnly() {
124 npls = bds.mkNPLS(nil)
125 }
126 nativeKey, err := it.Next(npls)
127 if err != nil {
128 if err == datastore.Done {
129 return nil
130 }
131 return normalizeError(err)
132 }
133
134 if err := cb(bds.nativeKeysToGAE(nativeKey)[0], npls.pmap, curso rFn); err != nil {
135 if err == ds.Stop {
136 return nil
137 }
138 return normalizeError(err)
139 }
140 }
141 }
142
143 func (bds *boundDatastore) Count(q *ds.FinalizedQuery) (int64, error) {
144 v, err := bds.client.Count(bds, bds.prepareNativeQuery(q))
145 if err != nil {
146 return -1, normalizeError(err)
147 }
148 return int64(v), nil
149 }
150
151 func idxCallbacker(err error, amt int, cb func(idx int, err error) error) error {
152 if err == nil {
153 for i := 0; i < amt; i++ {
154 if err := cb(i, nil); err != nil {
155 return err
156 }
157 }
158 return nil
159 }
160
161 err = errors.Fix(err)
162 if me, ok := err.(errors.MultiError); ok {
163 for i, err := range me {
164 if err := cb(i, normalizeError(err)); err != nil {
165 return err
166 }
167 }
168 return nil
169 }
170 return normalizeError(err)
171 }
172
173 func (bds *boundDatastore) GetMulti(keys []*ds.Key, _meta ds.MultiMetaGetter, cb ds.GetMultiCB) error {
174 nativeKeys := bds.gaeKeysToNative(keys...)
175 nativePLS := make([]*nativePropertyLoadSaver, len(nativeKeys))
176 for i := range nativePLS {
177 nativePLS[i] = bds.mkNPLS(nil)
178 }
179
180 var err error
181 if tx := bds.transaction; tx != nil {
182 // Transactional GetMulti.
183 err = tx.GetMulti(nativeKeys, nativePLS)
184 } else {
185 // Non-transactional GetMulti.
186 err = bds.client.GetMulti(bds, nativeKeys, nativePLS)
187 }
188
189 return idxCallbacker(err, len(nativePLS), func(idx int, err error) error {
190 return cb(nativePLS[idx].pmap, err)
191 })
192 }
193
194 func (bds *boundDatastore) PutMulti(keys []*ds.Key, vals []ds.PropertyMap, cb ds .PutMultiCB) error {
195 nativeKeys := bds.gaeKeysToNative(keys...)
196 nativePLS := make([]*nativePropertyLoadSaver, len(vals))
197 for i := range nativePLS {
198 nativePLS[i] = bds.mkNPLS(vals[i])
199 }
200
201 var err error
202 if tx := bds.transaction; tx != nil {
203 // Transactional PutMulti.
204 //
205 // In order to simulate the presence of mid-transaction key allo cation, we
206 // will identify any incomplete keys and allocate IDs for them. This is
207 // potentially wasteful in the event o failed or retried transac tions, but
iannucci 2016/05/23 21:53:06 s/o/of
dnj (Google) 2016/07/01 02:25:55 Done.
208 // it is required to maintain API compatibility with the datasto re
209 // interface.
210 var incompleteKeys []*datastore.Key
211 var incompleteKeyMap map[int]int
212 for i, k := range nativeKeys {
213 if k.Incomplete() {
214 if incompleteKeyMap == nil {
215 // Optimization: if there are any incomp lete keys, allocate room for
216 // the full range.
217 incompleteKeyMap = make(map[int]int, len (nativeKeys)-i)
218 incompleteKeys = make([]*datastore.Key, 0, len(nativeKeys)-i)
219 }
220 incompleteKeyMap[len(incompleteKeys)] = i
221 incompleteKeys = append(incompleteKeys, k)
222 }
223 }
224 if len(incompleteKeys) > 0 {
225 idKeys, err := bds.client.AllocateIDs(bds, incompleteKey s)
226 if err != nil {
227 return err
228 }
229 for i, idKey := range idKeys {
230 nativeKeys[incompleteKeyMap[i]] = idKey
231 }
232 }
233
234 _, err = tx.PutMulti(nativeKeys, nativePLS)
235 } else {
236 // Non-transactional PutMulti.
237 nativeKeys, err = bds.client.PutMulti(bds, nativeKeys, nativePLS )
238 }
239
240 return idxCallbacker(err, len(nativeKeys), func(idx int, err error) erro r {
241 if err == nil {
242 return cb(bds.nativeKeysToGAE(nativeKeys[idx])[0], nil)
243 }
244 return cb(nil, err)
245 })
246 }
247
248 func (bds *boundDatastore) DeleteMulti(keys []*ds.Key, cb ds.DeleteMultiCB) erro r {
249 nativeKeys := bds.gaeKeysToNative(keys...)
250
251 var err error
252 if tx := bds.transaction; tx != nil {
253 // Transactional DeleteMulti.
254 err = tx.DeleteMulti(nativeKeys)
255 } else {
256 // Non-transactional DeleteMulti.
257 err = bds.client.DeleteMulti(bds, nativeKeys)
258 }
259
260 return idxCallbacker(err, len(nativeKeys), func(_ int, err error) error {
261 return cb(err)
262 })
263 }
264
265 func (bds *boundDatastore) Testable() ds.Testable {
266 return nil
267 }
268
269 func (bds *boundDatastore) prepareNativeQuery(fq *ds.FinalizedQuery) *datastore. Query {
270 nq := datastore.NewQuery(fq.Kind())
271 if bds.transaction != nil {
272 nq = nq.Transaction(bds.transaction)
273 }
274
275 // nativeFilter translates a filter field. If the translation fails, we' ll
276 // pass the result through to the underlying datastore and allow it to
277 // reject it.
278 nativeFilter := func(prop ds.Property) interface{} {
279 if np, err := bds.gaePropertyToNative("", []ds.Property{prop}); err == nil {
280 return np.Value
281 }
282 return prop.Value()
283 }
284
285 // Equality filters.
286 for field, props := range fq.EqFilters() {
287 for _, prop := range props {
288 nq = nq.Filter(fmt.Sprintf("%s =", field), nativeFilter( prop))
289 }
290 }
291
292 // Inequality filters.
293 if ineq := fq.IneqFilterProp(); ineq != "" {
294 if field, op, prop := fq.IneqFilterLow(); field != "" {
295 nq = nq.Filter(fmt.Sprintf("%s %s", field, op), nativeFi lter(prop))
296 }
297
298 if field, op, prop := fq.IneqFilterHigh(); field != "" {
299 nq = nq.Filter(fmt.Sprintf("%s %s", field, op), nativeFi lter(prop))
300 }
301 }
302
303 start, end := fq.Bounds()
304 if start != nil {
305 nq = nq.Start(start.(datastore.Cursor))
306 }
307 if end != nil {
308 nq = nq.End(end.(datastore.Cursor))
309 }
310
311 if fq.Distinct() {
312 nq = nq.Distinct()
313 }
314 if fq.KeysOnly() {
315 nq = nq.KeysOnly()
316 }
317 if limit, ok := fq.Limit(); ok {
318 nq = nq.Limit(int(limit))
319 }
320 if offset, ok := fq.Offset(); ok {
321 nq = nq.Offset(int(offset))
322 }
323 if proj := fq.Project(); proj != nil {
324 nq = nq.Project(proj...)
325 }
326 if ancestor := fq.Ancestor(); ancestor != nil {
327 nq = nq.Ancestor(bds.gaeKeysToNative(ancestor)[0])
328 }
329 if fq.EventuallyConsistent() {
330 nq = nq.EventualConsistency()
331 }
332
333 for _, ic := range fq.Orders() {
334 prop := ic.Property
335 if ic.Descending {
336 prop = "-" + prop
337 }
338 nq = nq.Order(prop)
339 }
340
341 return nq
342 }
343
344 func (bds *boundDatastore) mkNPLS(base ds.PropertyMap) *nativePropertyLoadSaver {
345 return &nativePropertyLoadSaver{bds: bds, pmap: clonePropertyMap(base)}
346 }
347
348 func (bds *boundDatastore) gaePropertyToNative(name string, props []ds.Property) (nativeProp datastore.Property, err error) {
349 nativeProp.Name = name
350
351 nativeValues := make([]interface{}, len(props))
352 for i, prop := range props {
353 switch pt := prop.Type(); pt {
354 case ds.PTNull, ds.PTInt, ds.PTTime, ds.PTBool, ds.PTBytes, ds.P TString, ds.PTFloat:
355 nativeValues[i] = prop.Value()
356 break
357
358 case ds.PTKey:
359 nativeValues[i] = bds.gaeKeysToNative(prop.Value().(*ds. Key))[0]
360
361 default:
362 err = fmt.Errorf("unsupported property type at %d: %v", i, pt)
363 return
364 }
365 }
366
367 if len(nativeValues) == 1 {
368 nativeProp.Value = nativeValues[0]
369 nativeProp.NoIndex = (props[0].IndexSetting() != ds.ShouldIndex)
370 } else {
371 // We must always index list values.
372 nativeProp.Value = nativeValues
373 }
374 return
375 }
376
377 func (bds *boundDatastore) nativePropertyToGAE(nativeProp datastore.Property) (n ame string, props []ds.Property, err error) {
378 name = nativeProp.Name
379
380 var nativeValues []interface{}
381 // Slice of supported native type. Break this into a slice of datastore
382 // properties.
383 //
384 // It must be an []interface{}.
385 if rv := reflect.ValueOf(nativeProp.Value); rv.Kind() == reflect.Slice & & rv.Type().Elem().Kind() == reflect.Interface {
386 nativeValues = rv.Interface().([]interface{})
387 } else {
388 nativeValues = []interface{}{nativeProp.Value}
389 }
390
391 if len(nativeValues) == 0 {
392 return
393 }
394
395 props = make([]ds.Property, len(nativeValues))
396 for i, nv := range nativeValues {
397 switch nvt := nv.(type) {
398 case int64, bool, string, float64, []byte:
399 break
400
401 case time.Time:
402 // Cloud datastore library returns local time.
403 nv = nvt.UTC()
404
405 case *datastore.Key:
406 nv = bds.nativeKeysToGAE(nvt)[0]
407
408 default:
409 err = fmt.Errorf("element %d has unsupported datastore.V alue type %T", i, nv)
410 return
411 }
412
413 indexSetting := ds.ShouldIndex
414 if nativeProp.NoIndex {
415 indexSetting = ds.NoIndex
416 }
417 props[i].SetValue(nv, indexSetting)
418 }
419 return
420 }
421
422 func (bds *boundDatastore) gaeKeysToNative(keys ...*ds.Key) []*datastore.Key {
423 nativeKeys := make([]*datastore.Key, len(keys))
424 for i, key := range keys {
425 _, _, toks := key.Split()
426
427 var nativeKey *datastore.Key
428 for _, tok := range toks {
429 nativeKey = datastore.NewKey(bds, tok.Kind, tok.StringID , tok.IntID, nativeKey)
430 }
431 nativeKeys[i] = nativeKey
432 }
433 return nativeKeys
434 }
435
436 func (bds *boundDatastore) nativeKeysToGAE(nativeKeys ...*datastore.Key) []*ds.K ey {
437 keys := make([]*ds.Key, len(nativeKeys))
438 toks := make([]ds.KeyTok, 1)
439 for i, nativeKey := range nativeKeys {
440 toks = toks[:0]
441 cur := nativeKey
442 for {
443 toks = append(toks, ds.KeyTok{Kind: cur.Kind(), IntID: c ur.ID(), StringID: cur.Name()})
444 cur = cur.Parent()
445 if cur == nil {
446 break
447 }
448 }
449
450 // Reverse "toks" so we have ancestor-to-child lineage.
451 for i := 0; i < len(toks)/2; i++ {
452 ri := len(toks) - i - 1
453 toks[i], toks[ri] = toks[ri], toks[i]
454 }
455 keys[i] = ds.NewKeyToks(bds.appID, nativeKey.Namespace(), toks)
456 }
457 return keys
458 }
459
460 // nativePropertyLoadSaver is a ds.PropertyMap which implements
461 // datastore.PropertyLoadSaver.
462 //
463 // It naturally converts between native and GAE properties and values.
464 type nativePropertyLoadSaver struct {
465 bds *boundDatastore
466 pmap ds.PropertyMap
467 }
468
469 var _ datastore.PropertyLoadSaver = (*nativePropertyLoadSaver)(nil)
470
471 func (npls *nativePropertyLoadSaver) Load(props []datastore.Property) error {
472 if npls.pmap == nil {
473 // Allocate for common case: one property per property name.
474 npls.pmap = make(ds.PropertyMap, len(props))
475 }
476
477 for _, nativeProp := range props {
478 name, props, err := npls.bds.nativePropertyToGAE(nativeProp)
479 if err != nil {
480 return err
481 }
482 npls.pmap[name] = append(npls.pmap[name], props...)
483 }
484 return nil
485 }
486
487 func (npls *nativePropertyLoadSaver) Save() ([]datastore.Property, error) {
488 if len(npls.pmap) == 0 {
489 return nil, nil
490 }
491
492 props := make([]datastore.Property, 0, len(npls.pmap))
493 for name, plist := range npls.pmap {
494 // Strip meta.
495 if strings.HasPrefix(name, "$") {
496 continue
497 }
498
499 nativeProp, err := npls.bds.gaePropertyToNative(name, plist)
500 if err != nil {
501 return nil, err
502 }
503 props = append(props, nativeProp)
504 }
505 return props, nil
506 }
507
508 var datastoreTransactionKey = "*datastore.Transaction"
509
510 func withDatastoreTransaction(c context.Context, tx *datastore.Transaction) cont ext.Context {
511 return context.WithValue(c, &datastoreTransactionKey, tx)
512 }
513
514 func datastoreTransaction(c context.Context) *datastore.Transaction {
515 if tx, ok := c.Value(&datastoreTransactionKey).(*datastore.Transaction); ok {
516 return tx
517 }
518 return nil
519 }
520
521 func clonePropertyMap(pmap ds.PropertyMap) ds.PropertyMap {
522 if pmap == nil {
523 return nil
524 }
525
526 clone := make(ds.PropertyMap, len(pmap))
527 for k, props := range pmap {
528 clone[k] = append([]ds.Property(nil), props...)
529 }
530 return clone
531 }
532
533 func normalizeError(err error) error {
534 switch err {
535 case datastore.ErrNoSuchEntity:
536 return ds.ErrNoSuchEntity
537 case datastore.ErrConcurrentTransaction:
538 return ds.ErrConcurrentTransaction
539 case datastore.ErrInvalidKey:
540 return ds.ErrInvalidKey
541 default:
542 return err
543 }
544 }
OLDNEW
« no previous file with comments | « impl/cloud/context.go ('k') | impl/cloud/datastore_test.go » ('j') | impl/cloud/info.go » ('J')

Powered by Google App Engine
This is Rietveld 408576698