OLD | NEW |
1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 package txnBuf | 5 package txnBuf |
6 | 6 |
7 import ( | 7 import ( |
8 "bytes" | 8 "bytes" |
9 "sync" | 9 "sync" |
10 | 10 |
(...skipping 246 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
257 data[i].buffered = true | 257 data[i].buffered = true |
258 if size > 0 { | 258 if size > 0 { |
259 idxMap = append(idxMap, i) | 259 idxMap = append(idxMap, i) |
260 toGetKeys = append(toGetKeys, key) | 260 toGetKeys = append(toGetKeys, key) |
261 } | 261 } |
262 } | 262 } |
263 } | 263 } |
264 | 264 |
265 if len(toGetKeys) > 0 { | 265 if len(toGetKeys) > 0 { |
266 j := 0 | 266 j := 0 |
267 » » » t.bufDS.GetMulti(toGetKeys, nil, func(pm datastore.Prope
rtyMap, err error) { | 267 » » » t.bufDS.GetMulti(toGetKeys, nil, func(pm datastore.Prope
rtyMap, err error) error { |
268 impossible(err) | 268 impossible(err) |
269 data[idxMap[j]].data = pm | 269 data[idxMap[j]].data = pm |
270 j++ | 270 j++ |
| 271 return nil |
271 }) | 272 }) |
272 } | 273 } |
273 | 274 |
274 idxMap = nil | 275 idxMap = nil |
275 getKeys := []*datastore.Key(nil) | 276 getKeys := []*datastore.Key(nil) |
276 getMetas := datastore.MultiMetaGetter(nil) | 277 getMetas := datastore.MultiMetaGetter(nil) |
277 | 278 |
278 for i, itm := range data { | 279 for i, itm := range data { |
279 if !itm.buffered { | 280 if !itm.buffered { |
280 idxMap = append(idxMap, i) | 281 idxMap = append(idxMap, i) |
281 getKeys = append(getKeys, itm.key) | 282 getKeys = append(getKeys, itm.key) |
282 getMetas = append(getMetas, metas.GetSingle(i)) | 283 getMetas = append(getMetas, metas.GetSingle(i)) |
283 } | 284 } |
284 } | 285 } |
285 | 286 |
286 if len(idxMap) > 0 { | 287 if len(idxMap) > 0 { |
287 j := 0 | 288 j := 0 |
288 » » » err := t.parentDS.GetMulti(getKeys, getMetas, func(pm da
tastore.PropertyMap, err error) { | 289 » » » err := t.parentDS.GetMulti(getKeys, getMetas, func(pm da
tastore.PropertyMap, err error) error { |
289 if err != datastore.ErrNoSuchEntity { | 290 if err != datastore.ErrNoSuchEntity { |
290 i := idxMap[j] | 291 i := idxMap[j] |
291 if !lme.Assign(i, err) { | 292 if !lme.Assign(i, err) { |
292 data[i].data = pm | 293 data[i].data = pm |
293 } | 294 } |
294 } | 295 } |
295 j++ | 296 j++ |
| 297 return nil |
296 }) | 298 }) |
297 if err != nil { | 299 if err != nil { |
298 return err | 300 return err |
299 } | 301 } |
300 } | 302 } |
301 return nil | 303 return nil |
302 }() | 304 }() |
303 if err != nil { | 305 if err != nil { |
304 return err | 306 return err |
305 } | 307 } |
(...skipping 18 matching lines...) Expand all Loading... |
324 if !haveLock { | 326 if !haveLock { |
325 t.Lock() | 327 t.Lock() |
326 defer t.Unlock() | 328 defer t.Unlock() |
327 } | 329 } |
328 | 330 |
329 if err := t.updateRootsLocked(roots); err != nil { | 331 if err := t.updateRootsLocked(roots); err != nil { |
330 return err | 332 return err |
331 } | 333 } |
332 | 334 |
333 i := 0 | 335 i := 0 |
334 » » err := t.bufDS.DeleteMulti(keys, func(err error) { | 336 » » err := t.bufDS.DeleteMulti(keys, func(err error) error { |
335 impossible(err) | 337 impossible(err) |
336 t.entState.set(encKeys[i], 0) | 338 t.entState.set(encKeys[i], 0) |
337 i++ | 339 i++ |
| 340 return nil |
338 }) | 341 }) |
339 impossible(err) | 342 impossible(err) |
340 return nil | 343 return nil |
341 }() | 344 }() |
342 if err != nil { | 345 if err != nil { |
343 return err | 346 return err |
344 } | 347 } |
345 | 348 |
346 for range keys { | 349 for range keys { |
347 cb(nil) | 350 cb(nil) |
(...skipping 44 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
392 if !haveLock { | 395 if !haveLock { |
393 t.Lock() | 396 t.Lock() |
394 defer t.Unlock() | 397 defer t.Unlock() |
395 } | 398 } |
396 | 399 |
397 if err := t.updateRootsLocked(roots); err != nil { | 400 if err := t.updateRootsLocked(roots); err != nil { |
398 return err | 401 return err |
399 } | 402 } |
400 | 403 |
401 i := 0 | 404 i := 0 |
402 » » err := t.bufDS.PutMulti(keys, vals, func(k *datastore.Key, err e
rror) { | 405 » » err := t.bufDS.PutMulti(keys, vals, func(k *datastore.Key, err e
rror) error { |
403 impossible(err) | 406 impossible(err) |
404 t.entState.set(encKeys[i], vals[i].EstimateSize()) | 407 t.entState.set(encKeys[i], vals[i].EstimateSize()) |
405 i++ | 408 i++ |
| 409 return nil |
406 }) | 410 }) |
407 impossible(err) | 411 impossible(err) |
408 return nil | 412 return nil |
409 }() | 413 }() |
410 if err != nil { | 414 if err != nil { |
411 return err | 415 return err |
412 } | 416 } |
413 | 417 |
414 for _, k := range keys { | 418 for _, k := range keys { |
415 cb(k, nil) | 419 cb(k, nil) |
416 } | 420 } |
417 return nil | 421 return nil |
418 } | 422 } |
419 | 423 |
420 func commitToReal(s *txnBufState) error { | 424 func commitToReal(s *txnBufState) error { |
421 toPut, toPutKeys, toDel := s.effect() | 425 toPut, toPutKeys, toDel := s.effect() |
422 | 426 |
423 return parallel.FanOutIn(func(ch chan<- func() error) { | 427 return parallel.FanOutIn(func(ch chan<- func() error) { |
424 if len(toPut) > 0 { | 428 if len(toPut) > 0 { |
425 ch <- func() error { | 429 ch <- func() error { |
426 mErr := errors.NewLazyMultiError(len(toPut)) | 430 mErr := errors.NewLazyMultiError(len(toPut)) |
427 i := 0 | 431 i := 0 |
428 » » » » err := s.parentDS.PutMulti(toPutKeys, toPut, fun
c(_ *datastore.Key, err error) { | 432 » » » » err := s.parentDS.PutMulti(toPutKeys, toPut, fun
c(_ *datastore.Key, err error) error { |
429 mErr.Assign(i, err) | 433 mErr.Assign(i, err) |
430 i++ | 434 i++ |
| 435 return nil |
431 }) | 436 }) |
432 if err == nil { | 437 if err == nil { |
433 err = mErr.Get() | 438 err = mErr.Get() |
434 } | 439 } |
435 return err | 440 return err |
436 } | 441 } |
437 } | 442 } |
438 if len(toDel) > 0 { | 443 if len(toDel) > 0 { |
439 ch <- func() error { | 444 ch <- func() error { |
440 mErr := errors.NewLazyMultiError(len(toDel)) | 445 mErr := errors.NewLazyMultiError(len(toDel)) |
441 i := 0 | 446 i := 0 |
442 » » » » err := s.parentDS.DeleteMulti(toDel, func(err er
ror) { | 447 » » » » err := s.parentDS.DeleteMulti(toDel, func(err er
ror) error { |
443 mErr.Assign(i, err) | 448 mErr.Assign(i, err) |
444 i++ | 449 i++ |
| 450 return nil |
445 }) | 451 }) |
446 if err == nil { | 452 if err == nil { |
447 err = mErr.Get() | 453 err = mErr.Get() |
448 } | 454 } |
449 return err | 455 return err |
450 } | 456 } |
451 } | 457 } |
452 }) | 458 }) |
453 } | 459 } |
454 | 460 |
455 func (t *txnBufState) effect() (toPut []datastore.PropertyMap, toPutKeys, toDel
[]*datastore.Key) { | 461 func (t *txnBufState) effect() (toPut []datastore.PropertyMap, toPutKeys, toDel
[]*datastore.Key) { |
456 // TODO(riannucci): preallocate return slices | 462 // TODO(riannucci): preallocate return slices |
457 | 463 |
458 // need to pull all items out of the in-memory datastore. Fortunately we
have | 464 // need to pull all items out of the in-memory datastore. Fortunately we
have |
459 // kindless queries, and we disabled all the special entities, so just | 465 // kindless queries, and we disabled all the special entities, so just |
460 // run a kindless query without any filters and it will return all data | 466 // run a kindless query without any filters and it will return all data |
461 // currently in bufDS :). | 467 // currently in bufDS :). |
462 fq, err := datastore.NewQuery("").Finalize() | 468 fq, err := datastore.NewQuery("").Finalize() |
463 impossible(err) | 469 impossible(err) |
464 | 470 |
465 » err = t.bufDS.Run(fq, func(key *datastore.Key, data datastore.PropertyMa
p, _ datastore.CursorCB) bool { | 471 » err = t.bufDS.Run(fq, func(key *datastore.Key, data datastore.PropertyMa
p, _ datastore.CursorCB) error { |
466 toPutKeys = append(toPutKeys, key) | 472 toPutKeys = append(toPutKeys, key) |
467 toPut = append(toPut, data) | 473 toPut = append(toPut, data) |
468 » » return true | 474 » » return nil |
469 }) | 475 }) |
470 memoryCorruption(err) | 476 memoryCorruption(err) |
471 | 477 |
472 for keyStr, size := range t.entState.keyToSize { | 478 for keyStr, size := range t.entState.keyToSize { |
473 if size == 0 { | 479 if size == 0 { |
474 k, err := serialize.ReadKey(bytes.NewBufferString(keyStr
), serialize.WithoutContext, t.aid, t.ns) | 480 k, err := serialize.ReadKey(bytes.NewBufferString(keyStr
), serialize.WithoutContext, t.aid, t.ns) |
475 memoryCorruption(err) | 481 memoryCorruption(err) |
476 toDel = append(toDel, k) | 482 toDel = append(toDel, k) |
477 } | 483 } |
478 } | 484 } |
(...skipping 11 matching lines...) Expand all Loading... |
490 return ErrTransactionTooLarge | 496 return ErrTransactionTooLarge |
491 } | 497 } |
492 return nil | 498 return nil |
493 } | 499 } |
494 | 500 |
495 func (t *txnBufState) commitLocked(s *txnBufState) { | 501 func (t *txnBufState) commitLocked(s *txnBufState) { |
496 toPut, toPutKeys, toDel := s.effect() | 502 toPut, toPutKeys, toDel := s.effect() |
497 | 503 |
498 if len(toPut) > 0 { | 504 if len(toPut) > 0 { |
499 impossible(t.putMulti(toPutKeys, toPut, | 505 impossible(t.putMulti(toPutKeys, toPut, |
500 » » » func(_ *datastore.Key, err error) { impossible(err) }, t
rue)) | 506 » » » func(_ *datastore.Key, err error) error { return err },
true)) |
501 } | 507 } |
502 | 508 |
503 if len(toDel) > 0 { | 509 if len(toDel) > 0 { |
504 » » impossible(t.deleteMulti(toDel, impossible, true)) | 510 » » impossible(t.deleteMulti(toDel, func(err error) error { return e
rr }, true)) |
505 } | 511 } |
506 } | 512 } |
507 | 513 |
508 // toEncoded returns a list of all of the serialized versions of these keys, | 514 // toEncoded returns a list of all of the serialized versions of these keys, |
509 // plus a stringset of all the encoded root keys that `keys` represents. | 515 // plus a stringset of all the encoded root keys that `keys` represents. |
510 func toEncoded(keys []*datastore.Key) (full []string, roots stringset.Set) { | 516 func toEncoded(keys []*datastore.Key) (full []string, roots stringset.Set) { |
511 roots = stringset.New(len(keys)) | 517 roots = stringset.New(len(keys)) |
512 full = make([]string, len(keys)) | 518 full = make([]string, len(keys)) |
513 for i, k := range keys { | 519 for i, k := range keys { |
514 roots.Add(string(serialize.ToBytes(k.Root()))) | 520 roots.Add(string(serialize.ToBytes(k.Root()))) |
515 full[i] = string(serialize.ToBytes(k)) | 521 full[i] = string(serialize.ToBytes(k)) |
516 } | 522 } |
517 return | 523 return |
518 } | 524 } |
OLD | NEW |