| Index: scheduler/appengine/engine/dsset/dsset.go
|
| diff --git a/scheduler/appengine/engine/dsset/dsset.go b/scheduler/appengine/engine/dsset/dsset.go
|
| new file mode 100644
|
| index 0000000000000000000000000000000000000000..d57c1ba9d8c1491039a9386d41bb829c7b8c207b
|
| --- /dev/null
|
| +++ b/scheduler/appengine/engine/dsset/dsset.go
|
| @@ -0,0 +1,583 @@
|
| +// Copyright 2017 The LUCI Authors.
|
| +//
|
| +// Licensed under the Apache License, Version 2.0 (the "License");
|
| +// you may not use this file except in compliance with the License.
|
| +// You may obtain a copy of the License at
|
| +//
|
| +// http://www.apache.org/licenses/LICENSE-2.0
|
| +//
|
| +// Unless required by applicable law or agreed to in writing, software
|
| +// distributed under the License is distributed on an "AS IS" BASIS,
|
| +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
| +// See the License for the specific language governing permissions and
|
| +// limitations under the License.
|
| +
|
| +// Package dsset implements a particular flavor of datastore-backed set.
|
| +//
|
| +// Due to its internal structure, it requires some maintenance on behalf of the
|
| +// caller to periodically cleanup removed items (aka tombstones).
|
| +//
|
| +// Items added to the set should have unique IDs, at least for the duration of
|
| +// some configurable time interval, as defined by TombstonesDelay property.
|
| +// It means removed items can't be added back to the set right away (the set
|
| +// will think they are already there). This is required to make 'Add' operation
|
| +// idempotent.
|
| +//
|
| +// TombstonesDelay is assumed to be much larger than time scale of all "fast"
|
| +// processes in the system, in particular all List+Pop processes. For example,
|
| +// if List+Pop is expected to take 1 min, TombstonesDelay should be >> 1 min
|
| +// (e.g. 5 min). Setting TombstonesDelay to very large value is harmful though,
|
| +// since it may slow down 'List' and 'Pop' (by allowing more garbage that will
|
| +// have to be filtered out).
|
| +//
|
| +// Properties (where N is current size of the set):
|
| +// * Batch 'Add' with configurable QPS limit, O(1) performance.
|
| +// * Transactional consistent 'Pop' (1 QPS limit), O(N) performance.
|
| +// * Non-transactional consistent 'List' (1 QPS limit), O(N) performance.
|
| +// * Popped items can't be re-added until their tombstones expire.
|
| +//
|
| +// These properties make dsset suitable for multiple producers, single consumer
|
| +// queues, where order of items is not important, each item has a unique
|
| +// identifier, and the queue size is small.
|
| +//
|
| +// Structurally dsset consists of N+1 entity groups:
|
| +// * N separate entity groups that contain N shards of the set.
|
| +// * 1 entity group (with a configurable root) that holds tombstones.
|
| +//
|
| +// It is safe to increase number of shards at any time. Decreasing number of
|
| +// shards is dangerous (but can be done with some more coding).
|
| +//
|
| +// More shards make:
|
| +// * Add() less contentious (so it can support more QPS).
|
| +// * List() and CleanupStorage() slower and more expensive.
|
| +// * Pop() is not affected by number of shards.
|
| +package dsset
|
| +
|
| +import (
|
| + "fmt"
|
| + "sync"
|
| + "time"
|
| +
|
| + "golang.org/x/net/context"
|
| +
|
| + "github.com/luci/gae/service/datastore"
|
| + "github.com/luci/luci-go/common/clock"
|
| + "github.com/luci/luci-go/common/data/rand/mathrand"
|
| + "github.com/luci/luci-go/common/data/stringset"
|
| + "github.com/luci/luci-go/common/errors"
|
| + "github.com/luci/luci-go/common/retry/transient"
|
| +)
|
| +
|
| +// batchSize is total number of items to pass to PutMulti or DeleteMulti RPCs.
|
| +const batchSize = 500
|
| +
|
| +// Set holds a set of Items and uses Tombstones to achieve idempotency of Add.
|
| +//
|
| +// Producers just call Add(...).
|
| +//
|
| +// The consumer must run more elaborate algorithm that ensures atomicity of
|
| +// 'Pop' and takes care of cleaning up of the garbage. This requires a mix of
|
| +// transactional and non-transactional actions:
|
| +//
|
| +// listing, err := set.List(ctx)
|
| +// if err != nil || listing.Empty() {
|
| +// return err
|
| +// }
|
| +//
|
| +// if err := dsset.CleanupStorage(ctx, listing.Tombstones); err != nil {
|
| +// return err
|
| +// }
|
| +//
|
| +// ... Fetch any additional info associated with 'listing.Items' ...
|
| +//
|
| +// var tombstones []*dsset.Tombstone
|
| +// err = datastore.RunInTransaction(ctx, func(ctx context.Context) error {
|
| +// op, err := set.BeginPop(ctx, listing)
|
| +// if err != nil {
|
| +// return err
|
| +// }
|
| +// for _, itm := range items {
|
| +// if op.Pop(item.ID) {
|
| +// // The item was indeed in the set and we've just removed it!
|
| +// } else {
|
| +// // Some other transaction has popped it already.
|
| +// }
|
| +// }
|
| +// tombstones, err = dsset.FinishPop(ctx, op)
|
| +// return err
|
| +// }, nil)
|
| +// if err == nil {
|
| +// dsset.CleanupStorage(ctx, tombstones) // best-effort cleanup
|
| +// }
|
| +// return err
|
| +type Set struct {
|
| + ID string // global ID, used to construct datastore keys
|
| + ShardCount int // number of entity groups to use for storage
|
| + TombstonesRoot *datastore.Key // tombstones entity parent key
|
| + TombstonesDelay time.Duration // how long to keep tombstones in the set
|
| +}
|
| +
|
| +// Item is what's stored in the set.
|
| +type Item struct {
|
| + ID string // unique in time identifier of the item
|
| + Value []byte // arbitrary value (<1 MB, but preferably much smaller)
|
| +}
|
| +
|
| +// Listing is returned by 'List' call.
|
| +//
|
| +// It contains actual listing of items in the set, as well as a bunch of service
|
| +// information used by other operations ('CleanupStorage' and 'Pop') to keep
|
| +// the set in a garbage-free and consistent state.
|
| +//
|
| +// The only way to construct a correct Listing is to call 'List' method.
|
| +//
|
| +// See comments for Set struct and List method for more info.
|
| +type Listing struct {
|
| + Items []Item // all items in the set, in arbitrary order
|
| + Tombstones []*Tombstone // tombstones that can be cleaned up now
|
| +
|
| + set string // parent set ID
|
| + producedAt time.Time // when 'List' call was initiated
|
| + idToKeys map[string][]*datastore.Key // ID -> datastore keys to cleanup
|
| +}
|
| +
|
| +// Empty is true if both 'Items' and 'Tombstones' are empty
|
| +func (l *Listing) Empty() bool {
|
| + return len(l.Items) == 0 && len(l.Tombstones) == 0
|
| +}
|
| +
|
| +// Tombstone is a reference to a deleted item that still lingers in the set.
|
| +//
|
| +// Tombstones exist to make sure recently popped items do not reappear in the
|
| +// set if producers attempt to re-add them.
|
| +//
|
| +// Its fields are intentionally private to force correct usage of Set's methods.
|
| +type Tombstone struct {
|
| + id string // deleted item ID
|
| + storage []*datastore.Key // itemEntity's to delete in 'CleanupStorage'
|
| + old bool // true if tombstone should be popped in 'Pop'
|
| + cleanedUp bool // true if 'CleanupStorage' processed the tombstone
|
| +}
|
| +
|
| +// Add idempotently adds a bunch of items to the set.
|
| +//
|
| +// If items with given keys are already in the set, or have been deleted
|
| +// recently, they won't be re-added. No error is returned in this case. When
|
| +// retrying the call like that, the caller is responsible to pass exact same
|
| +// Item.Value, otherwise 'List' may return random variant of the added item.
|
| +//
|
| +// Writes to some single entity group (not known in advance). If called outside
|
| +// of a transaction and the call fails, may add only some subset of items.
|
| +// Running inside a transaction makes this operation atomic.
|
| +//
|
| +// Returns only transient errors.
|
| +func (s *Set) Add(c context.Context, items []Item) error {
|
| + // Pick a random shard and add all new items there. If this is a retry, they
|
| + // may exist in some other shard already. We don't care, they'll be
|
| + // deduplicated in 'List'. If added items have been popped already (they have
|
| + // tombstones), 'List' will omit them as well.
|
| + shardRoot := s.shardRoot(c, mathrand.Intn(c, s.ShardCount))
|
| + entities := make([]itemEntity, len(items))
|
| + for i, itm := range items {
|
| + entities[i] = itemEntity{
|
| + ID: itm.ID,
|
| + Parent: shardRoot,
|
| + Value: itm.Value,
|
| + }
|
| + }
|
| + return transient.Tag.Apply(batchOp(len(entities), func(start, end int) error {
|
| + return datastore.Put(c, entities[start:end])
|
| + }))
|
| +}
|
| +
|
| +// List returns all items that are currently in the set (in arbitrary order),
|
| +// as well as a set of tombstones that points to items that were previously
|
| +// popped and can be cleaned up now.
|
| +//
|
| +// Must be called outside of transactions (panics otherwise). Reads many entity
|
| +// groups, including TombstonesRoot one.
|
| +//
|
| +// The set of tombstones to cleanup can be passed to 'CleanupStorage', and
|
| +// later to 'BeginPop' (as party of the listing), in that order. Not doing
|
| +// so will lead to accumulation of garbage in the set that will slow down 'List'
|
| +// and 'Pop'.
|
| +//
|
| +// Returns only transient errors.
|
| +func (s *Set) List(c context.Context) (*Listing, error) {
|
| + if datastore.CurrentTransaction(c) != nil {
|
| + panic("dsset.Set.List must be called outside of a transaction")
|
| + }
|
| + now := clock.Now(c).UTC()
|
| +
|
| + // Fetch all shards (via consistent ancestor queries) and all tombstones.
|
| +
|
| + shards := make([][]*itemEntity, s.ShardCount)
|
| + tombsEntity := tombstonesEntity{ID: s.ID, Parent: s.TombstonesRoot}
|
| +
|
| + wg := sync.WaitGroup{}
|
| + wg.Add(1 + s.ShardCount)
|
| + errs := errors.NewLazyMultiError(s.ShardCount + 1)
|
| +
|
| + go func() {
|
| + defer wg.Done()
|
| + if err := datastore.Get(c, &tombsEntity); err != nil && err != datastore.ErrNoSuchEntity {
|
| + errs.Assign(0, err)
|
| + }
|
| + }()
|
| +
|
| + for i := 0; i < s.ShardCount; i++ {
|
| + go func(i int) {
|
| + defer wg.Done()
|
| + q := datastore.NewQuery("dsset.Item").Ancestor(s.shardRoot(c, i))
|
| + errs.Assign(i+1, datastore.GetAll(c, q, &shards[i]))
|
| + }(i)
|
| + }
|
| +
|
| + wg.Wait()
|
| + if err := errs.Get(); err != nil {
|
| + return nil, transient.Tag.Apply(err)
|
| + }
|
| +
|
| + // Mapping "item ID" => "list of entities to delete to remove it". This is
|
| + // eventually used by 'CleanupStorage'. Under normal circumstances, the list
|
| + // has only one item, but there can be more if 'Add' call was retried (so the
|
| + // item ends up in multiple different shards).
|
| + idToKeys := map[string][]*datastore.Key{}
|
| + for _, shard := range shards {
|
| + for _, e := range shard {
|
| + idToKeys[e.ID] = append(idToKeys[e.ID], datastore.KeyForObj(c, e))
|
| + }
|
| + }
|
| +
|
| + // A set of items we pretend not to see. Initially all tombstoned ones.
|
| + //
|
| + // Since we are iterating over tombstone list anyway, find all sufficiently
|
| + // old tombstones or tombstones that still have storage associated with them.
|
| + // We return them to the caller, so they can be cleaned up:
|
| + // * 'CleanupStorage' makes sure 'storage' entities are deleted.
|
| + // * 'BeginPop' completely erases old tombstones.
|
| + var tombs []*Tombstone
|
| + ignore := stringset.New(len(tombsEntity.Tombstones))
|
| + for _, t := range tombsEntity.Tombstones {
|
| + ignore.Add(t.ID)
|
| + old := now.Sub(t.Tombstoned) > s.TombstonesDelay
|
| + if storage := idToKeys[t.ID]; len(storage) > 0 || old {
|
| + tombs = append(tombs, &Tombstone{
|
| + id: t.ID,
|
| + storage: storage,
|
| + old: old, // if true, BeginPop will delete this tombstone
|
| + })
|
| + }
|
| + }
|
| +
|
| + // Join all shards, throwing away tombstoned and duplicated items.
|
| + var items []Item
|
| + for _, shard := range shards {
|
| + for _, e := range shard {
|
| + if !ignore.Has(e.ID) {
|
| + items = append(items, Item{
|
| + ID: e.ID,
|
| + Value: e.Value,
|
| + })
|
| + ignore.Add(e.ID)
|
| + }
|
| + }
|
| + }
|
| +
|
| + return &Listing{
|
| + Items: items,
|
| + Tombstones: tombs,
|
| + set: s.ID,
|
| + producedAt: now,
|
| + idToKeys: idToKeys,
|
| + }, nil
|
| +}
|
| +
|
| +// PopOp is an in-progress 'Pop' operation.
|
| +//
|
| +// See BeginPop.
|
| +type PopOp struct {
|
| + ctx context.Context // datastore context to use for this op
|
| + txn datastore.Transaction // a transaction that started BeginPop
|
| + now time.Time // popping time for all popped items
|
| + dirty bool // true if the tombstone map was modified
|
| + finished bool // true if finished already
|
| + entity *tombstonesEntity // entity with tombstones
|
| + tombs map[string]tombstone // entity.Tombstones in a map form
|
| + idToKeys map[string][]*datastore.Key // ID -> datastore keys to cleanup
|
| + popped []*Tombstone // new tombstones for popped items
|
| +}
|
| +
|
| +// BeginPop initiates 'Pop' operation.
|
| +//
|
| +// Pop operation is used to transactionally remove items from the set, as well
|
| +// as cleanup old tombstones. It must be finished with 'dsset.FinishPop', even
|
| +// if no items have been popped: the internal state still can change in this
|
| +// case, since 'BeginPop' cleans up old tombstones. Even more, it is necessary
|
| +// to do 'Pop' if listing contains non-empty set of tombstones (regardless of
|
| +// whether the caller wants to actually pop any items from the set). This is
|
| +// part of the required set maintenance.
|
| +//
|
| +// Requires a transaction. Modifies TombstonesRoot entity group (and only it).
|
| +//
|
| +// Returns only transient errors. Such errors usually mean that the entire pop
|
| +// sequence ('List' + 'Pop') should be retried.
|
| +func (s *Set) BeginPop(c context.Context, listing *Listing) (*PopOp, error) {
|
| + if listing.set != s.ID {
|
| + panic("passed Listing from another set")
|
| + }
|
| + txn := datastore.CurrentTransaction(c)
|
| + if txn == nil {
|
| + panic("dsset.Set.BeginPop must be called inside a transaction")
|
| + }
|
| +
|
| + now := clock.Now(c).UTC()
|
| + if age := now.Sub(listing.producedAt); age > s.TombstonesDelay {
|
| + return nil, transient.Tag.Apply(fmt.Errorf("the listing is stale (%s > %s)", age, s.TombstonesDelay))
|
| + }
|
| +
|
| + entity := &tombstonesEntity{ID: s.ID, Parent: s.TombstonesRoot}
|
| + if err := datastore.Get(c, entity); err != nil && err != datastore.ErrNoSuchEntity {
|
| + return nil, transient.Tag.Apply(err)
|
| + }
|
| +
|
| + // The data in tombstonesEntity, in map form.
|
| + tombs := make(map[string]tombstone, len(entity.Tombstones))
|
| + for _, t := range entity.Tombstones {
|
| + tombs[t.ID] = t
|
| + }
|
| +
|
| + // Throw away old tombstones right away.
|
| + dirty := false
|
| + for _, tomb := range listing.Tombstones {
|
| + if tomb.old {
|
| + if !tomb.cleanedUp {
|
| + panic("trying to remove Tombstone that wasn't cleaned up")
|
| + }
|
| + if _, hasTomb := tombs[tomb.id]; hasTomb {
|
| + delete(tombs, tomb.id)
|
| + dirty = true
|
| + }
|
| + }
|
| + }
|
| +
|
| + return &PopOp{
|
| + ctx: c,
|
| + txn: txn,
|
| + now: now,
|
| + dirty: dirty,
|
| + entity: entity,
|
| + tombs: tombs,
|
| + idToKeys: listing.idToKeys,
|
| + }, nil
|
| +}
|
| +
|
| +// CanPop returns true if the given item can be popped from the set.
|
| +//
|
| +// Returns false if this item has been popped before (perhaps in another
|
| +// transaction), or it's not in the the listing passed to BeginPop.
|
| +func (p *PopOp) CanPop(id string) bool {
|
| + if _, hasTomb := p.tombs[id]; hasTomb {
|
| + return false // already popped by someone else
|
| + }
|
| + if _, present := p.idToKeys[id]; present {
|
| + return true // listed in the set
|
| + }
|
| + return false
|
| +}
|
| +
|
| +// Pop removed the item from the set and returns true if it was there.
|
| +//
|
| +// Returns false if this item has been popped before (perhaps in another
|
| +// transaction), or it's not in the the listing passed to BeginPop.
|
| +func (p *PopOp) Pop(id string) bool {
|
| + if p.finished {
|
| + panic("the operation has already been finished")
|
| + }
|
| + if !p.CanPop(id) {
|
| + return false
|
| + }
|
| + p.tombs[id] = tombstone{ID: id, Tombstoned: p.now}
|
| + p.popped = append(p.popped, &Tombstone{
|
| + id: id,
|
| + storage: p.idToKeys[id],
|
| + old: false, // BeingPop will ignore this fresh tombstone
|
| + })
|
| + p.dirty = true
|
| + return true
|
| +}
|
| +
|
| +// makeTombstonesEntity is used internally by FinishPop.
|
| +func (p *PopOp) makeTombstonesEntity() *tombstonesEntity {
|
| + p.entity.Tombstones = p.entity.Tombstones[:0]
|
| + for _, tomb := range p.tombs {
|
| + p.entity.Tombstones = append(p.entity.Tombstones, tomb)
|
| + }
|
| + return p.entity
|
| +}
|
| +
|
| +////////////////////////////////////////////////////////////////////////////////
|
| +
|
| +// FinishPop completes one or more pop operations (for different sets) by
|
| +// submitting changes to datastore.
|
| +//
|
| +// Must be called within same transaction that called BeginPop.
|
| +//
|
| +// It returns a list of tombstones for popped items. The storage used by the
|
| +// items can be reclaimed right away by calling 'CleanupStorage'. It is fine
|
| +// not to do so, 'List' will eventually return all tombstones that need cleaning
|
| +// anyway. Calling 'CleanupStorage' as best effort is still beneficial though,
|
| +// since it will reduce the amount of garbage in the set.
|
| +//
|
| +// Returns only transient errors.
|
| +func FinishPop(ctx context.Context, ops ...*PopOp) (tombs []*Tombstone, err error) {
|
| + txn := datastore.CurrentTransaction(ctx)
|
| +
|
| + entities := []*tombstonesEntity{}
|
| + tombsCount := 0
|
| + for _, op := range ops {
|
| + if op.finished {
|
| + panic("the operation has already been finished")
|
| + }
|
| + if op.txn != txn {
|
| + panic("wrong transaction")
|
| + }
|
| + if op.dirty {
|
| + entities = append(entities, op.makeTombstonesEntity())
|
| + tombsCount += len(op.popped)
|
| + }
|
| + }
|
| +
|
| + if err := datastore.Put(ctx, entities); err != nil {
|
| + return nil, transient.Tag.Apply(err)
|
| + }
|
| +
|
| + if tombsCount != 0 {
|
| + tombs = make([]*Tombstone, 0, tombsCount)
|
| + }
|
| + for _, op := range ops {
|
| + tombs = append(tombs, op.popped...)
|
| + op.finished = true
|
| + }
|
| +
|
| + return tombs, nil
|
| +}
|
| +
|
| +// CleanupStorage deletes entities used to store items under given tombstones.
|
| +//
|
| +// This is datastore's MultiDelete RPC in disguise. Touches many entity groups.
|
| +// Must be called outside of transactions. Idempotent.
|
| +//
|
| +// Can handle tombstones from multiple different sets at once. This is preferred
|
| +// over calling 'CleanupStorage' multiple times (once per set), since it
|
| +// collapses multiple datastore RPCs into one.
|
| +//
|
| +// This MUST be called before tombstones returned by 'List' are removed in
|
| +// 'Pop'. Failure to do so will make items reappear in the set.
|
| +//
|
| +// Returns only transient errors. There's no way to know which items were
|
| +// removed and which weren't in case of an error.
|
| +func CleanupStorage(c context.Context, cleanup ...[]*Tombstone) error {
|
| + if datastore.CurrentTransaction(c) != nil {
|
| + panic("dsset.CleanupStorage must be called outside of a transaction")
|
| + }
|
| +
|
| + keys := []*datastore.Key{}
|
| + for _, tombs := range cleanup {
|
| + for _, tomb := range tombs {
|
| + keys = append(keys, tomb.storage...)
|
| + }
|
| + }
|
| +
|
| + err := batchOp(len(keys), func(start, end int) error {
|
| + return datastore.Delete(c, keys[start:end])
|
| + })
|
| + if err != nil {
|
| + return transient.Tag.Apply(err)
|
| + }
|
| +
|
| + for _, tombs := range cleanup {
|
| + for _, tomb := range tombs {
|
| + tomb.cleanedUp = true
|
| + tomb.storage = nil
|
| + }
|
| + }
|
| + return nil
|
| +}
|
| +
|
| +////////////////////////////////////////////////////////////////////////////////
|
| +
|
| +type itemEntity struct {
|
| + _kind string `gae:"$kind,dsset.Item"`
|
| +
|
| + ID string `gae:"$id"`
|
| + Parent *datastore.Key `gae:"$parent"`
|
| + Value []byte `gae:",noindex"`
|
| +}
|
| +
|
| +type tombstonesEntity struct {
|
| + _kind string `gae:"$kind,dsset.Tombstones"`
|
| +
|
| + ID string `gae:"$id"`
|
| + Parent *datastore.Key `gae:"$parent"`
|
| + Tombstones []tombstone `gae:",noindex"`
|
| +}
|
| +
|
| +type tombstone struct {
|
| + ID string // ID of tombstoned item
|
| + Tombstoned time.Time // when it was popped
|
| +}
|
| +
|
| +// shardRoot returns entity group key to use for a given shard.
|
| +func (s *Set) shardRoot(c context.Context, n int) *datastore.Key {
|
| + return datastore.NewKey(c, "dsset.Shard", fmt.Sprintf("%s:%d", s.ID, n), 0, nil)
|
| +}
|
| +
|
| +// batchOp splits 'total' into batches and calls 'op' in parallel.
|
| +//
|
| +// Doesn't preserve order of returned errors! Don't try to deconstruct the
|
| +// returned multi error, the position of individual errors there does not
|
| +// correlate with the original array.
|
| +func batchOp(total int, op func(start, end int) error) error {
|
| + switch {
|
| + case total == 0:
|
| + return nil
|
| + case total <= batchSize:
|
| + return op(0, total)
|
| + }
|
| +
|
| + errs := make(chan error)
|
| + ops := 0
|
| + offset := 0
|
| + for total > 0 {
|
| + count := batchSize
|
| + if count > total {
|
| + count = total
|
| + }
|
| + go func(start, end int) {
|
| + errs <- op(start, end)
|
| + }(offset, offset+count)
|
| + offset += count
|
| + total -= count
|
| + ops++
|
| + }
|
| +
|
| + var all errors.MultiError
|
| + for i := 0; i < ops; i++ {
|
| + err := <-errs
|
| + if merr, yep := err.(errors.MultiError); yep {
|
| + for _, e := range merr {
|
| + if e != nil {
|
| + all = append(all, e)
|
| + }
|
| + }
|
| + } else if err != nil {
|
| + all = append(all, err)
|
| + }
|
| + }
|
| +
|
| + if len(all) == 0 {
|
| + return nil
|
| + }
|
| + return all
|
| +}
|
|
|