OLD | NEW |
(Empty) | |
| 1 // Copyright 2016 The LUCI Authors. All rights reserved. |
| 2 // Use of this source code is governed under the Apache License, Version 2.0 |
| 3 // that can be found in the LICENSE file. |
| 4 |
| 5 package cloud |
| 6 |
| 7 import ( |
| 8 "fmt" |
| 9 "reflect" |
| 10 "strings" |
| 11 "time" |
| 12 |
| 13 "github.com/luci/luci-go/common/errors" |
| 14 |
| 15 ds "github.com/luci/gae/service/datastore" |
| 16 infoS "github.com/luci/gae/service/info" |
| 17 "google.golang.org/cloud/datastore" |
| 18 |
| 19 "golang.org/x/net/context" |
| 20 ) |
| 21 |
| 22 type cloudDatastore struct { |
| 23 client *datastore.Client |
| 24 } |
| 25 |
| 26 func (cds *cloudDatastore) use(c context.Context) context.Context { |
| 27 return ds.SetRawFactory(c, func(ic context.Context, wantTxn bool) ds.Raw
Interface { |
| 28 inf := infoS.Get(ic) |
| 29 if ns, ok := inf.GetNamespace(); ok { |
| 30 ic = datastore.WithNamespace(ic, ns) |
| 31 } |
| 32 |
| 33 bds := boundDatastore{ |
| 34 Context: ic, |
| 35 cloudDatastore: cds, |
| 36 appID: inf.FullyQualifiedAppID(), |
| 37 } |
| 38 if wantTxn { |
| 39 bds.transaction = datastoreTransaction(ic) |
| 40 } |
| 41 return &bds |
| 42 }) |
| 43 } |
| 44 |
| 45 // boundDatastore is a bound instance of the cloudDatastore installed in the |
| 46 // Context. |
| 47 type boundDatastore struct { |
| 48 // Context is the bound user Context. It includes the datastore namespac
e, if |
| 49 // one is set. |
| 50 context.Context |
| 51 *cloudDatastore |
| 52 |
| 53 appID string |
| 54 transaction *datastore.Transaction |
| 55 } |
| 56 |
| 57 func (bds *boundDatastore) AllocateIDs(keys []*ds.Key, cb ds.NewKeyCB) error { |
| 58 nativeKeys, err := bds.client.AllocateIDs(bds, bds.gaeKeysToNative(keys.
..)) |
| 59 if err != nil { |
| 60 return normalizeError(err) |
| 61 } |
| 62 |
| 63 keys = bds.nativeKeysToGAE(nativeKeys...) |
| 64 for _, key := range keys { |
| 65 cb(key, nil) |
| 66 } |
| 67 return nil |
| 68 } |
| 69 |
| 70 func (bds *boundDatastore) RunInTransaction(fn func(context.Context) error, opts
*ds.TransactionOptions) error { |
| 71 if bds.transaction != nil { |
| 72 return errors.New("nested transactions are not supported") |
| 73 } |
| 74 |
| 75 // The cloud datastore SDK does not expose any transaction options. |
| 76 if opts != nil { |
| 77 switch { |
| 78 case opts.XG: |
| 79 return errors.New("cross-group transactions are not supp
orted") |
| 80 } |
| 81 } |
| 82 |
| 83 attempts := 3 |
| 84 if opts != nil && opts.Attempts > 0 { |
| 85 attempts = opts.Attempts |
| 86 } |
| 87 for i := 0; i < attempts; i++ { |
| 88 _, err := bds.client.RunInTransaction(bds, func(tx *datastore.Tr
ansaction) error { |
| 89 return fn(withDatastoreTransaction(bds, tx)) |
| 90 }) |
| 91 if err = normalizeError(err); err != ds.ErrConcurrentTransaction
{ |
| 92 return err |
| 93 } |
| 94 } |
| 95 return ds.ErrConcurrentTransaction |
| 96 } |
| 97 |
| 98 func (bds *boundDatastore) DecodeCursor(s string) (ds.Cursor, error) { |
| 99 cursor, err := datastore.DecodeCursor(s) |
| 100 return cursor, normalizeError(err) |
| 101 } |
| 102 |
| 103 func (bds *boundDatastore) Run(q *ds.FinalizedQuery, cb ds.RawRunCB) error { |
| 104 it := bds.client.Run(bds, bds.prepareNativeQuery(q)) |
| 105 cursorFn := func() (ds.Cursor, error) { |
| 106 return it.Cursor() |
| 107 } |
| 108 |
| 109 for { |
| 110 var npls *nativePropertyLoadSaver |
| 111 if !q.KeysOnly() { |
| 112 npls = bds.mkNPLS(nil) |
| 113 } |
| 114 nativeKey, err := it.Next(npls) |
| 115 if err != nil { |
| 116 if err == datastore.Done { |
| 117 return nil |
| 118 } |
| 119 return normalizeError(err) |
| 120 } |
| 121 |
| 122 if err := cb(bds.nativeKeysToGAE(nativeKey)[0], npls.pmap, curso
rFn); err != nil { |
| 123 if err == ds.Stop { |
| 124 return nil |
| 125 } |
| 126 return normalizeError(err) |
| 127 } |
| 128 } |
| 129 } |
| 130 |
| 131 func (bds *boundDatastore) Count(q *ds.FinalizedQuery) (int64, error) { |
| 132 v, err := bds.client.Count(bds, bds.prepareNativeQuery(q)) |
| 133 if err != nil { |
| 134 return -1, normalizeError(err) |
| 135 } |
| 136 return int64(v), nil |
| 137 } |
| 138 |
| 139 func idxCallbacker(err error, amt int, cb func(idx int, err error) error) error
{ |
| 140 if err == nil { |
| 141 for i := 0; i < amt; i++ { |
| 142 if err := cb(i, nil); err != nil { |
| 143 return err |
| 144 } |
| 145 } |
| 146 return nil |
| 147 } |
| 148 |
| 149 err = errors.Fix(err) |
| 150 if me, ok := err.(errors.MultiError); ok { |
| 151 for i, err := range me { |
| 152 if err := cb(i, normalizeError(err)); err != nil { |
| 153 return err |
| 154 } |
| 155 } |
| 156 return nil |
| 157 } |
| 158 return normalizeError(err) |
| 159 } |
| 160 |
| 161 func (bds *boundDatastore) GetMulti(keys []*ds.Key, _meta ds.MultiMetaGetter, cb
ds.GetMultiCB) error { |
| 162 nativeKeys := bds.gaeKeysToNative(keys...) |
| 163 nativePLS := make([]*nativePropertyLoadSaver, len(nativeKeys)) |
| 164 for i := range nativePLS { |
| 165 nativePLS[i] = bds.mkNPLS(nil) |
| 166 } |
| 167 |
| 168 var err error |
| 169 if tx := bds.transaction; tx != nil { |
| 170 // Transactional GetMulti. |
| 171 err = tx.GetMulti(nativeKeys, nativePLS) |
| 172 } else { |
| 173 // Non-transactional GetMulti. |
| 174 err = bds.client.GetMulti(bds, nativeKeys, nativePLS) |
| 175 } |
| 176 |
| 177 return idxCallbacker(err, len(nativePLS), func(idx int, err error) error
{ |
| 178 return cb(nativePLS[idx].pmap, err) |
| 179 }) |
| 180 } |
| 181 |
| 182 func (bds *boundDatastore) PutMulti(keys []*ds.Key, vals []ds.PropertyMap, cb ds
.NewKeyCB) error { |
| 183 nativeKeys := bds.gaeKeysToNative(keys...) |
| 184 nativePLS := make([]*nativePropertyLoadSaver, len(vals)) |
| 185 for i := range nativePLS { |
| 186 nativePLS[i] = bds.mkNPLS(vals[i]) |
| 187 } |
| 188 |
| 189 var err error |
| 190 if tx := bds.transaction; tx != nil { |
| 191 // Transactional PutMulti. |
| 192 // |
| 193 // In order to simulate the presence of mid-transaction key allo
cation, we |
| 194 // will identify any incomplete keys and allocate IDs for them.
This is |
| 195 // potentially wasteful in the event of failed or retried transa
ctions, but |
| 196 // it is required to maintain API compatibility with the datasto
re |
| 197 // interface. |
| 198 var incompleteKeys []*datastore.Key |
| 199 var incompleteKeyMap map[int]int |
| 200 for i, k := range nativeKeys { |
| 201 if k.Incomplete() { |
| 202 if incompleteKeyMap == nil { |
| 203 // Optimization: if there are any incomp
lete keys, allocate room for |
| 204 // the full range. |
| 205 incompleteKeyMap = make(map[int]int, len
(nativeKeys)-i) |
| 206 incompleteKeys = make([]*datastore.Key,
0, len(nativeKeys)-i) |
| 207 } |
| 208 incompleteKeyMap[len(incompleteKeys)] = i |
| 209 incompleteKeys = append(incompleteKeys, k) |
| 210 } |
| 211 } |
| 212 if len(incompleteKeys) > 0 { |
| 213 idKeys, err := bds.client.AllocateIDs(bds, incompleteKey
s) |
| 214 if err != nil { |
| 215 return err |
| 216 } |
| 217 for i, idKey := range idKeys { |
| 218 nativeKeys[incompleteKeyMap[i]] = idKey |
| 219 } |
| 220 } |
| 221 |
| 222 _, err = tx.PutMulti(nativeKeys, nativePLS) |
| 223 } else { |
| 224 // Non-transactional PutMulti. |
| 225 nativeKeys, err = bds.client.PutMulti(bds, nativeKeys, nativePLS
) |
| 226 } |
| 227 |
| 228 return idxCallbacker(err, len(nativeKeys), func(idx int, err error) erro
r { |
| 229 if err == nil { |
| 230 return cb(bds.nativeKeysToGAE(nativeKeys[idx])[0], nil) |
| 231 } |
| 232 return cb(nil, err) |
| 233 }) |
| 234 } |
| 235 |
| 236 func (bds *boundDatastore) DeleteMulti(keys []*ds.Key, cb ds.DeleteMultiCB) erro
r { |
| 237 nativeKeys := bds.gaeKeysToNative(keys...) |
| 238 |
| 239 var err error |
| 240 if tx := bds.transaction; tx != nil { |
| 241 // Transactional DeleteMulti. |
| 242 err = tx.DeleteMulti(nativeKeys) |
| 243 } else { |
| 244 // Non-transactional DeleteMulti. |
| 245 err = bds.client.DeleteMulti(bds, nativeKeys) |
| 246 } |
| 247 |
| 248 return idxCallbacker(err, len(nativeKeys), func(_ int, err error) error
{ |
| 249 return cb(err) |
| 250 }) |
| 251 } |
| 252 |
| 253 func (bds *boundDatastore) Testable() ds.Testable { |
| 254 return nil |
| 255 } |
| 256 |
| 257 func (bds *boundDatastore) prepareNativeQuery(fq *ds.FinalizedQuery) *datastore.
Query { |
| 258 nq := datastore.NewQuery(fq.Kind()) |
| 259 if bds.transaction != nil { |
| 260 nq = nq.Transaction(bds.transaction) |
| 261 } |
| 262 |
| 263 // nativeFilter translates a filter field. If the translation fails, we'
ll |
| 264 // pass the result through to the underlying datastore and allow it to |
| 265 // reject it. |
| 266 nativeFilter := func(prop ds.Property) interface{} { |
| 267 if np, err := bds.gaePropertyToNative("", []ds.Property{prop});
err == nil { |
| 268 return np.Value |
| 269 } |
| 270 return prop.Value() |
| 271 } |
| 272 |
| 273 // Equality filters. |
| 274 for field, props := range fq.EqFilters() { |
| 275 for _, prop := range props { |
| 276 nq = nq.Filter(fmt.Sprintf("%s =", field), nativeFilter(
prop)) |
| 277 } |
| 278 } |
| 279 |
| 280 // Inequality filters. |
| 281 if ineq := fq.IneqFilterProp(); ineq != "" { |
| 282 if field, op, prop := fq.IneqFilterLow(); field != "" { |
| 283 nq = nq.Filter(fmt.Sprintf("%s %s", field, op), nativeFi
lter(prop)) |
| 284 } |
| 285 |
| 286 if field, op, prop := fq.IneqFilterHigh(); field != "" { |
| 287 nq = nq.Filter(fmt.Sprintf("%s %s", field, op), nativeFi
lter(prop)) |
| 288 } |
| 289 } |
| 290 |
| 291 start, end := fq.Bounds() |
| 292 if start != nil { |
| 293 nq = nq.Start(start.(datastore.Cursor)) |
| 294 } |
| 295 if end != nil { |
| 296 nq = nq.End(end.(datastore.Cursor)) |
| 297 } |
| 298 |
| 299 if fq.Distinct() { |
| 300 nq = nq.Distinct() |
| 301 } |
| 302 if fq.KeysOnly() { |
| 303 nq = nq.KeysOnly() |
| 304 } |
| 305 if limit, ok := fq.Limit(); ok { |
| 306 nq = nq.Limit(int(limit)) |
| 307 } |
| 308 if offset, ok := fq.Offset(); ok { |
| 309 nq = nq.Offset(int(offset)) |
| 310 } |
| 311 if proj := fq.Project(); proj != nil { |
| 312 nq = nq.Project(proj...) |
| 313 } |
| 314 if ancestor := fq.Ancestor(); ancestor != nil { |
| 315 nq = nq.Ancestor(bds.gaeKeysToNative(ancestor)[0]) |
| 316 } |
| 317 if fq.EventuallyConsistent() { |
| 318 nq = nq.EventualConsistency() |
| 319 } |
| 320 |
| 321 for _, ic := range fq.Orders() { |
| 322 prop := ic.Property |
| 323 if ic.Descending { |
| 324 prop = "-" + prop |
| 325 } |
| 326 nq = nq.Order(prop) |
| 327 } |
| 328 |
| 329 return nq |
| 330 } |
| 331 |
| 332 func (bds *boundDatastore) mkNPLS(base ds.PropertyMap) *nativePropertyLoadSaver
{ |
| 333 return &nativePropertyLoadSaver{bds: bds, pmap: clonePropertyMap(base)} |
| 334 } |
| 335 |
| 336 func (bds *boundDatastore) gaePropertyToNative(name string, props []ds.Property)
(nativeProp datastore.Property, err error) { |
| 337 nativeProp.Name = name |
| 338 |
| 339 nativeValues := make([]interface{}, len(props)) |
| 340 for i, prop := range props { |
| 341 switch pt := prop.Type(); pt { |
| 342 case ds.PTNull, ds.PTInt, ds.PTTime, ds.PTBool, ds.PTBytes, ds.P
TString, ds.PTFloat: |
| 343 nativeValues[i] = prop.Value() |
| 344 break |
| 345 |
| 346 case ds.PTKey: |
| 347 nativeValues[i] = bds.gaeKeysToNative(prop.Value().(*ds.
Key))[0] |
| 348 |
| 349 default: |
| 350 err = fmt.Errorf("unsupported property type at %d: %v",
i, pt) |
| 351 return |
| 352 } |
| 353 } |
| 354 |
| 355 if len(nativeValues) == 1 { |
| 356 nativeProp.Value = nativeValues[0] |
| 357 nativeProp.NoIndex = (props[0].IndexSetting() != ds.ShouldIndex) |
| 358 } else { |
| 359 // We must always index list values. |
| 360 nativeProp.Value = nativeValues |
| 361 } |
| 362 return |
| 363 } |
| 364 |
| 365 func (bds *boundDatastore) nativePropertyToGAE(nativeProp datastore.Property) (n
ame string, props []ds.Property, err error) { |
| 366 name = nativeProp.Name |
| 367 |
| 368 var nativeValues []interface{} |
| 369 // Slice of supported native type. Break this into a slice of datastore |
| 370 // properties. |
| 371 // |
| 372 // It must be an []interface{}. |
| 373 if rv := reflect.ValueOf(nativeProp.Value); rv.Kind() == reflect.Slice &
& rv.Type().Elem().Kind() == reflect.Interface { |
| 374 nativeValues = rv.Interface().([]interface{}) |
| 375 } else { |
| 376 nativeValues = []interface{}{nativeProp.Value} |
| 377 } |
| 378 |
| 379 if len(nativeValues) == 0 { |
| 380 return |
| 381 } |
| 382 |
| 383 props = make([]ds.Property, len(nativeValues)) |
| 384 for i, nv := range nativeValues { |
| 385 switch nvt := nv.(type) { |
| 386 case int64, bool, string, float64, []byte: |
| 387 break |
| 388 |
| 389 case time.Time: |
| 390 // Cloud datastore library returns local time. |
| 391 nv = nvt.UTC() |
| 392 |
| 393 case *datastore.Key: |
| 394 nv = bds.nativeKeysToGAE(nvt)[0] |
| 395 |
| 396 default: |
| 397 err = fmt.Errorf("element %d has unsupported datastore.V
alue type %T", i, nv) |
| 398 return |
| 399 } |
| 400 |
| 401 indexSetting := ds.ShouldIndex |
| 402 if nativeProp.NoIndex { |
| 403 indexSetting = ds.NoIndex |
| 404 } |
| 405 props[i].SetValue(nv, indexSetting) |
| 406 } |
| 407 return |
| 408 } |
| 409 |
| 410 func (bds *boundDatastore) gaeKeysToNative(keys ...*ds.Key) []*datastore.Key { |
| 411 nativeKeys := make([]*datastore.Key, len(keys)) |
| 412 for i, key := range keys { |
| 413 _, _, toks := key.Split() |
| 414 |
| 415 var nativeKey *datastore.Key |
| 416 for _, tok := range toks { |
| 417 nativeKey = datastore.NewKey(bds, tok.Kind, tok.StringID
, tok.IntID, nativeKey) |
| 418 } |
| 419 nativeKeys[i] = nativeKey |
| 420 } |
| 421 return nativeKeys |
| 422 } |
| 423 |
| 424 func (bds *boundDatastore) nativeKeysToGAE(nativeKeys ...*datastore.Key) []*ds.K
ey { |
| 425 keys := make([]*ds.Key, len(nativeKeys)) |
| 426 toks := make([]ds.KeyTok, 1) |
| 427 for i, nativeKey := range nativeKeys { |
| 428 toks = toks[:0] |
| 429 cur := nativeKey |
| 430 for { |
| 431 toks = append(toks, ds.KeyTok{Kind: cur.Kind(), IntID: c
ur.ID(), StringID: cur.Name()}) |
| 432 cur = cur.Parent() |
| 433 if cur == nil { |
| 434 break |
| 435 } |
| 436 } |
| 437 |
| 438 // Reverse "toks" so we have ancestor-to-child lineage. |
| 439 for i := 0; i < len(toks)/2; i++ { |
| 440 ri := len(toks) - i - 1 |
| 441 toks[i], toks[ri] = toks[ri], toks[i] |
| 442 } |
| 443 keys[i] = ds.NewKeyToks(bds.appID, nativeKey.Namespace(), toks) |
| 444 } |
| 445 return keys |
| 446 } |
| 447 |
| 448 // nativePropertyLoadSaver is a ds.PropertyMap which implements |
| 449 // datastore.PropertyLoadSaver. |
| 450 // |
| 451 // It naturally converts between native and GAE properties and values. |
| 452 type nativePropertyLoadSaver struct { |
| 453 bds *boundDatastore |
| 454 pmap ds.PropertyMap |
| 455 } |
| 456 |
| 457 var _ datastore.PropertyLoadSaver = (*nativePropertyLoadSaver)(nil) |
| 458 |
| 459 func (npls *nativePropertyLoadSaver) Load(props []datastore.Property) error { |
| 460 if npls.pmap == nil { |
| 461 // Allocate for common case: one property per property name. |
| 462 npls.pmap = make(ds.PropertyMap, len(props)) |
| 463 } |
| 464 |
| 465 for _, nativeProp := range props { |
| 466 name, props, err := npls.bds.nativePropertyToGAE(nativeProp) |
| 467 if err != nil { |
| 468 return err |
| 469 } |
| 470 npls.pmap[name] = append(npls.pmap[name], props...) |
| 471 } |
| 472 return nil |
| 473 } |
| 474 |
| 475 func (npls *nativePropertyLoadSaver) Save() ([]datastore.Property, error) { |
| 476 if len(npls.pmap) == 0 { |
| 477 return nil, nil |
| 478 } |
| 479 |
| 480 props := make([]datastore.Property, 0, len(npls.pmap)) |
| 481 for name, plist := range npls.pmap { |
| 482 // Strip meta. |
| 483 if strings.HasPrefix(name, "$") { |
| 484 continue |
| 485 } |
| 486 |
| 487 nativeProp, err := npls.bds.gaePropertyToNative(name, plist) |
| 488 if err != nil { |
| 489 return nil, err |
| 490 } |
| 491 props = append(props, nativeProp) |
| 492 } |
| 493 return props, nil |
| 494 } |
| 495 |
| 496 var datastoreTransactionKey = "*datastore.Transaction" |
| 497 |
| 498 func withDatastoreTransaction(c context.Context, tx *datastore.Transaction) cont
ext.Context { |
| 499 return context.WithValue(c, &datastoreTransactionKey, tx) |
| 500 } |
| 501 |
| 502 func datastoreTransaction(c context.Context) *datastore.Transaction { |
| 503 if tx, ok := c.Value(&datastoreTransactionKey).(*datastore.Transaction);
ok { |
| 504 return tx |
| 505 } |
| 506 return nil |
| 507 } |
| 508 |
| 509 func clonePropertyMap(pmap ds.PropertyMap) ds.PropertyMap { |
| 510 if pmap == nil { |
| 511 return nil |
| 512 } |
| 513 |
| 514 clone := make(ds.PropertyMap, len(pmap)) |
| 515 for k, props := range pmap { |
| 516 clone[k] = append([]ds.Property(nil), props...) |
| 517 } |
| 518 return clone |
| 519 } |
| 520 |
| 521 func normalizeError(err error) error { |
| 522 switch err { |
| 523 case datastore.ErrNoSuchEntity: |
| 524 return ds.ErrNoSuchEntity |
| 525 case datastore.ErrConcurrentTransaction: |
| 526 return ds.ErrConcurrentTransaction |
| 527 case datastore.ErrInvalidKey: |
| 528 return ds.ErrInvalidKey |
| 529 default: |
| 530 return err |
| 531 } |
| 532 } |
OLD | NEW |