Chromium Code Reviews| OLD | NEW |
|---|---|
| (Empty) | |
| 1 // Copyright 2016 The Chromium Authors. All rights reserved. | |
| 2 // Use of this source code is governed by a BSD-style license that can be | |
| 3 // found in the LICENSE file. | |
| 4 | |
| 5 package cloud | |
| 6 | |
| 7 import ( | |
| 8 "fmt" | |
| 9 "reflect" | |
| 10 "strings" | |
| 11 "time" | |
| 12 | |
| 13 "github.com/luci/luci-go/common/errors" | |
| 14 | |
| 15 ds "github.com/luci/gae/service/datastore" | |
| 16 infoS "github.com/luci/gae/service/info" | |
| 17 "google.golang.org/cloud/datastore" | |
| 18 | |
| 19 "golang.org/x/net/context" | |
| 20 ) | |
| 21 | |
| 22 type cloudDatastore struct { | |
| 23 client *datastore.Client | |
| 24 } | |
| 25 | |
| 26 func (cds *cloudDatastore) use(c context.Context) context.Context { | |
| 27 return ds.SetRawFactory(c, func(ic context.Context, wantTxn bool) ds.Raw Interface { | |
| 28 inf := infoS.Get(ic) | |
| 29 if ns, ok := inf.GetNamespace(); ok { | |
| 30 ic = datastore.WithNamespace(ic, ns) | |
| 31 } | |
| 32 | |
| 33 bds := boundDatastore{ | |
| 34 Context: ic, | |
| 35 cloudDatastore: cds, | |
| 36 appID: inf.FullyQualifiedAppID(), | |
| 37 } | |
| 38 if wantTxn { | |
| 39 bds.transaction = datastoreTransaction(ic) | |
| 40 } | |
| 41 return &bds | |
| 42 }) | |
| 43 } | |
| 44 | |
| 45 // boundDatastore is a bound instance of the cloudDatastore installed in the | |
| 46 // Context. | |
| 47 type boundDatastore struct { | |
| 48 // Context is the bound user Context. It includes the datastore namespac e, if | |
| 49 // one is set. | |
| 50 context.Context | |
| 51 *cloudDatastore | |
| 52 | |
| 53 appID string | |
|
iannucci
2016/05/23 21:53:06
namespace too?
dnj (Google)
2016/07/01 02:25:55
I'm tracking that in Info service and applying it
| |
| 54 transaction *datastore.Transaction | |
| 55 } | |
| 56 | |
| 57 func (bds *boundDatastore) AllocateIDs(incomplete *ds.Key, n int) (int64, error) { | |
| 58 // AllocateIDs assumes that a contiguous ID space will be returned. The cloud | |
| 59 // datastore library does not offer this guarantee. | |
|
iannucci
2016/05/23 21:53:06
IMO, I would just change the high level AllocateID
dnj (Google)
2016/07/01 02:25:55
Done.
| |
| 60 // | |
| 61 // It is our *expectation* that the remote datastore API will return a | |
| 62 // contiguous set of IDs for a given entity type. We will panic if this | |
| 63 // expectation is violated. | |
| 64 keys := make([]*ds.Key, n) | |
| 65 for i := range keys { | |
| 66 keys[i] = incomplete | |
| 67 } | |
| 68 | |
| 69 nativeKeys, err := bds.client.AllocateIDs(bds, bds.gaeKeysToNative(keys. ..)) | |
| 70 if err != nil { | |
| 71 return -1, normalizeError(err) | |
| 72 } | |
| 73 | |
| 74 keys = bds.nativeKeysToGAE(nativeKeys...) | |
| 75 start := keys[0].IntID() | |
| 76 | |
| 77 // Assert that the allocated IDs are contiguous. | |
| 78 expected := start + 1 | |
| 79 for _, key := range keys[1:] { | |
| 80 if id := key.IntID(); id != expected { | |
| 81 panic(fmt.Errorf("non-contiugous key IDs returned (%d != %d)", id, expected)) | |
| 82 } | |
| 83 expected++ | |
| 84 } | |
| 85 | |
| 86 return start, nil | |
| 87 } | |
| 88 | |
| 89 func (bds *boundDatastore) RunInTransaction(fn func(context.Context) error, opts *ds.TransactionOptions) error { | |
| 90 if bds.transaction != nil { | |
| 91 return errors.New("nested transactions are not supported") | |
| 92 } | |
| 93 | |
| 94 // The cloud datastore SDK does not expose any transaction options. | |
|
iannucci
2016/05/23 21:53:06
well that's a bummer.
dnj (Google)
2016/07/01 02:25:55
Acknowledged.
| |
| 95 if opts != nil { | |
| 96 switch { | |
| 97 case opts.XG: | |
| 98 return errors.New("cross-group transactions are not supp orted") | |
| 99 case opts.Attempts != 0 && opts.Attempts != 3: | |
|
iannucci
2016/05/23 21:53:06
This is actually a client library responsibility.
dnj (Google)
2016/07/01 02:25:55
Oh good to know. Implemented in the same manner as
| |
| 100 return errors.New("setting transaction attempts is not s upported") | |
| 101 } | |
| 102 } | |
| 103 | |
| 104 _, err := bds.client.RunInTransaction(bds, func(tx *datastore.Transactio n) error { | |
| 105 return fn(withDatastoreTransaction(bds, tx)) | |
| 106 }) | |
| 107 return normalizeError(err) | |
| 108 } | |
| 109 | |
| 110 func (bds *boundDatastore) DecodeCursor(s string) (ds.Cursor, error) { | |
| 111 cursor, err := datastore.DecodeCursor(s) | |
| 112 return cursor, normalizeError(err) | |
| 113 } | |
| 114 | |
| 115 func (bds *boundDatastore) Run(q *ds.FinalizedQuery, cb ds.RawRunCB) error { | |
| 116 it := bds.client.Run(bds, bds.prepareNativeQuery(q)) | |
| 117 cursorFn := func() (ds.Cursor, error) { | |
| 118 return it.Cursor() | |
| 119 } | |
| 120 | |
| 121 for { | |
| 122 var npls *nativePropertyLoadSaver | |
| 123 if !q.KeysOnly() { | |
| 124 npls = bds.mkNPLS(nil) | |
| 125 } | |
| 126 nativeKey, err := it.Next(npls) | |
| 127 if err != nil { | |
| 128 if err == datastore.Done { | |
| 129 return nil | |
| 130 } | |
| 131 return normalizeError(err) | |
| 132 } | |
| 133 | |
| 134 if err := cb(bds.nativeKeysToGAE(nativeKey)[0], npls.pmap, curso rFn); err != nil { | |
| 135 if err == ds.Stop { | |
| 136 return nil | |
| 137 } | |
| 138 return normalizeError(err) | |
| 139 } | |
| 140 } | |
| 141 } | |
| 142 | |
| 143 func (bds *boundDatastore) Count(q *ds.FinalizedQuery) (int64, error) { | |
| 144 v, err := bds.client.Count(bds, bds.prepareNativeQuery(q)) | |
| 145 if err != nil { | |
| 146 return -1, normalizeError(err) | |
| 147 } | |
| 148 return int64(v), nil | |
| 149 } | |
| 150 | |
| 151 func idxCallbacker(err error, amt int, cb func(idx int, err error) error) error { | |
| 152 if err == nil { | |
| 153 for i := 0; i < amt; i++ { | |
| 154 if err := cb(i, nil); err != nil { | |
| 155 return err | |
| 156 } | |
| 157 } | |
| 158 return nil | |
| 159 } | |
| 160 | |
| 161 err = errors.Fix(err) | |
| 162 if me, ok := err.(errors.MultiError); ok { | |
| 163 for i, err := range me { | |
| 164 if err := cb(i, normalizeError(err)); err != nil { | |
| 165 return err | |
| 166 } | |
| 167 } | |
| 168 return nil | |
| 169 } | |
| 170 return normalizeError(err) | |
| 171 } | |
| 172 | |
| 173 func (bds *boundDatastore) GetMulti(keys []*ds.Key, _meta ds.MultiMetaGetter, cb ds.GetMultiCB) error { | |
| 174 nativeKeys := bds.gaeKeysToNative(keys...) | |
| 175 nativePLS := make([]*nativePropertyLoadSaver, len(nativeKeys)) | |
| 176 for i := range nativePLS { | |
| 177 nativePLS[i] = bds.mkNPLS(nil) | |
| 178 } | |
| 179 | |
| 180 var err error | |
| 181 if tx := bds.transaction; tx != nil { | |
| 182 // Transactional GetMulti. | |
| 183 err = tx.GetMulti(nativeKeys, nativePLS) | |
| 184 } else { | |
| 185 // Non-transactional GetMulti. | |
| 186 err = bds.client.GetMulti(bds, nativeKeys, nativePLS) | |
| 187 } | |
| 188 | |
| 189 return idxCallbacker(err, len(nativePLS), func(idx int, err error) error { | |
| 190 return cb(nativePLS[idx].pmap, err) | |
| 191 }) | |
| 192 } | |
| 193 | |
| 194 func (bds *boundDatastore) PutMulti(keys []*ds.Key, vals []ds.PropertyMap, cb ds .PutMultiCB) error { | |
| 195 nativeKeys := bds.gaeKeysToNative(keys...) | |
| 196 nativePLS := make([]*nativePropertyLoadSaver, len(vals)) | |
| 197 for i := range nativePLS { | |
| 198 nativePLS[i] = bds.mkNPLS(vals[i]) | |
| 199 } | |
| 200 | |
| 201 var err error | |
| 202 if tx := bds.transaction; tx != nil { | |
| 203 // Transactional PutMulti. | |
| 204 // | |
| 205 // In order to simulate the presence of mid-transaction key allo cation, we | |
| 206 // will identify any incomplete keys and allocate IDs for them. This is | |
| 207 // potentially wasteful in the event o failed or retried transac tions, but | |
|
iannucci
2016/05/23 21:53:06
s/o/of
dnj (Google)
2016/07/01 02:25:55
Done.
| |
| 208 // it is required to maintain API compatibility with the datasto re | |
| 209 // interface. | |
| 210 var incompleteKeys []*datastore.Key | |
| 211 var incompleteKeyMap map[int]int | |
| 212 for i, k := range nativeKeys { | |
| 213 if k.Incomplete() { | |
| 214 if incompleteKeyMap == nil { | |
| 215 // Optimization: if there are any incomp lete keys, allocate room for | |
| 216 // the full range. | |
| 217 incompleteKeyMap = make(map[int]int, len (nativeKeys)-i) | |
| 218 incompleteKeys = make([]*datastore.Key, 0, len(nativeKeys)-i) | |
| 219 } | |
| 220 incompleteKeyMap[len(incompleteKeys)] = i | |
| 221 incompleteKeys = append(incompleteKeys, k) | |
| 222 } | |
| 223 } | |
| 224 if len(incompleteKeys) > 0 { | |
| 225 idKeys, err := bds.client.AllocateIDs(bds, incompleteKey s) | |
| 226 if err != nil { | |
| 227 return err | |
| 228 } | |
| 229 for i, idKey := range idKeys { | |
| 230 nativeKeys[incompleteKeyMap[i]] = idKey | |
| 231 } | |
| 232 } | |
| 233 | |
| 234 _, err = tx.PutMulti(nativeKeys, nativePLS) | |
| 235 } else { | |
| 236 // Non-transactional PutMulti. | |
| 237 nativeKeys, err = bds.client.PutMulti(bds, nativeKeys, nativePLS ) | |
| 238 } | |
| 239 | |
| 240 return idxCallbacker(err, len(nativeKeys), func(idx int, err error) erro r { | |
| 241 if err == nil { | |
| 242 return cb(bds.nativeKeysToGAE(nativeKeys[idx])[0], nil) | |
| 243 } | |
| 244 return cb(nil, err) | |
| 245 }) | |
| 246 } | |
| 247 | |
| 248 func (bds *boundDatastore) DeleteMulti(keys []*ds.Key, cb ds.DeleteMultiCB) erro r { | |
| 249 nativeKeys := bds.gaeKeysToNative(keys...) | |
| 250 | |
| 251 var err error | |
| 252 if tx := bds.transaction; tx != nil { | |
| 253 // Transactional DeleteMulti. | |
| 254 err = tx.DeleteMulti(nativeKeys) | |
| 255 } else { | |
| 256 // Non-transactional DeleteMulti. | |
| 257 err = bds.client.DeleteMulti(bds, nativeKeys) | |
| 258 } | |
| 259 | |
| 260 return idxCallbacker(err, len(nativeKeys), func(_ int, err error) error { | |
| 261 return cb(err) | |
| 262 }) | |
| 263 } | |
| 264 | |
| 265 func (bds *boundDatastore) Testable() ds.Testable { | |
| 266 return nil | |
| 267 } | |
| 268 | |
| 269 func (bds *boundDatastore) prepareNativeQuery(fq *ds.FinalizedQuery) *datastore. Query { | |
| 270 nq := datastore.NewQuery(fq.Kind()) | |
| 271 if bds.transaction != nil { | |
| 272 nq = nq.Transaction(bds.transaction) | |
| 273 } | |
| 274 | |
| 275 // nativeFilter translates a filter field. If the translation fails, we' ll | |
| 276 // pass the result through to the underlying datastore and allow it to | |
| 277 // reject it. | |
| 278 nativeFilter := func(prop ds.Property) interface{} { | |
| 279 if np, err := bds.gaePropertyToNative("", []ds.Property{prop}); err == nil { | |
| 280 return np.Value | |
| 281 } | |
| 282 return prop.Value() | |
| 283 } | |
| 284 | |
| 285 // Equality filters. | |
| 286 for field, props := range fq.EqFilters() { | |
| 287 for _, prop := range props { | |
| 288 nq = nq.Filter(fmt.Sprintf("%s =", field), nativeFilter( prop)) | |
| 289 } | |
| 290 } | |
| 291 | |
| 292 // Inequality filters. | |
| 293 if ineq := fq.IneqFilterProp(); ineq != "" { | |
| 294 if field, op, prop := fq.IneqFilterLow(); field != "" { | |
| 295 nq = nq.Filter(fmt.Sprintf("%s %s", field, op), nativeFi lter(prop)) | |
| 296 } | |
| 297 | |
| 298 if field, op, prop := fq.IneqFilterHigh(); field != "" { | |
| 299 nq = nq.Filter(fmt.Sprintf("%s %s", field, op), nativeFi lter(prop)) | |
| 300 } | |
| 301 } | |
| 302 | |
| 303 start, end := fq.Bounds() | |
| 304 if start != nil { | |
| 305 nq = nq.Start(start.(datastore.Cursor)) | |
| 306 } | |
| 307 if end != nil { | |
| 308 nq = nq.End(end.(datastore.Cursor)) | |
| 309 } | |
| 310 | |
| 311 if fq.Distinct() { | |
| 312 nq = nq.Distinct() | |
| 313 } | |
| 314 if fq.KeysOnly() { | |
| 315 nq = nq.KeysOnly() | |
| 316 } | |
| 317 if limit, ok := fq.Limit(); ok { | |
| 318 nq = nq.Limit(int(limit)) | |
| 319 } | |
| 320 if offset, ok := fq.Offset(); ok { | |
| 321 nq = nq.Offset(int(offset)) | |
| 322 } | |
| 323 if proj := fq.Project(); proj != nil { | |
| 324 nq = nq.Project(proj...) | |
| 325 } | |
| 326 if ancestor := fq.Ancestor(); ancestor != nil { | |
| 327 nq = nq.Ancestor(bds.gaeKeysToNative(ancestor)[0]) | |
| 328 } | |
| 329 if fq.EventuallyConsistent() { | |
| 330 nq = nq.EventualConsistency() | |
| 331 } | |
| 332 | |
| 333 for _, ic := range fq.Orders() { | |
| 334 prop := ic.Property | |
| 335 if ic.Descending { | |
| 336 prop = "-" + prop | |
| 337 } | |
| 338 nq = nq.Order(prop) | |
| 339 } | |
| 340 | |
| 341 return nq | |
| 342 } | |
| 343 | |
| 344 func (bds *boundDatastore) mkNPLS(base ds.PropertyMap) *nativePropertyLoadSaver { | |
| 345 return &nativePropertyLoadSaver{bds: bds, pmap: clonePropertyMap(base)} | |
| 346 } | |
| 347 | |
| 348 func (bds *boundDatastore) gaePropertyToNative(name string, props []ds.Property) (nativeProp datastore.Property, err error) { | |
| 349 nativeProp.Name = name | |
| 350 | |
| 351 nativeValues := make([]interface{}, len(props)) | |
| 352 for i, prop := range props { | |
| 353 switch pt := prop.Type(); pt { | |
| 354 case ds.PTNull, ds.PTInt, ds.PTTime, ds.PTBool, ds.PTBytes, ds.P TString, ds.PTFloat: | |
| 355 nativeValues[i] = prop.Value() | |
| 356 break | |
| 357 | |
| 358 case ds.PTKey: | |
| 359 nativeValues[i] = bds.gaeKeysToNative(prop.Value().(*ds. Key))[0] | |
| 360 | |
| 361 default: | |
| 362 err = fmt.Errorf("unsupported property type at %d: %v", i, pt) | |
| 363 return | |
| 364 } | |
| 365 } | |
| 366 | |
| 367 if len(nativeValues) == 1 { | |
| 368 nativeProp.Value = nativeValues[0] | |
| 369 nativeProp.NoIndex = (props[0].IndexSetting() != ds.ShouldIndex) | |
| 370 } else { | |
| 371 // We must always index list values. | |
| 372 nativeProp.Value = nativeValues | |
| 373 } | |
| 374 return | |
| 375 } | |
| 376 | |
| 377 func (bds *boundDatastore) nativePropertyToGAE(nativeProp datastore.Property) (n ame string, props []ds.Property, err error) { | |
| 378 name = nativeProp.Name | |
| 379 | |
| 380 var nativeValues []interface{} | |
| 381 // Slice of supported native type. Break this into a slice of datastore | |
| 382 // properties. | |
| 383 // | |
| 384 // It must be an []interface{}. | |
| 385 if rv := reflect.ValueOf(nativeProp.Value); rv.Kind() == reflect.Slice & & rv.Type().Elem().Kind() == reflect.Interface { | |
| 386 nativeValues = rv.Interface().([]interface{}) | |
| 387 } else { | |
| 388 nativeValues = []interface{}{nativeProp.Value} | |
| 389 } | |
| 390 | |
| 391 if len(nativeValues) == 0 { | |
| 392 return | |
| 393 } | |
| 394 | |
| 395 props = make([]ds.Property, len(nativeValues)) | |
| 396 for i, nv := range nativeValues { | |
| 397 switch nvt := nv.(type) { | |
| 398 case int64, bool, string, float64, []byte: | |
| 399 break | |
| 400 | |
| 401 case time.Time: | |
| 402 // Cloud datastore library returns local time. | |
| 403 nv = nvt.UTC() | |
| 404 | |
| 405 case *datastore.Key: | |
| 406 nv = bds.nativeKeysToGAE(nvt)[0] | |
| 407 | |
| 408 default: | |
| 409 err = fmt.Errorf("element %d has unsupported datastore.V alue type %T", i, nv) | |
| 410 return | |
| 411 } | |
| 412 | |
| 413 indexSetting := ds.ShouldIndex | |
| 414 if nativeProp.NoIndex { | |
| 415 indexSetting = ds.NoIndex | |
| 416 } | |
| 417 props[i].SetValue(nv, indexSetting) | |
| 418 } | |
| 419 return | |
| 420 } | |
| 421 | |
| 422 func (bds *boundDatastore) gaeKeysToNative(keys ...*ds.Key) []*datastore.Key { | |
| 423 nativeKeys := make([]*datastore.Key, len(keys)) | |
| 424 for i, key := range keys { | |
| 425 _, _, toks := key.Split() | |
| 426 | |
| 427 var nativeKey *datastore.Key | |
| 428 for _, tok := range toks { | |
| 429 nativeKey = datastore.NewKey(bds, tok.Kind, tok.StringID , tok.IntID, nativeKey) | |
| 430 } | |
| 431 nativeKeys[i] = nativeKey | |
| 432 } | |
| 433 return nativeKeys | |
| 434 } | |
| 435 | |
| 436 func (bds *boundDatastore) nativeKeysToGAE(nativeKeys ...*datastore.Key) []*ds.K ey { | |
| 437 keys := make([]*ds.Key, len(nativeKeys)) | |
| 438 toks := make([]ds.KeyTok, 1) | |
| 439 for i, nativeKey := range nativeKeys { | |
| 440 toks = toks[:0] | |
| 441 cur := nativeKey | |
| 442 for { | |
| 443 toks = append(toks, ds.KeyTok{Kind: cur.Kind(), IntID: c ur.ID(), StringID: cur.Name()}) | |
| 444 cur = cur.Parent() | |
| 445 if cur == nil { | |
| 446 break | |
| 447 } | |
| 448 } | |
| 449 | |
| 450 // Reverse "toks" so we have ancestor-to-child lineage. | |
| 451 for i := 0; i < len(toks)/2; i++ { | |
| 452 ri := len(toks) - i - 1 | |
| 453 toks[i], toks[ri] = toks[ri], toks[i] | |
| 454 } | |
| 455 keys[i] = ds.NewKeyToks(bds.appID, nativeKey.Namespace(), toks) | |
| 456 } | |
| 457 return keys | |
| 458 } | |
| 459 | |
| 460 // nativePropertyLoadSaver is a ds.PropertyMap which implements | |
| 461 // datastore.PropertyLoadSaver. | |
| 462 // | |
| 463 // It naturally converts between native and GAE properties and values. | |
| 464 type nativePropertyLoadSaver struct { | |
| 465 bds *boundDatastore | |
| 466 pmap ds.PropertyMap | |
| 467 } | |
| 468 | |
| 469 var _ datastore.PropertyLoadSaver = (*nativePropertyLoadSaver)(nil) | |
| 470 | |
| 471 func (npls *nativePropertyLoadSaver) Load(props []datastore.Property) error { | |
| 472 if npls.pmap == nil { | |
| 473 // Allocate for common case: one property per property name. | |
| 474 npls.pmap = make(ds.PropertyMap, len(props)) | |
| 475 } | |
| 476 | |
| 477 for _, nativeProp := range props { | |
| 478 name, props, err := npls.bds.nativePropertyToGAE(nativeProp) | |
| 479 if err != nil { | |
| 480 return err | |
| 481 } | |
| 482 npls.pmap[name] = append(npls.pmap[name], props...) | |
| 483 } | |
| 484 return nil | |
| 485 } | |
| 486 | |
| 487 func (npls *nativePropertyLoadSaver) Save() ([]datastore.Property, error) { | |
| 488 if len(npls.pmap) == 0 { | |
| 489 return nil, nil | |
| 490 } | |
| 491 | |
| 492 props := make([]datastore.Property, 0, len(npls.pmap)) | |
| 493 for name, plist := range npls.pmap { | |
| 494 // Strip meta. | |
| 495 if strings.HasPrefix(name, "$") { | |
| 496 continue | |
| 497 } | |
| 498 | |
| 499 nativeProp, err := npls.bds.gaePropertyToNative(name, plist) | |
| 500 if err != nil { | |
| 501 return nil, err | |
| 502 } | |
| 503 props = append(props, nativeProp) | |
| 504 } | |
| 505 return props, nil | |
| 506 } | |
| 507 | |
| 508 var datastoreTransactionKey = "*datastore.Transaction" | |
| 509 | |
| 510 func withDatastoreTransaction(c context.Context, tx *datastore.Transaction) cont ext.Context { | |
| 511 return context.WithValue(c, &datastoreTransactionKey, tx) | |
| 512 } | |
| 513 | |
| 514 func datastoreTransaction(c context.Context) *datastore.Transaction { | |
| 515 if tx, ok := c.Value(&datastoreTransactionKey).(*datastore.Transaction); ok { | |
| 516 return tx | |
| 517 } | |
| 518 return nil | |
| 519 } | |
| 520 | |
| 521 func clonePropertyMap(pmap ds.PropertyMap) ds.PropertyMap { | |
| 522 if pmap == nil { | |
| 523 return nil | |
| 524 } | |
| 525 | |
| 526 clone := make(ds.PropertyMap, len(pmap)) | |
| 527 for k, props := range pmap { | |
| 528 clone[k] = append([]ds.Property(nil), props...) | |
| 529 } | |
| 530 return clone | |
| 531 } | |
| 532 | |
| 533 func normalizeError(err error) error { | |
| 534 switch err { | |
| 535 case datastore.ErrNoSuchEntity: | |
| 536 return ds.ErrNoSuchEntity | |
| 537 case datastore.ErrConcurrentTransaction: | |
| 538 return ds.ErrConcurrentTransaction | |
| 539 case datastore.ErrInvalidKey: | |
| 540 return ds.ErrInvalidKey | |
| 541 default: | |
| 542 return err | |
| 543 } | |
| 544 } | |
| OLD | NEW |