| OLD | NEW |
| 1 // Copyright 2015 The LUCI Authors. All rights reserved. | 1 // Copyright 2015 The LUCI Authors. All rights reserved. |
| 2 // Use of this source code is governed under the Apache License, Version 2.0 | 2 // Use of this source code is governed under the Apache License, Version 2.0 |
| 3 // that can be found in the LICENSE file. | 3 // that can be found in the LICENSE file. |
| 4 | 4 |
| 5 package txnBuf | 5 package txnBuf |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "bytes" | 8 "bytes" |
| 9 "sync" | 9 "sync" |
| 10 | 10 |
| (...skipping 87 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 98 sync.Mutex | 98 sync.Mutex |
| 99 | 99 |
| 100 // encoded key -> size of entity. A size of 0 means that the entity is | 100 // encoded key -> size of entity. A size of 0 means that the entity is |
| 101 // deleted. | 101 // deleted. |
| 102 entState *sizeTracker | 102 entState *sizeTracker |
| 103 bufDS datastore.RawInterface | 103 bufDS datastore.RawInterface |
| 104 | 104 |
| 105 roots stringset.Set | 105 roots stringset.Set |
| 106 rootLimit int | 106 rootLimit int |
| 107 | 107 |
| 108 » aid string | 108 » kc datastore.KeyContext |
| 109 » ns string | |
| 110 parentDS datastore.RawInterface | 109 parentDS datastore.RawInterface |
| 111 | 110 |
| 112 // sizeBudget is the number of bytes that this transaction has to operat
e | 111 // sizeBudget is the number of bytes that this transaction has to operat
e |
| 113 // within. It's only used when attempting to apply() the transaction, an
d | 112 // within. It's only used when attempting to apply() the transaction, an
d |
| 114 // it is the threshold for the delta of applying this transaction to the | 113 // it is the threshold for the delta of applying this transaction to the |
| 115 // parent transaction. Note that a buffered transaction could actually h
ave | 114 // parent transaction. Note that a buffered transaction could actually h
ave |
| 116 // a negative delta if the parent transaction had many large entities wh
ich | 115 // a negative delta if the parent transaction had many large entities wh
ich |
| 117 // the inner transaction deleted. | 116 // the inner transaction deleted. |
| 118 sizeBudget int64 | 117 sizeBudget int64 |
| 119 // countBudget is the number of entity writes that this transaction has
to | 118 // countBudget is the number of entity writes that this transaction has
to |
| 120 // operate in. | 119 // operate in. |
| 121 writeCountBudget int | 120 writeCountBudget int |
| 122 } | 121 } |
| 123 | 122 |
| 124 func withTxnBuf(ctx context.Context, cb func(context.Context) error, opts *datas
tore.TransactionOptions) error { | 123 func withTxnBuf(ctx context.Context, cb func(context.Context) error, opts *datas
tore.TransactionOptions) error { |
| 125 inf := info.Get(ctx) | |
| 126 ns, _ := inf.GetNamespace() | |
| 127 | |
| 128 parentState, _ := ctx.Value(dsTxnBufParent).(*txnBufState) | 124 parentState, _ := ctx.Value(dsTxnBufParent).(*txnBufState) |
| 129 roots := stringset.New(0) | 125 roots := stringset.New(0) |
| 130 rootLimit := 1 | 126 rootLimit := 1 |
| 131 if opts != nil && opts.XG { | 127 if opts != nil && opts.XG { |
| 132 rootLimit = XGTransactionGroupLimit | 128 rootLimit = XGTransactionGroupLimit |
| 133 } | 129 } |
| 134 sizeBudget, writeCountBudget := DefaultSizeBudget, DefaultWriteCountBudg
et | 130 sizeBudget, writeCountBudget := DefaultSizeBudget, DefaultWriteCountBudg
et |
| 135 if parentState != nil { | 131 if parentState != nil { |
| 136 // TODO(riannucci): this is a bit wonky since it means that a ch
ild | 132 // TODO(riannucci): this is a bit wonky since it means that a ch
ild |
| 137 // transaction declaring XG=true will only get to modify 25 grou
ps IF | 133 // transaction declaring XG=true will only get to modify 25 grou
ps IF |
| 138 // they're same groups affected by the parent transactions. So i
nstead of | 134 // they're same groups affected by the parent transactions. So i
nstead of |
| 139 // respecting opts.XG for inner transactions, we just dup everyt
hing from | 135 // respecting opts.XG for inner transactions, we just dup everyt
hing from |
| 140 // the parent transaction. | 136 // the parent transaction. |
| 141 roots = parentState.roots.Dup() | 137 roots = parentState.roots.Dup() |
| 142 rootLimit = parentState.rootLimit | 138 rootLimit = parentState.rootLimit |
| 143 | 139 |
| 144 sizeBudget = parentState.sizeBudget - parentState.entState.total | 140 sizeBudget = parentState.sizeBudget - parentState.entState.total |
| 145 writeCountBudget = parentState.writeCountBudget - parentState.en
tState.numWrites() | 141 writeCountBudget = parentState.writeCountBudget - parentState.en
tState.numWrites() |
| 146 } | 142 } |
| 147 | 143 |
| 148 state := &txnBufState{ | 144 state := &txnBufState{ |
| 149 entState: &sizeTracker{}, | 145 entState: &sizeTracker{}, |
| 150 » » bufDS: memory.NewDatastore(inf).Raw(), | 146 » » bufDS: memory.NewDatastore(ctx, info.Raw(ctx)), |
| 151 roots: roots, | 147 roots: roots, |
| 152 rootLimit: rootLimit, | 148 rootLimit: rootLimit, |
| 153 » » ns: ns, | 149 » » kc: datastore.GetKeyContext(ctx), |
| 154 » » aid: inf.FullyQualifiedAppID(), | 150 » » parentDS: datastore.Raw(context.WithValue(ctx, dsTxnBufH
aveLock, true)), |
| 155 » » parentDS: datastore.Get(context.WithValue(ctx, dsTxnBufH
aveLock, true)).Raw(), | |
| 156 sizeBudget: sizeBudget, | 151 sizeBudget: sizeBudget, |
| 157 writeCountBudget: writeCountBudget, | 152 writeCountBudget: writeCountBudget, |
| 158 } | 153 } |
| 159 if err := cb(context.WithValue(ctx, dsTxnBufParent, state)); err != nil
{ | 154 if err := cb(context.WithValue(ctx, dsTxnBufParent, state)); err != nil
{ |
| 160 return err | 155 return err |
| 161 } | 156 } |
| 162 | 157 |
| 163 // no reason to unlock this ever. At this point it's toast. | 158 // no reason to unlock this ever. At this point it's toast. |
| 164 state.Lock() | 159 state.Lock() |
| 165 | 160 |
| (...skipping 339 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 505 | 500 |
| 506 err = t.bufDS.Run(fq, func(key *datastore.Key, data datastore.PropertyMa
p, _ datastore.CursorCB) error { | 501 err = t.bufDS.Run(fq, func(key *datastore.Key, data datastore.PropertyMa
p, _ datastore.CursorCB) error { |
| 507 toPutKeys = append(toPutKeys, key) | 502 toPutKeys = append(toPutKeys, key) |
| 508 toPut = append(toPut, data) | 503 toPut = append(toPut, data) |
| 509 return nil | 504 return nil |
| 510 }) | 505 }) |
| 511 memoryCorruption(err) | 506 memoryCorruption(err) |
| 512 | 507 |
| 513 for keyStr, size := range t.entState.keyToSize { | 508 for keyStr, size := range t.entState.keyToSize { |
| 514 if size == 0 { | 509 if size == 0 { |
| 515 » » » k, err := serialize.ReadKey(bytes.NewBufferString(keyStr
), serialize.WithoutContext, t.aid, t.ns) | 510 » » » k, err := serialize.ReadKey(bytes.NewBufferString(keyStr
), serialize.WithoutContext, t.kc) |
| 516 memoryCorruption(err) | 511 memoryCorruption(err) |
| 517 toDel = append(toDel, k) | 512 toDel = append(toDel, k) |
| 518 } | 513 } |
| 519 } | 514 } |
| 520 | 515 |
| 521 return | 516 return |
| 522 } | 517 } |
| 523 | 518 |
| 524 func (t *txnBufState) canApplyLocked(s *txnBufState) error { | 519 func (t *txnBufState) canApplyLocked(s *txnBufState) error { |
| 525 proposedState := t.entState.dup() | 520 proposedState := t.entState.dup() |
| (...skipping 38 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 564 // plus a stringset of all the encoded root keys that `keys` represents. | 559 // plus a stringset of all the encoded root keys that `keys` represents. |
| 565 func toEncoded(keys []*datastore.Key) (full []string, roots stringset.Set) { | 560 func toEncoded(keys []*datastore.Key) (full []string, roots stringset.Set) { |
| 566 roots = stringset.New(len(keys)) | 561 roots = stringset.New(len(keys)) |
| 567 full = make([]string, len(keys)) | 562 full = make([]string, len(keys)) |
| 568 for i, k := range keys { | 563 for i, k := range keys { |
| 569 roots.Add(string(serialize.ToBytes(k.Root()))) | 564 roots.Add(string(serialize.ToBytes(k.Root()))) |
| 570 full[i] = string(serialize.ToBytes(k)) | 565 full[i] = string(serialize.ToBytes(k)) |
| 571 } | 566 } |
| 572 return | 567 return |
| 573 } | 568 } |
| OLD | NEW |