Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(146)

Side by Side Diff: memory/raw_datastore_query.go

Issue 1243323002: Refactor a bit. (Closed) Base URL: https://github.com/luci/gae.git@master
Patch Set: fix golint Created 5 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « memory/raw_datastore_data.go ('k') | memory/raw_datastore_test.go » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 package memory
6
7 import (
8 "bytes"
9 "errors"
10 "fmt"
11 "math"
12 "strings"
13
14 "github.com/luci/gae"
15 "github.com/luci/gae/helper"
16 "github.com/luci/gkvlite"
17 "github.com/luci/luci-go/common/cmpbin"
18 )
19
20 type qDirection bool
21
22 const (
23 qASC qDirection = true
24 qDEC = false
25 )
26
27 var builtinQueryPrefix = []byte{0}
28 var complexQueryPrefix = []byte{1}
29
30 type qSortBy struct {
31 prop string
32 dir qDirection
33 }
34
35 func (q qSortBy) WriteBinary(buf *bytes.Buffer) {
36 if q.dir == qASC {
37 buf.WriteByte(0)
38 } else {
39 buf.WriteByte(1)
40 }
41 cmpbin.WriteString(buf, q.prop)
42 }
43
44 func (q *qSortBy) ReadBinary(buf *bytes.Buffer) error {
45 dir, err := buf.ReadByte()
46 if err != nil {
47 return err
48 }
49 q.dir = dir == 0
50 q.prop, _, err = cmpbin.ReadString(buf)
51 return err
52 }
53
54 type qIndex struct {
55 kind string
56 ancestor bool
57 sortby []qSortBy
58 }
59
60 func (i *qIndex) Builtin() bool {
61 return !i.ancestor && len(i.sortby) <= 1
62 }
63
64 func (i *qIndex) Less(o *qIndex) bool {
65 ibuf, obuf := &bytes.Buffer{}, &bytes.Buffer{}
66 i.WriteBinary(ibuf)
67 o.WriteBinary(obuf)
68 return i.String() < o.String()
69 }
70
71 // Valid verifies that this qIndex doesn't have duplicate sortBy fields.
72 func (i *qIndex) Valid() bool {
73 names := map[string]bool{}
74 for _, sb := range i.sortby {
75 if names[sb.prop] {
76 return false
77 }
78 names[sb.prop] = true
79 }
80 return true
81 }
82
83 func (i *qIndex) WriteBinary(buf *bytes.Buffer) {
84 // TODO(riannucci): do a Grow call here?
85 if i.Builtin() {
86 buf.Write(builtinQueryPrefix)
87 } else {
88 buf.Write(complexQueryPrefix)
89 }
90 cmpbin.WriteString(buf, i.kind)
91 if i.ancestor {
92 buf.WriteByte(0)
93 } else {
94 buf.WriteByte(1)
95 }
96 cmpbin.WriteUint(buf, uint64(len(i.sortby)))
97 for _, sb := range i.sortby {
98 sb.WriteBinary(buf)
99 }
100 }
101
102 func (i *qIndex) String() string {
103 ret := &bytes.Buffer{}
104 if i.Builtin() {
105 ret.WriteRune('B')
106 } else {
107 ret.WriteRune('C')
108 }
109 ret.WriteRune(':')
110 ret.WriteString(i.kind)
111 if i.ancestor {
112 ret.WriteString("|A")
113 }
114 for _, sb := range i.sortby {
115 ret.WriteRune('/')
116 if sb.dir == qDEC {
117 ret.WriteRune('-')
118 }
119 ret.WriteString(sb.prop)
120 }
121 return ret.String()
122 }
123
124 func (i *qIndex) ReadBinary(buf *bytes.Buffer) error {
125 // discard builtin/complex byte
126 _, err := buf.ReadByte()
127 if err != nil {
128 return err
129 }
130
131 i.kind, _, err = cmpbin.ReadString(buf)
132 if err != nil {
133 return err
134 }
135 anc, err := buf.ReadByte()
136 if err != nil {
137 return err
138 }
139 i.ancestor = anc == 1
140
141 numSorts, _, err := cmpbin.ReadUint(buf)
142 if err != nil {
143 return err
144 }
145 if numSorts > 64 {
146 return fmt.Errorf("qIndex.ReadBinary: Got over 64 sort orders: % d", numSorts)
147 }
148 i.sortby = make([]qSortBy, numSorts)
149 for idx := range i.sortby {
150 err = (&i.sortby[idx]).ReadBinary(buf)
151 if err != nil {
152 return err
153 }
154 }
155
156 return nil
157 }
158
159 type queryOp int
160
161 const (
162 qInvalid queryOp = iota
163 qEqual
164 qLessThan
165 qLessEq
166 qGreaterEq
167 qGreaterThan
168 )
169
170 func (o queryOp) isEQOp() bool {
171 return o == qEqual
172 }
173
174 func (o queryOp) isINEQOp() bool {
175 return o >= qLessThan && o <= qGreaterThan
176 }
177
178 var queryOpMap = map[string]queryOp{
179 "=": qEqual,
180 "<": qLessThan,
181 "<=": qLessEq,
182 ">=": qGreaterEq,
183 ">": qGreaterThan,
184 }
185
186 type queryFilter struct {
187 field string
188 op queryOp
189 value interface{}
190 }
191
192 func parseFilter(f string, v interface{}) (ret queryFilter, err error) {
193 toks := strings.SplitN(strings.TrimSpace(f), " ", 2)
194 if len(toks) != 2 {
195 err = errors.New("datastore: invalid filter: " + f)
196 } else {
197 op := queryOpMap[toks[1]]
198 if op == qInvalid {
199 err = fmt.Errorf("datastore: invalid operator %q in filt er %q", toks[1], f)
200 } else {
201 ret.field = toks[0]
202 ret.op = op
203 ret.value = v
204 }
205 }
206 return
207 }
208
209 type queryOrder struct {
210 field string
211 direction qDirection
212 }
213
214 type queryCursor string
215
216 func (q queryCursor) String() string { return string(q) }
217 func (q queryCursor) Valid() bool { return q != "" }
218
219 type queryImpl struct {
220 ns string
221
222 kind string
223 ancestor gae.DSKey
224 filter []queryFilter
225 order []queryOrder
226 project []string
227
228 distinct bool
229 eventualConsistency bool
230 keysOnly bool
231 limit int32
232 offset int32
233
234 start queryCursor
235 end queryCursor
236
237 err error
238 }
239
240 var _ gae.DSQuery = (*queryImpl)(nil)
241
242 type queryIterImpl struct {
243 idx *queryImpl
244 }
245
246 var _ gae.RDSIterator = (*queryIterImpl)(nil)
247
248 func (q *queryIterImpl) Cursor() (gae.DSCursor, error) {
249 if q.idx.err != nil {
250 return nil, q.idx.err
251 }
252 return nil, nil
253 }
254
255 func (q *queryIterImpl) Next(dst gae.DSPropertyLoadSaver) (gae.DSKey, error) {
256 if q.idx.err != nil {
257 return nil, q.idx.err
258 }
259 return nil, nil
260 }
261
262 func (q *queryImpl) normalize() (ret *queryImpl) {
263 // ported from GAE SDK datastore_index.py;Normalize()
264 ret = q.clone()
265
266 bs := newMemStore()
267
268 eqProperties := bs.MakePrivateCollection(nil)
269
270 ineqProperties := bs.MakePrivateCollection(nil)
271
272 for _, f := range ret.filter {
273 // if we supported the IN operator, we would check to see if the re were
274 // multiple value operands here, but the go SDK doesn't support this.
275 if f.op.isEQOp() {
276 eqProperties.Set([]byte(f.field), []byte{})
277 } else if f.op.isINEQOp() {
278 ineqProperties.Set([]byte(f.field), []byte{})
279 }
280 }
281
282 ineqProperties.VisitItemsAscend(nil, false, func(i *gkvlite.Item) bool {
283 eqProperties.Delete(i.Key)
284 return true
285 })
286
287 removeSet := bs.MakePrivateCollection(nil)
288 eqProperties.VisitItemsAscend(nil, false, func(i *gkvlite.Item) bool {
289 removeSet.Set(i.Key, []byte{})
290 return true
291 })
292
293 newOrders := []queryOrder{}
294 for _, o := range ret.order {
295 if removeSet.Get([]byte(o.field)) == nil {
296 removeSet.Set([]byte(o.field), []byte{})
297 newOrders = append(newOrders, o)
298 }
299 }
300 ret.order = newOrders
301
302 // need to fix ret.filters if we ever support the EXISTS operator and/or
303 // projections.
304 //
305 // newFilters = []
306 // for f in ret.filters:
307 // if f.op != qExists:
308 // newFilters = append(newFilters, f)
309 // if !removeSet.Has(f.field):
310 // removeSet.InsertNoReplace(f.field)
311 // newFilters = append(newFilters, f)
312 //
313 // so ret.filters == newFilters becuase none of ret.filters has op == qE xists
314 //
315 // then:
316 //
317 // for prop in ret.project:
318 // if !removeSet.Has(prop):
319 // removeSet.InsertNoReplace(prop)
320 // ... make new EXISTS filters, add them to newFilters ...
321 // ret.filters = newFilters
322 //
323 // However, since we don't support projection queries, this is moot.
324
325 if eqProperties.Get([]byte("__key__")) != nil {
326 ret.order = []queryOrder{}
327 }
328
329 newOrders = []queryOrder{}
330 for _, o := range ret.order {
331 if o.field == "__key__" {
332 newOrders = append(newOrders, o)
333 break
334 }
335 newOrders = append(newOrders, o)
336 }
337 ret.order = newOrders
338
339 return
340 }
341
342 func (q *queryImpl) checkCorrectness(ns string, isTxn bool) (ret *queryImpl) {
343 // ported from GAE SDK datastore_stub_util.py;CheckQuery()
344 ret = q.clone()
345
346 if ns != ret.ns {
347 ret.err = errors.New(
348 "gae/memory: Namespace mismatched. Query and Datastore d on't agree " +
349 "on the current namespace")
350 return
351 }
352
353 if ret.err != nil {
354 return
355 }
356
357 // if projection && keys_only:
358 // "projection and keys_only cannot both be set"
359
360 // if projection props match /^__.*__$/:
361 // "projections are not supported for the property: %(prop)s"
362
363 if isTxn && ret.ancestor == nil {
364 ret.err = errors.New(
365 "gae/memory: Only ancestor queries are allowed inside tr ansactions")
366 return
367 }
368
369 numComponents := len(ret.filter) + len(ret.order)
370 if ret.ancestor != nil {
371 numComponents++
372 }
373 if numComponents > 100 {
374 ret.err = errors.New(
375 "gae/memory: query is too large. may not have more than " +
376 "100 filters + sort orders ancestor total")
377 }
378
379 // if ret.ancestor.appid() != current appid
380 // "query app is x but ancestor app is x"
381 // if ret.ancestor.namespace() != current namespace
382 // "query namespace is x but ancestor namespace is x"
383
384 // if not all(g in orders for g in group_by)
385 // "items in the group by clause must be specified first in the orderin g"
386
387 ineqPropName := ""
388 for _, f := range ret.filter {
389 if f.field == "__key__" {
390 k, ok := f.value.(gae.DSKey)
391 if !ok {
392 ret.err = errors.New(
393 "gae/memory: __key__ filter value must b e a Key")
394 return
395 }
396 if !helper.DSKeyValid(k, ret.ns, false) {
397 // See the comment in queryImpl.Ancestor; basica lly this check
398 // never happens in the real env because the SDK silently swallows
399 // this condition :/
400 ret.err = gae.ErrDSInvalidKey
401 return
402 }
403 // __key__ filter app is X but query app is X
404 // __key__ filter namespace is X but query namespace is X
405 }
406 // if f.op == qEqual and f.field in ret.project_fields
407 // "cannot use projection on a proprety with an equality filte r"
408
409 if f.op.isINEQOp() {
410 if ineqPropName == "" {
411 ineqPropName = f.field
412 } else if f.field != ineqPropName {
413 ret.err = fmt.Errorf(
414 "gae/memory: Only one inequality filter per query is supported. "+
415 "Encountered both %s and %s", in eqPropName, f.field)
416 return
417 }
418 }
419 }
420
421 // if ineqPropName != "" && len(group_by) > 0 && len(orders) ==0
422 // "Inequality filter on X must also be a group by property "+
423 // "when group by properties are set."
424
425 if ineqPropName != "" && len(ret.order) != 0 {
426 if ret.order[0].field != ineqPropName {
427 ret.err = fmt.Errorf(
428 "gae/memory: The first sort property must be the same as the property "+
429 "to which the inequality filter is appli ed. In your query "+
430 "the first sort property is %s but the i nequality filter "+
431 "is on %s", ret.order[0].field, ineqProp Name)
432 return
433 }
434 }
435
436 if ret.kind == "" {
437 for _, f := range ret.filter {
438 if f.field != "__key__" {
439 ret.err = errors.New(
440 "gae/memory: kind is required for non-__ key__ filters")
441 return
442 }
443 }
444 for _, o := range ret.order {
445 if o.field != "__key__" || o.direction != qASC {
446 ret.err = errors.New(
447 "gae/memory: kind is required for all or ders except __key__ ascending")
448 return
449 }
450 }
451 }
452 return
453 }
454
455 func (q *queryImpl) calculateIndex() *qIndex {
456 // as a nod to simplicity in this code, we'll require that a single inde x
457 // is able to service the entire query. E.g. no zigzag merge joins or
458 // multiqueries. This will mean that the user will need to rely on
459 // dev_appserver to tell them what indicies they need for real, and for thier
460 // tests they'll need to specify the missing composite indices manually.
461 //
462 // This COULD lead to an exploding indicies problem, but we can fix that when
463 // we get to it.
464
465 //sortOrders := []qSortBy{}
466
467 return nil
468 }
469
470 func (q *queryImpl) clone() *queryImpl {
471 ret := *q
472 ret.filter = append([]queryFilter(nil), q.filter...)
473 ret.order = append([]queryOrder(nil), q.order...)
474 ret.project = append([]string(nil), q.project...)
475 return &ret
476 }
477
478 func (q *queryImpl) Ancestor(k gae.DSKey) gae.DSQuery {
479 q = q.clone()
480 q.ancestor = k
481 if k == nil {
482 // SDK has an explicit nil-check
483 q.err = errors.New("datastore: nil query ancestor")
484 } else if !helper.DSKeyValid(k, q.ns, false) {
485 // technically the SDK implementation does a Weird Thing (tm) if both the
486 // stringID and intID are set on a key; it only serializes the s tringID in
487 // the proto. This means that if you set the Ancestor to an inva lid key,
488 // you'll never actually hear about it. Instead of doing that in sanity, we
489 // just swap to an error here.
490 q.err = gae.ErrDSInvalidKey
491 }
492 return q
493 }
494
495 func (q *queryImpl) Distinct() gae.DSQuery {
496 q = q.clone()
497 q.distinct = true
498 return q
499 }
500
501 func (q *queryImpl) Filter(fStr string, val interface{}) gae.DSQuery {
502 q = q.clone()
503 f, err := parseFilter(fStr, val)
504 if err != nil {
505 q.err = err
506 return q
507 }
508 q.filter = append(q.filter, f)
509 return q
510 }
511
512 func (q *queryImpl) Order(field string) gae.DSQuery {
513 q = q.clone()
514 field = strings.TrimSpace(field)
515 o := queryOrder{field, qASC}
516 if strings.HasPrefix(field, "-") {
517 o.direction = qDEC
518 o.field = strings.TrimSpace(field[1:])
519 } else if strings.HasPrefix(field, "+") {
520 q.err = fmt.Errorf("datastore: invalid order: %q", field)
521 return q
522 }
523 if len(o.field) == 0 {
524 q.err = errors.New("datastore: empty order")
525 return q
526 }
527 q.order = append(q.order, o)
528 return q
529 }
530
531 func (q *queryImpl) Project(fieldName ...string) gae.DSQuery {
532 q = q.clone()
533 q.project = append(q.project, fieldName...)
534 return q
535 }
536
537 func (q *queryImpl) KeysOnly() gae.DSQuery {
538 q = q.clone()
539 q.keysOnly = true
540 return q
541 }
542
543 func (q *queryImpl) Limit(limit int) gae.DSQuery {
544 q = q.clone()
545 if limit < math.MinInt32 || limit > math.MaxInt32 {
546 q.err = errors.New("datastore: query limit overflow")
547 return q
548 }
549 q.limit = int32(limit)
550 return q
551 }
552
553 func (q *queryImpl) Offset(offset int) gae.DSQuery {
554 q = q.clone()
555 if offset < 0 {
556 q.err = errors.New("datastore: negative query offset")
557 return q
558 }
559 if offset > math.MaxInt32 {
560 q.err = errors.New("datastore: query offset overflow")
561 return q
562 }
563 q.offset = int32(offset)
564 return q
565 }
566
567 func (q *queryImpl) Start(c gae.DSCursor) gae.DSQuery {
568 q = q.clone()
569 curs := c.(queryCursor)
570 if !curs.Valid() {
571 q.err = errors.New("datastore: invalid cursor")
572 return q
573 }
574 q.start = curs
575 return q
576 }
577
578 func (q *queryImpl) End(c gae.DSCursor) gae.DSQuery {
579 q = q.clone()
580 curs := c.(queryCursor)
581 if !curs.Valid() {
582 q.err = errors.New("datastore: invalid cursor")
583 return q
584 }
585 q.end = curs
586 return q
587 }
588
589 func (q *queryImpl) EventualConsistency() gae.DSQuery {
590 q = q.clone()
591 q.eventualConsistency = true
592 return q
593 }
OLDNEW
« no previous file with comments | « memory/raw_datastore_data.go ('k') | memory/raw_datastore_test.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698