| OLD | NEW |
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 package memory | 5 package memory |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "bytes" | 8 "bytes" |
| 9 "fmt" | 9 "fmt" |
| 10 | 10 |
| 11 ds "github.com/luci/gae/service/datastore" | 11 ds "github.com/luci/gae/service/datastore" |
| 12 "github.com/luci/gae/service/datastore/serialize" | 12 "github.com/luci/gae/service/datastore/serialize" |
| 13 "github.com/luci/luci-go/common/cmpbin" | 13 "github.com/luci/luci-go/common/cmpbin" |
| 14 "github.com/luci/luci-go/common/stringset" | 14 "github.com/luci/luci-go/common/stringset" |
| 15 ) | 15 ) |
| 16 | 16 |
| 17 type queryStrategy interface { | 17 type queryStrategy interface { |
| 18 // handle applies the strategy to the embedded user callback. | 18 // handle applies the strategy to the embedded user callback. |
| 19 // - rawData is the slice of encoded Properties from the index row | 19 // - rawData is the slice of encoded Properties from the index row |
| 20 // (correctly de-inverted). | 20 // (correctly de-inverted). |
| 21 // - decodedProps is the slice of decoded Properties from the index ro
w | 21 // - decodedProps is the slice of decoded Properties from the index ro
w |
| 22 // - key is the decoded Key from the index row (the last item in rawDa
ta and | 22 // - key is the decoded Key from the index row (the last item in rawDa
ta and |
| 23 // decodedProps) | 23 // decodedProps) |
| 24 // - gc is the getCursor function to be passed to the user's callback | 24 // - gc is the getCursor function to be passed to the user's callback |
| 25 » handle(rawData [][]byte, decodedProps []ds.Property, key *ds.Key, gc fun
c() (ds.Cursor, error)) bool | 25 » handle(rawData [][]byte, decodedProps []ds.Property, key *ds.Key, gc fun
c() (ds.Cursor, error)) error |
| 26 } | 26 } |
| 27 | 27 |
| 28 type projectionLookup struct { | 28 type projectionLookup struct { |
| 29 suffixIndex int | 29 suffixIndex int |
| 30 propertyName string | 30 propertyName string |
| 31 } | 31 } |
| 32 | 32 |
| 33 type projectionStrategy struct { | 33 type projectionStrategy struct { |
| 34 cb ds.RawRunCB | 34 cb ds.RawRunCB |
| 35 | 35 |
| (...skipping 17 matching lines...) Expand all Loading... |
| 53 } | 53 } |
| 54 impossible(lookupErr) | 54 impossible(lookupErr) |
| 55 } | 55 } |
| 56 ret := &projectionStrategy{cb: cb, project: projectionLookups} | 56 ret := &projectionStrategy{cb: cb, project: projectionLookups} |
| 57 if fq.Distinct() { | 57 if fq.Distinct() { |
| 58 ret.distinct = stringset.New(0) | 58 ret.distinct = stringset.New(0) |
| 59 } | 59 } |
| 60 return ret | 60 return ret |
| 61 } | 61 } |
| 62 | 62 |
| 63 func (s *projectionStrategy) handle(rawData [][]byte, decodedProps []ds.Property
, key *ds.Key, gc func() (ds.Cursor, error)) bool { | 63 func (s *projectionStrategy) handle(rawData [][]byte, decodedProps []ds.Property
, key *ds.Key, gc func() (ds.Cursor, error)) error { |
| 64 projectedRaw := [][]byte(nil) | 64 projectedRaw := [][]byte(nil) |
| 65 if s.distinct != nil { | 65 if s.distinct != nil { |
| 66 projectedRaw = make([][]byte, len(decodedProps)) | 66 projectedRaw = make([][]byte, len(decodedProps)) |
| 67 } | 67 } |
| 68 pmap := make(ds.PropertyMap, len(s.project)) | 68 pmap := make(ds.PropertyMap, len(s.project)) |
| 69 for i, p := range s.project { | 69 for i, p := range s.project { |
| 70 if s.distinct != nil { | 70 if s.distinct != nil { |
| 71 projectedRaw[i] = rawData[p.suffixIndex] | 71 projectedRaw[i] = rawData[p.suffixIndex] |
| 72 } | 72 } |
| 73 pmap[p.propertyName] = []ds.Property{decodedProps[p.suffixIndex]
} | 73 pmap[p.propertyName] = []ds.Property{decodedProps[p.suffixIndex]
} |
| 74 } | 74 } |
| 75 if s.distinct != nil { | 75 if s.distinct != nil { |
| 76 if !s.distinct.Add(string(serialize.Join(projectedRaw...))) { | 76 if !s.distinct.Add(string(serialize.Join(projectedRaw...))) { |
| 77 » » » return true | 77 » » » return nil |
| 78 } | 78 } |
| 79 } | 79 } |
| 80 return s.cb(key, pmap, gc) | 80 return s.cb(key, pmap, gc) |
| 81 } | 81 } |
| 82 | 82 |
| 83 type keysOnlyStrategy struct { | 83 type keysOnlyStrategy struct { |
| 84 cb ds.RawRunCB | 84 cb ds.RawRunCB |
| 85 | 85 |
| 86 dedup stringset.Set | 86 dedup stringset.Set |
| 87 } | 87 } |
| 88 | 88 |
| 89 func (s *keysOnlyStrategy) handle(rawData [][]byte, _ []ds.Property, key *ds.Key
, gc func() (ds.Cursor, error)) bool { | 89 func (s *keysOnlyStrategy) handle(rawData [][]byte, _ []ds.Property, key *ds.Key
, gc func() (ds.Cursor, error)) error { |
| 90 if !s.dedup.Add(string(rawData[len(rawData)-1])) { | 90 if !s.dedup.Add(string(rawData[len(rawData)-1])) { |
| 91 » » return true | 91 » » return nil |
| 92 } | 92 } |
| 93 return s.cb(key, nil, gc) | 93 return s.cb(key, nil, gc) |
| 94 } | 94 } |
| 95 | 95 |
| 96 type normalStrategy struct { | 96 type normalStrategy struct { |
| 97 cb ds.RawRunCB | 97 cb ds.RawRunCB |
| 98 | 98 |
| 99 aid string | 99 aid string |
| 100 ns string | 100 ns string |
| 101 head *memCollection | 101 head *memCollection |
| 102 dedup stringset.Set | 102 dedup stringset.Set |
| 103 } | 103 } |
| 104 | 104 |
| 105 func newNormalStrategy(aid, ns string, cb ds.RawRunCB, head *memStore) queryStra
tegy { | 105 func newNormalStrategy(aid, ns string, cb ds.RawRunCB, head *memStore) queryStra
tegy { |
| 106 coll := head.GetCollection("ents:" + ns) | 106 coll := head.GetCollection("ents:" + ns) |
| 107 if coll == nil { | 107 if coll == nil { |
| 108 return nil | 108 return nil |
| 109 } | 109 } |
| 110 return &normalStrategy{cb, aid, ns, coll, stringset.New(0)} | 110 return &normalStrategy{cb, aid, ns, coll, stringset.New(0)} |
| 111 } | 111 } |
| 112 | 112 |
| 113 func (s *normalStrategy) handle(rawData [][]byte, _ []ds.Property, key *ds.Key,
gc func() (ds.Cursor, error)) bool { | 113 func (s *normalStrategy) handle(rawData [][]byte, _ []ds.Property, key *ds.Key,
gc func() (ds.Cursor, error)) error { |
| 114 rawKey := rawData[len(rawData)-1] | 114 rawKey := rawData[len(rawData)-1] |
| 115 if !s.dedup.Add(string(rawKey)) { | 115 if !s.dedup.Add(string(rawKey)) { |
| 116 » » return true | 116 » » return nil |
| 117 } | 117 } |
| 118 | 118 |
| 119 rawEnt := s.head.Get(rawKey) | 119 rawEnt := s.head.Get(rawKey) |
| 120 if rawEnt == nil { | 120 if rawEnt == nil { |
| 121 // entity doesn't exist at head | 121 // entity doesn't exist at head |
| 122 » » return true | 122 » » return nil |
| 123 } | 123 } |
| 124 pm, err := serialize.ReadPropertyMap(bytes.NewBuffer(rawEnt), serialize.
WithoutContext, s.aid, s.ns) | 124 pm, err := serialize.ReadPropertyMap(bytes.NewBuffer(rawEnt), serialize.
WithoutContext, s.aid, s.ns) |
| 125 memoryCorruption(err) | 125 memoryCorruption(err) |
| 126 | 126 |
| 127 return s.cb(key, pm, gc) | 127 return s.cb(key, pm, gc) |
| 128 } | 128 } |
| 129 | 129 |
| 130 func pickQueryStrategy(fq *ds.FinalizedQuery, rq *reducedQuery, cb ds.RawRunCB,
head *memStore) queryStrategy { | 130 func pickQueryStrategy(fq *ds.FinalizedQuery, rq *reducedQuery, cb ds.RawRunCB,
head *memStore) queryStrategy { |
| 131 if fq.KeysOnly() { | 131 if fq.KeysOnly() { |
| 132 return &keysOnlyStrategy{cb, stringset.New(0)} | 132 return &keysOnlyStrategy{cb, stringset.New(0)} |
| (...skipping 31 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 164 return | 164 return |
| 165 } | 165 } |
| 166 | 166 |
| 167 func countQuery(fq *ds.FinalizedQuery, aid, ns string, isTxn bool, idx, head *me
mStore) (ret int64, err error) { | 167 func countQuery(fq *ds.FinalizedQuery, aid, ns string, isTxn bool, idx, head *me
mStore) (ret int64, err error) { |
| 168 if len(fq.Project()) == 0 && !fq.KeysOnly() { | 168 if len(fq.Project()) == 0 && !fq.KeysOnly() { |
| 169 fq, err = fq.Original().KeysOnly(true).Finalize() | 169 fq, err = fq.Original().KeysOnly(true).Finalize() |
| 170 if err != nil { | 170 if err != nil { |
| 171 return | 171 return |
| 172 } | 172 } |
| 173 } | 173 } |
| 174 » err = executeQuery(fq, aid, ns, isTxn, idx, head, func(_ *ds.Key, _ ds.P
ropertyMap, _ ds.CursorCB) bool { | 174 » err = executeQuery(fq, aid, ns, isTxn, idx, head, func(_ *ds.Key, _ ds.P
ropertyMap, _ ds.CursorCB) error { |
| 175 ret++ | 175 ret++ |
| 176 » » return true | 176 » » return nil |
| 177 }) | 177 }) |
| 178 return | 178 return |
| 179 } | 179 } |
| 180 | 180 |
| 181 func executeQuery(fq *ds.FinalizedQuery, aid, ns string, isTxn bool, idx, head *
memStore, cb ds.RawRunCB) error { | 181 func executeQuery(fq *ds.FinalizedQuery, aid, ns string, isTxn bool, idx, head *
memStore, cb ds.RawRunCB) error { |
| 182 rq, err := reduce(fq, aid, ns, isTxn) | 182 rq, err := reduce(fq, aid, ns, isTxn) |
| 183 if err == ds.ErrNullQuery { | 183 if err == ds.ErrNullQuery { |
| 184 return nil | 184 return nil |
| 185 } | 185 } |
| 186 if err != nil { | 186 if err != nil { |
| (...skipping 31 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 218 memoryCorruption(err) | 218 memoryCorruption(err) |
| 219 } | 219 } |
| 220 cursorPrefix = buf.Bytes() | 220 cursorPrefix = buf.Bytes() |
| 221 } | 221 } |
| 222 // TODO(riannucci): Do we need to decrement suffix inste
ad of increment | 222 // TODO(riannucci): Do we need to decrement suffix inste
ad of increment |
| 223 // if we're sorting by __key__ DESCENDING? | 223 // if we're sorting by __key__ DESCENDING? |
| 224 return queryCursor(serialize.Join(cursorPrefix, incremen
t(suffix))), nil | 224 return queryCursor(serialize.Join(cursorPrefix, incremen
t(suffix))), nil |
| 225 } | 225 } |
| 226 } | 226 } |
| 227 | 227 |
| 228 » multiIterate(idxs, func(suffix []byte) bool { | 228 » return multiIterate(idxs, func(suffix []byte) error { |
| 229 if offset > 0 { | 229 if offset > 0 { |
| 230 offset-- | 230 offset-- |
| 231 » » » return true | 231 » » » return nil |
| 232 } | 232 } |
| 233 if hasLimit { | 233 if hasLimit { |
| 234 if limit <= 0 { | 234 if limit <= 0 { |
| 235 » » » » return false | 235 » » » » return ds.Stop |
| 236 } | 236 } |
| 237 limit-- | 237 limit-- |
| 238 } | 238 } |
| 239 | 239 |
| 240 rawData, decodedProps := parseSuffix(aid, ns, rq.suffixFormat, s
uffix, -1) | 240 rawData, decodedProps := parseSuffix(aid, ns, rq.suffixFormat, s
uffix, -1) |
| 241 | 241 |
| 242 keyProp := decodedProps[len(decodedProps)-1] | 242 keyProp := decodedProps[len(decodedProps)-1] |
| 243 if keyProp.Type() != ds.PTKey { | 243 if keyProp.Type() != ds.PTKey { |
| 244 impossible(fmt.Errorf("decoded index row doesn't end wit
h a Key: %#v", keyProp)) | 244 impossible(fmt.Errorf("decoded index row doesn't end wit
h a Key: %#v", keyProp)) |
| 245 } | 245 } |
| 246 | 246 |
| 247 return strategy.handle( | 247 return strategy.handle( |
| 248 rawData, decodedProps, keyProp.Value().(*ds.Key), | 248 rawData, decodedProps, keyProp.Value().(*ds.Key), |
| 249 getCursorFn(suffix)) | 249 getCursorFn(suffix)) |
| 250 }) | 250 }) |
| 251 | |
| 252 return nil | |
| 253 } | 251 } |
| OLD | NEW |