Index: filter/dsQueryBatch/filter.go |
diff --git a/filter/dsQueryBatch/filter.go b/filter/dsQueryBatch/filter.go |
new file mode 100644 |
index 0000000000000000000000000000000000000000..e55f8eee6a9d5ff74eda0c405506fd6a4b5166b1 |
--- /dev/null |
+++ b/filter/dsQueryBatch/filter.go |
@@ -0,0 +1,81 @@ |
+// Copyright 2015 The Chromium Authors. All rights reserved. |
+// Use of this source code is governed by a BSD-style license that can be |
+// found in the LICENSE file. |
+ |
+package dsQueryBatch |
+ |
+import ( |
+ "fmt" |
+ |
+ ds "github.com/luci/gae/service/datastore" |
+) |
+ |
+type iterQueryFilter struct { |
+ ds.RawInterface |
+ |
+ batchSize int32 |
+} |
+ |
+func (f *iterQueryFilter) Run(fq *ds.FinalizedQuery, cb ds.RawRunCB) error { |
+ limit, hasLimit := fq.Limit() |
+ |
+ var cursor ds.Cursor |
+ for { |
+ iterQuery := fq.Original() |
+ if cursor != nil { |
+ iterQuery = iterQuery.Start(cursor) |
+ cursor = nil |
+ } |
+ iterLimit := f.batchSize |
+ if hasLimit && limit < iterLimit { |
+ iterLimit = limit |
+ } |
+ iterQuery = iterQuery.Limit(iterLimit) |
+ |
+ iterFinalizedQuery, err := iterQuery.Finalize() |
+ if err != nil { |
+ panic(fmt.Errorf("failed to finalize internal query: %v", err)) |
+ } |
+ |
+ count := int32(0) |
+ err = f.RawInterface.Run(iterFinalizedQuery, func(key *ds.Key, val ds.PropertyMap, getCursor ds.CursorCB) error { |
+ if cursor != nil { |
+ // We're iterating past our batch size, which should never happen, since |
+ // we set a limit. This will only happen when our inner RawInterface |
+ // fails to honor the limit that we set. |
+ panic(fmt.Errorf("iterating past batch size")) |
+ } |
+ |
+ if err := cb(key, val, getCursor); err != nil { |
+ return err |
+ } |
+ |
+ // If this is the last entry in our batch, get the cursor and stop this |
+ // query round. |
+ count++ |
+ if count >= f.batchSize { |
+ if cursor, err = getCursor(); err != nil { |
+ return fmt.Errorf("failed to get cursor: %v", err) |
+ } |
+ } |
+ return nil |
+ }) |
+ if err != nil && err != ds.Stop { |
+ return err |
+ } |
+ |
+ // If we have no cursor, we're done. |
+ if cursor == nil { |
+ break |
+ } |
+ |
+ // Reduce our limit for the next round. |
+ if hasLimit { |
+ limit -= count |
+ if limit <= 0 { |
+ break |
+ } |
+ } |
+ } |
+ return nil |
+} |