| OLD | NEW |
| 1 // Copyright 2015 The LUCI Authors. All rights reserved. | 1 // Copyright 2015 The LUCI Authors. All rights reserved. |
| 2 // Use of this source code is governed under the Apache License, Version 2.0 | 2 // Use of this source code is governed under the Apache License, Version 2.0 |
| 3 // that can be found in the LICENSE file. | 3 // that can be found in the LICENSE file. |
| 4 | 4 |
| 5 package memory | 5 package memory |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "bytes" | 8 "bytes" |
| 9 "errors" | 9 "errors" |
| 10 "fmt" | 10 "fmt" |
| (...skipping 79 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 90 func (s *keysOnlyStrategy) handle(rawData [][]byte, _ []ds.Property, key *ds.Key
, gc func() (ds.Cursor, error)) error { | 90 func (s *keysOnlyStrategy) handle(rawData [][]byte, _ []ds.Property, key *ds.Key
, gc func() (ds.Cursor, error)) error { |
| 91 if !s.dedup.Add(string(rawData[len(rawData)-1])) { | 91 if !s.dedup.Add(string(rawData[len(rawData)-1])) { |
| 92 return nil | 92 return nil |
| 93 } | 93 } |
| 94 return s.cb(key, nil, gc) | 94 return s.cb(key, nil, gc) |
| 95 } | 95 } |
| 96 | 96 |
| 97 type normalStrategy struct { | 97 type normalStrategy struct { |
| 98 cb ds.RawRunCB | 98 cb ds.RawRunCB |
| 99 | 99 |
| 100 » aid string | 100 » kc ds.KeyContext |
| 101 » ns string | |
| 102 head memCollection | 101 head memCollection |
| 103 dedup stringset.Set | 102 dedup stringset.Set |
| 104 } | 103 } |
| 105 | 104 |
| 106 func newNormalStrategy(aid, ns string, cb ds.RawRunCB, head memStore) queryStrat
egy { | 105 func newNormalStrategy(kc ds.KeyContext, cb ds.RawRunCB, head memStore) queryStr
ategy { |
| 107 » coll := head.GetCollection("ents:" + ns) | 106 » coll := head.GetCollection("ents:" + kc.Namespace) |
| 108 if coll == nil { | 107 if coll == nil { |
| 109 return nil | 108 return nil |
| 110 } | 109 } |
| 111 » return &normalStrategy{cb, aid, ns, coll, stringset.New(0)} | 110 » return &normalStrategy{cb, kc, coll, stringset.New(0)} |
| 112 } | 111 } |
| 113 | 112 |
| 114 func (s *normalStrategy) handle(rawData [][]byte, _ []ds.Property, key *ds.Key,
gc func() (ds.Cursor, error)) error { | 113 func (s *normalStrategy) handle(rawData [][]byte, _ []ds.Property, key *ds.Key,
gc func() (ds.Cursor, error)) error { |
| 115 rawKey := rawData[len(rawData)-1] | 114 rawKey := rawData[len(rawData)-1] |
| 116 if !s.dedup.Add(string(rawKey)) { | 115 if !s.dedup.Add(string(rawKey)) { |
| 117 return nil | 116 return nil |
| 118 } | 117 } |
| 119 | 118 |
| 120 rawEnt := s.head.Get(rawKey) | 119 rawEnt := s.head.Get(rawKey) |
| 121 if rawEnt == nil { | 120 if rawEnt == nil { |
| 122 // entity doesn't exist at head | 121 // entity doesn't exist at head |
| 123 return nil | 122 return nil |
| 124 } | 123 } |
| 125 » pm, err := serialize.ReadPropertyMap(bytes.NewBuffer(rawEnt), serialize.
WithoutContext, s.aid, s.ns) | 124 » pm, err := serialize.ReadPropertyMap(bytes.NewBuffer(rawEnt), serialize.
WithoutContext, s.kc) |
| 126 memoryCorruption(err) | 125 memoryCorruption(err) |
| 127 | 126 |
| 128 return s.cb(key, pm, gc) | 127 return s.cb(key, pm, gc) |
| 129 } | 128 } |
| 130 | 129 |
| 131 func pickQueryStrategy(fq *ds.FinalizedQuery, rq *reducedQuery, cb ds.RawRunCB,
head memStore) queryStrategy { | 130 func pickQueryStrategy(fq *ds.FinalizedQuery, rq *reducedQuery, cb ds.RawRunCB,
head memStore) queryStrategy { |
| 132 if fq.KeysOnly() { | 131 if fq.KeysOnly() { |
| 133 return &keysOnlyStrategy{cb, stringset.New(0)} | 132 return &keysOnlyStrategy{cb, stringset.New(0)} |
| 134 } | 133 } |
| 135 if len(fq.Project()) > 0 { | 134 if len(fq.Project()) > 0 { |
| 136 return newProjectionStrategy(fq, rq, cb) | 135 return newProjectionStrategy(fq, rq, cb) |
| 137 } | 136 } |
| 138 » return newNormalStrategy(rq.aid, rq.ns, cb, head) | 137 » return newNormalStrategy(rq.kc, cb, head) |
| 139 } | 138 } |
| 140 | 139 |
| 141 func parseSuffix(aid, ns string, suffixFormat []ds.IndexColumn, suffix []byte, c
ount int) (raw [][]byte, decoded []ds.Property) { | 140 func parseSuffix(aid, ns string, suffixFormat []ds.IndexColumn, suffix []byte, c
ount int) (raw [][]byte, decoded []ds.Property) { |
| 142 buf := serialize.Invertible(bytes.NewBuffer(suffix)) | 141 buf := serialize.Invertible(bytes.NewBuffer(suffix)) |
| 143 decoded = make([]ds.Property, len(suffixFormat)) | 142 decoded = make([]ds.Property, len(suffixFormat)) |
| 144 raw = make([][]byte, len(suffixFormat)) | 143 raw = make([][]byte, len(suffixFormat)) |
| 145 | 144 |
| 146 err := error(nil) | 145 err := error(nil) |
| 146 kc := ds.KeyContext{aid, ns} |
| 147 for i := range decoded { | 147 for i := range decoded { |
| 148 if count >= 0 && i >= count { | 148 if count >= 0 && i >= count { |
| 149 break | 149 break |
| 150 } | 150 } |
| 151 needInvert := suffixFormat[i].Descending | 151 needInvert := suffixFormat[i].Descending |
| 152 | 152 |
| 153 buf.SetInvert(needInvert) | 153 buf.SetInvert(needInvert) |
| 154 » » decoded[i], err = serialize.ReadProperty(buf, serialize.WithoutC
ontext, aid, ns) | 154 » » decoded[i], err = serialize.ReadProperty(buf, serialize.WithoutC
ontext, kc) |
| 155 memoryCorruption(err) | 155 memoryCorruption(err) |
| 156 | 156 |
| 157 offset := len(suffix) - buf.Len() | 157 offset := len(suffix) - buf.Len() |
| 158 raw[i] = suffix[:offset] | 158 raw[i] = suffix[:offset] |
| 159 suffix = suffix[offset:] | 159 suffix = suffix[offset:] |
| 160 if needInvert { | 160 if needInvert { |
| 161 raw[i] = serialize.Invert(raw[i]) | 161 raw[i] = serialize.Invert(raw[i]) |
| 162 } | 162 } |
| 163 } | 163 } |
| 164 | 164 |
| 165 return | 165 return |
| 166 } | 166 } |
| 167 | 167 |
| 168 func countQuery(fq *ds.FinalizedQuery, aid, ns string, isTxn bool, idx, head mem
Store) (ret int64, err error) { | 168 func countQuery(fq *ds.FinalizedQuery, kc ds.KeyContext, isTxn bool, idx, head m
emStore) (ret int64, err error) { |
| 169 if len(fq.Project()) == 0 && !fq.KeysOnly() { | 169 if len(fq.Project()) == 0 && !fq.KeysOnly() { |
| 170 fq, err = fq.Original().KeysOnly(true).Finalize() | 170 fq, err = fq.Original().KeysOnly(true).Finalize() |
| 171 if err != nil { | 171 if err != nil { |
| 172 return | 172 return |
| 173 } | 173 } |
| 174 } | 174 } |
| 175 » err = executeQuery(fq, aid, ns, isTxn, idx, head, func(_ *ds.Key, _ ds.P
ropertyMap, _ ds.CursorCB) error { | 175 » err = executeQuery(fq, kc, isTxn, idx, head, func(_ *ds.Key, _ ds.Proper
tyMap, _ ds.CursorCB) error { |
| 176 ret++ | 176 ret++ |
| 177 return nil | 177 return nil |
| 178 }) | 178 }) |
| 179 return | 179 return |
| 180 } | 180 } |
| 181 | 181 |
| 182 func executeNamespaceQuery(fq *ds.FinalizedQuery, aid string, head memStore, cb
ds.RawRunCB) error { | 182 func executeNamespaceQuery(fq *ds.FinalizedQuery, kc ds.KeyContext, head memStor
e, cb ds.RawRunCB) error { |
| 183 // these objects have no properties, so any filters on properties cause
an | 183 // these objects have no properties, so any filters on properties cause
an |
| 184 // empty result. | 184 // empty result. |
| 185 if len(fq.EqFilters()) > 0 || len(fq.Project()) > 0 || len(fq.Orders())
> 1 { | 185 if len(fq.EqFilters()) > 0 || len(fq.Project()) > 0 || len(fq.Orders())
> 1 { |
| 186 return nil | 186 return nil |
| 187 } | 187 } |
| 188 if !(fq.IneqFilterProp() == "" || fq.IneqFilterProp() == "__key__") { | 188 if !(fq.IneqFilterProp() == "" || fq.IneqFilterProp() == "__key__") { |
| 189 return nil | 189 return nil |
| 190 } | 190 } |
| 191 limit, hasLimit := fq.Limit() | 191 limit, hasLimit := fq.Limit() |
| 192 offset, hasOffset := fq.Offset() | 192 offset, hasOffset := fq.Offset() |
| 193 start, end := fq.Bounds() | 193 start, end := fq.Bounds() |
| 194 | 194 |
| 195 cursErr := errors.New("cursors not supported for __namespace__ query") | 195 cursErr := errors.New("cursors not supported for __namespace__ query") |
| 196 cursFn := func() (ds.Cursor, error) { return nil, cursErr } | 196 cursFn := func() (ds.Cursor, error) { return nil, cursErr } |
| 197 if !(start == nil && end == nil) { | 197 if !(start == nil && end == nil) { |
| 198 return cursErr | 198 return cursErr |
| 199 } | 199 } |
| 200 |
| 201 kc.Namespace = "" |
| 200 for _, ns := range namespaces(head) { | 202 for _, ns := range namespaces(head) { |
| 201 if hasOffset && offset > 0 { | 203 if hasOffset && offset > 0 { |
| 202 offset-- | 204 offset-- |
| 203 continue | 205 continue |
| 204 } | 206 } |
| 205 if hasLimit { | 207 if hasLimit { |
| 206 if limit <= 0 { | 208 if limit <= 0 { |
| 207 return ds.Stop | 209 return ds.Stop |
| 208 } | 210 } |
| 209 limit-- | 211 limit-- |
| 210 } | 212 } |
| 211 k := (*ds.Key)(nil) | 213 k := (*ds.Key)(nil) |
| 212 if ns == "" { | 214 if ns == "" { |
| 213 // Datastore uses an id of 1 to indicate the default nam
espace in its | 215 // Datastore uses an id of 1 to indicate the default nam
espace in its |
| 214 // metadata API. | 216 // metadata API. |
| 215 » » » k = ds.MakeKey(aid, "", "__namespace__", 1) | 217 » » » k = kc.MakeKey("__namespace__", 1) |
| 216 } else { | 218 } else { |
| 217 » » » k = ds.MakeKey(aid, "", "__namespace__", ns) | 219 » » » k = kc.MakeKey("__namespace__", ns) |
| 218 } | 220 } |
| 219 if err := cb(k, nil, cursFn); err != nil { | 221 if err := cb(k, nil, cursFn); err != nil { |
| 220 return err | 222 return err |
| 221 } | 223 } |
| 222 } | 224 } |
| 223 return nil | 225 return nil |
| 224 } | 226 } |
| 225 | 227 |
| 226 func executeQuery(fq *ds.FinalizedQuery, aid, ns string, isTxn bool, idx, head m
emStore, cb ds.RawRunCB) error { | 228 func executeQuery(fq *ds.FinalizedQuery, kc ds.KeyContext, isTxn bool, idx, head
memStore, cb ds.RawRunCB) error { |
| 227 » rq, err := reduce(fq, aid, ns, isTxn) | 229 » rq, err := reduce(fq, kc, isTxn) |
| 228 if err == ds.ErrNullQuery { | 230 if err == ds.ErrNullQuery { |
| 229 return nil | 231 return nil |
| 230 } | 232 } |
| 231 if err != nil { | 233 if err != nil { |
| 232 return err | 234 return err |
| 233 } | 235 } |
| 234 | 236 |
| 235 if rq.kind == "__namespace__" { | 237 if rq.kind == "__namespace__" { |
| 236 » » return executeNamespaceQuery(fq, aid, head, cb) | 238 » » return executeNamespaceQuery(fq, kc, head, cb) |
| 237 } | 239 } |
| 238 | 240 |
| 239 idxs, err := getIndexes(rq, idx) | 241 idxs, err := getIndexes(rq, idx) |
| 240 if err == ds.ErrNullQuery { | 242 if err == ds.ErrNullQuery { |
| 241 return nil | 243 return nil |
| 242 } | 244 } |
| 243 if err != nil { | 245 if err != nil { |
| 244 return err | 246 return err |
| 245 } | 247 } |
| 246 | 248 |
| (...skipping 32 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 279 offset-- | 281 offset-- |
| 280 return nil | 282 return nil |
| 281 } | 283 } |
| 282 if hasLimit { | 284 if hasLimit { |
| 283 if limit <= 0 { | 285 if limit <= 0 { |
| 284 return ds.Stop | 286 return ds.Stop |
| 285 } | 287 } |
| 286 limit-- | 288 limit-- |
| 287 } | 289 } |
| 288 | 290 |
| 289 » » rawData, decodedProps := parseSuffix(aid, ns, rq.suffixFormat, s
uffix, -1) | 291 » » rawData, decodedProps := parseSuffix(kc.AppID, kc.Namespace, rq.
suffixFormat, suffix, -1) |
| 290 | 292 |
| 291 keyProp := decodedProps[len(decodedProps)-1] | 293 keyProp := decodedProps[len(decodedProps)-1] |
| 292 if keyProp.Type() != ds.PTKey { | 294 if keyProp.Type() != ds.PTKey { |
| 293 impossible(fmt.Errorf("decoded index row doesn't end wit
h a Key: %#v", keyProp)) | 295 impossible(fmt.Errorf("decoded index row doesn't end wit
h a Key: %#v", keyProp)) |
| 294 } | 296 } |
| 295 | 297 |
| 296 return strategy.handle( | 298 return strategy.handle( |
| 297 rawData, decodedProps, keyProp.Value().(*ds.Key), | 299 rawData, decodedProps, keyProp.Value().(*ds.Key), |
| 298 getCursorFn(suffix)) | 300 getCursorFn(suffix)) |
| 299 }) | 301 }) |
| 300 } | 302 } |
| OLD | NEW |