Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(514)

Side by Side Diff: filter/txnBuf/state.go

Issue 1521823003: Clean up callback interfaces. (Closed) Base URL: https://github.com/luci/gae.git@extra
Patch Set: fixins Created 5 years ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « filter/txnBuf/query_merger.go ('k') | filter/txnBuf/txnbuf_test.go » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 package txnBuf 5 package txnBuf
6 6
7 import ( 7 import (
8 "bytes" 8 "bytes"
9 "sync" 9 "sync"
10 10
(...skipping 246 matching lines...) Expand 10 before | Expand all | Expand 10 after
257 data[i].buffered = true 257 data[i].buffered = true
258 if size > 0 { 258 if size > 0 {
259 idxMap = append(idxMap, i) 259 idxMap = append(idxMap, i)
260 toGetKeys = append(toGetKeys, key) 260 toGetKeys = append(toGetKeys, key)
261 } 261 }
262 } 262 }
263 } 263 }
264 264
265 if len(toGetKeys) > 0 { 265 if len(toGetKeys) > 0 {
266 j := 0 266 j := 0
267 » » » t.bufDS.GetMulti(toGetKeys, nil, func(pm datastore.Prope rtyMap, err error) { 267 » » » t.bufDS.GetMulti(toGetKeys, nil, func(pm datastore.Prope rtyMap, err error) error {
268 impossible(err) 268 impossible(err)
269 data[idxMap[j]].data = pm 269 data[idxMap[j]].data = pm
270 j++ 270 j++
271 return nil
271 }) 272 })
272 } 273 }
273 274
274 idxMap = nil 275 idxMap = nil
275 getKeys := []*datastore.Key(nil) 276 getKeys := []*datastore.Key(nil)
276 getMetas := datastore.MultiMetaGetter(nil) 277 getMetas := datastore.MultiMetaGetter(nil)
277 278
278 for i, itm := range data { 279 for i, itm := range data {
279 if !itm.buffered { 280 if !itm.buffered {
280 idxMap = append(idxMap, i) 281 idxMap = append(idxMap, i)
281 getKeys = append(getKeys, itm.key) 282 getKeys = append(getKeys, itm.key)
282 getMetas = append(getMetas, metas.GetSingle(i)) 283 getMetas = append(getMetas, metas.GetSingle(i))
283 } 284 }
284 } 285 }
285 286
286 if len(idxMap) > 0 { 287 if len(idxMap) > 0 {
287 j := 0 288 j := 0
288 » » » err := t.parentDS.GetMulti(getKeys, getMetas, func(pm da tastore.PropertyMap, err error) { 289 » » » err := t.parentDS.GetMulti(getKeys, getMetas, func(pm da tastore.PropertyMap, err error) error {
289 if err != datastore.ErrNoSuchEntity { 290 if err != datastore.ErrNoSuchEntity {
290 i := idxMap[j] 291 i := idxMap[j]
291 if !lme.Assign(i, err) { 292 if !lme.Assign(i, err) {
292 data[i].data = pm 293 data[i].data = pm
293 } 294 }
294 } 295 }
295 j++ 296 j++
297 return nil
296 }) 298 })
297 if err != nil { 299 if err != nil {
298 return err 300 return err
299 } 301 }
300 } 302 }
301 return nil 303 return nil
302 }() 304 }()
303 if err != nil { 305 if err != nil {
304 return err 306 return err
305 } 307 }
(...skipping 18 matching lines...) Expand all
324 if !haveLock { 326 if !haveLock {
325 t.Lock() 327 t.Lock()
326 defer t.Unlock() 328 defer t.Unlock()
327 } 329 }
328 330
329 if err := t.updateRootsLocked(roots); err != nil { 331 if err := t.updateRootsLocked(roots); err != nil {
330 return err 332 return err
331 } 333 }
332 334
333 i := 0 335 i := 0
334 » » err := t.bufDS.DeleteMulti(keys, func(err error) { 336 » » err := t.bufDS.DeleteMulti(keys, func(err error) error {
335 impossible(err) 337 impossible(err)
336 t.entState.set(encKeys[i], 0) 338 t.entState.set(encKeys[i], 0)
337 i++ 339 i++
340 return nil
338 }) 341 })
339 impossible(err) 342 impossible(err)
340 return nil 343 return nil
341 }() 344 }()
342 if err != nil { 345 if err != nil {
343 return err 346 return err
344 } 347 }
345 348
346 for range keys { 349 for range keys {
347 cb(nil) 350 cb(nil)
(...skipping 44 matching lines...) Expand 10 before | Expand all | Expand 10 after
392 if !haveLock { 395 if !haveLock {
393 t.Lock() 396 t.Lock()
394 defer t.Unlock() 397 defer t.Unlock()
395 } 398 }
396 399
397 if err := t.updateRootsLocked(roots); err != nil { 400 if err := t.updateRootsLocked(roots); err != nil {
398 return err 401 return err
399 } 402 }
400 403
401 i := 0 404 i := 0
402 » » err := t.bufDS.PutMulti(keys, vals, func(k *datastore.Key, err e rror) { 405 » » err := t.bufDS.PutMulti(keys, vals, func(k *datastore.Key, err e rror) error {
403 impossible(err) 406 impossible(err)
404 t.entState.set(encKeys[i], vals[i].EstimateSize()) 407 t.entState.set(encKeys[i], vals[i].EstimateSize())
405 i++ 408 i++
409 return nil
406 }) 410 })
407 impossible(err) 411 impossible(err)
408 return nil 412 return nil
409 }() 413 }()
410 if err != nil { 414 if err != nil {
411 return err 415 return err
412 } 416 }
413 417
414 for _, k := range keys { 418 for _, k := range keys {
415 cb(k, nil) 419 cb(k, nil)
416 } 420 }
417 return nil 421 return nil
418 } 422 }
419 423
420 func commitToReal(s *txnBufState) error { 424 func commitToReal(s *txnBufState) error {
421 toPut, toPutKeys, toDel := s.effect() 425 toPut, toPutKeys, toDel := s.effect()
422 426
423 return parallel.FanOutIn(func(ch chan<- func() error) { 427 return parallel.FanOutIn(func(ch chan<- func() error) {
424 if len(toPut) > 0 { 428 if len(toPut) > 0 {
425 ch <- func() error { 429 ch <- func() error {
426 mErr := errors.NewLazyMultiError(len(toPut)) 430 mErr := errors.NewLazyMultiError(len(toPut))
427 i := 0 431 i := 0
428 » » » » err := s.parentDS.PutMulti(toPutKeys, toPut, fun c(_ *datastore.Key, err error) { 432 » » » » err := s.parentDS.PutMulti(toPutKeys, toPut, fun c(_ *datastore.Key, err error) error {
429 mErr.Assign(i, err) 433 mErr.Assign(i, err)
430 i++ 434 i++
435 return nil
431 }) 436 })
432 if err == nil { 437 if err == nil {
433 err = mErr.Get() 438 err = mErr.Get()
434 } 439 }
435 return err 440 return err
436 } 441 }
437 } 442 }
438 if len(toDel) > 0 { 443 if len(toDel) > 0 {
439 ch <- func() error { 444 ch <- func() error {
440 mErr := errors.NewLazyMultiError(len(toDel)) 445 mErr := errors.NewLazyMultiError(len(toDel))
441 i := 0 446 i := 0
442 » » » » err := s.parentDS.DeleteMulti(toDel, func(err er ror) { 447 » » » » err := s.parentDS.DeleteMulti(toDel, func(err er ror) error {
443 mErr.Assign(i, err) 448 mErr.Assign(i, err)
444 i++ 449 i++
450 return nil
445 }) 451 })
446 if err == nil { 452 if err == nil {
447 err = mErr.Get() 453 err = mErr.Get()
448 } 454 }
449 return err 455 return err
450 } 456 }
451 } 457 }
452 }) 458 })
453 } 459 }
454 460
455 func (t *txnBufState) effect() (toPut []datastore.PropertyMap, toPutKeys, toDel []*datastore.Key) { 461 func (t *txnBufState) effect() (toPut []datastore.PropertyMap, toPutKeys, toDel []*datastore.Key) {
456 // TODO(riannucci): preallocate return slices 462 // TODO(riannucci): preallocate return slices
457 463
458 // need to pull all items out of the in-memory datastore. Fortunately we have 464 // need to pull all items out of the in-memory datastore. Fortunately we have
459 // kindless queries, and we disabled all the special entities, so just 465 // kindless queries, and we disabled all the special entities, so just
460 // run a kindless query without any filters and it will return all data 466 // run a kindless query without any filters and it will return all data
461 // currently in bufDS :). 467 // currently in bufDS :).
462 fq, err := datastore.NewQuery("").Finalize() 468 fq, err := datastore.NewQuery("").Finalize()
463 impossible(err) 469 impossible(err)
464 470
465 » err = t.bufDS.Run(fq, func(key *datastore.Key, data datastore.PropertyMa p, _ datastore.CursorCB) bool { 471 » err = t.bufDS.Run(fq, func(key *datastore.Key, data datastore.PropertyMa p, _ datastore.CursorCB) error {
466 toPutKeys = append(toPutKeys, key) 472 toPutKeys = append(toPutKeys, key)
467 toPut = append(toPut, data) 473 toPut = append(toPut, data)
468 » » return true 474 » » return nil
469 }) 475 })
470 memoryCorruption(err) 476 memoryCorruption(err)
471 477
472 for keyStr, size := range t.entState.keyToSize { 478 for keyStr, size := range t.entState.keyToSize {
473 if size == 0 { 479 if size == 0 {
474 k, err := serialize.ReadKey(bytes.NewBufferString(keyStr ), serialize.WithoutContext, t.aid, t.ns) 480 k, err := serialize.ReadKey(bytes.NewBufferString(keyStr ), serialize.WithoutContext, t.aid, t.ns)
475 memoryCorruption(err) 481 memoryCorruption(err)
476 toDel = append(toDel, k) 482 toDel = append(toDel, k)
477 } 483 }
478 } 484 }
(...skipping 11 matching lines...) Expand all
490 return ErrTransactionTooLarge 496 return ErrTransactionTooLarge
491 } 497 }
492 return nil 498 return nil
493 } 499 }
494 500
495 func (t *txnBufState) commitLocked(s *txnBufState) { 501 func (t *txnBufState) commitLocked(s *txnBufState) {
496 toPut, toPutKeys, toDel := s.effect() 502 toPut, toPutKeys, toDel := s.effect()
497 503
498 if len(toPut) > 0 { 504 if len(toPut) > 0 {
499 impossible(t.putMulti(toPutKeys, toPut, 505 impossible(t.putMulti(toPutKeys, toPut,
500 » » » func(_ *datastore.Key, err error) { impossible(err) }, t rue)) 506 » » » func(_ *datastore.Key, err error) error { return err }, true))
501 } 507 }
502 508
503 if len(toDel) > 0 { 509 if len(toDel) > 0 {
504 » » impossible(t.deleteMulti(toDel, impossible, true)) 510 » » impossible(t.deleteMulti(toDel, func(err error) error { return e rr }, true))
505 } 511 }
506 } 512 }
507 513
508 // toEncoded returns a list of all of the serialized versions of these keys, 514 // toEncoded returns a list of all of the serialized versions of these keys,
509 // plus a stringset of all the encoded root keys that `keys` represents. 515 // plus a stringset of all the encoded root keys that `keys` represents.
510 func toEncoded(keys []*datastore.Key) (full []string, roots stringset.Set) { 516 func toEncoded(keys []*datastore.Key) (full []string, roots stringset.Set) {
511 roots = stringset.New(len(keys)) 517 roots = stringset.New(len(keys))
512 full = make([]string, len(keys)) 518 full = make([]string, len(keys))
513 for i, k := range keys { 519 for i, k := range keys {
514 roots.Add(string(serialize.ToBytes(k.Root()))) 520 roots.Add(string(serialize.ToBytes(k.Root())))
515 full[i] = string(serialize.ToBytes(k)) 521 full[i] = string(serialize.ToBytes(k))
516 } 522 }
517 return 523 return
518 } 524 }
OLDNEW
« no previous file with comments | « filter/txnBuf/query_merger.go ('k') | filter/txnBuf/txnbuf_test.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698