Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1120)

Unified Diff: scheduler/appengine/engine/dsset/dsset.go

Issue 2981143002: Add 'dsset' structure. (Closed)
Patch Set: Created 3 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: scheduler/appengine/engine/dsset/dsset.go
diff --git a/scheduler/appengine/engine/dsset/dsset.go b/scheduler/appengine/engine/dsset/dsset.go
new file mode 100644
index 0000000000000000000000000000000000000000..a09511025318a12e39b5ac8944d75f6355159b9a
--- /dev/null
+++ b/scheduler/appengine/engine/dsset/dsset.go
@@ -0,0 +1,431 @@
+// Copyright 2017 The LUCI Authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package dsset implements a particular flavor of datastore-backed set.
+//
+// Due to its internal structure, it requires some maintenance on behalf of the
+// caller to periodically cleanup removed items (aka tombstones).
+//
+// Items added to set should have IDs unique in time (or at least unique for
tandrii(chromium) 2017/07/18 15:55:14 1. I **feel**: "the" set, and "the" duration. But
Vadim Sh. 2017/07/24 00:30:16 Done.
+// duration of some configurable time interval, as defined by 'TombstonesDelay'
+// property). It means removed items can be added back to the set right away.
+//
+// Properties:
+// * Batch 'Add' with configurable QPS limit.
+// * Transactional consistent 'Pop' (1 QPS limit).
+// * Non-transactional consistent 'List' (1 QPS limit).
+// * Popped items can't be re-added until their tombstones expire.
+//
+// These properties make dsset suitable for multiple producers/single consumer
tandrii(chromium) 2017/07/18 15:55:15 nit: no need for slash in MPSC
Vadim Sh. 2017/07/24 00:30:16 Done.
+// queues, where order of items is not important and each item has an identifier
+// unique in time.
tandrii(chromium) 2017/07/18 15:55:14 ditto for "in time". How about "unique in each tim
Vadim Sh. 2017/07/24 00:30:17 Done.
+//
+// Structurally dsset consists of N+1 entity groups:
+// * N separate entity groups that contain N shards of the set.
+// * 1 entity group (with a configurable root) that holds tombstones.
+//
+// It is safe to increase number of shards at any time. Decreasing number of
+// shards is dangerous (but can be done with some more coding).
+//
+// More shards make:
+// * Add() less contentious (so it can support more QPS).
+// * List() and CleanupStorage() slower and more expensive.
+// * Pop() is not affected by number of shards.
+package dsset
+
+import (
+ "fmt"
+ "sync"
+ "time"
+
+ "golang.org/x/net/context"
+
+ "github.com/luci/gae/service/datastore"
+ "github.com/luci/luci-go/common/clock"
+ "github.com/luci/luci-go/common/data/rand/mathrand"
+ "github.com/luci/luci-go/common/data/stringset"
+ "github.com/luci/luci-go/common/errors"
+ "github.com/luci/luci-go/common/logging"
+ "github.com/luci/luci-go/common/retry/transient"
+)
+
+// Set holds a set of Items and uses Tombstones to achieve idempotency of Add.
+//
+// Produces just call Add(...).
tandrii(chromium) 2017/07/18 15:55:14 Producers
Vadim Sh. 2017/07/24 00:30:16 Done.
+//
+// A consumer need to run more elaborate algorithm that ensures atomicity of
tandrii(chromium) 2017/07/18 15:55:15 consider: s/A consumer need/The consumer (there sh
Vadim Sh. 2017/07/24 00:30:16 Done.
+// 'Pop' and takes care of cleanup up of the garbage. This requires a mix of
tandrii(chromium) 2017/07/18 15:55:15 cleanup up -> cleanup OR cleaning up
Vadim Sh. 2017/07/24 00:30:17 Done.
+// transactional and non-transactional actions:
+//
+// items, tombstones, err := set.List(ctx)
+// if err != nil {
+// return err
+// }
+//
+// if len(items) == 0 && len(tombstones) == 0 {
+// return nil
+// }
+//
+// if err := set.CleanupStorage(tombstones); err != nil {
+// return err
+// }
+//
+// ... Fetch any additional info associated with 'items' ...
+//
+// return datastore.RunInTransaction(ctx, func(ctx context.Context) error {
+// op, err := set.BeginPop(ctx)
+// if err != nil {
+// return err
+// }
+// op.CleanupTombstones(tombstones)
+// for _, itm := range items {
+// if op.Pop(item.ID) {
+// // The item was indeed in the set and we've just removed it!
+// } else {
+// // Some other transaction has popped it already.
+// }
+// }
+// return op.Submit()
+// }, nil)
+type Set struct {
+ ID string // global ID, used to construct datastore keys
+ ShardCount int // number of entity groups to use for storage
+ TombstonesRoot *datastore.Key // tombstones entity parent key
+ TombstonesDelay time.Duration // how long to keep tombstones in the set
+}
+
+// Item is what's stored in the set.
+type Item struct {
+ ID string // unique in time identifier of the item
+ Value []byte // arbitrary value (<1 MB, but preferably much smaller)
+}
+
+// Tombstone is a reference to a deleted item that still lingers in the set.
+//
+// Tombstones exist to make sure recently popped items do not reappear in the
+// set if producers attempt to re-add them.
+//
+// Its fields are intentionally private to force correct usage CleanupStorage
tandrii(chromium) 2017/07/18 15:55:14 usage of
Vadim Sh. 2017/07/24 00:30:17 Done.
+// and CleanupTombstones.
+type Tombstone struct {
+ set string // parent set name
tandrii(chromium) 2017/07/18 15:55:15 parent set ID, right?
Vadim Sh. 2017/07/24 00:30:17 Yes. 'ID' used to be 'Name', I changed it midway a
+ id string // deleted item ID
+ storage []*datastore.Key // itemEntity's to delete
+}
+
+// Add adds a bunch of items to the set.
tandrii(chromium) 2017/07/18 15:55:15 s/set./set. idempotently.
Vadim Sh. 2017/07/24 00:30:16 Done.
+//
+// Writes to some single entity group (not known in advance). If called outside
+// of a transaction and the call fails, may add only some subset of items.
+// Running inside a transaction makes this operation atomic.
+//
+// If items with given keys are already in the set, or has been deleted
tandrii(chromium) 2017/07/18 15:55:14 s/has/have
Vadim Sh. 2017/07/24 00:30:16 Done.
+// recently, they won't be re-added. No error is returned in this case.
+//
+// Returns only transient errors.
+func (s *Set) Add(c context.Context, items []Item) error {
+ // TODO: batch + parallel
tandrii(chromium) 2017/07/18 15:55:15 what's batch for? If # of items added is too large
Vadim Sh. 2017/07/24 00:30:17 Yes. There's a limit on number of entities in sing
tandrii(chromium) 2017/07/31 18:58:46 Acknowledged.
+
+ // Pick a random shard and add all new items there. If this is a retry, they
+ // may exist in some other shard already. We don't care, they'll be
+ // deduplicated in 'List'. If added items has been popped already (they have
tandrii(chromium) 2017/07/18 15:55:14 s/has/have
Vadim Sh. 2017/07/24 00:30:15 Done.
+ // tombstones), 'List' will omit them as well.
+ shardRoot := s.shardRoot(c, mathrand.Intn(c, s.ShardCount))
+ entities := make([]itemEntity, len(items))
+ for i, itm := range items {
+ entities[i] = itemEntity{
+ ID: itm.ID,
+ Parent: shardRoot,
+ Value: itm.Value,
+ }
+ }
+ return transient.Tag.Apply(datastore.Put(c, entities))
+}
+
+// List returns all items that are currently in the set (in arbitrary order),
+// as well as set of tombstones that points to items that were deleted
tandrii(chromium) 2017/07/18 15:55:15 as well as A set
Vadim Sh. 2017/07/24 00:30:16 Done.
+// sufficiently long ago and can be cleaned up now.
+//
+// Must be called outside of transactions (panics otherwise). Reads many entity
+// groups, including TombstonesRoot one.
+//
+// The set of tombstones to cleanup can be passed to CleanupStorage, and
+// later to CleanupTombstones (during pop operation), in that order. Not doing
+// so will lead to accumulation of garbage in the set that will slow down List
+// and Pop.
+//
+// Returns only transient errors.
+func (s *Set) List(c context.Context) (items []Item, cleanup []Tombstone, err error) {
+ if datastore.CurrentTransaction(c) != nil {
+ panic("dsset.Set.List must be called outside of a transaction")
+ }
+
+ // Fetch all shards (via consistent ancestor queries) and all tombstones.
+
+ shards := make([][]*itemEntity, s.ShardCount)
+ tombsEntity := tombstonesEntity{ID: s.ID, Parent: s.TombstonesRoot}
+
+ wg := sync.WaitGroup{}
+ errs := errors.NewLazyMultiError(s.ShardCount + 1)
+
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ if err := datastore.Get(c, &tombsEntity); err != nil && err != datastore.ErrNoSuchEntity {
+ errs.Assign(0, err)
+ }
+ }()
+
+ wg.Add(s.ShardCount)
+ for i := 0; i < s.ShardCount; i++ {
+ go func(i int) {
+ defer wg.Done()
+ q := datastore.NewQuery("dsset.Item").Ancestor(s.shardRoot(c, i))
+ errs.Assign(i+1, datastore.GetAll(c, q, &shards[i]))
+ }(i)
+ }
+
+ wg.Wait()
+ if err := errs.Get(); err != nil {
+ return nil, nil, transient.Tag.Apply(err)
+ }
+
+ logging.Infof(c, "%s: %d tombs", s.ID, len(tombsEntity.Tombstones))
+ for i, shard := range shards {
+ logging.Infof(c, "%s: shard #%d - %d items", s.ID, i, len(shard))
+ }
+
+ // A set of items we pretend not to see. Initially all tombstoned ones.
+ //
+ // Since we are iterating over tombstone list anyway, find all sufficiently
+ // old tombstones to return them to the caller, so they can be cleaned up.
+ ignore := stringset.New(len(tombsEntity.Tombstones))
+ oldestTombs := map[string]*Tombstone{} // key is item ID
+ now := clock.Now(c).UTC()
+ for _, t := range tombsEntity.Tombstones {
+ ignore.Add(t.ID)
+ if now.Sub(t.Tombstoned) > s.TombstonesDelay {
+ oldestTombs[t.ID] = &Tombstone{set: s.ID, id: t.ID}
+ }
+ }
+
+ // Join all shards, throwing away tombstoned and duplicated items. For oldest
+ // tombstones collect references to entities that need to be deleted. There
+ // may be more than one such entity per ID due to possibility of different
+ // shards hosting same item (if Add call was retried)!
tandrii(chromium) 2017/07/18 17:54:18 the same item
Vadim Sh. 2017/07/24 00:30:18 Done.
+ for _, shard := range shards {
+ for _, itemEntity := range shard {
+ if !ignore.Has(itemEntity.ID) {
+ items = append(items, Item{
+ ID: itemEntity.ID,
+ Value: itemEntity.Value,
+ })
+ ignore.Add(itemEntity.ID)
+ }
+ if tomb := oldestTombs[itemEntity.ID]; tomb != nil {
+ tomb.storage = append(tomb.storage, datastore.KeyForObj(c, itemEntity))
+ }
+ }
+ }
+
+ if len(oldestTombs) != 0 {
+ cleanup = make([]Tombstone, 0, len(oldestTombs))
+ for _, tomb := range oldestTombs {
+ cleanup = append(cleanup, *tomb)
+ }
+ }
+
+ return items, cleanup, nil
+}
+
+// CleanupStorage deletes entities used to store items under given tombstones.
tandrii(chromium) 2017/07/18 17:54:18 nit: I'd call it GC :)
Vadim Sh. 2017/07/24 00:30:17 There are two "kinds" of GC: one removes itemEntit
+//
+// Touches many entity groups. Must be called outside of transactions.
+// Idempotent.
+//
+// This MUST be called before tombstones are popped. Failure to do so will make
tandrii(chromium) 2017/07/18 17:54:18 is this worth enforcing? If so, then we could reco
Vadim Sh. 2017/07/24 00:30:16 Done, sort of.
+// items reappear in the set.
+//
+// Returns only transient errors.
+func (s *Set) CleanupStorage(c context.Context, cleanup []Tombstone) error {
+ if datastore.CurrentTransaction(c) != nil {
+ panic("dsset.Set.CleanupStorage must be called outside of a transaction")
+ }
+
+ keys := []*datastore.Key{}
+ for _, old := range cleanup {
+ if old.set != s.ID {
+ panic("passed Tombstone from some other set")
+ }
+ keys = append(keys, old.storage...)
+ }
+
+ // TODO: parallel
+
+ merr := errors.MultiError{}
+ for len(keys) > 0 {
+ count := 100
+ if count > len(keys) {
+ count = len(keys)
+ }
+ logging.Infof(c, "Cleaning up storage for %q by removing %d keys", s.ID, count)
+ if err := datastore.Delete(c, keys[:count]); err != nil {
+ logging.Warningf(c, "RPC failed - %s", err)
+ merr = append(merr, err)
+ }
+ keys = keys[count:]
+ }
+
+ if len(merr) == 0 {
+ return nil
+ }
+
+ return transient.Tag.Apply(merr)
+}
+
+// PopOp is an in-progress Pop operation.
+//
+// See BeginPop.
+type PopOp struct {
+ ctx context.Context // datastore context to use for this op
+ set string // id of the set
+ now time.Time // popping time for all popped items
+ entity *tombstonesEntity // entity with tombstones
+ tombs map[string]tombstone // tombstones in a map form
+ dirty bool // true if the tombstone map was modified
+}
+
+// BeginPop initiates Pop operation.
+//
+// Pop operation is used to transactionally remove items from the set, as well
+// as cleanup old tombstones. It must be finished with Submit.
+//
+// Requires a transaction. Modifies TombstonesRoot entity group (and only it).
+//
+// Returns only transient errors.
+func (s *Set) BeginPop(c context.Context) (*PopOp, error) {
+ if datastore.CurrentTransaction(c) == nil {
+ panic("dsset.Set.BeginPop must be called inside a transaction")
+ }
+
+ entity := &tombstonesEntity{ID: s.ID, Parent: s.TombstonesRoot}
+ if err := datastore.Get(c, entity); err != nil && err != datastore.ErrNoSuchEntity {
+ return nil, transient.Tag.Apply(err)
+ }
+
+ // The data in tombstonesEntity, in map form.
+ tombs := make(map[string]tombstone, len(entity.Tombstones))
+ for _, t := range entity.Tombstones {
+ tombs[t.ID] = t
+ }
+
+ return &PopOp{
+ ctx: c,
+ set: s.ID,
+ now: clock.Now(c).UTC(),
+ entity: entity,
+ tombs: tombs,
+ dirty: false,
+ }, nil
+}
+
+// AlreadyPopped returns true if the given item has already been popped, perhaps
+// in another transaction.
+//
+// Returns false for items that are still in the set, or items that have never
+// been in the set to being with. Use List (prior to starting the transaction)
tandrii(chromium) 2017/07/18 17:54:18 to beGIN with
Vadim Sh. 2017/07/24 00:30:17 Done.
+// to distinguish this two cases.
tandrii(chromium) 2017/07/18 17:54:18 these
Vadim Sh. 2017/07/24 00:30:17 Done.
+func (p *PopOp) AlreadyPopped(id string) bool {
+ _, hasTomb := p.tombs[id]
+ return hasTomb
+}
+
+// Pop pop the item from the set and returns true if it was indeed
tandrii(chromium) 2017/07/18 17:54:18 Pop pops
Vadim Sh. 2017/07/24 00:30:17 Done.
+// popped.
+//
+// Returns false if this item has been popped before (perhaps in another
+// transaction).
+//
+// Returns true if item existed in the set and was popped, or it never existed
+// there at all. Popping items that have never been in the set leads to weird
+// behavior, don't do this. Use List (prior to starting the transaction)
+// to figure out what items can be potentially popped.
+func (p *PopOp) Pop(id string) bool {
+ if _, hasTomb := p.tombs[id]; hasTomb {
+ return false
+ }
+ p.tombs[id] = tombstone{ID: id, Tombstoned: p.now}
tandrii(chromium) 2017/07/18 17:54:19 Suppose List is called > TombstonesDelay seconds b
Vadim Sh. 2017/07/24 00:30:16 It is single consumer mostly because 1 QPS limit o
+ p.dirty = true
+ return true
+}
+
+// CleanupTombstones removes old tombstones.
+//
+// This must be called only after set's CleanupStorage method, otherwise removed
+// items may reappear again.
+func (p *PopOp) CleanupTombstones(tombstones []Tombstone) {
+ for _, old := range tombstones {
+ if old.set != p.set {
+ panic("passed Tombstone from some other set")
+ }
+ if _, hasTomb := p.tombs[old.id]; hasTomb {
+ delete(p.tombs, old.id)
+ p.dirty = true
+ }
+ }
+}
+
+// Submit completes this pop operation but submitting changes to datastore.
tandrii(chromium) 2017/07/18 15:55:14 s/but/by
Vadim Sh. 2017/07/24 00:30:15 Done.
+//
+// The operation is unusable after this.
tandrii(chromium) 2017/07/18 17:54:19 wdyt about adding a field "submitted"=false by def
Vadim Sh. 2017/07/24 00:30:17 I did it initially, but the removed to simplify co
+func (p *PopOp) Submit() error {
+ if p.dirty {
+ p.entity.Tombstones = p.entity.Tombstones[:0]
+ for _, tomb := range p.tombs {
+ p.entity.Tombstones = append(p.entity.Tombstones, tomb)
+ }
+ if err := datastore.Put(p.ctx, p.entity); err != nil {
+ return transient.Tag.Apply(err)
+ }
+ }
+ return nil
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+type itemEntity struct {
+ _kind string `gae:"$kind,dsset.Item"`
+
+ ID string `gae:"$id"`
+ Parent *datastore.Key `gae:"$parent"`
+ Value []byte `gae:",noindex"`
+}
+
+type tombstonesEntity struct {
+ _kind string `gae:"$kind,dsset.Tombstones"`
+
+ ID string `gae:"$id"`
+ Parent *datastore.Key `gae:"$parent"`
+ Tombstones []tombstone `gae:",noindex"`
+}
+
+type tombstone struct {
+ ID string // ID of tombstoned item
+ Tombstoned time.Time // when it was popped
+}
+
+// shardRoot returns entity group key to use for a given shard.
+func (s *Set) shardRoot(c context.Context, n int) *datastore.Key {
+ return datastore.NewKey(c, "dsset.Shard", fmt.Sprintf("%s:%d", s.ID, n), 0, nil)
+}

Powered by Google App Engine
This is Rietveld 408576698