Index: impl/memory/datastore_query_execution.go |
diff --git a/impl/memory/datastore_query_execution.go b/impl/memory/datastore_query_execution.go |
index 975b0f158f2b235dcf86bab9c8c23d277f0d93f9..cc1ea10b5542a28d0f1814fdad40e078390d63b6 100644 |
--- a/impl/memory/datastore_query_execution.go |
+++ b/impl/memory/datastore_query_execution.go |
@@ -96,17 +96,18 @@ func (s *keysOnlyStrategy) handle(rawData [][]byte, _ []ds.Property, key *ds.Key |
type normalStrategy struct { |
cb ds.RawRunCB |
+ aid string |
ns string |
head *memCollection |
dedup stringset.Set |
} |
-func newNormalStrategy(ns string, cb ds.RawRunCB, head *memStore) queryStrategy { |
+func newNormalStrategy(aid, ns string, cb ds.RawRunCB, head *memStore) queryStrategy { |
coll := head.GetCollection("ents:" + ns) |
if coll == nil { |
return nil |
} |
- return &normalStrategy{cb, ns, coll, stringset.New(0)} |
+ return &normalStrategy{cb, aid, ns, coll, stringset.New(0)} |
} |
func (s *normalStrategy) handle(rawData [][]byte, _ []ds.Property, key *ds.Key, gc func() (ds.Cursor, error)) bool { |
@@ -120,7 +121,7 @@ func (s *normalStrategy) handle(rawData [][]byte, _ []ds.Property, key *ds.Key, |
// entity doesn't exist at head |
return true |
} |
- pm, err := serialize.ReadPropertyMap(bytes.NewBuffer(rawEnt), serialize.WithoutContext, globalAppID, s.ns) |
+ pm, err := serialize.ReadPropertyMap(bytes.NewBuffer(rawEnt), serialize.WithoutContext, s.aid, s.ns) |
memoryCorruption(err) |
return s.cb(key, pm, gc) |
@@ -133,10 +134,10 @@ func pickQueryStrategy(fq *ds.FinalizedQuery, rq *reducedQuery, cb ds.RawRunCB, |
if len(fq.Project()) > 0 { |
return newProjectionStrategy(fq, rq, cb) |
} |
- return newNormalStrategy(rq.ns, cb, head) |
+ return newNormalStrategy(rq.aid, rq.ns, cb, head) |
} |
-func parseSuffix(ns string, suffixFormat []ds.IndexColumn, suffix []byte, count int) (raw [][]byte, decoded []ds.Property) { |
+func parseSuffix(aid, ns string, suffixFormat []ds.IndexColumn, suffix []byte, count int) (raw [][]byte, decoded []ds.Property) { |
buf := serialize.Invertible(bytes.NewBuffer(suffix)) |
decoded = make([]ds.Property, len(suffixFormat)) |
raw = make([][]byte, len(suffixFormat)) |
@@ -149,7 +150,7 @@ func parseSuffix(ns string, suffixFormat []ds.IndexColumn, suffix []byte, count |
needInvert := suffixFormat[i].Descending |
buf.SetInvert(needInvert) |
- decoded[i], err = serialize.ReadProperty(buf, serialize.WithoutContext, globalAppID, ns) |
+ decoded[i], err = serialize.ReadProperty(buf, serialize.WithoutContext, aid, ns) |
memoryCorruption(err) |
offset := len(suffix) - buf.Len() |
@@ -163,22 +164,22 @@ func parseSuffix(ns string, suffixFormat []ds.IndexColumn, suffix []byte, count |
return |
} |
-func countQuery(fq *ds.FinalizedQuery, ns string, isTxn bool, idx, head *memStore) (ret int64, err error) { |
+func countQuery(fq *ds.FinalizedQuery, aid, ns string, isTxn bool, idx, head *memStore) (ret int64, err error) { |
if len(fq.Project()) == 0 && !fq.KeysOnly() { |
fq, err = fq.Original().KeysOnly(true).Finalize() |
if err != nil { |
return |
} |
} |
- err = executeQuery(fq, ns, isTxn, idx, head, func(_ *ds.Key, _ ds.PropertyMap, _ ds.CursorCB) bool { |
+ err = executeQuery(fq, aid, ns, isTxn, idx, head, func(_ *ds.Key, _ ds.PropertyMap, _ ds.CursorCB) bool { |
ret++ |
return true |
}) |
return |
} |
-func executeQuery(fq *ds.FinalizedQuery, ns string, isTxn bool, idx, head *memStore, cb ds.RawRunCB) error { |
- rq, err := reduce(fq, ns, isTxn) |
+func executeQuery(fq *ds.FinalizedQuery, aid, ns string, isTxn bool, idx, head *memStore, cb ds.RawRunCB) error { |
+ rq, err := reduce(fq, aid, ns, isTxn) |
if err == ds.ErrNullQuery { |
return nil |
} |
@@ -236,7 +237,7 @@ func executeQuery(fq *ds.FinalizedQuery, ns string, isTxn bool, idx, head *memSt |
limit-- |
} |
- rawData, decodedProps := parseSuffix(ns, rq.suffixFormat, suffix, -1) |
+ rawData, decodedProps := parseSuffix(aid, ns, rq.suffixFormat, suffix, -1) |
keyProp := decodedProps[len(decodedProps)-1] |
if keyProp.Type() != ds.PTKey { |