| OLD | NEW |
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 package txnBuf | 5 package txnBuf |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "bytes" | 8 "bytes" |
| 9 "sync" | 9 "sync" |
| 10 | 10 |
| (...skipping 246 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 257 data[i].buffered = true | 257 data[i].buffered = true |
| 258 if size > 0 { | 258 if size > 0 { |
| 259 idxMap = append(idxMap, i) | 259 idxMap = append(idxMap, i) |
| 260 toGetKeys = append(toGetKeys, key) | 260 toGetKeys = append(toGetKeys, key) |
| 261 } | 261 } |
| 262 } | 262 } |
| 263 } | 263 } |
| 264 | 264 |
| 265 if len(toGetKeys) > 0 { | 265 if len(toGetKeys) > 0 { |
| 266 j := 0 | 266 j := 0 |
| 267 » » » t.bufDS.GetMulti(toGetKeys, nil, func(pm datastore.Prope
rtyMap, err error) { | 267 » » » t.bufDS.GetMulti(toGetKeys, nil, func(pm datastore.Prope
rtyMap, err error) error { |
| 268 impossible(err) | 268 impossible(err) |
| 269 data[idxMap[j]].data = pm | 269 data[idxMap[j]].data = pm |
| 270 j++ | 270 j++ |
| 271 return nil |
| 271 }) | 272 }) |
| 272 } | 273 } |
| 273 | 274 |
| 274 idxMap = nil | 275 idxMap = nil |
| 275 getKeys := []*datastore.Key(nil) | 276 getKeys := []*datastore.Key(nil) |
| 276 getMetas := datastore.MultiMetaGetter(nil) | 277 getMetas := datastore.MultiMetaGetter(nil) |
| 277 | 278 |
| 278 for i, itm := range data { | 279 for i, itm := range data { |
| 279 if !itm.buffered { | 280 if !itm.buffered { |
| 280 idxMap = append(idxMap, i) | 281 idxMap = append(idxMap, i) |
| 281 getKeys = append(getKeys, itm.key) | 282 getKeys = append(getKeys, itm.key) |
| 282 getMetas = append(getMetas, metas.GetSingle(i)) | 283 getMetas = append(getMetas, metas.GetSingle(i)) |
| 283 } | 284 } |
| 284 } | 285 } |
| 285 | 286 |
| 286 if len(idxMap) > 0 { | 287 if len(idxMap) > 0 { |
| 287 j := 0 | 288 j := 0 |
| 288 » » » err := t.parentDS.GetMulti(getKeys, getMetas, func(pm da
tastore.PropertyMap, err error) { | 289 » » » err := t.parentDS.GetMulti(getKeys, getMetas, func(pm da
tastore.PropertyMap, err error) error { |
| 289 if err != datastore.ErrNoSuchEntity { | 290 if err != datastore.ErrNoSuchEntity { |
| 290 i := idxMap[j] | 291 i := idxMap[j] |
| 291 if !lme.Assign(i, err) { | 292 if !lme.Assign(i, err) { |
| 292 data[i].data = pm | 293 data[i].data = pm |
| 293 } | 294 } |
| 294 } | 295 } |
| 295 j++ | 296 j++ |
| 297 return nil |
| 296 }) | 298 }) |
| 297 if err != nil { | 299 if err != nil { |
| 298 return err | 300 return err |
| 299 } | 301 } |
| 300 } | 302 } |
| 301 return nil | 303 return nil |
| 302 }() | 304 }() |
| 303 if err != nil { | 305 if err != nil { |
| 304 return err | 306 return err |
| 305 } | 307 } |
| (...skipping 18 matching lines...) Expand all Loading... |
| 324 if !haveLock { | 326 if !haveLock { |
| 325 t.Lock() | 327 t.Lock() |
| 326 defer t.Unlock() | 328 defer t.Unlock() |
| 327 } | 329 } |
| 328 | 330 |
| 329 if err := t.updateRootsLocked(roots); err != nil { | 331 if err := t.updateRootsLocked(roots); err != nil { |
| 330 return err | 332 return err |
| 331 } | 333 } |
| 332 | 334 |
| 333 i := 0 | 335 i := 0 |
| 334 » » err := t.bufDS.DeleteMulti(keys, func(err error) { | 336 » » err := t.bufDS.DeleteMulti(keys, func(err error) error { |
| 335 impossible(err) | 337 impossible(err) |
| 336 t.entState.set(encKeys[i], 0) | 338 t.entState.set(encKeys[i], 0) |
| 337 i++ | 339 i++ |
| 340 return nil |
| 338 }) | 341 }) |
| 339 impossible(err) | 342 impossible(err) |
| 340 return nil | 343 return nil |
| 341 }() | 344 }() |
| 342 if err != nil { | 345 if err != nil { |
| 343 return err | 346 return err |
| 344 } | 347 } |
| 345 | 348 |
| 346 for range keys { | 349 for range keys { |
| 347 cb(nil) | 350 cb(nil) |
| (...skipping 44 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 392 if !haveLock { | 395 if !haveLock { |
| 393 t.Lock() | 396 t.Lock() |
| 394 defer t.Unlock() | 397 defer t.Unlock() |
| 395 } | 398 } |
| 396 | 399 |
| 397 if err := t.updateRootsLocked(roots); err != nil { | 400 if err := t.updateRootsLocked(roots); err != nil { |
| 398 return err | 401 return err |
| 399 } | 402 } |
| 400 | 403 |
| 401 i := 0 | 404 i := 0 |
| 402 » » err := t.bufDS.PutMulti(keys, vals, func(k *datastore.Key, err e
rror) { | 405 » » err := t.bufDS.PutMulti(keys, vals, func(k *datastore.Key, err e
rror) error { |
| 403 impossible(err) | 406 impossible(err) |
| 404 t.entState.set(encKeys[i], vals[i].EstimateSize()) | 407 t.entState.set(encKeys[i], vals[i].EstimateSize()) |
| 405 i++ | 408 i++ |
| 409 return nil |
| 406 }) | 410 }) |
| 407 impossible(err) | 411 impossible(err) |
| 408 return nil | 412 return nil |
| 409 }() | 413 }() |
| 410 if err != nil { | 414 if err != nil { |
| 411 return err | 415 return err |
| 412 } | 416 } |
| 413 | 417 |
| 414 for _, k := range keys { | 418 for _, k := range keys { |
| 415 cb(k, nil) | 419 cb(k, nil) |
| 416 } | 420 } |
| 417 return nil | 421 return nil |
| 418 } | 422 } |
| 419 | 423 |
| 420 func commitToReal(s *txnBufState) error { | 424 func commitToReal(s *txnBufState) error { |
| 421 toPut, toPutKeys, toDel := s.effect() | 425 toPut, toPutKeys, toDel := s.effect() |
| 422 | 426 |
| 423 return parallel.FanOutIn(func(ch chan<- func() error) { | 427 return parallel.FanOutIn(func(ch chan<- func() error) { |
| 424 if len(toPut) > 0 { | 428 if len(toPut) > 0 { |
| 425 ch <- func() error { | 429 ch <- func() error { |
| 426 mErr := errors.NewLazyMultiError(len(toPut)) | 430 mErr := errors.NewLazyMultiError(len(toPut)) |
| 427 i := 0 | 431 i := 0 |
| 428 » » » » err := s.parentDS.PutMulti(toPutKeys, toPut, fun
c(_ *datastore.Key, err error) { | 432 » » » » err := s.parentDS.PutMulti(toPutKeys, toPut, fun
c(_ *datastore.Key, err error) error { |
| 429 mErr.Assign(i, err) | 433 mErr.Assign(i, err) |
| 430 i++ | 434 i++ |
| 435 return nil |
| 431 }) | 436 }) |
| 432 if err == nil { | 437 if err == nil { |
| 433 err = mErr.Get() | 438 err = mErr.Get() |
| 434 } | 439 } |
| 435 return err | 440 return err |
| 436 } | 441 } |
| 437 } | 442 } |
| 438 if len(toDel) > 0 { | 443 if len(toDel) > 0 { |
| 439 ch <- func() error { | 444 ch <- func() error { |
| 440 mErr := errors.NewLazyMultiError(len(toDel)) | 445 mErr := errors.NewLazyMultiError(len(toDel)) |
| 441 i := 0 | 446 i := 0 |
| 442 » » » » err := s.parentDS.DeleteMulti(toDel, func(err er
ror) { | 447 » » » » err := s.parentDS.DeleteMulti(toDel, func(err er
ror) error { |
| 443 mErr.Assign(i, err) | 448 mErr.Assign(i, err) |
| 444 i++ | 449 i++ |
| 450 return nil |
| 445 }) | 451 }) |
| 446 if err == nil { | 452 if err == nil { |
| 447 err = mErr.Get() | 453 err = mErr.Get() |
| 448 } | 454 } |
| 449 return err | 455 return err |
| 450 } | 456 } |
| 451 } | 457 } |
| 452 }) | 458 }) |
| 453 } | 459 } |
| 454 | 460 |
| 455 func (t *txnBufState) effect() (toPut []datastore.PropertyMap, toPutKeys, toDel
[]*datastore.Key) { | 461 func (t *txnBufState) effect() (toPut []datastore.PropertyMap, toPutKeys, toDel
[]*datastore.Key) { |
| 456 // TODO(riannucci): preallocate return slices | 462 // TODO(riannucci): preallocate return slices |
| 457 | 463 |
| 458 // need to pull all items out of the in-memory datastore. Fortunately we
have | 464 // need to pull all items out of the in-memory datastore. Fortunately we
have |
| 459 // kindless queries, and we disabled all the special entities, so just | 465 // kindless queries, and we disabled all the special entities, so just |
| 460 // run a kindless query without any filters and it will return all data | 466 // run a kindless query without any filters and it will return all data |
| 461 // currently in bufDS :). | 467 // currently in bufDS :). |
| 462 fq, err := datastore.NewQuery("").Finalize() | 468 fq, err := datastore.NewQuery("").Finalize() |
| 463 impossible(err) | 469 impossible(err) |
| 464 | 470 |
| 465 » err = t.bufDS.Run(fq, func(key *datastore.Key, data datastore.PropertyMa
p, _ datastore.CursorCB) bool { | 471 » err = t.bufDS.Run(fq, func(key *datastore.Key, data datastore.PropertyMa
p, _ datastore.CursorCB) error { |
| 466 toPutKeys = append(toPutKeys, key) | 472 toPutKeys = append(toPutKeys, key) |
| 467 toPut = append(toPut, data) | 473 toPut = append(toPut, data) |
| 468 » » return true | 474 » » return nil |
| 469 }) | 475 }) |
| 470 memoryCorruption(err) | 476 memoryCorruption(err) |
| 471 | 477 |
| 472 for keyStr, size := range t.entState.keyToSize { | 478 for keyStr, size := range t.entState.keyToSize { |
| 473 if size == 0 { | 479 if size == 0 { |
| 474 k, err := serialize.ReadKey(bytes.NewBufferString(keyStr
), serialize.WithoutContext, t.aid, t.ns) | 480 k, err := serialize.ReadKey(bytes.NewBufferString(keyStr
), serialize.WithoutContext, t.aid, t.ns) |
| 475 memoryCorruption(err) | 481 memoryCorruption(err) |
| 476 toDel = append(toDel, k) | 482 toDel = append(toDel, k) |
| 477 } | 483 } |
| 478 } | 484 } |
| (...skipping 11 matching lines...) Expand all Loading... |
| 490 return ErrTransactionTooLarge | 496 return ErrTransactionTooLarge |
| 491 } | 497 } |
| 492 return nil | 498 return nil |
| 493 } | 499 } |
| 494 | 500 |
| 495 func (t *txnBufState) commitLocked(s *txnBufState) { | 501 func (t *txnBufState) commitLocked(s *txnBufState) { |
| 496 toPut, toPutKeys, toDel := s.effect() | 502 toPut, toPutKeys, toDel := s.effect() |
| 497 | 503 |
| 498 if len(toPut) > 0 { | 504 if len(toPut) > 0 { |
| 499 impossible(t.putMulti(toPutKeys, toPut, | 505 impossible(t.putMulti(toPutKeys, toPut, |
| 500 » » » func(_ *datastore.Key, err error) { impossible(err) }, t
rue)) | 506 » » » func(_ *datastore.Key, err error) error { return err },
true)) |
| 501 } | 507 } |
| 502 | 508 |
| 503 if len(toDel) > 0 { | 509 if len(toDel) > 0 { |
| 504 » » impossible(t.deleteMulti(toDel, impossible, true)) | 510 » » impossible(t.deleteMulti(toDel, func(err error) error { return e
rr }, true)) |
| 505 } | 511 } |
| 506 } | 512 } |
| 507 | 513 |
| 508 // toEncoded returns a list of all of the serialized versions of these keys, | 514 // toEncoded returns a list of all of the serialized versions of these keys, |
| 509 // plus a stringset of all the encoded root keys that `keys` represents. | 515 // plus a stringset of all the encoded root keys that `keys` represents. |
| 510 func toEncoded(keys []*datastore.Key) (full []string, roots stringset.Set) { | 516 func toEncoded(keys []*datastore.Key) (full []string, roots stringset.Set) { |
| 511 roots = stringset.New(len(keys)) | 517 roots = stringset.New(len(keys)) |
| 512 full = make([]string, len(keys)) | 518 full = make([]string, len(keys)) |
| 513 for i, k := range keys { | 519 for i, k := range keys { |
| 514 roots.Add(string(serialize.ToBytes(k.Root()))) | 520 roots.Add(string(serialize.ToBytes(k.Root()))) |
| 515 full[i] = string(serialize.ToBytes(k)) | 521 full[i] = string(serialize.ToBytes(k)) |
| 516 } | 522 } |
| 517 return | 523 return |
| 518 } | 524 } |
| OLD | NEW |