Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 package txnBuf | 5 package txnBuf |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "bytes" | 8 "bytes" |
| 9 "sync" | 9 "sync" |
| 10 | 10 |
| (...skipping 76 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 87 } | 87 } |
| 88 return &sizeTracker{k2s, s.total} | 88 return &sizeTracker{k2s, s.total} |
| 89 } | 89 } |
| 90 | 90 |
| 91 type txnBufState struct { | 91 type txnBufState struct { |
| 92 sync.Mutex | 92 sync.Mutex |
| 93 | 93 |
| 94 // encoded key -> size of entity. A size of 0 means that the entity is | 94 // encoded key -> size of entity. A size of 0 means that the entity is |
| 95 // deleted. | 95 // deleted. |
| 96 entState *sizeTracker | 96 entState *sizeTracker |
| 97 » memDS datastore.RawInterface | 97 » bufDS datastore.RawInterface |
| 98 | 98 |
| 99 roots stringset.Set | 99 roots stringset.Set |
| 100 rootLimit int | 100 rootLimit int |
| 101 | 101 |
| 102 » aid string | 102 » aid string |
| 103 » ns string | 103 » ns string |
| 104 » parentDS datastore.RawInterface | 104 » parentDS datastore.RawInterface |
| 105 » parentState *txnBufState | |
| 106 | 105 |
| 107 // sizeBudget is the number of bytes that this transaction has to operat e | 106 // sizeBudget is the number of bytes that this transaction has to operat e |
| 108 // within. It's only used when attempting to apply() the transaction, an d | 107 // within. It's only used when attempting to apply() the transaction, an d |
| 109 // it is the threshold for the delta of applying this transaction to the | 108 // it is the threshold for the delta of applying this transaction to the |
| 110 // parent transaction. Note that a buffered transaction could actually h ave | 109 // parent transaction. Note that a buffered transaction could actually h ave |
| 111 // a negative delta if the parent transaction had many large entities wh ich | 110 // a negative delta if the parent transaction had many large entities wh ich |
| 112 // the inner transaction deleted. | 111 // the inner transaction deleted. |
| 113 sizeBudget int64 | 112 sizeBudget int64 |
| 114 | |
| 115 // siblingLock is to prevent two nested transactions from running at the same | |
| 116 // time. | |
| 117 // | |
| 118 // Example: | |
| 119 // RunInTransaction() { // root | |
| 120 // RunInTransaction() // A | |
| 121 // RunInTransaction() // B | |
| 122 // } | |
| 123 // | |
| 124 // This will prevent A and B from running simulatneously. | |
| 125 siblingLock sync.Mutex | |
| 126 } | 113 } |
| 127 | 114 |
| 128 func withTxnBuf(ctx context.Context, cb func(context.Context) error, opts *datas tore.TransactionOptions) error { | 115 func withTxnBuf(ctx context.Context, cb func(context.Context) error, opts *datas tore.TransactionOptions) error { |
| 129 inf := info.Get(ctx) | 116 inf := info.Get(ctx) |
| 130 ns := inf.GetNamespace() | 117 ns := inf.GetNamespace() |
| 131 | 118 |
| 132 parentState, _ := ctx.Value(dsTxnBufParent).(*txnBufState) | 119 parentState, _ := ctx.Value(dsTxnBufParent).(*txnBufState) |
| 133 roots := stringset.New(0) | 120 roots := stringset.New(0) |
| 134 rootLimit := 1 | 121 rootLimit := 1 |
| 135 if opts != nil && opts.XG { | 122 if opts != nil && opts.XG { |
| 136 rootLimit = XGTransactionGroupLimit | 123 rootLimit = XGTransactionGroupLimit |
| 137 } | 124 } |
| 138 sizeBudget := DefaultSizeBudget | 125 sizeBudget := DefaultSizeBudget |
| 139 if parentState != nil { | 126 if parentState != nil { |
| 140 parentState.siblingLock.Lock() | |
| 141 defer parentState.siblingLock.Unlock() | |
| 142 | |
| 143 // TODO(riannucci): this is a bit wonky since it means that a ch ild | 127 // TODO(riannucci): this is a bit wonky since it means that a ch ild |
| 144 // transaction declaring XG=true will only get to modify 25 grou ps IF | 128 // transaction declaring XG=true will only get to modify 25 grou ps IF |
| 145 // they're same groups affected by the parent transactions. So i nstead of | 129 // they're same groups affected by the parent transactions. So i nstead of |
| 146 // respecting opts.XG for inner transactions, we just dup everyt hing from | 130 // respecting opts.XG for inner transactions, we just dup everyt hing from |
| 147 // the parent transaction. | 131 // the parent transaction. |
| 148 roots = parentState.roots.Dup() | 132 roots = parentState.roots.Dup() |
| 149 rootLimit = parentState.rootLimit | 133 rootLimit = parentState.rootLimit |
| 150 | 134 |
| 151 sizeBudget = parentState.sizeBudget - parentState.entState.total | 135 sizeBudget = parentState.sizeBudget - parentState.entState.total |
| 152 if sizeBudget < DefaultSizeThreshold { | 136 if sizeBudget < DefaultSizeThreshold { |
| 153 return ErrTransactionTooLarge | 137 return ErrTransactionTooLarge |
| 154 } | 138 } |
| 155 } | 139 } |
| 156 | 140 |
| 157 » memDS, err := memory.NewDatastore(inf.FullyQualifiedAppID(), ns) | 141 » bufDS, err := memory.NewDatastore(inf.FullyQualifiedAppID(), ns) |
| 158 if err != nil { | 142 if err != nil { |
| 159 return err | 143 return err |
| 160 } | 144 } |
| 161 | 145 |
| 162 state := &txnBufState{ | 146 state := &txnBufState{ |
| 163 » » entState: &sizeTracker{}, | 147 » » entState: &sizeTracker{}, |
| 164 » » memDS: memDS.Raw(), | 148 » » bufDS: bufDS.Raw(), |
| 165 » » roots: roots, | 149 » » roots: roots, |
| 166 » » rootLimit: rootLimit, | 150 » » rootLimit: rootLimit, |
| 167 » » ns: ns, | 151 » » ns: ns, |
| 168 » » aid: inf.AppID(), | 152 » » aid: inf.AppID(), |
| 169 » » parentDS: datastore.Get(ctx).Raw(), | 153 » » parentDS: datastore.Get(context.WithValue(ctx, dsTxnBufHaveLoc k, true)).Raw(), |
| 170 » » parentState: parentState, | 154 » » sizeBudget: sizeBudget, |
| 171 » » sizeBudget: sizeBudget, | |
| 172 } | 155 } |
| 173 if err = cb(context.WithValue(ctx, dsTxnBufParent, state)); err != nil { | 156 if err = cb(context.WithValue(ctx, dsTxnBufParent, state)); err != nil { |
| 174 return err | 157 return err |
| 175 } | 158 } |
| 176 » return state.apply() | 159 |
| 160 » // no reason to unlock this ever. At this point it's toast. | |
| 161 » state.Lock() | |
| 162 | |
| 163 » if parentState == nil { | |
| 164 » » return commitToReal(state) | |
| 165 » } | |
| 166 | |
| 167 » if err = parentState.canApply(state); err != nil { | |
| 168 » » return err | |
| 169 » } | |
| 170 | |
| 171 » parentState.commit(state) | |
| 172 » return nil | |
| 177 } | 173 } |
| 178 | 174 |
| 179 // item is a temporary object for representing key/entity pairs and their cache | 175 // item is a temporary object for representing key/entity pairs and their cache |
| 180 // state (e.g. if they exist in the in-memory datastore buffer or not). | 176 // state (e.g. if they exist in the in-memory datastore buffer or not). |
| 181 // Additionally item memoizes some common comparison strings. item objects | 177 // Additionally item memoizes some common comparison strings. item objects |
| 182 // must never be persisted outside of a single function/query context. | 178 // must never be persisted outside of a single function/query context. |
| 183 type item struct { | 179 type item struct { |
| 184 key *datastore.Key | 180 key *datastore.Key |
| 185 data datastore.PropertyMap | 181 data datastore.PropertyMap |
| 186 buffered bool | 182 buffered bool |
| (...skipping 42 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 229 // only need to update the roots if they did something that required upd ating | 225 // only need to update the roots if they did something that required upd ating |
| 230 if proposedRoots.Len() > 0 { | 226 if proposedRoots.Len() > 0 { |
| 231 proposedRoots.Iter(func(root string) bool { | 227 proposedRoots.Iter(func(root string) bool { |
| 232 t.roots.Add(root) | 228 t.roots.Add(root) |
| 233 return true | 229 return true |
| 234 }) | 230 }) |
| 235 } | 231 } |
| 236 return nil | 232 return nil |
| 237 } | 233 } |
| 238 | 234 |
| 239 func (t *txnBufState) getMulti(keys []*datastore.Key) ([]item, error) { | 235 func (t *txnBufState) getMulti(keys []*datastore.Key, metas datastore.MultiMetaG etter, cb datastore.GetMultiCB, haveLock bool) error { |
| 240 encKeys, roots := toEncoded(keys) | 236 encKeys, roots := toEncoded(keys) |
| 241 » ret := make([]item, len(keys)) | 237 » data := make([]item, len(keys)) |
| 242 | 238 |
| 243 idxMap := []int(nil) | 239 idxMap := []int(nil) |
| 244 toGetKeys := []*datastore.Key(nil) | 240 toGetKeys := []*datastore.Key(nil) |
| 245 | 241 |
| 246 » t.Lock() | 242 » lme := errors.NewLazyMultiError(len(keys)) |
| 247 » defer t.Unlock() | 243 » err := func() error { |
| 248 | 244 » » if !haveLock { |
| 249 » if err := t.updateRootsLocked(roots); err != nil { | 245 » » » t.Lock() |
| 250 » » return nil, err | 246 » » » defer t.Unlock() |
| 251 » } | 247 » » } |
| 252 | 248 |
| 249 » » if err := t.updateRootsLocked(roots); err != nil { | |
| 250 » » » return err | |
| 251 » » } | |
| 252 | |
| 253 » » for i, key := range keys { | |
| 254 » » » data[i].key = key | |
| 255 » » » data[i].encKey = encKeys[i] | |
| 256 » » » if size, ok := t.entState.get(data[i].getEncKey()); ok { | |
| 257 » » » » data[i].buffered = true | |
| 258 » » » » if size > 0 { | |
| 259 » » » » » idxMap = append(idxMap, i) | |
| 260 » » » » » toGetKeys = append(toGetKeys, key) | |
| 261 » » » » } | |
| 262 » » » } | |
| 263 » » } | |
| 264 | |
| 265 » » if len(toGetKeys) > 0 { | |
| 266 » » » j := 0 | |
| 267 » » » t.bufDS.GetMulti(toGetKeys, nil, func(pm datastore.Prope rtyMap, err error) { | |
| 268 » » » » impossible(err) | |
| 269 » » » » data[idxMap[j]].data = pm | |
| 270 » » » » j++ | |
| 271 » » » }) | |
| 272 » » } | |
| 273 | |
| 274 » » idxMap = nil | |
| 275 » » getKeys := []*datastore.Key(nil) | |
| 276 » » getMetas := datastore.MultiMetaGetter(nil) | |
| 277 | |
| 278 » » for i, itm := range data { | |
| 279 » » » if !itm.buffered { | |
| 280 » » » » idxMap = append(idxMap, i) | |
| 281 » » » » getKeys = append(getKeys, itm.key) | |
| 282 » » » » getMetas = append(getMetas, metas.GetSingle(i)) | |
| 283 » » » } | |
| 284 » » } | |
| 285 | |
| 286 » » if len(idxMap) > 0 { | |
| 287 » » » j := 0 | |
| 288 » » » err := t.parentDS.GetMulti(getKeys, getMetas, func(pm da tastore.PropertyMap, err error) { | |
| 289 » » » » if err != datastore.ErrNoSuchEntity { | |
| 290 » » » » » i := idxMap[j] | |
| 291 » » » » » if !lme.Assign(i, err) { | |
| 292 » » » » » » data[i].data = pm | |
| 293 » » » » » } | |
| 294 » » » » } | |
| 295 » » » » j++ | |
| 296 » » » }) | |
| 297 » » » if err != nil { | |
| 298 » » » » return err | |
| 299 » » » } | |
| 300 » » } | |
| 301 » » return nil | |
| 302 » }() | |
| 303 » if err != nil { | |
| 304 » » return err | |
| 305 » } | |
| 306 | |
| 307 » for i, itm := range data { | |
| 308 » » err := lme.GetOne(i) | |
| 309 » » if err != nil { | |
| 310 » » » cb(nil, err) | |
| 311 » » } else if itm.data == nil { | |
| 312 » » » cb(nil, datastore.ErrNoSuchEntity) | |
| 313 » » } else { | |
| 314 » » » cb(itm.data, nil) | |
| 315 » » } | |
| 316 » } | |
| 317 » return nil | |
| 318 } | |
| 319 | |
| 320 func (t *txnBufState) deleteMulti(keys []*datastore.Key, cb datastore.DeleteMult iCB, haveLock bool) error { | |
| 321 » encKeys, roots := toEncoded(keys) | |
| 322 | |
| 323 » err := func() error { | |
| 324 » » if !haveLock { | |
| 325 » » » t.Lock() | |
| 326 » » » defer t.Unlock() | |
| 327 » » } | |
| 328 | |
| 329 » » if err := t.updateRootsLocked(roots); err != nil { | |
| 330 » » » return err | |
| 331 » » } | |
| 332 | |
| 333 » » i := 0 | |
| 334 » » err := t.bufDS.DeleteMulti(keys, func(err error) { | |
| 335 » » » impossible(err) | |
| 336 » » » t.entState.set(encKeys[i], 0) | |
| 337 » » » i++ | |
| 338 » » }) | |
| 339 » » impossible(err) | |
| 340 » » return nil | |
| 341 » }() | |
| 342 » if err != nil { | |
| 343 » » return err | |
| 344 » } | |
| 345 | |
| 346 » for range keys { | |
| 347 » » cb(nil) | |
| 348 » } | |
| 349 | |
| 350 » return nil | |
| 351 } | |
| 352 | |
| 353 func (t *txnBufState) fixKeys(keys []*datastore.Key) ([]*datastore.Key, error) { | |
| 354 » lme := errors.NewLazyMultiError(len(keys)) | |
| 355 » realKeys := []*datastore.Key(nil) | |
| 253 for i, key := range keys { | 356 for i, key := range keys { |
| 254 » » ret[i].key = key | 357 » » if key.Incomplete() { |
| 255 » » ret[i].encKey = encKeys[i] | 358 » » » // intentionally call AllocateIDs without lock. |
| 256 » » if size, ok := t.entState.get(ret[i].getEncKey()); ok { | 359 » » » start, err := t.parentDS.AllocateIDs(key, 1) |
| 257 » » » ret[i].buffered = true | 360 » » » if !lme.Assign(i, err) { |
| 258 » » » if size > 0 { | 361 » » » » if realKeys == nil { |
| 259 » » » » idxMap = append(idxMap, i) | 362 » » » » » realKeys = make([]*datastore.Key, len(ke ys)) |
| 260 » » » » toGetKeys = append(toGetKeys, key) | 363 » » » » » copy(realKeys, keys) |
| 261 » » » } | 364 » » » » } |
| 262 » » } | 365 |
| 263 » } | 366 » » » » aid, ns, toks := key.Split() |
| 264 | 367 » » » » toks[len(toks)-1].IntID = start |
| 265 » if len(toGetKeys) > 0 { | 368 » » » » realKeys[i] = datastore.NewKeyToks(aid, ns, toks ) |
| 266 » » j := 0 | 369 » » » } |
| 267 » » t.memDS.GetMulti(toGetKeys, nil, func(pm datastore.PropertyMap, err error) { | 370 » » } |
| 371 » } | |
| 372 » err := lme.Get() | |
| 373 | |
| 374 » if realKeys != nil { | |
| 375 » » return realKeys, err | |
| 376 » } | |
| 377 » return keys, err | |
| 378 } | |
| 379 | |
| 380 func (t *txnBufState) putMulti(keys []*datastore.Key, vals []datastore.PropertyM ap, cb datastore.PutMultiCB, haveLock bool) error { | |
| 381 » keys, err := t.fixKeys(keys) | |
| 382 » if err != nil { | |
| 383 » » for _, e := range err.(errors.MultiError) { | |
| 384 » » » cb(nil, e) | |
| 385 » » } | |
| 386 » » return nil | |
| 387 » } | |
| 388 | |
| 389 » encKeys, roots := toEncoded(keys) | |
| 390 | |
| 391 » err = func() error { | |
| 392 » » if !haveLock { | |
| 393 » » » t.Lock() | |
| 394 » » » defer t.Unlock() | |
| 395 » » } | |
| 396 | |
| 397 » » if err := t.updateRootsLocked(roots); err != nil { | |
| 398 » » » return err | |
| 399 » » } | |
| 400 | |
| 401 » » i := 0 | |
| 402 » » err := t.bufDS.PutMulti(keys, vals, func(k *datastore.Key, err e rror) { | |
| 268 impossible(err) | 403 impossible(err) |
| 269 » » » ret[idxMap[j]].data = pm | 404 » » » t.entState.set(encKeys[i], vals[i].EstimateSize()) |
| 270 » » » j++ | 405 » » » i++ |
| 271 }) | 406 }) |
| 272 » } | 407 » » impossible(err) |
| 273 | 408 » » return nil |
| 274 » return ret, nil | 409 » }() |
| 275 } | 410 » if err != nil { |
| 276 | |
| 277 func (t *txnBufState) deleteMulti(keys []*datastore.Key) error { | |
| 278 » encKeys, roots := toEncoded(keys) | |
| 279 | |
| 280 » t.Lock() | |
| 281 » defer t.Unlock() | |
| 282 | |
| 283 » if err := t.updateRootsLocked(roots); err != nil { | |
| 284 return err | 411 return err |
| 285 } | 412 } |
| 286 | 413 |
| 287 » i := 0 | 414 » for _, k := range keys { |
| 288 » err := t.memDS.DeleteMulti(keys, func(err error) { | 415 » » cb(k, nil) |
| 289 » » impossible(err) | 416 » } |
| 290 » » t.entState.set(encKeys[i], 0) | 417 » return nil |
| 291 » » i++ | 418 } |
| 292 » }) | 419 |
| 293 » impossible(err) | 420 func commitToReal(s *txnBufState) error { |
| 294 » return nil | 421 » toPut, toPutKeys, toDel := s.effect() |
| 295 } | |
| 296 | |
| 297 func (t *txnBufState) putMulti(keys []*datastore.Key, vals []datastore.PropertyM ap) error { | |
| 298 » encKeys, roots := toEncoded(keys) | |
| 299 | |
| 300 » t.Lock() | |
| 301 » defer t.Unlock() | |
| 302 | |
| 303 » if err := t.updateRootsLocked(roots); err != nil { | |
| 304 » » return err | |
| 305 » } | |
| 306 | |
| 307 » i := 0 | |
| 308 » err := t.memDS.PutMulti(keys, vals, func(k *datastore.Key, err error) { | |
| 309 » » impossible(err) | |
| 310 » » t.entState.set(encKeys[i], vals[i].EstimateSize()) | |
| 311 » » i++ | |
| 312 » }) | |
| 313 » impossible(err) | |
| 314 » return nil | |
| 315 } | |
| 316 | |
| 317 // apply actually takes the buffered transaction and applies it to the parent | |
| 318 // transaction. It will only return an error if the underlying 'real' datastore | |
| 319 // returns an error on PutMulti or DeleteMulti. | |
| 320 func (t *txnBufState) apply() error { | |
| 321 » t.Lock() | |
| 322 » defer t.Unlock() | |
| 323 | |
| 324 » // if parentState is nil... just try to commit this anyway. The estimate s | |
| 325 » // we're using here are just educated guesses. If it fits for real, then | |
| 326 » // hooray. If not, then the underlying datastore will error. | |
| 327 » if t.parentState != nil { | |
| 328 » » t.parentState.Lock() | |
| 329 » » proposedState := t.parentState.entState.dup() | |
| 330 » » t.parentState.Unlock() | |
| 331 » » for k, v := range t.entState.keyToSize { | |
| 332 » » » proposedState.set(k, v) | |
| 333 » » } | |
| 334 » » if proposedState.total > t.sizeBudget { | |
| 335 » » » return ErrTransactionTooLarge | |
| 336 » » } | |
| 337 » } | |
| 338 | |
| 339 » toPutKeys := []*datastore.Key(nil) | |
| 340 » toPut := []datastore.PropertyMap(nil) | |
| 341 » toDel := []*datastore.Key(nil) | |
| 342 | |
| 343 » // need to pull all items out of the in-memory datastore. Fortunately we have | |
| 344 » // kindless queries, and we disabled all the special entities, so just | |
| 345 » // run a kindless query without any filters and it will return all data | |
| 346 » // currently in memDS :). | |
| 347 » fq, err := datastore.NewQuery("").Finalize() | |
| 348 » impossible(err) | |
| 349 | |
| 350 » err = t.memDS.Run(fq, func(key *datastore.Key, data datastore.PropertyMa p, _ datastore.CursorCB) bool { | |
| 351 » » toPutKeys = append(toPutKeys, key) | |
| 352 » » toPut = append(toPut, data) | |
| 353 » » return true | |
| 354 » }) | |
| 355 » memoryCorruption(err) | |
| 356 | |
| 357 » for keyStr, size := range t.entState.keyToSize { | |
| 358 » » if size == 0 { | |
| 359 » » » k, err := serialize.ReadKey(bytes.NewBufferString(keyStr ), serialize.WithoutContext, t.aid, t.ns) | |
| 360 » » » memoryCorruption(err) | |
| 361 » » » toDel = append(toDel, k) | |
| 362 » » } | |
| 363 » } | |
| 364 | |
| 365 » ds := t.parentDS | |
| 366 | 422 |
| 367 return parallel.FanOutIn(func(ch chan<- func() error) { | 423 return parallel.FanOutIn(func(ch chan<- func() error) { |
| 368 if len(toPut) > 0 { | 424 if len(toPut) > 0 { |
| 369 ch <- func() error { | 425 ch <- func() error { |
| 370 mErr := errors.NewLazyMultiError(len(toPut)) | 426 mErr := errors.NewLazyMultiError(len(toPut)) |
| 371 i := 0 | 427 i := 0 |
| 372 » » » » err := ds.PutMulti(toPutKeys, toPut, func(_ *dat astore.Key, err error) { | 428 » » » » err := s.parentDS.PutMulti(toPutKeys, toPut, fun c(_ *datastore.Key, err error) { |
| 373 mErr.Assign(i, err) | 429 mErr.Assign(i, err) |
| 374 i++ | 430 i++ |
| 375 }) | 431 }) |
| 376 if err == nil { | 432 if err == nil { |
| 377 err = mErr.Get() | 433 err = mErr.Get() |
| 378 } | 434 } |
| 379 return err | 435 return err |
| 380 } | 436 } |
| 381 } | 437 } |
| 382 if len(toDel) > 0 { | 438 if len(toDel) > 0 { |
| 383 ch <- func() error { | 439 ch <- func() error { |
| 384 mErr := errors.NewLazyMultiError(len(toDel)) | 440 mErr := errors.NewLazyMultiError(len(toDel)) |
| 385 i := 0 | 441 i := 0 |
| 386 » » » » err := ds.DeleteMulti(toDel, func(err error) { | 442 » » » » err := s.parentDS.DeleteMulti(toDel, func(err er ror) { |
| 387 mErr.Assign(i, err) | 443 mErr.Assign(i, err) |
| 388 i++ | 444 i++ |
| 389 }) | 445 }) |
| 390 if err == nil { | 446 if err == nil { |
| 391 err = mErr.Get() | 447 err = mErr.Get() |
| 392 } | 448 } |
| 393 return err | 449 return err |
| 394 } | 450 } |
| 395 } | 451 } |
| 396 }) | 452 }) |
| 397 } | 453 } |
| 398 | 454 |
| 455 func (t *txnBufState) effect() (toPut []datastore.PropertyMap, toPutKeys, toDel []*datastore.Key) { | |
| 456 // TODO(riannucci): preallocate return slices | |
| 457 | |
| 458 // need to pull all items out of the in-memory datastore. Fortunately we have | |
| 459 // kindless queries, and we disabled all the special entities, so just | |
| 460 // run a kindless query without any filters and it will return all data | |
| 461 // currently in bufDS :). | |
| 462 fq, err := datastore.NewQuery("").Finalize() | |
| 463 impossible(err) | |
| 464 | |
| 465 err = t.bufDS.Run(fq, func(key *datastore.Key, data datastore.PropertyMa p, _ datastore.CursorCB) bool { | |
| 466 toPutKeys = append(toPutKeys, key) | |
| 467 toPut = append(toPut, data) | |
| 468 return true | |
| 469 }) | |
| 470 memoryCorruption(err) | |
| 471 | |
| 472 for keyStr, size := range t.entState.keyToSize { | |
| 473 if size == 0 { | |
| 474 k, err := serialize.ReadKey(bytes.NewBufferString(keyStr ), serialize.WithoutContext, t.aid, t.ns) | |
| 475 memoryCorruption(err) | |
| 476 toDel = append(toDel, k) | |
| 477 } | |
| 478 } | |
| 479 | |
| 480 return | |
| 481 } | |
| 482 | |
| 483 func (t *txnBufState) canApply(s *txnBufState) error { | |
|
dnj
2015/11/11 16:08:28
canApplyLocked?
iannucci
2015/11/11 18:06:40
done
| |
| 484 proposedState := t.entState.dup() | |
| 485 | |
| 486 for k, v := range s.entState.keyToSize { | |
| 487 proposedState.set(k, v) | |
| 488 } | |
| 489 if proposedState.total > s.sizeBudget { | |
| 490 return ErrTransactionTooLarge | |
| 491 } | |
| 492 return nil | |
| 493 } | |
| 494 | |
| 495 func (t *txnBufState) commit(s *txnBufState) { | |
|
dnj
2015/11/11 16:08:28
commitLocked?
iannucci
2015/11/11 18:06:40
done
| |
| 496 toPut, toPutKeys, toDel := s.effect() | |
| 497 | |
| 498 if len(toPut) > 0 { | |
| 499 impossible(t.putMulti(toPutKeys, toPut, | |
| 500 func(_ *datastore.Key, err error) { impossible(err) }, t rue)) | |
| 501 } | |
| 502 | |
| 503 if len(toDel) > 0 { | |
| 504 impossible(t.deleteMulti(toDel, impossible, true)) | |
| 505 } | |
| 506 } | |
| 507 | |
| 399 // toEncoded returns a list of all of the serialized versions of these keys, | 508 // toEncoded returns a list of all of the serialized versions of these keys, |
| 400 // plus a stringset of all the encoded root keys that `keys` represents. | 509 // plus a stringset of all the encoded root keys that `keys` represents. |
| 401 func toEncoded(keys []*datastore.Key) (full []string, roots stringset.Set) { | 510 func toEncoded(keys []*datastore.Key) (full []string, roots stringset.Set) { |
| 402 roots = stringset.New(len(keys)) | 511 roots = stringset.New(len(keys)) |
| 403 full = make([]string, len(keys)) | 512 full = make([]string, len(keys)) |
| 404 for i, k := range keys { | 513 for i, k := range keys { |
| 405 roots.Add(string(serialize.ToBytes(k.Root()))) | 514 roots.Add(string(serialize.ToBytes(k.Root()))) |
| 406 full[i] = string(serialize.ToBytes(k)) | 515 full[i] = string(serialize.ToBytes(k)) |
| 407 } | 516 } |
| 408 return | 517 return |
| 409 } | 518 } |
| OLD | NEW |