OLD | NEW |
1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 package txnBuf | 5 package txnBuf |
6 | 6 |
7 import ( | 7 import ( |
8 ds "github.com/luci/gae/service/datastore" | 8 ds "github.com/luci/gae/service/datastore" |
9 "github.com/luci/luci-go/common/errors" | 9 "github.com/luci/luci-go/common/errors" |
10 "golang.org/x/net/context" | 10 "golang.org/x/net/context" |
(...skipping 40 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
51 func (d *dsTxnBuf) Count(fq *ds.FinalizedQuery) (count int64, err error) { | 51 func (d *dsTxnBuf) Count(fq *ds.FinalizedQuery) (count int64, err error) { |
52 // Unfortunately there's no fast-path here. We literally have to run the | 52 // Unfortunately there's no fast-path here. We literally have to run the |
53 // query and count. Fortunately we can optimize to count keys if it's no
t | 53 // query and count. Fortunately we can optimize to count keys if it's no
t |
54 // a projection query. This will save on bandwidth a bit. | 54 // a projection query. This will save on bandwidth a bit. |
55 if len(fq.Project()) == 0 && !fq.KeysOnly() { | 55 if len(fq.Project()) == 0 && !fq.KeysOnly() { |
56 fq, err = fq.Original().KeysOnly(true).Finalize() | 56 fq, err = fq.Original().KeysOnly(true).Finalize() |
57 if err != nil { | 57 if err != nil { |
58 return | 58 return |
59 } | 59 } |
60 } | 60 } |
61 » err = d.Run(fq, func(_ *ds.Key, _ ds.PropertyMap, _ ds.CursorCB) bool { | 61 » err = d.Run(fq, func(_ *ds.Key, _ ds.PropertyMap, _ ds.CursorCB) error { |
62 count++ | 62 count++ |
63 » » return true | 63 » » return nil |
64 }) | 64 }) |
65 return | 65 return |
66 } | 66 } |
67 | 67 |
68 func (d *dsTxnBuf) Run(fq *ds.FinalizedQuery, cb ds.RawRunCB) error { | 68 func (d *dsTxnBuf) Run(fq *ds.FinalizedQuery, cb ds.RawRunCB) error { |
69 if start, end := fq.Bounds(); start != nil || end != nil { | 69 if start, end := fq.Bounds(); start != nil || end != nil { |
70 return errors.New("txnBuf filter does not support query cursors"
) | 70 return errors.New("txnBuf filter does not support query cursors"
) |
71 } | 71 } |
72 | 72 |
73 limit, limitSet := fq.Limit() | 73 limit, limitSet := fq.Limit() |
74 offset, _ := fq.Offset() | 74 offset, _ := fq.Offset() |
75 keysOnly := fq.KeysOnly() | 75 keysOnly := fq.KeysOnly() |
76 | 76 |
77 project := fq.Project() | 77 project := fq.Project() |
78 | 78 |
79 bufDS, parentDS, sizes := func() (ds.RawInterface, ds.RawInterface, *siz
eTracker) { | 79 bufDS, parentDS, sizes := func() (ds.RawInterface, ds.RawInterface, *siz
eTracker) { |
80 if !d.haveLock { | 80 if !d.haveLock { |
81 d.state.Lock() | 81 d.state.Lock() |
82 defer d.state.Unlock() | 82 defer d.state.Unlock() |
83 } | 83 } |
84 return d.state.bufDS, d.state.parentDS, d.state.entState.dup() | 84 return d.state.bufDS, d.state.parentDS, d.state.entState.dup() |
85 }() | 85 }() |
86 | 86 |
87 » return runMergedQueries(fq, sizes, bufDS, parentDS, func(key *ds.Key, da
ta ds.PropertyMap) bool { | 87 » return runMergedQueries(fq, sizes, bufDS, parentDS, func(key *ds.Key, da
ta ds.PropertyMap) error { |
88 if offset > 0 { | 88 if offset > 0 { |
89 offset-- | 89 offset-- |
90 » » » return true | 90 » » » return nil |
91 } | 91 } |
92 if limitSet { | 92 if limitSet { |
93 if limit == 0 { | 93 if limit == 0 { |
94 » » » » return false | 94 » » » » return ds.Stop |
95 } | 95 } |
96 limit-- | 96 limit-- |
97 } | 97 } |
98 if keysOnly { | 98 if keysOnly { |
99 data = nil | 99 data = nil |
100 } else if len(project) > 0 { | 100 } else if len(project) > 0 { |
101 newData := make(ds.PropertyMap, len(project)) | 101 newData := make(ds.PropertyMap, len(project)) |
102 for _, p := range project { | 102 for _, p := range project { |
103 newData[p] = data[p] | 103 newData[p] = data[p] |
104 } | 104 } |
105 data = newData | 105 data = newData |
106 } | 106 } |
107 return cb(key, data, nil) | 107 return cb(key, data, nil) |
108 }) | 108 }) |
109 } | 109 } |
110 | 110 |
111 func (d *dsTxnBuf) RunInTransaction(cb func(context.Context) error, opts *ds.Tra
nsactionOptions) error { | 111 func (d *dsTxnBuf) RunInTransaction(cb func(context.Context) error, opts *ds.Tra
nsactionOptions) error { |
112 if !d.haveLock { | 112 if !d.haveLock { |
113 d.state.Lock() | 113 d.state.Lock() |
114 defer d.state.Unlock() | 114 defer d.state.Unlock() |
115 } | 115 } |
116 return withTxnBuf(d.ic, cb, opts) | 116 return withTxnBuf(d.ic, cb, opts) |
117 } | 117 } |
118 | 118 |
119 func (d *dsTxnBuf) Testable() ds.Testable { | 119 func (d *dsTxnBuf) Testable() ds.Testable { |
120 return d.state.parentDS.Testable() | 120 return d.state.parentDS.Testable() |
121 } | 121 } |
OLD | NEW |