OLD | NEW |
1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 package memory | 5 package memory |
6 | 6 |
7 import ( | 7 import ( |
8 "bytes" | 8 "bytes" |
9 "fmt" | 9 "fmt" |
10 | 10 |
11 ds "github.com/luci/gae/service/datastore" | 11 ds "github.com/luci/gae/service/datastore" |
12 "github.com/luci/gae/service/datastore/serialize" | 12 "github.com/luci/gae/service/datastore/serialize" |
13 "github.com/luci/luci-go/common/cmpbin" | 13 "github.com/luci/luci-go/common/cmpbin" |
14 "github.com/luci/luci-go/common/stringset" | 14 "github.com/luci/luci-go/common/stringset" |
15 ) | 15 ) |
16 | 16 |
17 type queryStrategy interface { | 17 type queryStrategy interface { |
18 // handle applies the strategy to the embedded user callback. | 18 // handle applies the strategy to the embedded user callback. |
19 // - rawData is the slice of encoded Properties from the index row | 19 // - rawData is the slice of encoded Properties from the index row |
20 // (correctly de-inverted). | 20 // (correctly de-inverted). |
21 // - decodedProps is the slice of decoded Properties from the index ro
w | 21 // - decodedProps is the slice of decoded Properties from the index ro
w |
22 // - key is the decoded Key from the index row (the last item in rawDa
ta and | 22 // - key is the decoded Key from the index row (the last item in rawDa
ta and |
23 // decodedProps) | 23 // decodedProps) |
24 // - gc is the getCursor function to be passed to the user's callback | 24 // - gc is the getCursor function to be passed to the user's callback |
25 » handle(rawData [][]byte, decodedProps []ds.Property, key *ds.Key, gc fun
c() (ds.Cursor, error)) bool | 25 » handle(rawData [][]byte, decodedProps []ds.Property, key *ds.Key, gc fun
c() (ds.Cursor, error)) error |
26 } | 26 } |
27 | 27 |
28 type projectionLookup struct { | 28 type projectionLookup struct { |
29 suffixIndex int | 29 suffixIndex int |
30 propertyName string | 30 propertyName string |
31 } | 31 } |
32 | 32 |
33 type projectionStrategy struct { | 33 type projectionStrategy struct { |
34 cb ds.RawRunCB | 34 cb ds.RawRunCB |
35 | 35 |
(...skipping 17 matching lines...) Expand all Loading... |
53 } | 53 } |
54 impossible(lookupErr) | 54 impossible(lookupErr) |
55 } | 55 } |
56 ret := &projectionStrategy{cb: cb, project: projectionLookups} | 56 ret := &projectionStrategy{cb: cb, project: projectionLookups} |
57 if fq.Distinct() { | 57 if fq.Distinct() { |
58 ret.distinct = stringset.New(0) | 58 ret.distinct = stringset.New(0) |
59 } | 59 } |
60 return ret | 60 return ret |
61 } | 61 } |
62 | 62 |
63 func (s *projectionStrategy) handle(rawData [][]byte, decodedProps []ds.Property
, key *ds.Key, gc func() (ds.Cursor, error)) bool { | 63 func (s *projectionStrategy) handle(rawData [][]byte, decodedProps []ds.Property
, key *ds.Key, gc func() (ds.Cursor, error)) error { |
64 projectedRaw := [][]byte(nil) | 64 projectedRaw := [][]byte(nil) |
65 if s.distinct != nil { | 65 if s.distinct != nil { |
66 projectedRaw = make([][]byte, len(decodedProps)) | 66 projectedRaw = make([][]byte, len(decodedProps)) |
67 } | 67 } |
68 pmap := make(ds.PropertyMap, len(s.project)) | 68 pmap := make(ds.PropertyMap, len(s.project)) |
69 for i, p := range s.project { | 69 for i, p := range s.project { |
70 if s.distinct != nil { | 70 if s.distinct != nil { |
71 projectedRaw[i] = rawData[p.suffixIndex] | 71 projectedRaw[i] = rawData[p.suffixIndex] |
72 } | 72 } |
73 pmap[p.propertyName] = []ds.Property{decodedProps[p.suffixIndex]
} | 73 pmap[p.propertyName] = []ds.Property{decodedProps[p.suffixIndex]
} |
74 } | 74 } |
75 if s.distinct != nil { | 75 if s.distinct != nil { |
76 if !s.distinct.Add(string(serialize.Join(projectedRaw...))) { | 76 if !s.distinct.Add(string(serialize.Join(projectedRaw...))) { |
77 » » » return true | 77 » » » return nil |
78 } | 78 } |
79 } | 79 } |
80 return s.cb(key, pmap, gc) | 80 return s.cb(key, pmap, gc) |
81 } | 81 } |
82 | 82 |
83 type keysOnlyStrategy struct { | 83 type keysOnlyStrategy struct { |
84 cb ds.RawRunCB | 84 cb ds.RawRunCB |
85 | 85 |
86 dedup stringset.Set | 86 dedup stringset.Set |
87 } | 87 } |
88 | 88 |
89 func (s *keysOnlyStrategy) handle(rawData [][]byte, _ []ds.Property, key *ds.Key
, gc func() (ds.Cursor, error)) bool { | 89 func (s *keysOnlyStrategy) handle(rawData [][]byte, _ []ds.Property, key *ds.Key
, gc func() (ds.Cursor, error)) error { |
90 if !s.dedup.Add(string(rawData[len(rawData)-1])) { | 90 if !s.dedup.Add(string(rawData[len(rawData)-1])) { |
91 » » return true | 91 » » return nil |
92 } | 92 } |
93 return s.cb(key, nil, gc) | 93 return s.cb(key, nil, gc) |
94 } | 94 } |
95 | 95 |
96 type normalStrategy struct { | 96 type normalStrategy struct { |
97 cb ds.RawRunCB | 97 cb ds.RawRunCB |
98 | 98 |
99 aid string | 99 aid string |
100 ns string | 100 ns string |
101 head *memCollection | 101 head *memCollection |
102 dedup stringset.Set | 102 dedup stringset.Set |
103 } | 103 } |
104 | 104 |
105 func newNormalStrategy(aid, ns string, cb ds.RawRunCB, head *memStore) queryStra
tegy { | 105 func newNormalStrategy(aid, ns string, cb ds.RawRunCB, head *memStore) queryStra
tegy { |
106 coll := head.GetCollection("ents:" + ns) | 106 coll := head.GetCollection("ents:" + ns) |
107 if coll == nil { | 107 if coll == nil { |
108 return nil | 108 return nil |
109 } | 109 } |
110 return &normalStrategy{cb, aid, ns, coll, stringset.New(0)} | 110 return &normalStrategy{cb, aid, ns, coll, stringset.New(0)} |
111 } | 111 } |
112 | 112 |
113 func (s *normalStrategy) handle(rawData [][]byte, _ []ds.Property, key *ds.Key,
gc func() (ds.Cursor, error)) bool { | 113 func (s *normalStrategy) handle(rawData [][]byte, _ []ds.Property, key *ds.Key,
gc func() (ds.Cursor, error)) error { |
114 rawKey := rawData[len(rawData)-1] | 114 rawKey := rawData[len(rawData)-1] |
115 if !s.dedup.Add(string(rawKey)) { | 115 if !s.dedup.Add(string(rawKey)) { |
116 » » return true | 116 » » return nil |
117 } | 117 } |
118 | 118 |
119 rawEnt := s.head.Get(rawKey) | 119 rawEnt := s.head.Get(rawKey) |
120 if rawEnt == nil { | 120 if rawEnt == nil { |
121 // entity doesn't exist at head | 121 // entity doesn't exist at head |
122 » » return true | 122 » » return nil |
123 } | 123 } |
124 pm, err := serialize.ReadPropertyMap(bytes.NewBuffer(rawEnt), serialize.
WithoutContext, s.aid, s.ns) | 124 pm, err := serialize.ReadPropertyMap(bytes.NewBuffer(rawEnt), serialize.
WithoutContext, s.aid, s.ns) |
125 memoryCorruption(err) | 125 memoryCorruption(err) |
126 | 126 |
127 return s.cb(key, pm, gc) | 127 return s.cb(key, pm, gc) |
128 } | 128 } |
129 | 129 |
130 func pickQueryStrategy(fq *ds.FinalizedQuery, rq *reducedQuery, cb ds.RawRunCB,
head *memStore) queryStrategy { | 130 func pickQueryStrategy(fq *ds.FinalizedQuery, rq *reducedQuery, cb ds.RawRunCB,
head *memStore) queryStrategy { |
131 if fq.KeysOnly() { | 131 if fq.KeysOnly() { |
132 return &keysOnlyStrategy{cb, stringset.New(0)} | 132 return &keysOnlyStrategy{cb, stringset.New(0)} |
(...skipping 31 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
164 return | 164 return |
165 } | 165 } |
166 | 166 |
167 func countQuery(fq *ds.FinalizedQuery, aid, ns string, isTxn bool, idx, head *me
mStore) (ret int64, err error) { | 167 func countQuery(fq *ds.FinalizedQuery, aid, ns string, isTxn bool, idx, head *me
mStore) (ret int64, err error) { |
168 if len(fq.Project()) == 0 && !fq.KeysOnly() { | 168 if len(fq.Project()) == 0 && !fq.KeysOnly() { |
169 fq, err = fq.Original().KeysOnly(true).Finalize() | 169 fq, err = fq.Original().KeysOnly(true).Finalize() |
170 if err != nil { | 170 if err != nil { |
171 return | 171 return |
172 } | 172 } |
173 } | 173 } |
174 » err = executeQuery(fq, aid, ns, isTxn, idx, head, func(_ *ds.Key, _ ds.P
ropertyMap, _ ds.CursorCB) bool { | 174 » err = executeQuery(fq, aid, ns, isTxn, idx, head, func(_ *ds.Key, _ ds.P
ropertyMap, _ ds.CursorCB) error { |
175 ret++ | 175 ret++ |
176 » » return true | 176 » » return nil |
177 }) | 177 }) |
178 return | 178 return |
179 } | 179 } |
180 | 180 |
181 func executeQuery(fq *ds.FinalizedQuery, aid, ns string, isTxn bool, idx, head *
memStore, cb ds.RawRunCB) error { | 181 func executeQuery(fq *ds.FinalizedQuery, aid, ns string, isTxn bool, idx, head *
memStore, cb ds.RawRunCB) error { |
182 rq, err := reduce(fq, aid, ns, isTxn) | 182 rq, err := reduce(fq, aid, ns, isTxn) |
183 if err == ds.ErrNullQuery { | 183 if err == ds.ErrNullQuery { |
184 return nil | 184 return nil |
185 } | 185 } |
186 if err != nil { | 186 if err != nil { |
(...skipping 31 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
218 memoryCorruption(err) | 218 memoryCorruption(err) |
219 } | 219 } |
220 cursorPrefix = buf.Bytes() | 220 cursorPrefix = buf.Bytes() |
221 } | 221 } |
222 // TODO(riannucci): Do we need to decrement suffix inste
ad of increment | 222 // TODO(riannucci): Do we need to decrement suffix inste
ad of increment |
223 // if we're sorting by __key__ DESCENDING? | 223 // if we're sorting by __key__ DESCENDING? |
224 return queryCursor(serialize.Join(cursorPrefix, incremen
t(suffix))), nil | 224 return queryCursor(serialize.Join(cursorPrefix, incremen
t(suffix))), nil |
225 } | 225 } |
226 } | 226 } |
227 | 227 |
228 » multiIterate(idxs, func(suffix []byte) bool { | 228 » return multiIterate(idxs, func(suffix []byte) error { |
229 if offset > 0 { | 229 if offset > 0 { |
230 offset-- | 230 offset-- |
231 » » » return true | 231 » » » return nil |
232 } | 232 } |
233 if hasLimit { | 233 if hasLimit { |
234 if limit <= 0 { | 234 if limit <= 0 { |
235 » » » » return false | 235 » » » » return ds.Stop |
236 } | 236 } |
237 limit-- | 237 limit-- |
238 } | 238 } |
239 | 239 |
240 rawData, decodedProps := parseSuffix(aid, ns, rq.suffixFormat, s
uffix, -1) | 240 rawData, decodedProps := parseSuffix(aid, ns, rq.suffixFormat, s
uffix, -1) |
241 | 241 |
242 keyProp := decodedProps[len(decodedProps)-1] | 242 keyProp := decodedProps[len(decodedProps)-1] |
243 if keyProp.Type() != ds.PTKey { | 243 if keyProp.Type() != ds.PTKey { |
244 impossible(fmt.Errorf("decoded index row doesn't end wit
h a Key: %#v", keyProp)) | 244 impossible(fmt.Errorf("decoded index row doesn't end wit
h a Key: %#v", keyProp)) |
245 } | 245 } |
246 | 246 |
247 return strategy.handle( | 247 return strategy.handle( |
248 rawData, decodedProps, keyProp.Value().(*ds.Key), | 248 rawData, decodedProps, keyProp.Value().(*ds.Key), |
249 getCursorFn(suffix)) | 249 getCursorFn(suffix)) |
250 }) | 250 }) |
251 | |
252 return nil | |
253 } | 251 } |
OLD | NEW |