Index: scheduler/appengine/engine/dsset/dsset.go |
diff --git a/scheduler/appengine/engine/dsset/dsset.go b/scheduler/appengine/engine/dsset/dsset.go |
new file mode 100644 |
index 0000000000000000000000000000000000000000..a09511025318a12e39b5ac8944d75f6355159b9a |
--- /dev/null |
+++ b/scheduler/appengine/engine/dsset/dsset.go |
@@ -0,0 +1,431 @@ |
+// Copyright 2017 The LUCI Authors. |
+// |
+// Licensed under the Apache License, Version 2.0 (the "License"); |
+// you may not use this file except in compliance with the License. |
+// You may obtain a copy of the License at |
+// |
+// http://www.apache.org/licenses/LICENSE-2.0 |
+// |
+// Unless required by applicable law or agreed to in writing, software |
+// distributed under the License is distributed on an "AS IS" BASIS, |
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
+// See the License for the specific language governing permissions and |
+// limitations under the License. |
+ |
+// Package dsset implements a particular flavor of datastore-backed set. |
+// |
+// Due to its internal structure, it requires some maintenance on behalf of the |
+// caller to periodically cleanup removed items (aka tombstones). |
+// |
+// Items added to set should have IDs unique in time (or at least unique for |
tandrii(chromium)
2017/07/18 15:55:14
1. I **feel**: "the" set, and "the" duration. But
Vadim Sh.
2017/07/24 00:30:16
Done.
|
+// duration of some configurable time interval, as defined by 'TombstonesDelay' |
+// property). It means removed items can be added back to the set right away. |
+// |
+// Properties: |
+// * Batch 'Add' with configurable QPS limit. |
+// * Transactional consistent 'Pop' (1 QPS limit). |
+// * Non-transactional consistent 'List' (1 QPS limit). |
+// * Popped items can't be re-added until their tombstones expire. |
+// |
+// These properties make dsset suitable for multiple producers/single consumer |
tandrii(chromium)
2017/07/18 15:55:15
nit: no need for slash in MPSC
Vadim Sh.
2017/07/24 00:30:16
Done.
|
+// queues, where order of items is not important and each item has an identifier |
+// unique in time. |
tandrii(chromium)
2017/07/18 15:55:14
ditto for "in time". How about "unique in each tim
Vadim Sh.
2017/07/24 00:30:17
Done.
|
+// |
+// Structurally dsset consists of N+1 entity groups: |
+// * N separate entity groups that contain N shards of the set. |
+// * 1 entity group (with a configurable root) that holds tombstones. |
+// |
+// It is safe to increase number of shards at any time. Decreasing number of |
+// shards is dangerous (but can be done with some more coding). |
+// |
+// More shards make: |
+// * Add() less contentious (so it can support more QPS). |
+// * List() and CleanupStorage() slower and more expensive. |
+// * Pop() is not affected by number of shards. |
+package dsset |
+ |
+import ( |
+ "fmt" |
+ "sync" |
+ "time" |
+ |
+ "golang.org/x/net/context" |
+ |
+ "github.com/luci/gae/service/datastore" |
+ "github.com/luci/luci-go/common/clock" |
+ "github.com/luci/luci-go/common/data/rand/mathrand" |
+ "github.com/luci/luci-go/common/data/stringset" |
+ "github.com/luci/luci-go/common/errors" |
+ "github.com/luci/luci-go/common/logging" |
+ "github.com/luci/luci-go/common/retry/transient" |
+) |
+ |
+// Set holds a set of Items and uses Tombstones to achieve idempotency of Add. |
+// |
+// Produces just call Add(...). |
tandrii(chromium)
2017/07/18 15:55:14
Producers
Vadim Sh.
2017/07/24 00:30:16
Done.
|
+// |
+// A consumer need to run more elaborate algorithm that ensures atomicity of |
tandrii(chromium)
2017/07/18 15:55:15
consider: s/A consumer need/The consumer (there sh
Vadim Sh.
2017/07/24 00:30:16
Done.
|
+// 'Pop' and takes care of cleanup up of the garbage. This requires a mix of |
tandrii(chromium)
2017/07/18 15:55:15
cleanup up -> cleanup OR cleaning up
Vadim Sh.
2017/07/24 00:30:17
Done.
|
+// transactional and non-transactional actions: |
+// |
+// items, tombstones, err := set.List(ctx) |
+// if err != nil { |
+// return err |
+// } |
+// |
+// if len(items) == 0 && len(tombstones) == 0 { |
+// return nil |
+// } |
+// |
+// if err := set.CleanupStorage(tombstones); err != nil { |
+// return err |
+// } |
+// |
+// ... Fetch any additional info associated with 'items' ... |
+// |
+// return datastore.RunInTransaction(ctx, func(ctx context.Context) error { |
+// op, err := set.BeginPop(ctx) |
+// if err != nil { |
+// return err |
+// } |
+// op.CleanupTombstones(tombstones) |
+// for _, itm := range items { |
+// if op.Pop(item.ID) { |
+// // The item was indeed in the set and we've just removed it! |
+// } else { |
+// // Some other transaction has popped it already. |
+// } |
+// } |
+// return op.Submit() |
+// }, nil) |
+type Set struct { |
+ ID string // global ID, used to construct datastore keys |
+ ShardCount int // number of entity groups to use for storage |
+ TombstonesRoot *datastore.Key // tombstones entity parent key |
+ TombstonesDelay time.Duration // how long to keep tombstones in the set |
+} |
+ |
+// Item is what's stored in the set. |
+type Item struct { |
+ ID string // unique in time identifier of the item |
+ Value []byte // arbitrary value (<1 MB, but preferably much smaller) |
+} |
+ |
+// Tombstone is a reference to a deleted item that still lingers in the set. |
+// |
+// Tombstones exist to make sure recently popped items do not reappear in the |
+// set if producers attempt to re-add them. |
+// |
+// Its fields are intentionally private to force correct usage CleanupStorage |
tandrii(chromium)
2017/07/18 15:55:14
usage of
Vadim Sh.
2017/07/24 00:30:17
Done.
|
+// and CleanupTombstones. |
+type Tombstone struct { |
+ set string // parent set name |
tandrii(chromium)
2017/07/18 15:55:15
parent set ID, right?
Vadim Sh.
2017/07/24 00:30:17
Yes. 'ID' used to be 'Name', I changed it midway a
|
+ id string // deleted item ID |
+ storage []*datastore.Key // itemEntity's to delete |
+} |
+ |
+// Add adds a bunch of items to the set. |
tandrii(chromium)
2017/07/18 15:55:15
s/set./set. idempotently.
Vadim Sh.
2017/07/24 00:30:16
Done.
|
+// |
+// Writes to some single entity group (not known in advance). If called outside |
+// of a transaction and the call fails, may add only some subset of items. |
+// Running inside a transaction makes this operation atomic. |
+// |
+// If items with given keys are already in the set, or has been deleted |
tandrii(chromium)
2017/07/18 15:55:14
s/has/have
Vadim Sh.
2017/07/24 00:30:16
Done.
|
+// recently, they won't be re-added. No error is returned in this case. |
+// |
+// Returns only transient errors. |
+func (s *Set) Add(c context.Context, items []Item) error { |
+ // TODO: batch + parallel |
tandrii(chromium)
2017/07/18 15:55:15
what's batch for? If # of items added is too large
Vadim Sh.
2017/07/24 00:30:17
Yes. There's a limit on number of entities in sing
tandrii(chromium)
2017/07/31 18:58:46
Acknowledged.
|
+ |
+ // Pick a random shard and add all new items there. If this is a retry, they |
+ // may exist in some other shard already. We don't care, they'll be |
+ // deduplicated in 'List'. If added items has been popped already (they have |
tandrii(chromium)
2017/07/18 15:55:14
s/has/have
Vadim Sh.
2017/07/24 00:30:15
Done.
|
+ // tombstones), 'List' will omit them as well. |
+ shardRoot := s.shardRoot(c, mathrand.Intn(c, s.ShardCount)) |
+ entities := make([]itemEntity, len(items)) |
+ for i, itm := range items { |
+ entities[i] = itemEntity{ |
+ ID: itm.ID, |
+ Parent: shardRoot, |
+ Value: itm.Value, |
+ } |
+ } |
+ return transient.Tag.Apply(datastore.Put(c, entities)) |
+} |
+ |
+// List returns all items that are currently in the set (in arbitrary order), |
+// as well as set of tombstones that points to items that were deleted |
tandrii(chromium)
2017/07/18 15:55:15
as well as A set
Vadim Sh.
2017/07/24 00:30:16
Done.
|
+// sufficiently long ago and can be cleaned up now. |
+// |
+// Must be called outside of transactions (panics otherwise). Reads many entity |
+// groups, including TombstonesRoot one. |
+// |
+// The set of tombstones to cleanup can be passed to CleanupStorage, and |
+// later to CleanupTombstones (during pop operation), in that order. Not doing |
+// so will lead to accumulation of garbage in the set that will slow down List |
+// and Pop. |
+// |
+// Returns only transient errors. |
+func (s *Set) List(c context.Context) (items []Item, cleanup []Tombstone, err error) { |
+ if datastore.CurrentTransaction(c) != nil { |
+ panic("dsset.Set.List must be called outside of a transaction") |
+ } |
+ |
+ // Fetch all shards (via consistent ancestor queries) and all tombstones. |
+ |
+ shards := make([][]*itemEntity, s.ShardCount) |
+ tombsEntity := tombstonesEntity{ID: s.ID, Parent: s.TombstonesRoot} |
+ |
+ wg := sync.WaitGroup{} |
+ errs := errors.NewLazyMultiError(s.ShardCount + 1) |
+ |
+ wg.Add(1) |
+ go func() { |
+ defer wg.Done() |
+ if err := datastore.Get(c, &tombsEntity); err != nil && err != datastore.ErrNoSuchEntity { |
+ errs.Assign(0, err) |
+ } |
+ }() |
+ |
+ wg.Add(s.ShardCount) |
+ for i := 0; i < s.ShardCount; i++ { |
+ go func(i int) { |
+ defer wg.Done() |
+ q := datastore.NewQuery("dsset.Item").Ancestor(s.shardRoot(c, i)) |
+ errs.Assign(i+1, datastore.GetAll(c, q, &shards[i])) |
+ }(i) |
+ } |
+ |
+ wg.Wait() |
+ if err := errs.Get(); err != nil { |
+ return nil, nil, transient.Tag.Apply(err) |
+ } |
+ |
+ logging.Infof(c, "%s: %d tombs", s.ID, len(tombsEntity.Tombstones)) |
+ for i, shard := range shards { |
+ logging.Infof(c, "%s: shard #%d - %d items", s.ID, i, len(shard)) |
+ } |
+ |
+ // A set of items we pretend not to see. Initially all tombstoned ones. |
+ // |
+ // Since we are iterating over tombstone list anyway, find all sufficiently |
+ // old tombstones to return them to the caller, so they can be cleaned up. |
+ ignore := stringset.New(len(tombsEntity.Tombstones)) |
+ oldestTombs := map[string]*Tombstone{} // key is item ID |
+ now := clock.Now(c).UTC() |
+ for _, t := range tombsEntity.Tombstones { |
+ ignore.Add(t.ID) |
+ if now.Sub(t.Tombstoned) > s.TombstonesDelay { |
+ oldestTombs[t.ID] = &Tombstone{set: s.ID, id: t.ID} |
+ } |
+ } |
+ |
+ // Join all shards, throwing away tombstoned and duplicated items. For oldest |
+ // tombstones collect references to entities that need to be deleted. There |
+ // may be more than one such entity per ID due to possibility of different |
+ // shards hosting same item (if Add call was retried)! |
tandrii(chromium)
2017/07/18 17:54:18
the same item
Vadim Sh.
2017/07/24 00:30:18
Done.
|
+ for _, shard := range shards { |
+ for _, itemEntity := range shard { |
+ if !ignore.Has(itemEntity.ID) { |
+ items = append(items, Item{ |
+ ID: itemEntity.ID, |
+ Value: itemEntity.Value, |
+ }) |
+ ignore.Add(itemEntity.ID) |
+ } |
+ if tomb := oldestTombs[itemEntity.ID]; tomb != nil { |
+ tomb.storage = append(tomb.storage, datastore.KeyForObj(c, itemEntity)) |
+ } |
+ } |
+ } |
+ |
+ if len(oldestTombs) != 0 { |
+ cleanup = make([]Tombstone, 0, len(oldestTombs)) |
+ for _, tomb := range oldestTombs { |
+ cleanup = append(cleanup, *tomb) |
+ } |
+ } |
+ |
+ return items, cleanup, nil |
+} |
+ |
+// CleanupStorage deletes entities used to store items under given tombstones. |
tandrii(chromium)
2017/07/18 17:54:18
nit: I'd call it GC :)
Vadim Sh.
2017/07/24 00:30:17
There are two "kinds" of GC: one removes itemEntit
|
+// |
+// Touches many entity groups. Must be called outside of transactions. |
+// Idempotent. |
+// |
+// This MUST be called before tombstones are popped. Failure to do so will make |
tandrii(chromium)
2017/07/18 17:54:18
is this worth enforcing? If so, then we could reco
Vadim Sh.
2017/07/24 00:30:16
Done, sort of.
|
+// items reappear in the set. |
+// |
+// Returns only transient errors. |
+func (s *Set) CleanupStorage(c context.Context, cleanup []Tombstone) error { |
+ if datastore.CurrentTransaction(c) != nil { |
+ panic("dsset.Set.CleanupStorage must be called outside of a transaction") |
+ } |
+ |
+ keys := []*datastore.Key{} |
+ for _, old := range cleanup { |
+ if old.set != s.ID { |
+ panic("passed Tombstone from some other set") |
+ } |
+ keys = append(keys, old.storage...) |
+ } |
+ |
+ // TODO: parallel |
+ |
+ merr := errors.MultiError{} |
+ for len(keys) > 0 { |
+ count := 100 |
+ if count > len(keys) { |
+ count = len(keys) |
+ } |
+ logging.Infof(c, "Cleaning up storage for %q by removing %d keys", s.ID, count) |
+ if err := datastore.Delete(c, keys[:count]); err != nil { |
+ logging.Warningf(c, "RPC failed - %s", err) |
+ merr = append(merr, err) |
+ } |
+ keys = keys[count:] |
+ } |
+ |
+ if len(merr) == 0 { |
+ return nil |
+ } |
+ |
+ return transient.Tag.Apply(merr) |
+} |
+ |
+// PopOp is an in-progress Pop operation. |
+// |
+// See BeginPop. |
+type PopOp struct { |
+ ctx context.Context // datastore context to use for this op |
+ set string // id of the set |
+ now time.Time // popping time for all popped items |
+ entity *tombstonesEntity // entity with tombstones |
+ tombs map[string]tombstone // tombstones in a map form |
+ dirty bool // true if the tombstone map was modified |
+} |
+ |
+// BeginPop initiates Pop operation. |
+// |
+// Pop operation is used to transactionally remove items from the set, as well |
+// as cleanup old tombstones. It must be finished with Submit. |
+// |
+// Requires a transaction. Modifies TombstonesRoot entity group (and only it). |
+// |
+// Returns only transient errors. |
+func (s *Set) BeginPop(c context.Context) (*PopOp, error) { |
+ if datastore.CurrentTransaction(c) == nil { |
+ panic("dsset.Set.BeginPop must be called inside a transaction") |
+ } |
+ |
+ entity := &tombstonesEntity{ID: s.ID, Parent: s.TombstonesRoot} |
+ if err := datastore.Get(c, entity); err != nil && err != datastore.ErrNoSuchEntity { |
+ return nil, transient.Tag.Apply(err) |
+ } |
+ |
+ // The data in tombstonesEntity, in map form. |
+ tombs := make(map[string]tombstone, len(entity.Tombstones)) |
+ for _, t := range entity.Tombstones { |
+ tombs[t.ID] = t |
+ } |
+ |
+ return &PopOp{ |
+ ctx: c, |
+ set: s.ID, |
+ now: clock.Now(c).UTC(), |
+ entity: entity, |
+ tombs: tombs, |
+ dirty: false, |
+ }, nil |
+} |
+ |
+// AlreadyPopped returns true if the given item has already been popped, perhaps |
+// in another transaction. |
+// |
+// Returns false for items that are still in the set, or items that have never |
+// been in the set to being with. Use List (prior to starting the transaction) |
tandrii(chromium)
2017/07/18 17:54:18
to beGIN with
Vadim Sh.
2017/07/24 00:30:17
Done.
|
+// to distinguish this two cases. |
tandrii(chromium)
2017/07/18 17:54:18
these
Vadim Sh.
2017/07/24 00:30:17
Done.
|
+func (p *PopOp) AlreadyPopped(id string) bool { |
+ _, hasTomb := p.tombs[id] |
+ return hasTomb |
+} |
+ |
+// Pop pop the item from the set and returns true if it was indeed |
tandrii(chromium)
2017/07/18 17:54:18
Pop pops
Vadim Sh.
2017/07/24 00:30:17
Done.
|
+// popped. |
+// |
+// Returns false if this item has been popped before (perhaps in another |
+// transaction). |
+// |
+// Returns true if item existed in the set and was popped, or it never existed |
+// there at all. Popping items that have never been in the set leads to weird |
+// behavior, don't do this. Use List (prior to starting the transaction) |
+// to figure out what items can be potentially popped. |
+func (p *PopOp) Pop(id string) bool { |
+ if _, hasTomb := p.tombs[id]; hasTomb { |
+ return false |
+ } |
+ p.tombs[id] = tombstone{ID: id, Tombstoned: p.now} |
tandrii(chromium)
2017/07/18 17:54:19
Suppose List is called > TombstonesDelay seconds b
Vadim Sh.
2017/07/24 00:30:16
It is single consumer mostly because 1 QPS limit o
|
+ p.dirty = true |
+ return true |
+} |
+ |
+// CleanupTombstones removes old tombstones. |
+// |
+// This must be called only after set's CleanupStorage method, otherwise removed |
+// items may reappear again. |
+func (p *PopOp) CleanupTombstones(tombstones []Tombstone) { |
+ for _, old := range tombstones { |
+ if old.set != p.set { |
+ panic("passed Tombstone from some other set") |
+ } |
+ if _, hasTomb := p.tombs[old.id]; hasTomb { |
+ delete(p.tombs, old.id) |
+ p.dirty = true |
+ } |
+ } |
+} |
+ |
+// Submit completes this pop operation but submitting changes to datastore. |
tandrii(chromium)
2017/07/18 15:55:14
s/but/by
Vadim Sh.
2017/07/24 00:30:15
Done.
|
+// |
+// The operation is unusable after this. |
tandrii(chromium)
2017/07/18 17:54:19
wdyt about adding a field "submitted"=false by def
Vadim Sh.
2017/07/24 00:30:17
I did it initially, but the removed to simplify co
|
+func (p *PopOp) Submit() error { |
+ if p.dirty { |
+ p.entity.Tombstones = p.entity.Tombstones[:0] |
+ for _, tomb := range p.tombs { |
+ p.entity.Tombstones = append(p.entity.Tombstones, tomb) |
+ } |
+ if err := datastore.Put(p.ctx, p.entity); err != nil { |
+ return transient.Tag.Apply(err) |
+ } |
+ } |
+ return nil |
+} |
+ |
+//////////////////////////////////////////////////////////////////////////////// |
+ |
+type itemEntity struct { |
+ _kind string `gae:"$kind,dsset.Item"` |
+ |
+ ID string `gae:"$id"` |
+ Parent *datastore.Key `gae:"$parent"` |
+ Value []byte `gae:",noindex"` |
+} |
+ |
+type tombstonesEntity struct { |
+ _kind string `gae:"$kind,dsset.Tombstones"` |
+ |
+ ID string `gae:"$id"` |
+ Parent *datastore.Key `gae:"$parent"` |
+ Tombstones []tombstone `gae:",noindex"` |
+} |
+ |
+type tombstone struct { |
+ ID string // ID of tombstoned item |
+ Tombstoned time.Time // when it was popped |
+} |
+ |
+// shardRoot returns entity group key to use for a given shard. |
+func (s *Set) shardRoot(c context.Context, n int) *datastore.Key { |
+ return datastore.NewKey(c, "dsset.Shard", fmt.Sprintf("%s:%d", s.ID, n), 0, nil) |
+} |