Chromium Code Reviews| Index: scheduler/appengine/engine/dsset/dsset.go |
| diff --git a/scheduler/appengine/engine/dsset/dsset.go b/scheduler/appengine/engine/dsset/dsset.go |
| new file mode 100644 |
| index 0000000000000000000000000000000000000000..a09511025318a12e39b5ac8944d75f6355159b9a |
| --- /dev/null |
| +++ b/scheduler/appengine/engine/dsset/dsset.go |
| @@ -0,0 +1,431 @@ |
| +// Copyright 2017 The LUCI Authors. |
| +// |
| +// Licensed under the Apache License, Version 2.0 (the "License"); |
| +// you may not use this file except in compliance with the License. |
| +// You may obtain a copy of the License at |
| +// |
| +// http://www.apache.org/licenses/LICENSE-2.0 |
| +// |
| +// Unless required by applicable law or agreed to in writing, software |
| +// distributed under the License is distributed on an "AS IS" BASIS, |
| +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| +// See the License for the specific language governing permissions and |
| +// limitations under the License. |
| + |
| +// Package dsset implements a particular flavor of datastore-backed set. |
| +// |
| +// Due to its internal structure, it requires some maintenance on behalf of the |
| +// caller to periodically cleanup removed items (aka tombstones). |
| +// |
| +// Items added to set should have IDs unique in time (or at least unique for |
|
tandrii(chromium)
2017/07/18 15:55:14
1. I **feel**: "the" set, and "the" duration. But
Vadim Sh.
2017/07/24 00:30:16
Done.
|
| +// duration of some configurable time interval, as defined by 'TombstonesDelay' |
| +// property). It means removed items can be added back to the set right away. |
| +// |
| +// Properties: |
| +// * Batch 'Add' with configurable QPS limit. |
| +// * Transactional consistent 'Pop' (1 QPS limit). |
| +// * Non-transactional consistent 'List' (1 QPS limit). |
| +// * Popped items can't be re-added until their tombstones expire. |
| +// |
| +// These properties make dsset suitable for multiple producers/single consumer |
|
tandrii(chromium)
2017/07/18 15:55:15
nit: no need for slash in MPSC
Vadim Sh.
2017/07/24 00:30:16
Done.
|
| +// queues, where order of items is not important and each item has an identifier |
| +// unique in time. |
|
tandrii(chromium)
2017/07/18 15:55:14
ditto for "in time". How about "unique in each tim
Vadim Sh.
2017/07/24 00:30:17
Done.
|
| +// |
| +// Structurally dsset consists of N+1 entity groups: |
| +// * N separate entity groups that contain N shards of the set. |
| +// * 1 entity group (with a configurable root) that holds tombstones. |
| +// |
| +// It is safe to increase number of shards at any time. Decreasing number of |
| +// shards is dangerous (but can be done with some more coding). |
| +// |
| +// More shards make: |
| +// * Add() less contentious (so it can support more QPS). |
| +// * List() and CleanupStorage() slower and more expensive. |
| +// * Pop() is not affected by number of shards. |
| +package dsset |
| + |
| +import ( |
| + "fmt" |
| + "sync" |
| + "time" |
| + |
| + "golang.org/x/net/context" |
| + |
| + "github.com/luci/gae/service/datastore" |
| + "github.com/luci/luci-go/common/clock" |
| + "github.com/luci/luci-go/common/data/rand/mathrand" |
| + "github.com/luci/luci-go/common/data/stringset" |
| + "github.com/luci/luci-go/common/errors" |
| + "github.com/luci/luci-go/common/logging" |
| + "github.com/luci/luci-go/common/retry/transient" |
| +) |
| + |
| +// Set holds a set of Items and uses Tombstones to achieve idempotency of Add. |
| +// |
| +// Produces just call Add(...). |
|
tandrii(chromium)
2017/07/18 15:55:14
Producers
Vadim Sh.
2017/07/24 00:30:16
Done.
|
| +// |
| +// A consumer need to run more elaborate algorithm that ensures atomicity of |
|
tandrii(chromium)
2017/07/18 15:55:15
consider: s/A consumer need/The consumer (there sh
Vadim Sh.
2017/07/24 00:30:16
Done.
|
| +// 'Pop' and takes care of cleanup up of the garbage. This requires a mix of |
|
tandrii(chromium)
2017/07/18 15:55:15
cleanup up -> cleanup OR cleaning up
Vadim Sh.
2017/07/24 00:30:17
Done.
|
| +// transactional and non-transactional actions: |
| +// |
| +// items, tombstones, err := set.List(ctx) |
| +// if err != nil { |
| +// return err |
| +// } |
| +// |
| +// if len(items) == 0 && len(tombstones) == 0 { |
| +// return nil |
| +// } |
| +// |
| +// if err := set.CleanupStorage(tombstones); err != nil { |
| +// return err |
| +// } |
| +// |
| +// ... Fetch any additional info associated with 'items' ... |
| +// |
| +// return datastore.RunInTransaction(ctx, func(ctx context.Context) error { |
| +// op, err := set.BeginPop(ctx) |
| +// if err != nil { |
| +// return err |
| +// } |
| +// op.CleanupTombstones(tombstones) |
| +// for _, itm := range items { |
| +// if op.Pop(item.ID) { |
| +// // The item was indeed in the set and we've just removed it! |
| +// } else { |
| +// // Some other transaction has popped it already. |
| +// } |
| +// } |
| +// return op.Submit() |
| +// }, nil) |
| +type Set struct { |
| + ID string // global ID, used to construct datastore keys |
| + ShardCount int // number of entity groups to use for storage |
| + TombstonesRoot *datastore.Key // tombstones entity parent key |
| + TombstonesDelay time.Duration // how long to keep tombstones in the set |
| +} |
| + |
| +// Item is what's stored in the set. |
| +type Item struct { |
| + ID string // unique in time identifier of the item |
| + Value []byte // arbitrary value (<1 MB, but preferably much smaller) |
| +} |
| + |
| +// Tombstone is a reference to a deleted item that still lingers in the set. |
| +// |
| +// Tombstones exist to make sure recently popped items do not reappear in the |
| +// set if producers attempt to re-add them. |
| +// |
| +// Its fields are intentionally private to force correct usage CleanupStorage |
|
tandrii(chromium)
2017/07/18 15:55:14
usage of
Vadim Sh.
2017/07/24 00:30:17
Done.
|
| +// and CleanupTombstones. |
| +type Tombstone struct { |
| + set string // parent set name |
|
tandrii(chromium)
2017/07/18 15:55:15
parent set ID, right?
Vadim Sh.
2017/07/24 00:30:17
Yes. 'ID' used to be 'Name', I changed it midway a
|
| + id string // deleted item ID |
| + storage []*datastore.Key // itemEntity's to delete |
| +} |
| + |
| +// Add adds a bunch of items to the set. |
|
tandrii(chromium)
2017/07/18 15:55:15
s/set./set. idempotently.
Vadim Sh.
2017/07/24 00:30:16
Done.
|
| +// |
| +// Writes to some single entity group (not known in advance). If called outside |
| +// of a transaction and the call fails, may add only some subset of items. |
| +// Running inside a transaction makes this operation atomic. |
| +// |
| +// If items with given keys are already in the set, or has been deleted |
|
tandrii(chromium)
2017/07/18 15:55:14
s/has/have
Vadim Sh.
2017/07/24 00:30:16
Done.
|
| +// recently, they won't be re-added. No error is returned in this case. |
| +// |
| +// Returns only transient errors. |
| +func (s *Set) Add(c context.Context, items []Item) error { |
| + // TODO: batch + parallel |
|
tandrii(chromium)
2017/07/18 15:55:15
what's batch for? If # of items added is too large
Vadim Sh.
2017/07/24 00:30:17
Yes. There's a limit on number of entities in sing
tandrii(chromium)
2017/07/31 18:58:46
Acknowledged.
|
| + |
| + // Pick a random shard and add all new items there. If this is a retry, they |
| + // may exist in some other shard already. We don't care, they'll be |
| + // deduplicated in 'List'. If added items has been popped already (they have |
|
tandrii(chromium)
2017/07/18 15:55:14
s/has/have
Vadim Sh.
2017/07/24 00:30:15
Done.
|
| + // tombstones), 'List' will omit them as well. |
| + shardRoot := s.shardRoot(c, mathrand.Intn(c, s.ShardCount)) |
| + entities := make([]itemEntity, len(items)) |
| + for i, itm := range items { |
| + entities[i] = itemEntity{ |
| + ID: itm.ID, |
| + Parent: shardRoot, |
| + Value: itm.Value, |
| + } |
| + } |
| + return transient.Tag.Apply(datastore.Put(c, entities)) |
| +} |
| + |
| +// List returns all items that are currently in the set (in arbitrary order), |
| +// as well as set of tombstones that points to items that were deleted |
|
tandrii(chromium)
2017/07/18 15:55:15
as well as A set
Vadim Sh.
2017/07/24 00:30:16
Done.
|
| +// sufficiently long ago and can be cleaned up now. |
| +// |
| +// Must be called outside of transactions (panics otherwise). Reads many entity |
| +// groups, including TombstonesRoot one. |
| +// |
| +// The set of tombstones to cleanup can be passed to CleanupStorage, and |
| +// later to CleanupTombstones (during pop operation), in that order. Not doing |
| +// so will lead to accumulation of garbage in the set that will slow down List |
| +// and Pop. |
| +// |
| +// Returns only transient errors. |
| +func (s *Set) List(c context.Context) (items []Item, cleanup []Tombstone, err error) { |
| + if datastore.CurrentTransaction(c) != nil { |
| + panic("dsset.Set.List must be called outside of a transaction") |
| + } |
| + |
| + // Fetch all shards (via consistent ancestor queries) and all tombstones. |
| + |
| + shards := make([][]*itemEntity, s.ShardCount) |
| + tombsEntity := tombstonesEntity{ID: s.ID, Parent: s.TombstonesRoot} |
| + |
| + wg := sync.WaitGroup{} |
| + errs := errors.NewLazyMultiError(s.ShardCount + 1) |
| + |
| + wg.Add(1) |
| + go func() { |
| + defer wg.Done() |
| + if err := datastore.Get(c, &tombsEntity); err != nil && err != datastore.ErrNoSuchEntity { |
| + errs.Assign(0, err) |
| + } |
| + }() |
| + |
| + wg.Add(s.ShardCount) |
| + for i := 0; i < s.ShardCount; i++ { |
| + go func(i int) { |
| + defer wg.Done() |
| + q := datastore.NewQuery("dsset.Item").Ancestor(s.shardRoot(c, i)) |
| + errs.Assign(i+1, datastore.GetAll(c, q, &shards[i])) |
| + }(i) |
| + } |
| + |
| + wg.Wait() |
| + if err := errs.Get(); err != nil { |
| + return nil, nil, transient.Tag.Apply(err) |
| + } |
| + |
| + logging.Infof(c, "%s: %d tombs", s.ID, len(tombsEntity.Tombstones)) |
| + for i, shard := range shards { |
| + logging.Infof(c, "%s: shard #%d - %d items", s.ID, i, len(shard)) |
| + } |
| + |
| + // A set of items we pretend not to see. Initially all tombstoned ones. |
| + // |
| + // Since we are iterating over tombstone list anyway, find all sufficiently |
| + // old tombstones to return them to the caller, so they can be cleaned up. |
| + ignore := stringset.New(len(tombsEntity.Tombstones)) |
| + oldestTombs := map[string]*Tombstone{} // key is item ID |
| + now := clock.Now(c).UTC() |
| + for _, t := range tombsEntity.Tombstones { |
| + ignore.Add(t.ID) |
| + if now.Sub(t.Tombstoned) > s.TombstonesDelay { |
| + oldestTombs[t.ID] = &Tombstone{set: s.ID, id: t.ID} |
| + } |
| + } |
| + |
| + // Join all shards, throwing away tombstoned and duplicated items. For oldest |
| + // tombstones collect references to entities that need to be deleted. There |
| + // may be more than one such entity per ID due to possibility of different |
| + // shards hosting same item (if Add call was retried)! |
|
tandrii(chromium)
2017/07/18 17:54:18
the same item
Vadim Sh.
2017/07/24 00:30:18
Done.
|
| + for _, shard := range shards { |
| + for _, itemEntity := range shard { |
| + if !ignore.Has(itemEntity.ID) { |
| + items = append(items, Item{ |
| + ID: itemEntity.ID, |
| + Value: itemEntity.Value, |
| + }) |
| + ignore.Add(itemEntity.ID) |
| + } |
| + if tomb := oldestTombs[itemEntity.ID]; tomb != nil { |
| + tomb.storage = append(tomb.storage, datastore.KeyForObj(c, itemEntity)) |
| + } |
| + } |
| + } |
| + |
| + if len(oldestTombs) != 0 { |
| + cleanup = make([]Tombstone, 0, len(oldestTombs)) |
| + for _, tomb := range oldestTombs { |
| + cleanup = append(cleanup, *tomb) |
| + } |
| + } |
| + |
| + return items, cleanup, nil |
| +} |
| + |
| +// CleanupStorage deletes entities used to store items under given tombstones. |
|
tandrii(chromium)
2017/07/18 17:54:18
nit: I'd call it GC :)
Vadim Sh.
2017/07/24 00:30:17
There are two "kinds" of GC: one removes itemEntit
|
| +// |
| +// Touches many entity groups. Must be called outside of transactions. |
| +// Idempotent. |
| +// |
| +// This MUST be called before tombstones are popped. Failure to do so will make |
|
tandrii(chromium)
2017/07/18 17:54:18
is this worth enforcing? If so, then we could reco
Vadim Sh.
2017/07/24 00:30:16
Done, sort of.
|
| +// items reappear in the set. |
| +// |
| +// Returns only transient errors. |
| +func (s *Set) CleanupStorage(c context.Context, cleanup []Tombstone) error { |
| + if datastore.CurrentTransaction(c) != nil { |
| + panic("dsset.Set.CleanupStorage must be called outside of a transaction") |
| + } |
| + |
| + keys := []*datastore.Key{} |
| + for _, old := range cleanup { |
| + if old.set != s.ID { |
| + panic("passed Tombstone from some other set") |
| + } |
| + keys = append(keys, old.storage...) |
| + } |
| + |
| + // TODO: parallel |
| + |
| + merr := errors.MultiError{} |
| + for len(keys) > 0 { |
| + count := 100 |
| + if count > len(keys) { |
| + count = len(keys) |
| + } |
| + logging.Infof(c, "Cleaning up storage for %q by removing %d keys", s.ID, count) |
| + if err := datastore.Delete(c, keys[:count]); err != nil { |
| + logging.Warningf(c, "RPC failed - %s", err) |
| + merr = append(merr, err) |
| + } |
| + keys = keys[count:] |
| + } |
| + |
| + if len(merr) == 0 { |
| + return nil |
| + } |
| + |
| + return transient.Tag.Apply(merr) |
| +} |
| + |
| +// PopOp is an in-progress Pop operation. |
| +// |
| +// See BeginPop. |
| +type PopOp struct { |
| + ctx context.Context // datastore context to use for this op |
| + set string // id of the set |
| + now time.Time // popping time for all popped items |
| + entity *tombstonesEntity // entity with tombstones |
| + tombs map[string]tombstone // tombstones in a map form |
| + dirty bool // true if the tombstone map was modified |
| +} |
| + |
| +// BeginPop initiates Pop operation. |
| +// |
| +// Pop operation is used to transactionally remove items from the set, as well |
| +// as cleanup old tombstones. It must be finished with Submit. |
| +// |
| +// Requires a transaction. Modifies TombstonesRoot entity group (and only it). |
| +// |
| +// Returns only transient errors. |
| +func (s *Set) BeginPop(c context.Context) (*PopOp, error) { |
| + if datastore.CurrentTransaction(c) == nil { |
| + panic("dsset.Set.BeginPop must be called inside a transaction") |
| + } |
| + |
| + entity := &tombstonesEntity{ID: s.ID, Parent: s.TombstonesRoot} |
| + if err := datastore.Get(c, entity); err != nil && err != datastore.ErrNoSuchEntity { |
| + return nil, transient.Tag.Apply(err) |
| + } |
| + |
| + // The data in tombstonesEntity, in map form. |
| + tombs := make(map[string]tombstone, len(entity.Tombstones)) |
| + for _, t := range entity.Tombstones { |
| + tombs[t.ID] = t |
| + } |
| + |
| + return &PopOp{ |
| + ctx: c, |
| + set: s.ID, |
| + now: clock.Now(c).UTC(), |
| + entity: entity, |
| + tombs: tombs, |
| + dirty: false, |
| + }, nil |
| +} |
| + |
| +// AlreadyPopped returns true if the given item has already been popped, perhaps |
| +// in another transaction. |
| +// |
| +// Returns false for items that are still in the set, or items that have never |
| +// been in the set to being with. Use List (prior to starting the transaction) |
|
tandrii(chromium)
2017/07/18 17:54:18
to beGIN with
Vadim Sh.
2017/07/24 00:30:17
Done.
|
| +// to distinguish this two cases. |
|
tandrii(chromium)
2017/07/18 17:54:18
these
Vadim Sh.
2017/07/24 00:30:17
Done.
|
| +func (p *PopOp) AlreadyPopped(id string) bool { |
| + _, hasTomb := p.tombs[id] |
| + return hasTomb |
| +} |
| + |
| +// Pop pop the item from the set and returns true if it was indeed |
|
tandrii(chromium)
2017/07/18 17:54:18
Pop pops
Vadim Sh.
2017/07/24 00:30:17
Done.
|
| +// popped. |
| +// |
| +// Returns false if this item has been popped before (perhaps in another |
| +// transaction). |
| +// |
| +// Returns true if item existed in the set and was popped, or it never existed |
| +// there at all. Popping items that have never been in the set leads to weird |
| +// behavior, don't do this. Use List (prior to starting the transaction) |
| +// to figure out what items can be potentially popped. |
| +func (p *PopOp) Pop(id string) bool { |
| + if _, hasTomb := p.tombs[id]; hasTomb { |
| + return false |
| + } |
| + p.tombs[id] = tombstone{ID: id, Tombstoned: p.now} |
|
tandrii(chromium)
2017/07/18 17:54:19
Suppose List is called > TombstonesDelay seconds b
Vadim Sh.
2017/07/24 00:30:16
It is single consumer mostly because 1 QPS limit o
|
| + p.dirty = true |
| + return true |
| +} |
| + |
| +// CleanupTombstones removes old tombstones. |
| +// |
| +// This must be called only after set's CleanupStorage method, otherwise removed |
| +// items may reappear again. |
| +func (p *PopOp) CleanupTombstones(tombstones []Tombstone) { |
| + for _, old := range tombstones { |
| + if old.set != p.set { |
| + panic("passed Tombstone from some other set") |
| + } |
| + if _, hasTomb := p.tombs[old.id]; hasTomb { |
| + delete(p.tombs, old.id) |
| + p.dirty = true |
| + } |
| + } |
| +} |
| + |
| +// Submit completes this pop operation but submitting changes to datastore. |
|
tandrii(chromium)
2017/07/18 15:55:14
s/but/by
Vadim Sh.
2017/07/24 00:30:15
Done.
|
| +// |
| +// The operation is unusable after this. |
|
tandrii(chromium)
2017/07/18 17:54:19
wdyt about adding a field "submitted"=false by def
Vadim Sh.
2017/07/24 00:30:17
I did it initially, but the removed to simplify co
|
| +func (p *PopOp) Submit() error { |
| + if p.dirty { |
| + p.entity.Tombstones = p.entity.Tombstones[:0] |
| + for _, tomb := range p.tombs { |
| + p.entity.Tombstones = append(p.entity.Tombstones, tomb) |
| + } |
| + if err := datastore.Put(p.ctx, p.entity); err != nil { |
| + return transient.Tag.Apply(err) |
| + } |
| + } |
| + return nil |
| +} |
| + |
| +//////////////////////////////////////////////////////////////////////////////// |
| + |
| +type itemEntity struct { |
| + _kind string `gae:"$kind,dsset.Item"` |
| + |
| + ID string `gae:"$id"` |
| + Parent *datastore.Key `gae:"$parent"` |
| + Value []byte `gae:",noindex"` |
| +} |
| + |
| +type tombstonesEntity struct { |
| + _kind string `gae:"$kind,dsset.Tombstones"` |
| + |
| + ID string `gae:"$id"` |
| + Parent *datastore.Key `gae:"$parent"` |
| + Tombstones []tombstone `gae:",noindex"` |
| +} |
| + |
| +type tombstone struct { |
| + ID string // ID of tombstoned item |
| + Tombstoned time.Time // when it was popped |
| +} |
| + |
| +// shardRoot returns entity group key to use for a given shard. |
| +func (s *Set) shardRoot(c context.Context, n int) *datastore.Key { |
| + return datastore.NewKey(c, "dsset.Shard", fmt.Sprintf("%s:%d", s.ID, n), 0, nil) |
| +} |